-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.nf
402 lines (313 loc) · 11.5 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
workflow {
// create synthetic data if specified
if ( params.make_inputs == true ) {
make_inputs()
emx_files = make_inputs.out.dataset
labels_files = make_inputs.out.labels
gmt_files = make_inputs.out.gmt_file
}
// otherwise load input files
else {
emx_files = Channel.fromFilePairs("${params.input_dir}/${params.emx_files}", size: 1, flat: true)
labels_files = Channel.fromFilePairs("${params.input_dir}/${params.labels_files}", size: 1, flat: true)
gmt_files = Channel.fromFilePairs("${params.input_dir}/${params.gmt_files}", size: 1, flat: true)
}
// group dataset files by name
datasets = emx_files.join(labels_files)
// combine datasets with gmt files via cartesian product
inputs_combined = datasets.combine(gmt_files)
// perform phase 1 if specified
if ( params.phase1 == true ) {
// split gmt files into chunks
phase1_split(gmt_files)
gmt_chunks = phase1_split.out.chunks
.flatMap { it[1].collect { c -> [it[0], c] } }
// combine datasets with gmt file chunks via cartesian product
fg_chunks = datasets.combine(gmt_chunks)
// perform foreground evaluation
phase1_fg(fg_chunks)
fg_scores = phase1_fg.out.score_chunks
// perform background evaluation
bg_chunks = Channel.from( 0 .. params.chunks-1 )
phase1_bg(datasets, bg_chunks)
// combine background scores with gmt files
bg_scores = phase1_bg.out.score_chunks
.combine(gmt_files)
.map { [it[0], it[2], it[1]] }
// combine fg and bg score chunks
// group score chunks by dataset and gmt file
score_chunks = Channel.empty()
.mix(fg_scores, bg_scores)
.groupTuple(by: [0, 1], size: 2 * params.chunks)
.map { [it[0], it[1], it[2].sort {it.name}] }
// merge score chunks
phase1_merge(score_chunks)
phase1_scores = phase1_merge.out.scores
// use scores to select gene sets
phase1_select(inputs_combined, phase1_scores)
gene_sets = phase1_select.out.gene_sets
}
else {
gene_sets = Channel.empty()
}
// perform phase 2 (combinatoric search) if specified
if ( params.phase2 == true ) {
// split gene sets into chunks
phase2_split(gene_sets)
geneset_chunks = phase2_split.out.chunks
.flatMap { it[2].collect { c -> [it[0], it[1], c] } }
// combine datasets with gene set chunks via cartesian product
phase2_chunks = datasets.combine(geneset_chunks, by: 0)
// evaluate gene sets via combinatoric search
phase2_evaluate(phase2_chunks)
// group scores by dataset and geneset list
phase2_scores = phase2_evaluate.out.score_chunks
.groupTuple(by: [0, 1])
.map { [it[0], it[1], it[2].flatten()] }
// use scores to select gene sets
phase2_select(gene_sets, phase2_scores)
}
// perform phase 2 (random forest) if specified
if ( params.phase2_rf == true ) {
phase2_rf(inputs_combined, gene_sets)
}
}
/**
* The make_inputs process generates synthetic input data
* for an example run.
*/
process make_inputs {
publishDir "${params.output_dir}", mode: "copy"
output:
tuple val("example"), path("example.emx.txt"), emit: dataset
tuple val("example"), path("example.labels.txt"), emit: labels
tuple val("example"), path("example.genesets.txt"), emit: gmt_file
path("example.tsne.png")
script:
"""
make-inputs.py \
--n-samples 1000 \
--n-genes 100 \
--n-classes 5 \
--n-sets 10 \
--visualize
"""
}
/**
* The phase1_split process splits the input gmt file into chunks.
*/
process phase1_split {
tag "${gmt_name}"
input:
tuple val(gmt_name), path(gmt_file)
output:
tuple val(gmt_name), path("*"), emit: chunks
script:
"""
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE gmt_lines=`cat ${gmt_file} | wc -l`"
split -d -n l/${params.chunks} ${gmt_file} ""
"""
}
/**
* The phase1_fg process performs experiments from a single chunk
* of a gmt file.
*/
process phase1_fg {
tag "${dataset}/${gmt_name}/${gmt_file.name}"
label "gpu"
input:
tuple val(dataset), path(emx_file), path(labels_file), val(gmt_name), path(gmt_file)
output:
tuple val(dataset), val(gmt_name), path("*.log"), emit: score_chunks
script:
"""
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE gmt_lines=`cat ${gmt_file} | wc -l`"
echo "#TRACE gmt_genes=`cat ${gmt_file} | wc -w`"
echo "#TRACE n_rows=`tail -n +1 ${emx_file} | wc -l`"
echo "#TRACE n_cols=`head -n +1 ${emx_file} | wc -w`"
echo "#TRACE model=${params.phase1_model}"
phase1-evaluate.py \
--dataset ${emx_file} \
--labels ${labels_file} \
--model-config ${projectDir}/models.json \
--model ${params.phase1_model} \
--gene-sets ${gmt_file} \
--outfile ${gmt_file}.log
"""
}
/**
* The phase1_bg process performs a single chunk of background experiments.
*/
process phase1_bg {
tag "${dataset}/${index}"
label "gpu"
input:
tuple val(dataset), path(emx_file), path(labels_file)
each index
output:
tuple val(dataset), path("*.log"), emit: score_chunks
script:
"""
echo "#TRACE random_min=${params.phase1_random_min}"
echo "#TRACE random_max=${params.phase1_random_max}"
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE index=${index}"
echo "#TRACE n_rows=`tail -n +1 ${emx_file} | wc -l`"
echo "#TRACE n_cols=`head -n +1 ${emx_file} | wc -w`"
echo "#TRACE model=${params.phase1_model}"
START=${params.phase1_random_min + index}
STOP=${params.phase1_random_max}
STEP=${params.chunks}
phase1-evaluate.py \
--dataset ${emx_file} \
--labels ${labels_file} \
--model-config ${projectDir}/models.json \
--model ${params.phase1_model} \
--random \
--random-range \${START} \${STOP} \${STEP} \
--random-iters ${params.phase1_random_iters} \
--outfile \$(printf "%04d" ${index}).log
"""
}
/**
* The phase1_merge process takes the output chunks from phase1_evaluate
* and merges them into a score file for each dataset/GMT pair.
*/
process phase1_merge {
publishDir "${params.output_dir}/${dataset}", mode: "copy"
tag "${dataset}/${gmt_name}"
input:
tuple val(dataset), val(gmt_name), path(chunks)
output:
tuple val(dataset), val(gmt_name), path("phase1-evaluate-*.log"), emit: scores
script:
"""
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE gmt_lines=`cat ${chunks} | wc -l`"
head -n +1 ${chunks[0]} > phase1-evaluate-${gmt_name}.log
for f in ${chunks}; do
tail -n +2 \$f >> temp
done
sort -V temp >> phase1-evaluate-${gmt_name}.log
"""
}
/**
* The phase1_select process takes a score file for a dataset / GMT and selects
* gene sets which score significantly higher over background.
*/
process phase1_select {
publishDir "${params.output_dir}/${dataset}", mode: "copy"
input:
tuple val(dataset), path(emx_file), path(labels_file), val(gmt_name), path(gmt_file)
tuple val(dataset), val(gmt_name), path(scores)
output:
tuple val(dataset), val(gmt_name), path("phase1-genesets.txt"), emit: gene_sets
path("phase1-select-${gmt_name}.log")
script:
"""
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE gmt_lines=`cat ${gmt_file} | wc -l`"
echo "#TRACE gmt_genes=`cat ${gmt_file} | wc -w`"
phase1-select.py \
--dataset ${emx_file} \
--gene-sets ${gmt_file} \
--scores ${scores} \
--threshold ${params.phase1_threshold} \
--n-sets ${params.phase1_nsets} \
> phase1-select-${gmt_name}.log
"""
}
/**
* The phase2_split process splits the gene set list from phase 1 into chunks.
*/
process phase2_split {
tag "${dataset}/${gmt_name}"
input:
tuple val(dataset), val(gmt_name), path(gmt_file)
output:
tuple val(dataset), val(gmt_name), path("*"), emit: chunks
script:
"""
split -d -l 1 ${gmt_file} ""
"""
}
/**
* The phase2_evaluate process performs subset analysis on a gene set.
*/
process phase2_evaluate {
publishDir "${params.output_dir}/${dataset}", mode: "copy"
tag "${dataset}/${gmt_name}/${gmt_file.name}"
label "gpu"
input:
tuple val(dataset), path(emx_file), path(labels_file), val(gmt_name), path(gmt_file)
output:
tuple val(dataset), val(gmt_name), path("*_scores_*.txt"), optional: true, emit: score_chunks
script:
"""
phase2-evaluate.py \
--dataset ${emx_file} \
--labels ${labels_file} \
--model-config ${projectDir}/models.json \
--model ${params.phase2_model} \
--gene-sets ${gmt_file} \
--n-jobs 1 \
--logdir .
"""
}
/**
* The phase2_select process takes a list of gene sets selected by phase 1, as well
* as subset scores from phase2_evaluate, and selects candidate genes for each
* gene set.
*/
process phase2_select {
publishDir "${params.output_dir}/${dataset}", mode: "copy"
tag "${dataset}/${gmt_name}"
input:
tuple val(dataset), val(gmt_name), path("phase1-genesets.txt")
tuple val(dataset), val(gmt_name), path(scores)
output:
tuple val(dataset), val(gmt_name), path("phase2-genesets.txt"), emit: gene_sets
path("*.png"), optional: true
script:
"""
phase2-select.py \
--gene-sets phase1-genesets.txt \
--logdir . \
--threshold ${params.phase2_threshold} \
${params.phase2_visualize ? "--visualize" : ""}
"""
}
/**
* The phase2_rf process takes a list of gene sets selected by phase 1 and
* uses the feature importances of a random forest to select candidate genes
* for each gene set.
*/
process phase2_rf {
publishDir "${params.output_dir}/${dataset}", mode: "copy"
tag "${dataset}/${gmt_name}"
input:
tuple val(dataset), path(emx_file), path(labels_file), val(gmt_name), path(gmt_file)
tuple val(dataset), val(gmt_name), path("phase1-genesets.txt")
output:
tuple val(dataset), val(gmt_name), path("phase2-rf-genesets.txt"), emit: gene_sets
path("*.png"), optional: true
script:
"""
echo "#TRACE chunks=${params.chunks}"
echo "#TRACE gmt_lines=`cat phase1-genesets.txt | wc -l`"
echo "#TRACE gmt_genes=`cat phase1-genesets.txt | wc -w`"
echo "#TRACE n_rows=`tail -n +1 ${emx_file} | wc -l`"
echo "#TRACE n_cols=`head -n +1 ${emx_file} | wc -w`"
phase2-rf.py \
--dataset ${emx_file} \
--labels ${labels_file} \
--gene-sets phase1-genesets.txt \
--n-jobs 1 \
--threshold ${params.phase2_rf_threshold} \
${params.phase2_rf_visualize ? "--visualize" : ""}
"""
}