搭建生信分析流水线,如工厂一样24小时运转Snakemake——进阶命令

小果上次介绍了分析流程管理工具snakemake,以及讲解了他的基础用法,并搭建了一个基础的分析流程。在此基础上,小果继续带大家学习他强大的进阶命令,完善示例流程。

这些命令包括:

指定流程的线程数及CPU内核数。

为流程添加配置文件。

引入输入文件的函数。

添加规则参数。

为流程添加日志。

设置输出文件为临时文件或者保护文件

指定流程线程数

对于生信的某些工具,小果建议使用多个线程以加快计算速度。该如何实现呢?我们可以通过threads指令让Snakemake知道规则需要的线程。例如:

rule bwa_map:

input:

“data/genome.fa”,

“data/samples/{sample}.fastq”

output:

“mapped_reads/{sample}.bam”

threads: 8

shell:

“bwa mem -t {threads} {input} | samtools view -Sb – > {output}

添加一行命令,就可以指定rule bwa_map使用8个线程。

值得一提的是snakemake会确保同时运行的所有作业的线程总数,不超过给定数量的CPU内核。

我们在执行snakemake工作流程时,使用–cores参数,指定使用的CPU内核数量。

snakemake –cores 10

比如这条命令,就会用10个CPU内核执行该工作流。

rule bwa_map指定了8个线程,那么snakemake就会用剩下两个内核去执行其他的任务,例如samtools_sort。

当提供的内核少于线程时,规则使用的线程数将减少到给定内核数。

如果给出的–cores没有数字,则使用所有可用的cores。

设置配置文件

SAMPLES = [“A”, “B”]

rule bcftools_call:

input:

fa=”data/genome.fa”,

bam=expand(“sorted_reads/{sample}.bam”, sample=SAMPLES),

bai=expand(“sorted_reads/{sample}.bam.bai”, sample=SAMPLES)

output:

“calls/all.vcf”

shell:

“bcftools mpileup -f {input.fa} {input.bam} | ”

“bcftools call -mv – > {output}

上期讲解中我们使用SAMPLES列表,来匹配所有我们需要处理的文件。A,B。

但是,当新的数据来了需要我们的流程分析时,需要手动地更改,难以适应新的数据。

Snakemake考虑到了这种情况,它提供了一种配置文件机制。配置文件可以用JSON或YAML编写,并与指令一起使用。

configfile: “config.yaml”

我们只需要在工作流的顶端添加这一行命令。Snakemake将加载配置文件,并将其内容存储到名为的全局可用字典中。

我们创建一个config.yaml文件,编写samples的文件路径

samples:

A: data/samples/A.fastq

B: data/samples/B.fastq

现在我们就可以去掉SAMPLES列表,并将rule改为

rule bcftools_call:

input:

fa=”data/genome.fa”,

bam=expand(“sorted_reads/{sample}.bam”, sample=config[“samples”]),

bai=expand(“sorted_reads/{sample}.bam.bai”, sample=config[“samples”])

output:

“calls/all.vcf”

shell:

“bcftools mpileup -f {input.fa} {input.bam} | ”

“bcftools call -mv – > {output}

定义输入函数

我们已经将sample文件的路径存储在配置文件中,因此我们还可以定义一个函数来为使用这些路径。

Snakemake 工作流的执行分为三个阶段。分别是

  1. 在初始化阶段,将解析定义工作流的文件,并实例化所有规则。
  2. 在DAG阶段,通过填充通配符并将输入文件与输出文件匹配,可以构建所有作业的有向无循环依赖图。
  3. 在调度阶段,执行作业的DAG,根据可用资源启动作业。

在初始化阶段,输入文件的函数会被执行,但是在这个阶段作业、通配符值和规则依赖关系还不知道。

我们需要将输入文件的确定推迟到DAG阶段。这可以通过在输入指令中指定输入函数而不是字符串来实现。对于规则,其工作原理如下:

def get_bwa_map_input_fastqs(wildcards):

return config[“samples”][wildcards.sample]

定义一个获取输入文件路径的函数。

rule bwa_map:

input:

“data/genome.fa”,

get_bwa_map_input_fastqs #调用输入函数

output:

“mapped_reads/{sample}.bam”

threads: 8

shell:

“bwa mem -t {threads} {input} | samtools view -Sb – > {output}

这个函数会返回输入文件的路径,[wildcards.sample]通过这个命令可以返回文件

