JoinNode, synchronize and itersource

JoinNode has the opposite effect of iterables. Where iterables split up the execution workflow into many different branches, a JoinNode merges them back into on node. A JoinNode generalizes MapNode to operate in conjunction with an upstream iterable node to reassemble downstream results, e.g.:

Simple example

Let's consider the very simple example depicted at the top of this page:

from nipype import Node, JoinNode, Workflow

# Specify fake input node A
a = Node(interface=A(), name="a")

# Iterate over fake node B's input 'in_file?
b = Node(interface=B(), name="b")
b.iterables = ('in_file', [file1, file2])

# Pass results on to fake node C
c = Node(interface=C(), name="c")

# Join forked execution workflow in fake node D
d = JoinNode(interface=D(),
             joinsource="b",
             joinfield="in_files",
             name="d")

# Put everything into a workflow as usual
workflow = Workflow(name="workflow")
workflow.connect([(a, b, [('subject', 'subject')]),
                  (b, c, [('out_file', 'in_file')])
                  (c, d, [('out_file', 'in_files')])
                  ])

As you can see, setting up a JoinNode is rather simple. The only difference to a normal Node is the joinsource and the joinfield. joinsource specifies from which node the information to join is coming and the joinfield specifies the input field of the JoinNode where the information to join will be entering the node.

This example assumes that interface A has one output subject, interface B has two inputs subject and in_file and one output out_file, interface C has one input in_file and one output out_file, and interface D has one list input in_files. The images variable is a list of three input image file names.

As with iterables and the MapNode iterfield, the joinfield can be a list of fields. Thus, the declaration in the previous example is equivalent to the following:

d = JoinNode(interface=D(),
             joinsource="b",
             joinfield=["in_files"],
             name="d")

The joinfield defaults to all of the JoinNode input fields, so the declaration is also equivalent to the following:

d = JoinNode(interface=D(),
             joinsource="b",
             name="d")

In this example, the node C out_file outputs are collected into the JoinNode D in_files input list. The in_files order is the same as the upstream B node iterables order.

The JoinNode input can be filtered for unique values by specifying the unique flag, e.g.:

d = JoinNode(interface=D(),
             joinsource="b",
             unique=True,
             name="d")

synchronize

The Node iterables parameter can be be a single field or a list of fields. If it is a list, then execution is performed over all permutations of the list items. For example:

b.iterables = [("m", [1, 2]), ("n", [3, 4])]

results in the execution graph:

where B13 has inputs m = 1, n = 3, B14 has inputs m = 1, n = 4, etc.

The synchronize parameter synchronizes the iterables lists, e.g.:

b.iterables = [("m", [1, 2]), ("n", [3, 4])]
b.synchronize = True

results in the execution graph:

where the iterable inputs are selected in lock-step by index, i.e.:

(*m*, *n*) = (1, 3) and (2, 4)

for B13 and B24, resp.

itersource

The itersource feature allows you to expand a downstream iterable based on a mapping of an upstream iterable. For example:

a = Node(interface=A(), name="a")
b = Node(interface=B(), name="b")
b.iterables = ("m", [1, 2])
c = Node(interface=C(), name="c")
d = Node(interface=D(), name="d")
d.itersource = ("b", "m")
d.iterables = [("n", {1:[3,4], 2:[5,6]})]
my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_file','in_file')]),
                     (b,c,[('out_file','in_file')])
                     (c,d,[('out_file','in_file')])
                     ])

results in the execution graph:

In this example, all interfaces have input in_file and output out_file. In addition, interface B has input m and interface D has input n. A Python dictionary associates the B node input value with the downstream D node n iterable values.

This example can be extended with a summary JoinNode:

e = JoinNode(interface=E(), joinsource="d",
             joinfield="in_files", name="e")
my_workflow.connect(d, 'out_file',
                    e, 'in_files')

resulting in the graph:

The combination of iterables, MapNode, JoinNode, synchronize and itersource enables the creation of arbitrarily complex workflow graphs. The astute workflow builder will recognize that this flexibility is both a blessing and a curse. These advanced features are handy additions to the Nipype toolkit when used judiciously.

More realistic JoinNode example

