Source code for kedro_partitioned.io.path_safe_partitioned_dataset
"""A DataSet that is partitioned into multiple DataSets."""
from pathlib import PurePosixPath
import posixpath
from kedro.io import PartitionedDataSet
[docs]class PathSafePartitionedDataSet(PartitionedDataSet):
"""Partitioned DataSet, but handles mixed relative and absolute paths.
For example, if the ffspec package you are using returns relative paths
from a glob, but the path you specified is absolute, this dataset will be
able to handle it.
Example:
>>> ds = PathSafePartitionedDataSet(
... path="http://abc.core/path/to", # absolute
... dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("path/to/partition1.csv") # relative
'partition1.csv'
>>> ds = PartitionedDataSet(
... path="http://abc.core/path/to", # absolute
... dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("path/to/partition1.csv") # relative
'path/to/partition1.csv'
"""
def _path_to_partition(self, path: str) -> str:
"""Takes only the relative subpath from the partitioned dataset path.
Args:
path (str): path to a partition
Returns:
str: relative subpath from the partitioned dataset path
Example:
>>> ds = PathSafePartitionedDataSet(
... path="http://abc.core/path/to",
... dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("http://abc.core/path/to/partition1.csv")
'partition1.csv'
>>> ds = PathSafePartitionedDataSet(
... path="data/path",
... dataset="pandas.CSVDataSet",)
>>> ds._path_to_partition("data/path/partition1.csv")
'partition1.csv'
Note:
this dataset differs from the original one because it treats non
absolute paths too. An example of non package that returns relative
paths is the adlfs package. it returns the path relative to the
container, while to declare the dataset, you'll have to pass the
full uri to the folder. This makes Kedro's partitioned dataset to
not rsplit(partition, path) correctly.
"""
subpath = super()._path_to_partition(path)
subpath_parts = PurePosixPath(path).parts
path_parts = PurePosixPath(self._normalized_path).parts
common_index = next((i for i, part in enumerate(path_parts)
if part == subpath_parts[0]), 0)
suffix = str(PurePosixPath(*path_parts[common_index:])) + posixpath.sep
return subpath.replace(suffix, '', 1)