Выполнение заданий одновременно с несколькими шагами

#nextflow

Вопрос:

Мне нужно запустить 10 000 трубопроводов, каждый из которых состоит из 5 шагов (процессов)

Еще одно требование состоит в том, что я хочу запустить около 300 одновременно. Это означает, что я хочу, чтобы началось 300, затем для каждого конвейера, выполнившего 5 шагов, я хочу запустить новый конвейер. Я не мог найти, как это сделать, используя каналы.

Некоторые начальные мысли: начните с разделения канала 10 000 на буферы из 300 элементов. Но это не помогает начать новую, когда она заканчивается…

 proteins = Channel.fromPath( '/some/path/*.fa' ).buffer( size: 300 )

process A {
  input:
  file query_file from proteins

  output:
}
process B {
}
 

Ответ №1:

Вы можете достичь примерно того, чего хотите, используя следующее в своем nextflow.config :

 process {

    maxForks = 300
}

executor {

    queueSize = 300
}
 
  • Директива maxForks устанавливает максимальное количество экземпляров процесса, которые могут выполняться параллельно. Установив это значение в файле nextflow.config, мы гарантируем, что оно будет полностью применено к каждому из ваших пяти процессов. Если у вас есть другие процессы, которые вы не хотите охватывать этой директивой, вы, конечно, можете использовать один или несколько селекторов процессов для выбора процессов, для которых вы хотите ограничить эту конфигурацию. В качестве альтернативы просто добавьте директиву в каждое из ваших пяти определений процесса.
  • Исполнитель queueSize просто определяет количество задач, которые он будет выполнять параллельно.

Это, конечно, не гарантирует завершения части из пяти процессов перед началом нового, но обычно это не вызывает особых проблем.

Комментарии:

1. Я вижу, но я боюсь, что когда закончится первый процесс, вместо второго шага будет другой экземпляр первого процесса. Я проверю, как именно это работает…

2. @MosheShaham Могу я спросить, почему это проблема? Обычно порядок выполнения не имеет значения, так как вам все равно нужно выполнить все задания.