Source code for kedro_partitioned.extras.datasets.concatenated_dataset

"""A DataSet that concatenates partitioned datasets."""
from concurrent.futures import ThreadPoolExecutor
import importlib
from typing import (
    Any,
    Callable,
    Dict,
    Generic,
    Iterable,
    Tuple,
    Type,
    TypeVar,
    Union
)

import pandas as pd
from kedro_partitioned.io.path_safe_partitioned_dataset import (
    PathSafePartitionedDataSet
)
from kedro_partitioned.utils.other import filter_or_regex, identity, truthify
from kedro_partitioned.utils.typing import PandasDataSets

T = TypeVar('T')


[docs]class ConcatenatedDataSet(PathSafePartitionedDataSet, Generic[T]): """A partitioned DataSet that concatenates partitioned datasets. Args: path (str): Path to the folder where the data is stored. dataset (Union[str, Type[T], Dict[str, Any]]): DataSet class to wrap. concat_func (Callable[[Iterable[T]], T]): Function to concatenate data. filepath_arg (str, optional): dataset's path attribute. Defaults to "filepath". filename_suffix (str, optional): partitioned suffix. Defaults to "". credentials (Dict[str, Any], optional): credentials. Defaults to None. load_args (Dict[str, Any], optional): args for loading. Defaults to None. fs_args (Dict[str, Any], optional): args for fsspec. Defaults to None. overwrite (bool, optional): overwrite partitions. Defaults to False. preprocess (Union[Callable[[T], T], str], optional): applied to each partition before concatenating. this argument can be a function, a lambda as string, a python import path to a function, or a regex. Defaults to identity. preprocess_kwargs (Dict, optional): arguments for preprocess function. this argument can be a function, a lambda as string or a python import path to a function. Defaults to {}. filter (Union[Callable[[str], bool], str], optional): filter partitions by its relative paths. Defaults to truthify. """ _IMPORTLIB_SEPARATOR = '.' def __init__( self, path: str, dataset: Union[str, Type[T], Dict[str, Any]], concat_func: Callable[[Iterable[T]], T], filepath_arg: str = "filepath", filename_suffix: str = "", credentials: Dict[str, Any] = None, load_args: Dict[str, Any] = None, fs_args: Dict[str, Any] = None, overwrite: bool = False, preprocess: Union[Callable[[T], T], str] = identity, preprocess_kwargs: Dict = {}, filter: Union[Callable[[str], bool], str] = truthify, ): """Initialize a ConcatenatedDataSet.""" super().__init__(path, dataset, filepath_arg, filename_suffix, credentials, load_args, fs_args, overwrite) self.concat_func = self._parse_function(concat_func) self.preprocess = self._parse_function(preprocess) self.filter = filter_or_regex(self._parse_function(filter)) self.preprocess_kwargs = preprocess_kwargs @classmethod def _parse_function(cls, fn: Union[T, str]) -> Union[T, str, Callable]: """Parses a string as lambda, or imports it. Parses a string type to a lambda function string, or imports it, or returns itself if not string or if it is not a suitable string. Args: fn (Union[T, str]) Returns: Union[T, str, Callable] Example: >>> fn = ConcatenatedDataSet._parse_function('lambda x: x+3') >>> fn(3) 6 >>> fn = ConcatenatedDataSet._parse_function( ... 'kedro_partitioned.utils.other.falsify') >>> fn(True) False >>> fn = ConcatenatedDataSet._parse_function('invalid') >>> fn 'invalid' """ try: if isinstance(fn, str): if fn.startswith('lambda '): return eval(fn) elif cls._IMPORTLIB_SEPARATOR in fn: module, func = fn.rsplit(cls._IMPORTLIB_SEPARATOR, 1) return getattr(importlib.import_module(module), func) except Exception: pass return fn def _load_partition(self, data: Tuple[str, Callable[[], T]]) -> T: self._logger.info(f"Processing partition {data[0]}") return self.preprocess(data[1](), **self.preprocess_kwargs) def _load(self) -> T: partitions = super()._load() loaders = {k: v for k, v in partitions.items() if self.filter(k)} with ThreadPoolExecutor() as pool: data_list = list(pool.map(self._load_partition, loaders.items())) return self.concat_func(data_list)
[docs]class PandasConcatenatedDataSet(ConcatenatedDataSet[PandasDataSets]): """A partitioned dataset that concatenates load pandas DataFrames. Args: path (str): Path to the folder where the data is stored. dataset (Union[str, Type[T], Dict[str, Any]]): DataSet class to wrap. filepath_arg (str, optional): dataset's path attribute. Defaults to "filepath". filename_suffix (str, optional): partitioned suffix. Defaults to "". credentials (Dict[str, Any], optional): credentials. Defaults to None. load_args (Dict[str, Any], optional): args for loading. Defaults to None. fs_args (Dict[str, Any], optional): args for fsspec. Defaults to None. overwrite (bool, optional): overwrite partitions. Defaults to False. preprocess (Callable[[T], T], optional): applied to each partition before concatenating. this argument can be a function, a lambda as string or a python import path to a function. Defaults to identity. preprocess_kwargs (Dict, optional): arguments for preprocess function. Defaults to {}. filter (Union[Callable[[str], bool], str], optional): filter partitions by its relative paths. this argument can be a function, a lambda as string, a python import path to a function, or a regex. Defaults to truthify. Example: >>> ds = PandasConcatenatedDataSet( ... path='a/b/c', ... dataset='pandas.CSVDataSet', ... filter='lambda subpath: "test" in subpath', ... preprocess='lambda df, col: df.rename(columns={"a": col})', ... preprocess_kwargs={'col': 'b'}) # doctest: +ELLIPSIS >>> ds <...PandasConcatenatedDataSet object at 0x...> >>> ds.filter('a/test/c.csv') True >>> ds.filter('a/b/c.csv') False >>> df = pd.DataFrame({'a': [1]}) >>> ds._load_partition(('test', lambda: df)) b 0 1 With imports: >>> ds = PandasConcatenatedDataSet( ... path='a/b/c', ... dataset='pandas.CSVDataSet', ... filter='kedro_partitioned.utils.other.falsify', ... # the same can be done with preprocess ... preprocess='lambda df, col: df.rename(columns={"a": col})', ... preprocess_kwargs={'col': 'b'}) # doctest: +ELLIPSIS >>> ds.filter('a/test/c.csv') False With regexes: >>> ds = PandasConcatenatedDataSet( ... path='a/b/c', ... dataset='pandas.CSVDataSet', ... filter='.+test.csv$', ... preprocess='lambda df, col: df.rename(columns={"a": col})', ... preprocess_kwargs={'col': 'b'}) # doctest: +ELLIPSIS >>> ds.filter('a/test.csv/test.csv') True >>> ds.filter('a/test.csv/a.csv') False """ def __init__( self, path: str, dataset: Union[str, Type[T], Dict[str, Any]], filepath_arg: str = "filepath", filename_suffix: str = "", credentials: Dict[str, Any] = None, load_args: Dict[str, Any] = None, fs_args: Dict[str, Any] = None, overwrite: bool = False, preprocess: Callable[[T], T] = identity, preprocess_kwargs: Dict = {}, filter: Union[Callable[[str], bool], str] = truthify, ): """Initialize a PandasConcatenatedDataSet.""" super().__init__(path, dataset, pd.concat, filepath_arg, filename_suffix, credentials, load_args, fs_args, overwrite, preprocess, preprocess_kwargs, filter)