Let's consider another example where we have one node that iterates over 3 different numbers and generates random numbers. Another node joins those three different numbers (each coming from a separate branch of the workflow) into one list. To make the whole thing a bit more realistic, the second node will use the Function interface to do something with those numbers, before we spit them out again.

In [ ]:
from nipype import JoinNode, Node, Workflow
from nipype.interfaces.utility import Function, IdentityInterface
In [ ]:
def get_data_from_id(id):
    """Generate a random number based on id"""
    import numpy as np
    return id + np.random.rand()

def merge_and_scale_data(data2):
    """Scale the input list by 1000"""
    import numpy as np
    return (np.array(data2) * 1000).tolist()


node1 = Node(Function(input_names=['id'],
                      output_names=['data1'],
                      function=get_data_from_id),
             name='get_data')
node1.iterables = ('id', [1, 2, 3])

node2 = JoinNode(Function(input_names=['data2'],
                          output_names=['data_scaled'],
                          function=merge_and_scale_data),
                 name='scale_data',
                 joinsource=node1,
                 joinfield=['data2'])
In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
eg = wf.run()
180514-09:36:12,614 workflow INFO:
	 Workflow testjoin settings: ['check', 'execution', 'logging', 'monitoring']
180514-09:36:12,622 workflow INFO:
	 Running serially.
180514-09:36:12,623 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmpallwswjs/testjoin/_id_3/get_data".
180514-09:36:12,626 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,633 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,634 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmplyw137sg/testjoin/_id_2/get_data".
180514-09:36:12,639 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,644 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,645 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmpjrq52bq4/testjoin/_id_1/get_data".
180514-09:36:12,650 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,655 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,657 workflow INFO:
	 [Node] Setting-up "testjoin.scale_data" in "/tmp/tmpa4vcpwpf/testjoin/scale_data".
180514-09:36:12,662 workflow INFO:
	 [Node] Running "scale_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,667 workflow INFO:
	 [Node] Finished "testjoin.scale_data".
In [ ]:
wf.write_graph(graph2use='exec')
from IPython.display import Image
Image(filename='graph_detailed.png')
180514-09:36:12,850 workflow INFO:
	 Generated workflow graph: /home/neuro/nipype_tutorial/notebooks/graph.png (graph2use=exec, simple_form=True).
Out[ ]:

Now, let's look at the input and output of the joinnode:

In [ ]:
res = [node for node in eg.nodes() if 'scale_data' in node.name][0].result
res.outputs
Out[ ]:
data_scaled = [1150.7970627481727, 2711.321904675856, 3786.9480560460365]
In [ ]:
res.inputs
Out[ ]:
{'function_str': 'def merge_and_scale_data(data2):\n    """Scale the input list by 1000"""\n    import numpy as np\n    return (np.array(data2) * 1000).tolist()\n',
 'ignore_exception': False,
 'data2': [1.1507970627481727, 2.711321904675856, 3.7869480560460365]}

Extending to multiple nodes

We extend the workflow by using three nodes. Note that even this workflow, the joinsource corresponds to the node containing iterables and the joinfield corresponds to the input port of the JoinNode that aggregates the iterable branches. As before the graph below shows how the execution process is set up.

In [ ]:
def get_data_from_id(id):
    import numpy as np
    return id + np.random.rand()

def scale_data(data2):
    import numpy as np
    return data2

def replicate(data3, nreps=2):
    return data3 * nreps

node1 = Node(Function(input_names=['id'],
                      output_names=['data1'],
                      function=get_data_from_id),
             name='get_data')
node1.iterables = ('id', [1, 2, 3])

node2 = Node(Function(input_names=['data2'],
                      output_names=['data_scaled'],
                      function=scale_data),
             name='scale_data')

node3 = JoinNode(Function(input_names=['data3'],
                          output_names=['data_repeated'],
                          function=replicate),
                 name='replicate_data',
                 joinsource=node1,
                 joinfield=['data3'])
In [ ]:
wf = Workflow(name='testjoin')
wf.connect(node1, 'data1', node2, 'data2')
wf.connect(node2, 'data_scaled', node3, 'data3')
eg = wf.run()
180514-09:36:12,912 workflow INFO:
	 Workflow testjoin settings: ['check', 'execution', 'logging', 'monitoring']
180514-09:36:12,923 workflow INFO:
	 Running serially.
