Parallelism#

Step Parallelism#

Step Parallelism is a feature that allows the user to replicate a step into multiple nodes, each one processing a batch data. This is useful when the step is computationally expensive and the user wants to speed up the execution time, without having to worry about load balancing.

Why Step Parallelism?#

Because Kedro may be used in multiple machines, i.e. clusters, local machines, etc…it may be hard to find a solution that can scale both horizontally and vertically with the same code. In order to solve this problem, we developed a solution that passes the responsibility of load balancing to the current runner. For example, if you are using our step parallel solution with the SequentialRunner, Kedro will run it sequentially, but if you are using the ParallelRunner, Kedro will run it in parallel without using a single line of multiprocessing or multithreading code. The same applies to the Kedro to cloud converters, like kedro-azureml.

How Step Parallelism works?#

%%{init: {'theme':'dark'}}%% graph LR subgraph Input PartitionedInput[(Partitioned Input)] --> Slicer Slicer --> JSON[(JSON)] end subgraph Compute PartitionedInput --> Slice0[Slice 0] PartitionedInput --> Slice1[Slice 1] PartitionedInput --> Slice...[Slice ...] PartitionedInput --> Slicen[Slice n] JSON --> Slice0 JSON --> Slice1 JSON --> Slice... JSON --> Slicen Slice0 --> PartitionedOutputCopy0[(Partitioned Output Copy 0)] Slice1 --> PartitionedOutputCopy1[(Partitioned Output Copy 1)] Slice... --> PartitionedOutputCopy...[(Partitioned Output Copy ...)] Slicen --> PartitionedOutputCopyn[(Partitioned Output Copy n)] end subgraph Barrier PartitionedOutputCopy0 --> Synchronization PartitionedOutputCopy1 --> Synchronization PartitionedOutputCopy... --> Synchronization PartitionedOutputCopyn --> Synchronization Synchronization --> PartitionedOutput[(Partitioned Output)] end

There three main layers in the step parallelism solution:

  1. Input Layer: Responsible for slicing the input data into a JSON that specifies what partitions will be processed by each node.

  2. Compute Layer: Responsible for processing each partition and saving it in a copy of the output (Since no nodes are going to process the same partition, the nodes won’t incur into race condition).

  3. Barrier Layer: Responsible for blocking the Kedro pipeline nodes that depend on the output of the step parallelism until all the compute nodes have finished processing the data (it doesn’t perform any updates in the partitioned output).

How to use it?#

In order to use the step parallelism feature, you need to create your nodes using the multinode or the multipipeline functions. These functions return a pipeline with the structure stated above. The multinode function is used when you want to replicate a node into multiple nodes, while the multipipeline function is used when you want to replicate a pipeline into multiple pipelines, i.e. slices are replaced by a copy of the pipeline.

See the function docstrings below for more usage examples:

Multinode#

kedro_partitioned.pipeline.multinode(func, partitioned_input, partitioned_output, name, configurator=None, other_inputs=[], tags=None, confirms=None, namespace=None, n_slices=2, filter=<function truthify>)[source]#

Creates multiple nodes to process partitioned data.

Multinodes are a way to implement step level parallelism. It is useful for processing independent data in parallel, managed by pipeline runners. For example, in Kedro, running the pipeline with ParallelRunner would enable the steps generated by multinode to be run using multiple cpus. At the same time, if you run this pipeline in a distributed context, you could rely on a “DistributedRunner” or in another pipeline manager like AzureML or Kubeflow, without having to change the code.

Multinodes work like the following flowchart:
+--------------+
| Configurator |
| parameter    |-+
+--------------+ |
            |    |
            v    |
+-------------+  +->+------------+     +--------------------+
| Slicer Node |--+->| Slice-0    |--+->| Synchronization    |
| my          |  +->| my-slice-0 |  |  | my-synchronization |
+-------------+  |  +------------+  |  +--------------------+
            ^    |          ...     |       |
            |    +->+------------+  |       v
+-------------+  +->| Slice-1    |--|   +-------------+
| Partitioned |--+->| my-slice-1 |  |   | Partitioned |
| input-ds    |  |  +------------+  |   | output-ds   |
+-------------+  +->+------------+  |   +-------------+
Start            +->| Slice-n    |--+   End
                 +->| my-slice-n |
                    +------------+
Nodes specification:
Slicer Node:

name: multinode name inputs: partitioned inputs and configurator cached flags outputs: json with a list of partitions for each slice

Slice:

name: multinode name + slice id inputs: partitioned inputs, slicer json, configurator data outputs: subset of the partitioned outputs

Synchronization:

name: multinode name + synchronization inputs: subset of the partitioned outputs outputs: partitioned outputs without data (synchronization only)

