Nipype Quickstart

This is a very quick non-imaging introduction to Nipype workflows. For a more comprehensive introduction, check the next section of the tutorial.

Nipype architecture

Import a few things from nipype

In [ ]:
import os
from nipype import Workflow, Node, Function

Creating Workflow with one Node that adds two numbers

In [ ]:
def sum(a, b):
    return a + b

wf = Workflow('hello')

adder = Node(Function(input_names=['a', 'b'],
                      output_names=['sum'],
                      function=sum), 
             name='a_plus_b')

adder.inputs.a = 1
adder.inputs.b = 3

wf.add_nodes([adder])

wf.base_dir = os.getcwd()

eg = wf.run()

list(eg.nodes())[0].result.outputs

Creating a second node and connecting to the hello Workflow

In [ ]:
def concat(a, b):
    return [a, b]


concater = Node(Function(input_names=['a', 'b'],
                         output_names=['some_list'],
                         function=concat), 
                name='concat_a_b')

wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3

eg = wf.run()
print(eg.nodes())

And we can check results of our Workflow, we should see a list:

In [ ]:
list(eg.nodes())[-1].result.outputs

We will try to add additional Node that adds one:

In [ ]:
def plus_one(a):
    return a + 1

plusone = Node(Function(input_names=['a'],
                        output_names=['out'],
                        function=plus_one), 
               name='add_1')

wf.connect(concater, 'some_list', plusone, 'a')

try:
    eg = wf.run()
except(RuntimeError) as err:
    print("RuntimeError:", err)
else:
    raise

This time the workflow didn't execute cleanly and we got an error. We can use nipypecli to read the crashfile (note, that if you have multiple crashfiles in the directory you'll have to provide a full name):

In [ ]:
!nipypecli crash crash*

It clearly shows the problematic Node and its input. We tried to add an integer to a list, this operation is not allowed in Python.

Let's try using MapNode

In [ ]:
from nipype import MapNode

plusone = MapNode(Function(input_names=['a'],
                           output_names=['out'],
                           function=plus_one), 
                  iterfield=['a'],
                  name='add_1')

wf = Workflow('hello_mapnode')

adder = Node(Function(input_names=['a', 'b'],
                      output_names=['sum'],
                      function=sum), 
             name='a_plus_b')

adder.inputs.a = 1
adder.inputs.b = 3
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3

wf.connect(concater, 'some_list', plusone, 'a')

wf.base_dir = os.getcwd()

eg = wf.run()
print(eg.nodes())

Now the workflow finished without problems, let's see the results from hello.add_1:

In [ ]:
print(list(eg.nodes())[2].result.outputs)

And now we will run the example with iterables:

In [ ]:
adder.iterables = ('a', [1, 2])
adder.inputs.b = 2

eg = wf.run()
print(eg.nodes())

Now we have 6 nodes, we can check results for hello.add_1.a1

In [ ]:
list(eg.nodes())[5].result.outputs
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
from IPython.display import Image

We can plot a general structure of the workflow:

In [ ]:
Image("hello_mapnode/graph.png")

And more detailed structure with all nodes:

In [ ]:
Image("hello_mapnode/graph_detailed.png")

We will introduce another iterables, for the concater Node:

In [ ]:
concater.iterables = ('b', [3, 4])
eg = wf.run()
eg.nodes();
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
Image("hello_mapnode/graph_detailed.png")

Now we will introduce JoinNode that allows us to merge results together:

In [ ]:
def merge_and_scale_data(data2):
    import numpy as np
    return (np.array(data2) * 1000).tolist()


from nipype import JoinNode
joiner = JoinNode(Function(input_names=['data2'],
                          output_names=['data_scaled'],
                          function=merge_and_scale_data),
                 name='join_scale_data',
                 joinsource=adder,
                 joinfield=['data2'])

wf.connect(plusone, 'out', joiner, 'data2')

eg = wf.run()
eg.nodes()

Let's check the output of hello.join_scale_data.a0 node:

In [ ]:
list(eg.nodes())[0].result.outputs
In [ ]:
wf.write_graph(graph2use='exec')
In [ ]:
Image("hello_mapnode/graph.png")
In [ ]:
Image("hello_mapnode/graph_detailed.png")
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
wf.base_dir = os.path.join(os.getcwd(), 'alt')
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
In [ ]:
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})

Exercise 1

Create a workflow to calculate a sum of factorials of numbers from a range between $n_{min}$ and $n_{max}$, i.e.:

$$\sum _{k=n_{min}}^{n_{max}} k! = 0! + 1! +2! + 3! + \cdots$$

if $n_{min}=0$ and $n_{max}=3$ $$\sum _{k=0}^{3} k! = 0! + 1! +2! + 3! = 1 + 1 + 2 + 6 = 10$$

In [ ]:
#write your code here

# 1. write 3 functions: one that returns a list of number from a specific range, 
#    second that returns n! (you can use math.factorial) and third, that sums the elements from a list

# 2. create a workflow and define the working directory

# 3. define 3 nodes using Node and MapNode and connect them within the workflow

