Datasets#

Concatenated DataSet#

class kedro_partitioned.extras.datasets.concatenated_dataset.ConcatenatedDataSet(path, dataset, concat_func, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, preprocess=<function identity>, preprocess_kwargs={}, filter=<function truthify>)[source]#

A partitioned DataSet that concatenates partitioned datasets.

Parameters:
  • path (str) – Path to the folder where the data is stored.

  • dataset (Union[str, Type[T], Dict[str, Any]]) – DataSet class to wrap.

  • concat_func (Callable[[Iterable[T]], T]) – Function to concatenate data.

  • filepath_arg (str, optional) – dataset’s path attribute. Defaults to “filepath”.

  • filename_suffix (str, optional) – partitioned suffix. Defaults to “”.

  • credentials (Dict[str, Any], optional) – credentials. Defaults to None.

  • load_args (Dict[str, Any], optional) – args for loading. Defaults to None.

  • fs_args (Dict[str, Any], optional) – args for fsspec. Defaults to None.

  • overwrite (bool, optional) – overwrite partitions. Defaults to False.

  • preprocess (Union[Callable[[T], T], str], optional) – applied to each partition before concatenating. this argument can be a function, a lambda as string, a python import path to a function, or a regex. Defaults to identity.

  • preprocess_kwargs (Dict, optional) – arguments for preprocess function. this argument can be a function, a lambda as string or a python import path to a function. Defaults to {}.

  • filter (Union[Callable[[str], bool], str], optional) – filter partitions by its relative paths. Defaults to truthify.

Pandas Concatenated DataSet#

Pandas concatenated dataset is a sugar for the PartitionedDataSet that concatenates all dataframe partitions into a single dataframe.

For example, let’s say you have a folder structure like this:

clients/
├── brazil.csv
├── canada.csv
└── united_states.csv

And you wan’t to load all the files as a single dataset. In this case, you could do something like this:

clients:
  type: kedro_partitioned.dataset.PandasConcatenatedDataSet
  path: clients
  dataset:
    type: pandas.CSVDataSet

Then, the clients dataset will be all the concatenated dataframes from the clients/*.csv files.

class kedro_partitioned.extras.datasets.concatenated_dataset.PandasConcatenatedDataSet(path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False, preprocess=<function identity>, preprocess_kwargs={}, filter=<function truthify>)[source]#

A partitioned dataset that concatenates load pandas DataFrames.

Parameters:
  • path (str) – Path to the folder where the data is stored.

  • dataset (Union[str, Type[T], Dict[str, Any]]) – DataSet class to wrap.

  • filepath_arg (str, optional) – dataset’s path attribute. Defaults to “filepath”.

  • filename_suffix (str, optional) – partitioned suffix. Defaults to “”.

  • credentials (Dict[str, Any], optional) – credentials. Defaults to None.

  • load_args (Dict[str, Any], optional) – args for loading. Defaults to None.

  • fs_args (Dict[str, Any], optional) – args for fsspec. Defaults to None.

  • overwrite (bool, optional) – overwrite partitions. Defaults to False.

  • preprocess (Callable[[T], T], optional) – applied to each partition before concatenating. this argument can be a function, a lambda as string or a python import path to a function. Defaults to identity.

  • preprocess_kwargs (Dict, optional) – arguments for preprocess function. Defaults to {}.

  • filter (Union[Callable[[str], bool], str], optional) – filter partitions by its relative paths. this argument can be a function, a lambda as string, a python import path to a function, or a regex. Defaults to truthify.

Example

>>> ds = PandasConcatenatedDataSet(
...     path='a/b/c',
...     dataset='pandas.CSVDataSet',
...     filter='lambda subpath: "test" in subpath',
...     preprocess='lambda df, col: df.rename(columns={"a": col})',
...     preprocess_kwargs={'col': 'b'})  
>>> ds
<...PandasConcatenatedDataSet object at 0x...>
>>> ds.filter('a/test/c.csv')
True
>>> ds.filter('a/b/c.csv')
False
>>> df = pd.DataFrame({'a': [1]})
>>> ds._load_partition(('test', lambda: df))
   b
0  1

With imports:

>>> ds = PandasConcatenatedDataSet(
...     path='a/b/c',
...     dataset='pandas.CSVDataSet',
...     filter='kedro_partitioned.utils.other.falsify',
...     # the same can be done with preprocess
...     preprocess='lambda df, col: df.rename(columns={"a": col})',
...     preprocess_kwargs={'col': 'b'})  
>>> ds.filter('a/test/c.csv')
False

With regexes:

>>> ds = PandasConcatenatedDataSet(
...     path='a/b/c',
...     dataset='pandas.CSVDataSet',
...     filter='.+test.csv$',
...     preprocess='lambda df, col: df.rename(columns={"a": col})',
...     preprocess_kwargs={'col': 'b'})  
>>> ds.filter('a/test.csv/test.csv')
True
>>> ds.filter('a/test.csv/a.csv')
False

Path Safe Partitioned DataSet#

class kedro_partitioned.io.PathSafePartitionedDataSet(path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False)[source]#

Partitioned DataSet, but handles mixed relative and absolute paths.

For example, if the ffspec package you are using returns relative paths from a glob, but the path you specified is absolute, this dataset will be able to handle it.

Example

>>> ds = PathSafePartitionedDataSet(
...          path="http://abc.core/path/to",  # absolute
...          dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("path/to/partition1.csv")  # relative
'partition1.csv'
>>> ds = PartitionedDataSet(
...          path="http://abc.core/path/to",  # absolute
...          dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("path/to/partition1.csv")  # relative
'path/to/partition1.csv'

Note

it is recommended to use PathSafePartitionedDataSet instead of PartitionedDataSet, for every step parallelism scenario. This is important because handling path safely is mandatory for the multinode partitioned dataset zip feature to work properly.