This is a very quick non-imaging introduction to Nipype workflows. For a more comprehensive introduction, check the next section of the tutorial.
This notebook is taken from reproducible-imaging repository
import os
from nipype import Workflow, Node, Function
Creating Workflow with one Node that adds two numbers
def sum(a, b):
return a + b
wf = Workflow('hello')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.add_nodes([adder])
wf.base_dir = os.getcwd()
eg = wf.run()
list(eg.nodes())[0].result.outputs
Creating a second node and connecting to the hello
Workflow
def concat(a, b):
return [a, b]
concater = Node(Function(input_names=['a', 'b'],
output_names=['some_list'],
function=concat),
name='concat_a_b')
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
eg = wf.run()
print(eg.nodes())
And we can check results of our Workflow, we should see a list:
list(eg.nodes())[-1].result.outputs
We will try to add additional Node that adds one:
def plus_one(a):
return a + 1
plusone = Node(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
name='add_1')
wf.connect(concater, 'some_list', plusone, 'a')
try:
eg = wf.run()
except(RuntimeError) as err:
print("RuntimeError:", err)
else:
raise
This time the workflow didn't execute cleanly and we got an error. We can use nipypecli
to read the crashfile (note, that if you have multiple crashfiles in the directory you'll have to provide a full name):
!nipypecli crash crash*
It clearly shows the problematic Node and its input. We tried to add an integer to a list, this operation is not allowed in Python.
Let's try using MapNode
from nipype import MapNode
plusone = MapNode(Function(input_names=['a'],
output_names=['out'],
function=plus_one),
iterfield=['a'],
name='add_1')
wf = Workflow('hello_mapnode')
adder = Node(Function(input_names=['a', 'b'],
output_names=['sum'],
function=sum),
name='a_plus_b')
adder.inputs.a = 1
adder.inputs.b = 3
wf.connect(adder, 'sum', concater, 'a')
concater.inputs.b = 3
wf.connect(concater, 'some_list', plusone, 'a')
wf.base_dir = os.getcwd()
eg = wf.run()
print(eg.nodes())
Now the workflow finished without problems, let's see the results from hello.add_1
:
print(list(eg.nodes())[2].result.outputs)
And now we will run the example with iterables
:
adder.iterables = ('a', [1, 2])
adder.inputs.b = 2
eg = wf.run()
print(eg.nodes())
Now we have 6 nodes, we can check results for hello.add_1.a1
list(eg.nodes())[5].result.outputs
wf.write_graph(graph2use='exec')
from IPython.display import Image
We can plot a general structure of the workflow:
Image("hello_mapnode/graph.png")
And more detailed structure with all nodes:
Image("hello_mapnode/graph_detailed.png")
We will introduce another iterables, for the concater Node:
concater.iterables = ('b', [3, 4])
eg = wf.run()
eg.nodes();
wf.write_graph(graph2use='exec')
Image("hello_mapnode/graph_detailed.png")
Now we will introduce JoinNode that allows us to merge results together:
def merge_and_scale_data(data2):
import numpy as np
return (np.array(data2) * 1000).tolist()
from nipype import JoinNode
joiner = JoinNode(Function(input_names=['data2'],
output_names=['data_scaled'],
function=merge_and_scale_data),
name='join_scale_data',
joinsource=adder,
joinfield=['data2'])
wf.connect(plusone, 'out', joiner, 'data2')
eg = wf.run()
eg.nodes()
Let's check the output of hello.join_scale_data.a0
node:
list(eg.nodes())[0].result.outputs
wf.write_graph(graph2use='exec')
Image("hello_mapnode/graph.png")
Image("hello_mapnode/graph_detailed.png")
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
wf.base_dir = os.path.join(os.getcwd(), 'alt')
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
%time eg = wf.run(plugin='MultiProc', plugin_args={'n_procs': 2})
Create a workflow to calculate a sum of factorials of numbers from a range between $n_{min}$ and $n_{max}$, i.e.:
$$\sum _{k=n_{min}}^{n_{max}} k! = 0! + 1! +2! + 3! + \cdots$$
if $n_{min}=0$ and $n_{max}=3$ $$\sum _{k=0}^{3} k! = 0! + 1! +2! + 3! = 1 + 1 + 2 + 6 = 10$$
#write your code here
# 1. write 3 functions: one that returns a list of number from a specific range,
# second that returns n! (you can use math.factorial) and third, that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. run the workflow and check the results
from nipype import Workflow, Node, MapNode, Function
import os
def range_fun(n_min, n_max):
return list(range(n_min, n_max+1))
def factorial(n):
# print("FACTORIAL, {}".format(n))
import math
return math.factorial(n)
def summing(terms):
return sum(terms)
wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_min', 'n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
factorial_nd = MapNode(Function(input_names=['n'],
output_names=['fact_out'],
function=factorial),
iterfield=['n'],
name='factorial')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3
wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")
eg = wf_ex1.run()
let's print all nodes:
eg.nodes()
the final result should be 10:
list(eg.nodes())[2].result.outputs
we can also check the results of two other nodes:
print(list(eg.nodes())[0].result.outputs)
print(list(eg.nodes())[1].result.outputs)
Create a workflow to calculate the following sum for chosen $n$ and five different values of $x$: $0$, $\frac{1}{2} \pi$, $\pi$, $\frac{3}{2} \pi$, and $ 2 \pi$.
$\sum _{{k=0}}^{{n}}{\frac {(-1)^{k}}{(2k+1)!}}x^{{2k+1}}\quad =x-{\frac {x^{3}}{3!}}+{\frac {x^{5}}{5!}}-\cdots $
# write your solution here
# 1. write 3 functions: one that returns a list of number from a range between 0 and some n,
# second that returns a term for a specific k, and third, that sums the elements from a list
# 2. create a workflow and define the working directory
# 3. define 3 nodes using Node and MapNode and connect them within the workflow
# 4. use iterables for 4 values of x
# 5. run the workflow and check the final results for every value of x
# we can reuse function from previous exercise, but they need some edits
from nipype import Workflow, Node, MapNode, JoinNode, Function
import os
import math
def range_fun(n_max):
return list(range(n_max+1))
def term(k, x):
import math
fract = math.factorial(2 * k + 1)
polyn = x ** (2 * k + 1)
return (-1)**k * polyn / fract
def summing(terms):
return sum(terms)
wf_ex2 = Workflow('ex2')
wf_ex2.base_dir = os.getcwd()
range_nd = Node(Function(input_names=['n_max'],
output_names=['range_list'],
function=range_fun),
name='range_list')
term_nd = MapNode(Function(input_names=['k', 'x'],
output_names=['term_out'],
function=term),
iterfield=['k'],
name='term')
summing_nd = Node(Function(input_names=['terms'],
output_names=['sum_out'],
function=summing),
name='summing')
range_nd.inputs.n_max = 15
x_list = [0, 0.5 * math.pi, math.pi, 1.5 * math.pi, 2 * math.pi]
term_nd.iterables = ('x', x_list)
wf_ex2.add_nodes([range_nd])
wf_ex2.connect(range_nd, 'range_list', term_nd, 'k')
wf_ex2.connect(term_nd, 'term_out', summing_nd, "terms")
eg = wf_ex2.run()
let's check all nodes
eg.nodes()
let's print all results of ex2.summing
print(list(eg.nodes())[2].result.outputs)
print(list(eg.nodes())[4].result.outputs)
print(list(eg.nodes())[6].result.outputs)
print(list(eg.nodes())[8].result.outputs)
print(list(eg.nodes())[10].result.outputs)
Great, we just implemented pretty good Sine function! Those number should be approximately 0, 1, 0, -1 and 0. If they are not, try to increase $n_max$.
Use JoinNode to combine results from Exercise 2 in one container, e.g. a dictionary, that takes value $x$ as a key and the result from summing
Node as a value.
# write your code here
# 1. create an additional function that takes 2 lists and combines them into one container, e.g. dictionary
# 2. use JoinNode to define a new node that merges results from Exercise 2 and connect it to the workflow
# 3. run the workflow and check the results of the merging node
def merge_results(results, x):
return dict(zip(x, results))
join_nd = JoinNode(Function(input_names=['results', 'x'],
output_names=['results_cont'],
function=merge_results),
name='merge',
joinsource=term_nd, # this is the node that used iterables for x
joinfield=['results'])
# taking the list of arguments from the previous part
join_nd.inputs.x = x_list
# connecting a new node to the summing_nd
wf_ex2.connect(summing_nd, "sum_out", join_nd, "results")
eg = wf_ex2.run()
let's print all nodes
eg.nodes()
and results from merge
Node:
list(eg.nodes())[1].result.outputs