# 4. run the workflow and check the results
In [ ]:
from nipype import Workflow, Node, MapNode, Function
import os

def range_fun(n_min, n_max):
    return list(range(n_min, n_max+1))

def factorial(n):
    # print("FACTORIAL, {}".format(n))
    import math
    return math.factorial(n)

def summing(terms):
    return sum(terms)

wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()

range_nd = Node(Function(input_names=['n_min', 'n_max'],
                         output_names=['range_list'],
                         function=range_fun), 
                name='range_list')

factorial_nd = MapNode(Function(input_names=['n'],
                                output_names=['fact_out'],
                                function=factorial), 
                       iterfield=['n'],
                       name='factorial')

summing_nd = Node(Function(input_names=['terms'],
                           output_names=['sum_out'],
                           function=summing), 
                  name='summing')


range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3

wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")


eg = wf_ex1.run()

let's print all nodes:

In [ ]:
eg.nodes()

the final result should be 10:

In [ ]:
list(eg.nodes())[2].result.outputs

we can also check the results of two other nodes:

In [ ]:
print(list(eg.nodes())[0].result.outputs)
print(list(eg.nodes())[1].result.outputs)

Exercise 2

Create a workflow to calculate the following sum for chosen $n$ and five different values of $x$: $0$, $\frac{1}{2} \pi$, $\pi$, $\frac{3}{2} \pi$, and $ 2 \pi$.

$\sum _{{k=0}}^{{n}}{\frac {(-1)^{k}}{(2k+1)!}}x^{{2k+1}}\quad =x-{\frac {x^{3}}{3!}}+{\frac {x^{5}}{5!}}-\cdots $

In [ ]:
# write your solution here

# 1. write 3 functions: one that returns a list of number from a range between 0 and some n, 
#    second that returns a term for a specific k, and third, that sums the elements from a list

# 2. create a workflow and define the working directory

# 3. define 3 nodes using Node and MapNode and connect them within the workflow

# 4. use iterables for 4 values of x

# 5. run the workflow and check the final results for every value of x
In [ ]:
# we can reuse function from previous exercise, but they need some edits
from nipype import Workflow, Node, MapNode, JoinNode, Function
import os
import math

def range_fun(n_max):
    return list(range(n_max+1))

def term(k, x):
    import math
    fract = math.factorial(2 * k + 1)
    polyn = x ** (2 * k + 1) 
    return (-1)**k * polyn / fract

def summing(terms):
    return sum(terms)

wf_ex2 = Workflow('ex2')
wf_ex2.base_dir = os.getcwd()

range_nd = Node(Function(input_names=['n_max'],
                         output_names=['range_list'],
                         function=range_fun), 
                name='range_list')

term_nd = MapNode(Function(input_names=['k', 'x'],
                           output_names=['term_out'],
                           function=term), 
                  iterfield=['k'],
                  name='term')

summing_nd = Node(Function(input_names=['terms'],
                           output_names=['sum_out'],
                           function=summing), 
                name='summing')


range_nd.inputs.n_max = 15

x_list = [0, 0.5 * math.pi, math.pi, 1.5 * math.pi, 2 * math.pi]

term_nd.iterables = ('x', x_list)

wf_ex2.add_nodes([range_nd])
wf_ex2.connect(range_nd, 'range_list', term_nd, 'k')
wf_ex2.connect(term_nd, 'term_out', summing_nd, "terms")


eg = wf_ex2.run()

let's check all nodes

In [ ]:
eg.nodes()

let's print all results of ex2.summing

In [ ]:
print(list(eg.nodes())[2].result.outputs)
print(list(eg.nodes())[4].result.outputs)
print(list(eg.nodes())[6].result.outputs)
print(list(eg.nodes())[8].result.outputs)
print(list(eg.nodes())[10].result.outputs)

Great, we just implemented pretty good Sine function! Those number should be approximately 0, 1, 0, -1 and 0. If they are not, try to increase $n_max$.

Exercise 2a

Use JoinNode to combine results from Exercise 2 in one container, e.g. a dictionary, that takes value $x$ as a key and the result from summing Node as a value.

In [ ]:
# write your code here

# 1. create an additional function that takes 2 lists and combines them into one container, e.g. dictionary

# 2. use JoinNode to define a new node that merges results from Exercise 2 and connect it to the workflow

# 3. run the workflow and check the results of the merging node
In [ ]:
def merge_results(results, x):
    return dict(zip(x, results))

join_nd = JoinNode(Function(input_names=['results', 'x'],
                            output_names=['results_cont'],
                            function=merge_results),
                   name='merge',
                   joinsource=term_nd, # this is the node that used iterables for x
                   joinfield=['results'])

# taking the list of arguments from the previous part 
join_nd.inputs.x = x_list

# connecting a new node to the summing_nd
wf_ex2.connect(summing_nd, "sum_out", join_nd, "results")

eg = wf_ex2.run()

let's print all nodes

In [ ]:
eg.nodes()

and results from merge Node:

In [ ]:
list(eg.nodes())[1].result.outputs

Home | github | Nipype