Parameters:
  • func (Callable[[Args[Any]], Union[str, List[Any]]]) – Function executed by each of the n-nodes. It takes n positional arguments, being the first of them one partition from the partitioned_input

  • partitioned_input (Union[str, List[str]]) – Name of the PartitionedDataSet used as input. If a list is provided, it will work like a zip(partitions_a, …, partitions_n)

  • partitioned_output (Union[str, List[str]]) – Name of the PartitionedDataSet used as output by func nodes

  • configurator (str, optional) – An input name of a partitioned parameter dict. e.g. ‘params:config’

  • name (str, optional) – Name prefix for the multiple nodes, and the name of the post node. Defaults to None.

  • other_inputs (List[str], optional) – Name of other inputs for func. Defaults to [].

  • tags (Union[str, Iterable[str]], optional) – List of tags for all nodes generated by this function. Defaults to None.

  • confirms (Union[str, List[str]], optional) – List of DataSets that should be confirmed before the execution of the nodes. Defaults to None.

  • namespace (str, optional) – Namespace the nodes belong to. Defaults to None.

  • n_slices (int) – Number of multinodes to build. Defaults to MAX_WORKERS + MAX_NODES

  • filter (IsFunction[str], optional) – Function to filter input partitions

Return type:

Pipeline

Returns:

Pipeline

Example

>>> sortnodes = lambda pipe: sorted(pipe.nodes, key=lambda n: n.name)
>>> sortnodes(multinode(
...     func=max,
...     partitioned_input='a',
...     partitioned_output='b',
...     other_inputs=['d'],
...     n_slices=2,
...     name='x',)) 
[Node(nonefy, ['a'], 'b-slicer', 'x'),
 Node(max, ['b-slicer', 'a', 'd'], ['b-slice-0'], 'x-slice-0'),
 Node(max, ['b-slicer', 'a', 'd'], ['b-slice-1'], 'x-slice-1'),
 Node(nonefy, ['b-slice-0', 'b-slice-1'], ['b'], 'x-synchronization')]

Accepts multiple inputs (works like zip(*partitioneds)):

>>> sortnodes(multinode(
...     func=max,
...     partitioned_input=['a', 'b'],
...     partitioned_output='c',
...     n_slices=2,
...     other_inputs=['d'],
...     name='x')) 
[Node(nonefy, ['a', 'b'], 'c-slicer', 'x'),
 Node(max, ['c-slicer', 'a', 'b', 'd'], ['c-slice-0'], 'x-slice-0'),
 Node(max, ['c-slicer', 'a', 'b', 'd'], ['c-slice-1'], 'x-slice-1'),
 Node(nonefy, ['c-slice-0', 'c-slice-1'], ['c'], 'x-synchronization')]

Accepts multiple outputs:

