Parallelism#
Step Parallelism#
Step Parallelism is a feature that allows the user to replicate a step into multiple nodes, each one processing a batch data. This is useful when the step is computationally expensive and the user wants to speed up the execution time, without having to worry about load balancing.
Why Step Parallelism?#
Because Kedro may be used in multiple machines, i.e. clusters, local machines, etc…it may be hard to find a solution that can scale both horizontally and vertically with the same code. In order to solve this problem, we developed a solution that passes the responsibility of load balancing to the current runner. For example, if you are using our step parallel solution with the SequentialRunner
, Kedro will run it sequentially, but if you are using the ParallelRunner
, Kedro will run it in parallel without using a single line of multiprocessing or multithreading code. The same applies to the Kedro to cloud converters, like kedro-azureml
.
How Step Parallelism works?#
There three main layers in the step parallelism solution:
Input Layer: Responsible for slicing the input data into a JSON that specifies what partitions will be processed by each node.
Compute Layer: Responsible for processing each partition and saving it in a copy of the output (Since no nodes are going to process the same partition, the nodes won’t incur into race condition).
Barrier Layer: Responsible for blocking the Kedro pipeline nodes that depend on the output of the step parallelism until all the compute nodes have finished processing the data (it doesn’t perform any updates in the partitioned output).
How to use it?#
In order to use the step parallelism feature, you need to create your nodes using the multinode
or the multipipeline
functions. These functions return a pipeline with the structure stated above. The multinode
function is used when you want to replicate a node into multiple nodes, while the multipipeline
function is used when you want to replicate a pipeline into multiple pipelines, i.e. slices are replaced by a copy of the pipeline.
See the function docstrings below for more usage examples:
Multinode#
- kedro_partitioned.pipeline.multinode(func, partitioned_input, partitioned_output, name, configurator=None, other_inputs=[], tags=None, confirms=None, namespace=None, n_slices=2, filter=<function truthify>)[source]#
Creates multiple nodes to process partitioned data.
Multinodes are a way to implement step level parallelism. It is useful for processing independent data in parallel, managed by pipeline runners. For example, in Kedro, running the pipeline with ParallelRunner would enable the steps generated by multinode to be run using multiple cpus. At the same time, if you run this pipeline in a distributed context, you could rely on a “DistributedRunner” or in another pipeline manager like AzureML or Kubeflow, without having to change the code.
- Multinodes work like the following flowchart:
+--------------+ | Configurator | | parameter |-+ +--------------+ | | | v | +-------------+ +->+------------+ +--------------------+ | Slicer Node |--+->| Slice-0 |--+->| Synchronization | | my | +->| my-slice-0 | | | my-synchronization | +-------------+ | +------------+ | +--------------------+ ^ | ... | | | +->+------------+ | v +-------------+ +->| Slice-1 |--| +-------------+ | Partitioned |--+->| my-slice-1 | | | Partitioned | | input-ds | | +------------+ | | output-ds | +-------------+ +->+------------+ | +-------------+ Start +->| Slice-n |--+ End +->| my-slice-n | +------------+
- Nodes specification:
- Slicer Node:
name: multinode name inputs: partitioned inputs and configurator cached flags outputs: json with a list of partitions for each slice
- Slice:
name: multinode name + slice id inputs: partitioned inputs, slicer json, configurator data outputs: subset of the partitioned outputs
- Synchronization:
name: multinode name + synchronization inputs: subset of the partitioned outputs outputs: partitioned outputs without data (synchronization only)
- Parameters:
func (Callable[[Args[Any]], Union[str, List[Any]]]) – Function executed by each of the n-nodes. It takes n positional arguments, being the first of them one partition from the partitioned_input
partitioned_input (Union[str, List[str]]) – Name of the PartitionedDataSet used as input. If a list is provided, it will work like a zip(partitions_a, …, partitions_n)
partitioned_output (Union[str, List[str]]) – Name of the PartitionedDataSet used as output by func nodes
configurator (str, optional) – An input name of a partitioned parameter dict. e.g. ‘params:config’
name (str, optional) – Name prefix for the multiple nodes, and the name of the post node. Defaults to None.
other_inputs (List[str], optional) – Name of other inputs for func. Defaults to [].
tags (Union[str, Iterable[str]], optional) – List of tags for all nodes generated by this function. Defaults to None.
confirms (Union[str, List[str]], optional) – List of DataSets that should be confirmed before the execution of the nodes. Defaults to None.
namespace (str, optional) – Namespace the nodes belong to. Defaults to None.
n_slices (int) – Number of multinodes to build. Defaults to MAX_WORKERS + MAX_NODES
filter (IsFunction[str], optional) – Function to filter input partitions
- Return type:
Pipeline
- Returns:
Pipeline
Example
>>> sortnodes = lambda pipe: sorted(pipe.nodes, key=lambda n: n.name) >>> sortnodes(multinode( ... func=max, ... partitioned_input='a', ... partitioned_output='b', ... other_inputs=['d'], ... n_slices=2, ... name='x',)) [Node(nonefy, ['a'], 'b-slicer', 'x'), Node(max, ['b-slicer', 'a', 'd'], ['b-slice-0'], 'x-slice-0'), Node(max, ['b-slicer', 'a', 'd'], ['b-slice-1'], 'x-slice-1'), Node(nonefy, ['b-slice-0', 'b-slice-1'], ['b'], 'x-synchronization')]
Accepts multiple inputs (works like zip(*partitioneds)):
>>> sortnodes(multinode( ... func=max, ... partitioned_input=['a', 'b'], ... partitioned_output='c', ... n_slices=2, ... other_inputs=['d'], ... name='x')) [Node(nonefy, ['a', 'b'], 'c-slicer', 'x'), Node(max, ['c-slicer', 'a', 'b', 'd'], ['c-slice-0'], 'x-slice-0'), Node(max, ['c-slicer', 'a', 'b', 'd'], ['c-slice-1'], 'x-slice-1'), Node(nonefy, ['c-slice-0', 'c-slice-1'], ['c'], 'x-synchronization')]
Accepts multiple outputs:
>>> sortnodes(multinode( ... func=max, ... partitioned_input='a', ... partitioned_output=['b', 'c'], ... n_slices=2, ... other_inputs=['d'], ... name='x')) [Node(nonefy, ['a'], 'b-slicer', 'x'), Node(max, ['b-slicer', 'a', 'd'], ['b-slice-0', 'c-slice-0'], 'x-slice-0'), Node(max, ['b-slicer', 'a', 'd'], ['b-slice-1', 'c-slice-1'], 'x-slice-1'), Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'], ['b', 'c'], 'x-synchronization')]
Tags and namespaces are allowed:
>>> mn = multinode( ... func=max, ... partitioned_input='a', ... partitioned_output=['b', 'c'], ... name='x', ... n_slices=2, ... tags=['test_tag'], ... namespace='namespace',) >>> sortnodes(mn) [Node(nonefy, ['a'], 'b-slicer', 'x'), Node(max, ['b-slicer', 'a'], ['b-slice-0', 'c-slice-0'], 'x-slice-0'), Node(max, ['b-slicer', 'a'], ['b-slice-1', 'c-slice-1'], 'x-slice-1'), Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'], ['b', 'c'], 'x-synchronization')] >>> all([n.tags == {'x', 'test_tag'} for n in mn.nodes]) True >>> all([n.namespace == 'namespace' for n in mn.nodes]) True
Configurators
A configurator is a dict parameter declared in parameters yamls that contains two sections: ‘template’ and ‘configurators’. The ‘template’ section is a dict that contains the partitions subpath pattern, and its configurations. The ‘configurators’ section is a list of configurators. Each configurator is a dict that contains a ‘target’ list specifying replacements for the pattern, and a data entry that is going to be inputted to the multinode.
config: template: pattern: 'a-part-{part}' # optional, overwrites '.*' as the regex when a # v target is set to '*' # any: # part: '(a|b|c|d)' # optional, specifies priority of each target if left # v to right order is not correct # hierarchy: # - part configurators: - target: # replaces pattern's {} from left to right order - - a - b # or a regex alternate syntax # - a|b cached: true # will not run data: setting_a: 'foo' setting_b: 2 - target: - c data: setting_a: 'zzz' setting_b: 4 - target: - '*' data: setting_a: 'bar' setting_b: 1
In the example above, target [‘a’, ‘b’] will be the configurator of the partition ‘a-part-a’ and ‘a-part-b’, the configurator with target ‘c’ will be the configurator of the partition ‘a-part-c’, and the configurator with target ‘*’ will be the configurator of all other partitions.
Example
>>> mn = multinode( ... func=max, ... partitioned_input='a', ... partitioned_output=['b', 'c'], ... name='x', ... n_slices=2, ... tags=['test_tag'], ... namespace='namespace', ... configurator='params:config') >>> sortnodes(mn) [Node(nonefy, ['a', 'params:config'], 'b-slicer', 'x'), Node(max, ['b-slicer', 'a', 'params:config'], ['b-slice-0', 'c-slice-0'], 'x-slice-0'), Node(max, ['b-slicer', 'a', 'params:config'], ['b-slice-1', 'c-slice-1'], 'x-slice-1'), Node(nonefy, ['b-slice-0', 'c-slice-0', 'b-slice-1', 'c-slice-1'], ['b', 'c'], 'x-synchronization')]
Note
the multinode name is also added as a tag into all nodes in order to allow running the multinode with kedro run –tag.
Warning
every function must me declared considering partitioned inputs are the first arguments of the function, the configurator (if present) is the following argument, and the rest of the arguments are other outputs.
Warning
The configurator syntax prioritizes more specific targets rather than generalist ones. However, if you have ambiguity between configurators, a wrong configurator may be used e.g. if you have a configurator with target [‘a’, ‘b’], and another with target [‘a’], the first match will be used i.e. order is random or list instance order driven.
Multipipeline#
- kedro_partitioned.pipeline.multipipeline(pipe, partitioned_input, name, configurator=None, tags=None, confirms=None, namespace=None, n_slices=2, max_simultaneous_steps=None, filter=<function truthify>)[source]#
Creates multiple pipelines to process partitioned data.
Multipipelines are the same as multinode, but instead of adding a synhcronization node for each step, it creates small pipelines that work like a multinode, with a synchronization only at the end of the pipeline. This enables to process data in parallel, but without the need of waiting for a multinode layer to finish its work.
See also
kedro_partitioned.multinode.multinode()
- Parameters:
pipe (Pipeline) – Pipeline to be parallelized by multiple nodes.
partitioned_input (Union[str, List[str]]) – Name of the PartitionedDataSet used as input. If a list is provided, it will work like a zip(partitions_a, …, partitions_n)
configurator (str, optional) – Name of partitioned parameters used as input. e.g. ‘param:configurators’
name (str, optional) – Name prefix for the multiple nodes, and the name of the post node. Defaults to None.
tags (Union[str, Iterable[str]], optional) – List of tags for all nodes generated by this function. Defaults to None.
confirms (Union[str, List[str]], optional) – List of DataSets that should be confirmed before the execution of the nodes. Defaults to None.
namespace (str, optional) – Namespace the nodes belong to. Defaults to None.
n_slices (int) – Number of multinodes to build. Defaults to MAX_WORKERS + MAX_NODES
max_simultaneous_steps (int) – Maximum number of slices created for each branch. Defaults to None.
filter (IsFunction[str]) – A function applied to each partition of the partitioned inputs. If the function returns False, the parttition won’t be used.
- Return type:
Pipeline
- Returns:
Pipeline
Example
>>> sortnodes = lambda pipe: sorted(pipe.nodes, key=lambda n: n.name) >>> funcpipe = Pipeline([node(min, ['a', 'b'], 'c', name='abc'), ... node(max, ['c', 'd'], ['e'], name='def')]) >>> sortnodes(multipipeline( ... funcpipe, ... ['a'], ... 'x', ... n_slices=2)) [Node(min, ['c-slicer', 'a', 'b'], ['c-slice-0'], 'abc-slice-0'), Node(min, ['c-slicer', 'a', 'b'], ['c-slice-1'], 'abc-slice-1'), Node(max, ['c-slicer', 'c-slice-0', 'd'], ['e-slice-0'], 'def-slice-0'), Node(max, ['c-slicer', 'c-slice-1', 'd'], ['e-slice-1'], 'def-slice-1'), Node(nonefy, ['a'], 'c-slicer', 'x'), Node(nonefy, ['e-slice-0', 'e-slice-1'], ['c', 'e'], 'x-synchronization')]
Max Simultaneous Steps:
This configuration defines the maximum number of steps per branch. Check the example bellow for more details:
max_simultaneous_steps = None n_slices = 2 func = pipe([A->B, B->C, [C, D]->E]) B->D output = pipe(A->B0, B0->C0, [C0, D0] -> E0) A->B1 B1->C1 [C1, D1] -> D1 B0->D0 B1->D1 max_simultaneous_steps = 2 output: pipe(A->B0, B0->C0, [C0, D0] -> E0) B0->D0
Warning
every function must me declared considering partitioned inputs are the first arguments of the function, the configurator (if present) is the following argument, and the rest of the arguments are other outputs.
Warning
The configurator syntax prioritizes more specific targets rather than generalist ones. However, if you have ambiguity between configurators, a wrong configurator may be used e.g. if you have a configurator with target [‘a’, ‘b’], and another with target [‘a’], the first match will be used i.e. order is random or list instance order driven.
Note
Prefer using the multipipeline
over the multinode
, since it decreases the IO cost and it is more readable.