180514-09:36:12,924 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmpurv1ab1s/testjoin/_id_3/get_data".
180514-09:36:12,929 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,934 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,935 workflow INFO:
	 [Node] Setting-up "testjoin.scale_data" in "/tmp/tmphdccd_nu/testjoin/_id_3/scale_data".
180514-09:36:12,940 workflow INFO:
	 [Node] Running "scale_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,946 workflow INFO:
	 [Node] Finished "testjoin.scale_data".
180514-09:36:12,947 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmph559889y/testjoin/_id_2/get_data".
180514-09:36:12,951 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,956 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,957 workflow INFO:
	 [Node] Setting-up "testjoin.scale_data" in "/tmp/tmpidkaq4t9/testjoin/_id_2/scale_data".
180514-09:36:12,961 workflow INFO:
	 [Node] Running "scale_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,965 workflow INFO:
	 [Node] Finished "testjoin.scale_data".
180514-09:36:12,966 workflow INFO:
	 [Node] Setting-up "testjoin.get_data" in "/tmp/tmpmss2apc1/testjoin/_id_1/get_data".
180514-09:36:12,971 workflow INFO:
	 [Node] Running "get_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,977 workflow INFO:
	 [Node] Finished "testjoin.get_data".
180514-09:36:12,978 workflow INFO:
	 [Node] Setting-up "testjoin.scale_data" in "/tmp/tmp_yafi4jn/testjoin/_id_1/scale_data".
180514-09:36:12,982 workflow INFO:
	 [Node] Running "scale_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,986 workflow INFO:
	 [Node] Finished "testjoin.scale_data".
180514-09:36:12,987 workflow INFO:
	 [Node] Setting-up "testjoin.replicate_data" in "/tmp/tmped1rvpme/testjoin/replicate_data".
180514-09:36:12,993 workflow INFO:
	 [Node] Running "replicate_data" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:12,998 workflow INFO:
	 [Node] Finished "testjoin.replicate_data".
In [ ]:
wf.write_graph(graph2use='exec')
Image(filename='graph_detailed.png')
180514-09:36:13,168 workflow INFO:
	 Generated workflow graph: /home/neuro/nipype_tutorial/notebooks/graph.png (graph2use=exec, simple_form=True).
Out[ ]:

Exercise 1

You have list of DOB of the subjects in a few various format : ["10 February 1984", "March 5 1990", "April 2 1782", "June 6, 1988", "12 May 1992"], and you want to sort the list.

You can use Node with iterables to extract day, month and year, and use datetime.datetime to unify the format that can be compared, and JoinNode to sort the list.

In [ ]:
# write your solution here
In [ ]:
# the list of all DOB
dob_subjects = ["10 February 1984", "March 5 1990", "April 2 1782", "June 6, 1988", "12 May 1992"]
In [ ]:
# let's start from creating Node with iterable to split all strings from the list
from nipype import Node, JoinNode, Function, Workflow

def split_dob(dob_string):
    return dob_string.split()

split_node = Node(Function(input_names=["dob_string"],
                              output_names=["split_list"],
                              function=split_dob),
                    name="splitting")

#split_node.inputs.dob_string = "10 February 1984"
split_node.iterables = ("dob_string", dob_subjects)
In [ ]:
# and now let's work on the date format more, independently for every element

# sometimes the second element has an extra "," that we should remove
def remove_comma(str_list):
    str_list[1] = str_list[1].replace(",", "")
    return str_list

cleaning_node = Node(Function(input_names=["str_list"],
                              output_names=["str_list_clean"],
                              function=remove_comma),
                    name="cleaning")


# now we can extract year, month, day from our list and create ``datetime.datetim`` object
def datetime_format(date_list):
    import datetime
    # year is always the last
    year = int(date_list[2])
    #day and month can be in the first or second position
    # we can use datetime.datetime.strptime to convert name of the month to integer
    try:
        day = int(date_list[0])
        month = datetime.datetime.strptime(date_list[1], "%B").month
    except(ValueError):
        day = int(date_list[1])
        month = datetime.datetime.strptime(date_list[0], "%B").month
    # and create datetime.datetime format
    return datetime.datetime(year, month, day)


datetime_node = Node(Function(input_names=["date_list"],
                                  output_names=["datetime"],
                              function=datetime_format),
                    name="datetime")

