Decorators#

Concat Partitions#

This decorator is used to concatenate the partitions of a dataset into a single dataset. It is similar to the ConcatenatedDataSet, but can be used as a decorator in a node.

kedro_partitioned.pipeline.decorators.concat_partitions(partitioned_arg, filter=None, func=<function identity>, func_args=[])[source]#

Decorator that concatenates DataFrames in a partitioned dataset.

Parameters:
  • partitioned_arg (str) – func’s partitioned dataset argument

  • filter (Union[str, Callable[[str], bool]]]) – filter function for partition keys. Defaults to None * str: a regex * Callable[[str], bool]

  • func (Callable[[pd.DataFrame], pd.DataFrame]) – function applied to each partitions. Defaults to identity

Return type:

Callable[[Callable], Callable]

Returns:

Callable[[Callable], Callable]

Example

>>> fake_partitioned = {'a': lambda: pd.DataFrame({'a': [1]}),
...                     'ab': lambda: pd.DataFrame({'a': [2]}),
...                     'c': lambda: pd.DataFrame({'a': [3]})}
>>> @concat_partitions(partitioned_arg='df')
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a
0  1
1  2
2  3
>>> @concat_partitions(partitioned_arg='df',
...                    func=lambda x: x.assign(d=x['a']+10))
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a   d
0  1  11
1  2  12
2  3  13
>>> @concat_partitions(partitioned_arg='df', filter='ab?')
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a
0  1
1  2
>>> @concat_partitions(partitioned_arg='df', filter='ab?',
...                    func=lambda x: x.assign(d=x['a']+10))
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a   d
0  1  11
1  2  12
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x)
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a
0  1
1  2
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x,
...                    func=lambda x: x.assign(d=x['a']+10))
... def foo(df):
...     return df
>>> foo(fake_partitioned)
   a   d
0  1  11
1  2  12
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x,
...                    func=lambda x, arg1: x.assign(d=x['a']+arg1),
...                    func_args=['arg1'])
... def foo(df, arg1):
...     return df
>>> foo(fake_partitioned, 20)
   a   d
0  1  21
1  2  22
>>> @concat_partitions(partitioned_arg='df', filter='ggg')
... def foo(df):
...     return df
>>> foo(fake_partitioned)
Empty DataFrame
Columns: []
Index: []
>>> @concat_partitions(partitioned_arg='df', filter='ggg')
... def foo(df):
...     return df
>>> foo({})
Empty DataFrame
Columns: []
Index: []

Using helpers:

>>> from kedro_partitioned.pipeline.decorators.helper_factory import (
...     date_range_filter)
>>> dfn = date_range_filter(min_date='2020-02-02', format='%Y-%m-%d')
>>> date_part = {'p1/2020-01-01/s': lambda: pd.DataFrame({'a': [1]}),
...              'p1/2020-02-03/s': lambda: pd.DataFrame({'a': [2]}),
...              'p2/2020-05-03/s': lambda: pd.DataFrame({'a': [3]})}
>>> @concat_partitions(partitioned_arg='df', filter=dfn)
... def foo(df):
...     return df
>>> foo(date_part)
   a
0  2
1  3

Using multiple helpers:

>>> from kedro_partitioned.utils.other import compose
>>> from kedro_partitioned.pipeline.decorators.helper_factory import (
...     regex_filter)
>>> @concat_partitions(partitioned_arg='df', filter=[dfn, r'p1.*'])
... def foo(df):
...     return df
>>> foo(date_part)
   a
0  2

Split Into Partitions#

This decorator is used to split a dataset into partitions. It does the opposite of the ConcatPartitions decorator.

kedro_partitioned.pipeline.decorators.split_into_partitions(keys, folder_template=None, filename_template=None, output=0)[source]#

Splits a DataFrame function output into a dict <group_by_keys>: <group>.

Parameters:
  • keys (Iterable[str]) – Columns names to group

  • folder_template (str) – Template name for folder. You can pass units of keys inside braces ({}). Defaults to None.

  • filename_template (str) – Template name for filename. You can pass units of keys inside braces ({}) Defaults to None

  • output (Union[str, int], optional) – Key or index of the output of the DataFrame. Defaults to 0.

Return type:

Callable

Returns:

Callable

Example

>>> df = pd.DataFrame({'name': ['Apple', 'Pear'], 'price': [10, 15]})
>>> @split_into_partitions(
...     keys=['name', 'price'],
...     output=0)
... def foo(df):
...     return [df]
>>> pprint(foo(df))  
[{'Apple/10/Apple_10':     name  price
0  Apple     10,
'Pear/15/Pear_15':    name  price
1  Pear     15}]
>>> @split_into_partitions(
...     keys=['name', 'price'],
...     folder_template='part/{name}/{price}',
...     filename_template='{name}_{price}',
...     output='out')
... def foo(df):
...     return {'out': df}
>>> pprint(foo(df))  
{'out': {'part/Apple/10/Apple_10':     name  price
0  Apple     10,
     'part/Pear/15/Pear_15':    name  price
1  Pear     15}}

List output#

This decorator converts the output of a node into a list. Useful to standardize the declaration with lists.

kedro_partitioned.pipeline.decorators.list_output(f)[source]#

Turns an function output into a list.

Parameters:

f (Callable) –

Return type:

Callable

Returns:

Callable

Example

>>> @list_output
... def foo():
...     return 3
>>> foo()
[3]