Decorators#
Concat Partitions#
This decorator is used to concatenate the partitions of a dataset into a single dataset. It is similar to the ConcatenatedDataSet
, but can be used as a decorator in a node.
- kedro_partitioned.pipeline.decorators.concat_partitions(partitioned_arg, filter=None, func=<function identity>, func_args=[])[source]#
Decorator that concatenates DataFrames in a partitioned dataset.
- Parameters:
partitioned_arg (str) – func’s partitioned dataset argument
filter (Union[str, Callable[[str], bool]]]) – filter function for partition keys. Defaults to None * str: a regex * Callable[[str], bool]
func (Callable[[pd.DataFrame], pd.DataFrame]) – function applied to each partitions. Defaults to identity
- Return type:
Callable
[[Callable
],Callable
]- Returns:
Callable[[Callable], Callable]
Example
>>> fake_partitioned = {'a': lambda: pd.DataFrame({'a': [1]}), ... 'ab': lambda: pd.DataFrame({'a': [2]}), ... 'c': lambda: pd.DataFrame({'a': [3]})} >>> @concat_partitions(partitioned_arg='df') ... def foo(df): ... return df
>>> foo(fake_partitioned) a 0 1 1 2 2 3
>>> @concat_partitions(partitioned_arg='df', ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12 2 3 13
>>> @concat_partitions(partitioned_arg='df', filter='ab?') ... def foo(df): ... return df >>> foo(fake_partitioned) a 0 1 1 2
>>> @concat_partitions(partitioned_arg='df', filter='ab?', ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x) ... def foo(df): ... return df >>> foo(fake_partitioned) a 0 1 1 2
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x, ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12
>>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x, ... func=lambda x, arg1: x.assign(d=x['a']+arg1), ... func_args=['arg1']) ... def foo(df, arg1): ... return df >>> foo(fake_partitioned, 20) a d 0 1 21 1 2 22
>>> @concat_partitions(partitioned_arg='df', filter='ggg') ... def foo(df): ... return df >>> foo(fake_partitioned) Empty DataFrame Columns: [] Index: []
>>> @concat_partitions(partitioned_arg='df', filter='ggg') ... def foo(df): ... return df >>> foo({}) Empty DataFrame Columns: [] Index: []
Using helpers:
>>> from kedro_partitioned.pipeline.decorators.helper_factory import ( ... date_range_filter)
>>> dfn = date_range_filter(min_date='2020-02-02', format='%Y-%m-%d') >>> date_part = {'p1/2020-01-01/s': lambda: pd.DataFrame({'a': [1]}), ... 'p1/2020-02-03/s': lambda: pd.DataFrame({'a': [2]}), ... 'p2/2020-05-03/s': lambda: pd.DataFrame({'a': [3]})} >>> @concat_partitions(partitioned_arg='df', filter=dfn) ... def foo(df): ... return df >>> foo(date_part) a 0 2 1 3
Using multiple helpers:
>>> from kedro_partitioned.utils.other import compose >>> from kedro_partitioned.pipeline.decorators.helper_factory import ( ... regex_filter)
>>> @concat_partitions(partitioned_arg='df', filter=[dfn, r'p1.*']) ... def foo(df): ... return df >>> foo(date_part) a 0 2
Split Into Partitions#
This decorator is used to split a dataset into partitions. It does the opposite of the ConcatPartitions
decorator.
- kedro_partitioned.pipeline.decorators.split_into_partitions(keys, folder_template=None, filename_template=None, output=0)[source]#
Splits a DataFrame function output into a dict <group_by_keys>: <group>.
- Parameters:
keys (Iterable[str]) – Columns names to group
folder_template (str) – Template name for folder. You can pass units of keys inside braces ({}). Defaults to None.
filename_template (str) – Template name for filename. You can pass units of keys inside braces ({}) Defaults to None
output (Union[str, int], optional) – Key or index of the output of the DataFrame. Defaults to 0.
- Return type:
Callable
- Returns:
Callable
Example
>>> df = pd.DataFrame({'name': ['Apple', 'Pear'], 'price': [10, 15]}) >>> @split_into_partitions( ... keys=['name', 'price'], ... output=0) ... def foo(df): ... return [df]
>>> pprint(foo(df)) [{'Apple/10/Apple_10': name price 0 Apple 10, 'Pear/15/Pear_15': name price 1 Pear 15}]
>>> @split_into_partitions( ... keys=['name', 'price'], ... folder_template='part/{name}/{price}', ... filename_template='{name}_{price}', ... output='out') ... def foo(df): ... return {'out': df}
>>> pprint(foo(df)) {'out': {'part/Apple/10/Apple_10': name price 0 Apple 10, 'part/Pear/15/Pear_15': name price 1 Pear 15}}
List output#
This decorator converts the output of a node into a list. Useful to standardize the declaration with lists.