In [ ]:
# now we are ready to create JoinNode and sort the list of DOB

def sorting_dob(datetime_list):
    datetime_list.sort()
    return datetime_list

sorting_node = JoinNode(Function(input_names=["datetime_list"],
                              output_names=["dob_sorted"],
                              function=sorting_dob),
                    joinsource=split_node, # this is the node that used iterables for x
                    joinfield=['datetime_list'],
                    name="sorting")
In [ ]:
# and we're ready to create workflow

ex1_wf = Workflow(name="sorting_dob")
ex1_wf.connect(split_node, "split_list", cleaning_node, "str_list")
ex1_wf.connect(cleaning_node, "str_list_clean", datetime_node, "date_list")
ex1_wf.connect(datetime_node, "datetime", sorting_node, "datetime_list")
In [ ]:
# you can check the graph
from IPython.display import Image
ex1_wf.write_graph(graph2use='exec')
Image(filename='graph_detailed.png')
180514-09:36:13,444 workflow INFO:
	 Generated workflow graph: /home/neuro/nipype_tutorial/notebooks/graph.png (graph2use=exec, simple_form=True).
Out[ ]:
In [ ]:
# and run the workflow
ex1_res = ex1_wf.run()
180514-09:36:13,458 workflow INFO:
	 Workflow sorting_dob settings: ['check', 'execution', 'logging', 'monitoring']
180514-09:36:13,476 workflow INFO:
	 Running serially.
180514-09:36:13,477 workflow INFO:
	 [Node] Setting-up "sorting_dob.splitting" in "/tmp/tmpezj8dmgv/sorting_dob/_dob_string_12May1992/splitting".
180514-09:36:13,481 workflow INFO:
	 [Node] Running "splitting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,487 workflow INFO:
	 [Node] Finished "sorting_dob.splitting".
180514-09:36:13,488 workflow INFO:
	 [Node] Setting-up "sorting_dob.cleaning" in "/tmp/tmphh102ffs/sorting_dob/_dob_string_12May1992/cleaning".
180514-09:36:13,492 workflow INFO:
	 [Node] Running "cleaning" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,496 workflow INFO:
	 [Node] Finished "sorting_dob.cleaning".
180514-09:36:13,497 workflow INFO:
	 [Node] Setting-up "sorting_dob.datetime" in "/tmp/tmp7p1jfwkp/sorting_dob/_dob_string_12May1992/datetime".
180514-09:36:13,502 workflow INFO:
	 [Node] Running "datetime" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,510 workflow INFO:
	 [Node] Finished "sorting_dob.datetime".
180514-09:36:13,511 workflow INFO:
	 [Node] Setting-up "sorting_dob.splitting" in "/tmp/tmp9tl3l645/sorting_dob/_dob_string_June6.1988/splitting".
180514-09:36:13,515 workflow INFO:
	 [Node] Running "splitting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,522 workflow INFO:
	 [Node] Finished "sorting_dob.splitting".
180514-09:36:13,523 workflow INFO:
	 [Node] Setting-up "sorting_dob.cleaning" in "/tmp/tmp2m_1nki_/sorting_dob/_dob_string_June6.1988/cleaning".
180514-09:36:13,528 workflow INFO:
	 [Node] Running "cleaning" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,533 workflow INFO:
	 [Node] Finished "sorting_dob.cleaning".
180514-09:36:13,534 workflow INFO:
	 [Node] Setting-up "sorting_dob.datetime" in "/tmp/tmpa3mt80s3/sorting_dob/_dob_string_June6.1988/datetime".
180514-09:36:13,539 workflow INFO:
	 [Node] Running "datetime" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,545 workflow INFO:
	 [Node] Finished "sorting_dob.datetime".
180514-09:36:13,546 workflow INFO:
	 [Node] Setting-up "sorting_dob.splitting" in "/tmp/tmpn3qjarnu/sorting_dob/_dob_string_April21782/splitting".
180514-09:36:13,550 workflow INFO:
	 [Node] Running "splitting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,554 workflow INFO:
	 [Node] Finished "sorting_dob.splitting".
180514-09:36:13,555 workflow INFO:
	 [Node] Setting-up "sorting_dob.cleaning" in "/tmp/tmphbay6bx1/sorting_dob/_dob_string_April21782/cleaning".