>>> sortnodes(multinode(
...     func=max,
...     partitioned_input='a',
...     partitioned_output=['b', 'c'],
...     n_slices=2,
...     other_inputs=['d'],
...     name='x')) 
[Node(nonefy, ['a'], 'b-slicer', 'x'),
 Node(max, ['b-slicer', 'a', 'd'], ['b-slice-0', 'c-slice-0'],            'x-slice-0'),
 Node(max, ['b-slicer', 'a', 'd'], ['b-slice-1', 'c-slice-1'],            'x-slice-1'),
 Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'],            ['b', 'c'], 'x-synchronization')]

Tags and namespaces are allowed:

>>> mn = multinode(
...     func=max,
...     partitioned_input='a',
...     partitioned_output=['b', 'c'],
...     name='x',
...     n_slices=2,
...     tags=['test_tag'],
...     namespace='namespace',)
>>> sortnodes(mn) 
[Node(nonefy, ['a'], 'b-slicer', 'x'),
 Node(max, ['b-slicer', 'a'], ['b-slice-0', 'c-slice-0'], 'x-slice-0'),
 Node(max, ['b-slicer', 'a'], ['b-slice-1', 'c-slice-1'], 'x-slice-1'),
 Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'],            ['b', 'c'], 'x-synchronization')]
>>> all([n.tags == {'x', 'test_tag'} for n in mn.nodes])
True
>>> all([n.namespace == 'namespace' for n in mn.nodes])
True

Configurators

A configurator is a dict parameter declared in parameters yamls that contains two sections: ‘template’ and ‘configurators’. The ‘template’ section is a dict that contains the partitions subpath pattern, and its configurations. The ‘configurators’ section is a list of configurators. Each configurator is a dict that contains a ‘target’ list specifying replacements for the pattern, and a data entry that is going to be inputted to the multinode.

config:
  template:
    pattern: 'a-part-{part}'

  #    optional, overwrites '.*' as the regex when a
  # v  target is set to '*'
  # any:
  #  part: '(a|b|c|d)'

  #    optional, specifies priority of each target if left
  # v  to right order is not correct
  # hierarchy:
  #  - part

  configurators:
    -
       target: # replaces pattern's {} from left to right order
         -
           - a
           - b
       # or a regex alternate syntax
       # - a|b
       cached: true # will not run
       data:
         setting_a: 'foo'
         setting_b: 2
    -
       target:
         - c
       data:
         setting_a: 'zzz'
         setting_b: 4
    -
       target:
         - '*'
       data:
         setting_a: 'bar'
         setting_b: 1

In the example above, target [‘a’, ‘b’] will be the configurator of the partition ‘a-part-a’ and ‘a-part-b’, the configurator with target ‘c’ will be the configurator of the partition ‘a-part-c’, and the configurator with target ‘*’ will be the configurator of all other partitions.

Example

>>> mn = multinode(
...     func=max,
...     partitioned_input='a',
...     partitioned_output=['b', 'c'],
...     name='x',
...     n_slices=2,
...     tags=['test_tag'],
...     namespace='namespace',
...     configurator='params:config')
>>> sortnodes(mn) 
[Node(nonefy, ['a', 'params:config'], 'b-slicer', 'x'),
 Node(max, ['b-slicer', 'a', 'params:config'],            ['b-slice-0', 'c-slice-0'], 'x-slice-0'),
 Node(max, ['b-slicer', 'a', 'params:config'],            ['b-slice-1', 'c-slice-1'], 'x-slice-1'),
 Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'],            ['b', 'c'], 'x-synchronization')]

Note

the multinode name is also added as a tag into all nodes in order to allow running the multinode with kedro run –tag.

Warning

every function must me declared considering partitioned inputs are the first arguments of the function, the configurator (if present) is the following argument, and the rest of the arguments are other outputs.

Warning

The configurator syntax prioritizes more specific targets rather than generalist ones. However, if you have ambiguity between configurators, a wrong configurator may be used e.g. if you have a configurator with target [‘a’, ‘b’], and another with target [‘a’], the first match will be used i.e. order is random or list instance order driven.

Multipipeline#

kedro_partitioned.pipeline.multipipeline(pipe, partitioned_input, name, configurator=None, tags=None, confirms=None, namespace=None, n_slices=2, max_simultaneous_steps=None, filter=<function truthify>)[source]#

Creates multiple pipelines to process partitioned data.

Multipipelines are the same as multinode, but instead of adding a synhcronization node for each step, it creates small pipelines that work like a multinode, with a synchronization only at the end of the pipeline. This enables to process data in parallel, but without the need of waiting for a multinode layer to finish its work.

See also

kedro_partitioned.multinode.multinode()

Parameters:
  • pipe (Pipeline) – Pipeline to be parallelized by multiple nodes.

  • partitioned_input (Union[str, List[str]]) – Name of the PartitionedDataSet used as input. If a list is provided, it will work like a zip(partitions_a, …, partitions_n)

  • configurator (str, optional) – Name of partitioned parameters used as input. e.g. ‘param:configurators’

  • name (str, optional) – Name prefix for the multiple nodes, and the name of the post node. Defaults to None.

  • tags (Union[str, Iterable[str]], optional) – List of tags for all nodes generated by this function. Defaults to None.

  • confirms (Union[str, List[str]], optional) – List of DataSets that should be confirmed before the execution of the nodes. Defaults to None.

  • namespace (str, optional) – Namespace the nodes belong to. Defaults to None.

  • n_slices (int) – Number of multinodes to build. Defaults to MAX_WORKERS + MAX_NODES

  • max_simultaneous_steps (int) – Maximum number of slices created for each branch. Defaults to None.

  • filter (IsFunction[str]) – A function applied to each partition of the partitioned inputs. If the function returns False, the parttition won’t be used.

Return type:

Pipeline

Returns:

Pipeline

Example

>>> sortnodes = lambda pipe: sorted(pipe.nodes, key=lambda n: n.name)
>>> funcpipe = Pipeline([node(min, ['a', 'b'], 'c', name='abc'),
...                      node(max, ['c', 'd'], ['e'], name='def')])
>>> sortnodes(multipipeline(
...     funcpipe,
...     ['a'],
...     'x',
...     n_slices=2)) 
[Node(min, ['c-slicer', 'a', 'b'], ['c-slice-0'], 'abc-slice-0'),
 Node(min, ['c-slicer', 'a', 'b'], ['c-slice-1'], 'abc-slice-1'),
 Node(max, ['c-slicer', 'c-slice-0', 'd'], ['e-slice-0'],            'def-slice-0'),
 Node(max, ['c-slicer', 'c-slice-1', 'd'], ['e-slice-1'],            'def-slice-1'),
 Node(nonefy, ['a'], 'c-slicer', 'x'),
 Node(nonefy, ['e-slice-0', 'e-slice-1'], ['c', 'e'],            'x-synchronization')]

Max Simultaneous Steps:

This configuration defines the maximum number of steps per branch. Check the example bellow for more details:

max_simultaneous_steps = None
n_slices = 2
func = pipe([A->B, B->C, [C, D]->E])
                B->D
output = pipe(A->B0, B0->C0, [C0, D0] -> E0)
                A->B1  B1->C1  [C1, D1] -> D1
                        B0->D0
                        B1->D1

max_simultaneous_steps = 2
output: pipe(A->B0, B0->C0, [C0, D0] -> E0)
                    B0->D0

Warning

every function must me declared considering partitioned inputs are the first arguments of the function, the configurator (if present) is the following argument, and the rest of the arguments are other outputs.

Warning

The configurator syntax prioritizes more specific targets rather than generalist ones. However, if you have ambiguity between configurators, a wrong configurator may be used e.g. if you have a configurator with target [‘a’, ‘b’], and another with target [‘a’], the first match will be used i.e. order is random or list instance order driven.

Note

Prefer using the multipipeline over the multinode, since it decreases the IO cost and it is more readable.