Source code for kedro_partitioned.pipeline.decorators.decorators

"""Decorators for node funcs."""
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
import posixpath
import re
from typing import Any, Callable, Iterable, Union, List, Dict

import pandas as pd

from kedro_partitioned.pipeline.decorators.helper_factory import regex_filter
from kedro_partitioned.utils.typing import IsFunction
from kedro_partitioned.utils.other import kwargs_only, identity
from kedro_partitioned.utils.iterable import tolist


[docs]def concat_partitions( partitioned_arg: str, filter: Union[str, IsFunction[str], List[IsFunction[str]]] = None, func: Callable[[pd.DataFrame], pd.DataFrame] = identity, func_args: List[str] = [], ) -> Callable[[Callable], Callable]: """Decorator that concatenates DataFrames in a partitioned dataset. Args: partitioned_arg (str): func's partitioned dataset argument filter (Union[str, Callable[[str], bool]]]): filter function for partition keys. Defaults to None * str: a regex * Callable[[str], bool] func (Callable[[pd.DataFrame], pd.DataFrame]): function applied to each partitions. Defaults to identity Returns: Callable[[Callable], Callable] Example: >>> fake_partitioned = {'a': lambda: pd.DataFrame({'a': [1]}), ... 'ab': lambda: pd.DataFrame({'a': [2]}), ... 'c': lambda: pd.DataFrame({'a': [3]})} >>> @concat_partitions(partitioned_arg='df') ... def foo(df): ... return df >>> foo(fake_partitioned) a 0 1 1 2 2 3 >>> @concat_partitions(partitioned_arg='df', ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12 2 3 13 >>> @concat_partitions(partitioned_arg='df', filter='ab?') ... def foo(df): ... return df >>> foo(fake_partitioned) a 0 1 1 2 >>> @concat_partitions(partitioned_arg='df', filter='ab?', ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12 >>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x) ... def foo(df): ... return df >>> foo(fake_partitioned) a 0 1 1 2 >>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x, ... func=lambda x: x.assign(d=x['a']+10)) ... def foo(df): ... return df >>> foo(fake_partitioned) a d 0 1 11 1 2 12 >>> @concat_partitions(partitioned_arg='df', filter=lambda x: 'a' in x, ... func=lambda x, arg1: x.assign(d=x['a']+arg1), ... func_args=['arg1']) ... def foo(df, arg1): ... return df >>> foo(fake_partitioned, 20) a d 0 1 21 1 2 22 >>> @concat_partitions(partitioned_arg='df', filter='ggg') ... def foo(df): ... return df >>> foo(fake_partitioned) Empty DataFrame Columns: [] Index: [] >>> @concat_partitions(partitioned_arg='df', filter='ggg') ... def foo(df): ... return df >>> foo({}) Empty DataFrame Columns: [] Index: [] Using helpers: >>> from kedro_partitioned.pipeline.decorators.helper_factory import ( ... date_range_filter) >>> dfn = date_range_filter(min_date='2020-02-02', format='%Y-%m-%d') >>> date_part = {'p1/2020-01-01/s': lambda: pd.DataFrame({'a': [1]}), ... 'p1/2020-02-03/s': lambda: pd.DataFrame({'a': [2]}), ... 'p2/2020-05-03/s': lambda: pd.DataFrame({'a': [3]})} >>> @concat_partitions(partitioned_arg='df', filter=dfn) ... def foo(df): ... return df >>> foo(date_part) a 0 2 1 3 Using multiple helpers: >>> from kedro_partitioned.utils.other import compose >>> from kedro_partitioned.pipeline.decorators.helper_factory import ( ... regex_filter) >>> @concat_partitions(partitioned_arg='df', filter=[dfn, r'p1.*']) ... def foo(df): ... return df >>> foo(date_part) a 0 2 """ if filter is None: def filter_fn(_: str) -> bool: return True elif isinstance(filter, str): regex = re.compile(filter) def filter_fn(x: str) -> bool: return bool(regex.search(x)) else: filter_fns = [ regex_filter(x) if isinstance(x, str) else x for x in tolist(filter) ] def filter_fn(x: str) -> bool: return all([fn(x) for fn in filter_fns]) def decorator(f: Callable) -> Callable: @wraps(f) @kwargs_only(f) def wrapper(**kwargs: Any) -> Any: loaders_dict: Dict[str, Callable[[], pd.DataFrame]] =\ kwargs[partitioned_arg] if len(loaders_dict) > 0: loaders_dict = { k: v for k, v in loaders_dict.items() if filter_fn(k) } if len(loaders_dict) == 0: # filter removed everything partitions = [pd.DataFrame()] else: # reads in parallel loaders = list(loaders_dict.values()) with ThreadPoolExecutor() as pool: partitions = list( pool.map( lambda x: func( x(), **{ k: v for k, v in kwargs.items() if k in func_args }), loaders)) kwargs[partitioned_arg] =\ pd.concat(partitions).reset_index(drop=True) else: # no partitions into the folder kwargs[partitioned_arg] = pd.DataFrame() return f(**kwargs) return wrapper return decorator
[docs]def list_output(f: Callable) -> Callable: """Turns an function output into a list. Args: f (Callable) Returns: Callable Example: >>> @list_output ... def foo(): ... return 3 >>> foo() [3] """ @wraps(f) def wrapper(*args: Any, **kwargs: Any) -> Any: return [f(*args, **kwargs)] return wrapper
[docs]def split_into_partitions( keys: Union[str, Iterable[str]], folder_template: str = None, filename_template: str = None, output: Union[str, int] = 0 ) -> Callable: """Splits a DataFrame function output into a dict <group_by_keys>: <group>. Args: keys (Iterable[str]): Columns names to group folder_template (str): Template name for folder. You can pass units of keys inside braces ({}). Defaults to None. filename_template (str): Template name for filename. You can pass units of keys inside braces ({}) Defaults to None output (Union[str, int], optional): Key or index of the output of the DataFrame. Defaults to 0. Returns: Callable Example: >>> df = pd.DataFrame({'name': ['Apple', 'Pear'], 'price': [10, 15]}) >>> @split_into_partitions( ... keys=['name', 'price'], ... output=0) ... def foo(df): ... return [df] >>> pprint(foo(df)) # doctest: +NORMALIZE_WHITESPACE [{'Apple/10/Apple_10': name price 0 Apple 10, 'Pear/15/Pear_15': name price 1 Pear 15}] >>> @split_into_partitions( ... keys=['name', 'price'], ... folder_template='part/{name}/{price}', ... filename_template='{name}_{price}', ... output='out') ... def foo(df): ... return {'out': df} >>> pprint(foo(df)) # doctest: +NORMALIZE_WHITESPACE {'out': {'part/Apple/10/Apple_10': name price 0 Apple 10, 'part/Pear/15/Pear_15': name price 1 Pear 15}} """ if isinstance(keys, str): keys = [keys] if folder_template is None: folder_template = posixpath.join(*['{' + str(k) + '}' for k in keys]) if filename_template is None: filename_template = '_'.join(['{' + str(k) + '}' for k in keys]) def decorator(f: Callable) -> Callable: @wraps(f) def wrapper(*args: Any, **kwargs: Any) -> Any: r = f(*args, **kwargs) is_df = False if isinstance(r, (tuple, list, dict)): df = r[output] else: df = r is_df = True template = posixpath.join(folder_template, filename_template) splitted = { template.format(**{ unit: key[ind] for ind, unit in enumerate(keys) }): group for key, group in df.groupby(keys) } if is_df: r = splitted else: r[output] = splitted return r return wrapper return decorator