180514-09:36:13,561 workflow INFO:
	 [Node] Running "cleaning" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,566 workflow INFO:
	 [Node] Finished "sorting_dob.cleaning".
180514-09:36:13,567 workflow INFO:
	 [Node] Setting-up "sorting_dob.datetime" in "/tmp/tmp0dubwg6o/sorting_dob/_dob_string_April21782/datetime".
180514-09:36:13,572 workflow INFO:
	 [Node] Running "datetime" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,577 workflow INFO:
	 [Node] Finished "sorting_dob.datetime".
180514-09:36:13,578 workflow INFO:
	 [Node] Setting-up "sorting_dob.splitting" in "/tmp/tmpospydkf7/sorting_dob/_dob_string_March51990/splitting".
180514-09:36:13,582 workflow INFO:
	 [Node] Running "splitting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,588 workflow INFO:
	 [Node] Finished "sorting_dob.splitting".
180514-09:36:13,589 workflow INFO:
	 [Node] Setting-up "sorting_dob.cleaning" in "/tmp/tmpc3x_jvi6/sorting_dob/_dob_string_March51990/cleaning".
180514-09:36:13,594 workflow INFO:
	 [Node] Running "cleaning" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,599 workflow INFO:
	 [Node] Finished "sorting_dob.cleaning".
180514-09:36:13,600 workflow INFO:
	 [Node] Setting-up "sorting_dob.datetime" in "/tmp/tmpepegpwe0/sorting_dob/_dob_string_March51990/datetime".
180514-09:36:13,605 workflow INFO:
	 [Node] Running "datetime" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,610 workflow INFO:
	 [Node] Finished "sorting_dob.datetime".
180514-09:36:13,611 workflow INFO:
	 [Node] Setting-up "sorting_dob.splitting" in "/tmp/tmp4lwlc8tx/sorting_dob/_dob_string_10February1984/splitting".
180514-09:36:13,615 workflow INFO:
	 [Node] Running "splitting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,620 workflow INFO:
	 [Node] Finished "sorting_dob.splitting".
180514-09:36:13,621 workflow INFO:
	 [Node] Setting-up "sorting_dob.cleaning" in "/tmp/tmprplt3p6y/sorting_dob/_dob_string_10February1984/cleaning".
180514-09:36:13,625 workflow INFO:
	 [Node] Running "cleaning" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,631 workflow INFO:
	 [Node] Finished "sorting_dob.cleaning".
180514-09:36:13,632 workflow INFO:
	 [Node] Setting-up "sorting_dob.datetime" in "/tmp/tmptukudtu7/sorting_dob/_dob_string_10February1984/datetime".
180514-09:36:13,637 workflow INFO:
	 [Node] Running "datetime" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,642 workflow INFO:
	 [Node] Finished "sorting_dob.datetime".
180514-09:36:13,643 workflow INFO:
	 [Node] Setting-up "sorting_dob.sorting" in "/tmp/tmpa6s4k__t/sorting_dob/sorting".
180514-09:36:13,649 workflow INFO:
	 [Node] Running "sorting" ("nipype.interfaces.utility.wrappers.Function")
180514-09:36:13,656 workflow INFO:
	 [Node] Finished "sorting_dob.sorting".
In [ ]:
# you can check list of all nodes
ex1_res.nodes()
Out[ ]:
NodeView((<nipype.pipeline.engine.nodes.JoinNode object at 0x7f9d6ff0b898>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e128>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e4a8>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4eba8>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e898>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e940>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e2e8>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e9b0>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4e8d0>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff4eeb8>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff84978>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff84eb8>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff84278>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff842b0>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff84128>, <nipype.pipeline.engine.nodes.Node object at 0x7f9d6ff84be0>))
In [ ]:
# and check the results from sorting_dob.sorting
list(ex1_res.nodes())[0].result.outputs
Out[ ]:
dob_sorted = [datetime.datetime(1782, 4, 2, 0, 0), datetime.datetime(1984, 2, 10, 0, 0), datetime.datetime(1988, 6, 6, 0, 0), datetime.datetime(1990, 3, 5, 0, 0), datetime.datetime(1992, 5, 12, 0, 0)]

Home | github | Nipype