设置规则的参数

有时,shell命令不仅仅由输入和输出文件以及一些静态标志组成。有时候可能需要根据作业的通配符值设置其他参数。Snakemake允许使用指令params为规则定义任意参数。例如

rule bwa_map:

input:

“data/genome.fa”,

get_bwa_map_input_fastqs

output:

“mapped_reads/{sample}.bam”

params:

rg=r”@RG\tID:{sample}\tSM:{sample}

threads: 8

shell:

“bwa mem -R ‘{params.rg}‘ -t {threads} {input} | samtools view -Sb – > {output}

突变分析会考虑很多参数。一个特别重要的是先前的突变率(默认为1e-3)。

我们可以向配置文件中添加一个key并使用规则中的指令,将其调用到shell命令来配置这个key。

日志

在执行一个大型工作流时,通常需要将每个作业的日志记录输出存储到一个单独的文件中,而不是在多个作业并行运行时将所有日志记录输出打印到终端,这会导致输出混乱。为此,Snakemake允许为规则指定日志文件。可以通过添加log命令来完成。

rule bwa_map:

input:

“data/genome.fa”,

get_bwa_map_input_fastqs

output:

“mapped_reads/{sample}.bam”

params:

rg=r”@RG\tID:{sample}\tSM:{sample}

log:

“logs/bwa_mem/{sample}.log”

threads: 8

shell:

“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”

“samtools view -Sb – > {output}) 2> {log}

执行shell命令时,可以通过管道传输到日志文件中。

临时文件和保护文件

当我们运行一个大型工作流时,会产生许多中间文件,这些文件会占用大量的磁盘空间,并且也需要耗费计算资源和时间。为了避免这种情况可以将输出文件设置为临时文件,一旦需要它的作业执行完毕,就会删除这个临时文件,避免磁盘占用过多。

rule bwa_map:

input:

“data/genome.fa”,

get_bwa_map_input_fastqs

output:

temp(“mapped_reads/{sample}.bam”)

params:

rg=r”@RG\tID:{sample}\tSM:{sample}

log:

“logs/bwa_mem/{sample}.log”

threads: 8

shell:

“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”

“samtools view -Sb – > {output}) 2> {log}

有的时候,我们已经执行完了工作流程,得到了想要的结果,不希望结果遭到篡改。我们可以将结果问价你设置为保护文件。Snakemake会对文件系统中的输出文件进行写保护,这样就不会意外地覆盖或删除它。

流程总结

创建配置文件

samples:

A: data/samples/A.fastq

B: data/samples/B.fastq

prior_mutation_rate: 0.001

Snakemake工作流程文件Snakefile

configfile: “config.yaml” #指定配置文件

rule all:

input:

“plots/quals.svg”

def get_bwa_map_input_fastqs(wildcards): #定义输入函数

return config[“samples”][wildcards.sample]

rule bwa_map:

input:

“data/genome.fa”,

get_bwa_map_input_fastqs

output:

temp(“mapped_reads/{sample}.bam”)#设置临时文件

params:

rg=r”@RG\tID:{sample}\tSM:{sample}

log:

“logs/bwa_mem/{sample}.log” #输出日志

threads: 8 #设定线程

shell:

“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”

“samtools view -Sb – > {output}) 2> {log}

rule samtools_sort:

input:

“mapped_reads/{sample}.bam”

output:

protected(“sorted_reads/{sample}.bam”)

shell:

“samtools sort -T sorted_reads/{wildcards.sample}

“-O bam {input} > {output}

rule samtools_index:

input:

“sorted_reads/{sample}.bam”

output:

“sorted_reads/{sample}.bam.bai”

shell:

“samtools index {input}

rule bcftools_call:

input:

fa=”data/genome.fa”,

bam=expand(“sorted_reads/{sample}.bam”, sample=config[“samples”]),

bai=expand(“sorted_reads/{sample}.bam.bai”, sample=config[“samples”])

output:

“calls/all.vcf”

params:

rate=config[“prior_mutation_rate”]

log:

“logs/bcftools_call/all.log”

shell:

“(bcftools mpileup -f {input.fa} {input.bam} | ”

“bcftools call -mv -P {params.rate} – > {output}) 2> {log}

rule plot_quals:

input:

“calls/all.vcf”

output:

“plots/quals.svg”

script:

“scripts/plot-quals.py”

好了小果今天的讲解就到这里了,下期再见~