小果上次介绍了分析流程管理工具snakemake,以及讲解了他的基础用法,并搭建了一个基础的分析流程。在此基础上,小果继续带大家学习他强大的进阶命令,完善示例流程。
这些命令包括:
指定流程的线程数及CPU内核数。
为流程添加配置文件。
引入输入文件的函数。
添加规则参数。
为流程添加日志。
设置输出文件为临时文件或者保护文件
指定流程线程数
对于生信的某些工具,小果建议使用多个线程以加快计算速度。该如何实现呢?我们可以通过threads指令让Snakemake知道规则需要的线程。例如:
rule bwa_map:
input:
“data/genome.fa”,
“data/samples/{sample}.fastq”
output:
“mapped_reads/{sample}.bam”
threads: 8
shell:
“bwa mem -t {threads} {input} | samtools view -Sb – > {output}”
添加一行命令,就可以指定rule bwa_map使用8个线程。
值得一提的是snakemake会确保同时运行的所有作业的线程总数,不超过给定数量的CPU内核。
我们在执行snakemake工作流程时,使用–cores参数,指定使用的CPU内核数量。
snakemake –cores 10
比如这条命令,就会用10个CPU内核执行该工作流。
rule bwa_map指定了8个线程,那么snakemake就会用剩下两个内核去执行其他的任务,例如samtools_sort。
当提供的内核少于线程时,规则使用的线程数将减少到给定内核数。
如果给出的–cores没有数字,则使用所有可用的cores。
设置配置文件
SAMPLES = [“A”, “B”]
rule bcftools_call:
input:
fa=”data/genome.fa”,
bam=expand(“sorted_reads/{sample}.bam”, sample=SAMPLES),
bai=expand(“sorted_reads/{sample}.bam.bai”, sample=SAMPLES)
output:
“calls/all.vcf”
shell:
“bcftools mpileup -f {input.fa} {input.bam} | ”
“bcftools call -mv – > {output}”
上期讲解中我们使用SAMPLES列表,来匹配所有我们需要处理的文件。A,B。
但是,当新的数据来了需要我们的流程分析时,需要手动地更改,难以适应新的数据。
Snakemake考虑到了这种情况,它提供了一种配置文件机制。配置文件可以用JSON或YAML编写,并与指令一起使用。
configfile: “config.yaml”
我们只需要在工作流的顶端添加这一行命令。Snakemake将加载配置文件,并将其内容存储到名为的全局可用字典中。
我们创建一个config.yaml文件,编写samples的文件路径
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq
现在我们就可以去掉SAMPLES列表,并将rule改为
rule bcftools_call:
input:
fa=”data/genome.fa”,
bam=expand(“sorted_reads/{sample}.bam”, sample=config[“samples”]),
bai=expand(“sorted_reads/{sample}.bam.bai”, sample=config[“samples”])
output:
“calls/all.vcf”
shell:
“bcftools mpileup -f {input.fa} {input.bam} | ”
“bcftools call -mv – > {output}”
定义输入函数
我们已经将sample文件的路径存储在配置文件中,因此我们还可以定义一个函数来为使用这些路径。
Snakemake 工作流的执行分为三个阶段。分别是
- 在初始化阶段,将解析定义工作流的文件,并实例化所有规则。
- 在DAG阶段,通过填充通配符并将输入文件与输出文件匹配,可以构建所有作业的有向无循环依赖图。
- 在调度阶段,执行作业的DAG,根据可用资源启动作业。
在初始化阶段,输入文件的函数会被执行,但是在这个阶段作业、通配符值和规则依赖关系还不知道。
我们需要将输入文件的确定推迟到DAG阶段。这可以通过在输入指令中指定输入函数而不是字符串来实现。对于规则,其工作原理如下:
def get_bwa_map_input_fastqs(wildcards):
return config[“samples”][wildcards.sample]
定义一个获取输入文件路径的函数。
rule bwa_map:
input:
“data/genome.fa”,
get_bwa_map_input_fastqs #调用输入函数
output:
“mapped_reads/{sample}.bam”
threads: 8
shell:
“bwa mem -t {threads} {input} | samtools view -Sb – > {output}”
这个函数会返回输入文件的路径,[wildcards.sample]通过这个命令可以返回文件
设置规则的参数
有时,shell命令不仅仅由输入和输出文件以及一些静态标志组成。有时候可能需要根据作业的通配符值设置其他参数。Snakemake允许使用指令params为规则定义任意参数。例如
rule bwa_map:
input:
“data/genome.fa”,
get_bwa_map_input_fastqs
output:
“mapped_reads/{sample}.bam”
params:
rg=r”@RG\tID:{sample}\tSM:{sample}”
threads: 8
shell:
“bwa mem -R ‘{params.rg}‘ -t {threads} {input} | samtools view -Sb – > {output}”
突变分析会考虑很多参数。一个特别重要的是先前的突变率(默认为1e-3)。
我们可以向配置文件中添加一个key并使用规则中的指令,将其调用到shell命令来配置这个key。
日志
在执行一个大型工作流时,通常需要将每个作业的日志记录输出存储到一个单独的文件中,而不是在多个作业并行运行时将所有日志记录输出打印到终端,这会导致输出混乱。为此,Snakemake允许为规则指定日志文件。可以通过添加log命令来完成。
rule bwa_map:
input:
“data/genome.fa”,
get_bwa_map_input_fastqs
output:
“mapped_reads/{sample}.bam”
params:
rg=r”@RG\tID:{sample}\tSM:{sample}”
log:
“logs/bwa_mem/{sample}.log”
threads: 8
shell:
“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”
“samtools view -Sb – > {output}) 2> {log}”
执行shell命令时,可以通过管道传输到日志文件中。
临时文件和保护文件
当我们运行一个大型工作流时,会产生许多中间文件,这些文件会占用大量的磁盘空间,并且也需要耗费计算资源和时间。为了避免这种情况可以将输出文件设置为临时文件,一旦需要它的作业执行完毕,就会删除这个临时文件,避免磁盘占用过多。
rule bwa_map:
input:
“data/genome.fa”,
get_bwa_map_input_fastqs
output:
temp(“mapped_reads/{sample}.bam”)
params:
rg=r”@RG\tID:{sample}\tSM:{sample}”
log:
“logs/bwa_mem/{sample}.log”
threads: 8
shell:
“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”
“samtools view -Sb – > {output}) 2> {log}”
有的时候,我们已经执行完了工作流程,得到了想要的结果,不希望结果遭到篡改。我们可以将结果问价你设置为保护文件。Snakemake会对文件系统中的输出文件进行写保护,这样就不会意外地覆盖或删除它。
流程总结
创建配置文件
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq
prior_mutation_rate: 0.001
Snakemake工作流程文件Snakefile
configfile: “config.yaml” #指定配置文件
rule all:
input:
“plots/quals.svg”
def get_bwa_map_input_fastqs(wildcards): #定义输入函数
return config[“samples”][wildcards.sample]
rule bwa_map:
input:
“data/genome.fa”,
get_bwa_map_input_fastqs
output:
temp(“mapped_reads/{sample}.bam”)#设置临时文件
params:
rg=r”@RG\tID:{sample}\tSM:{sample}”
log:
“logs/bwa_mem/{sample}.log” #输出日志
threads: 8 #设定线程
shell:
“(bwa mem -R ‘{params.rg}‘ -t {threads} {input} | ”
“samtools view -Sb – > {output}) 2> {log}”
rule samtools_sort:
input:
“mapped_reads/{sample}.bam”
output:
protected(“sorted_reads/{sample}.bam”)
shell:
“samtools sort -T sorted_reads/{wildcards.sample} ”
“-O bam {input} > {output}”
rule samtools_index:
input:
“sorted_reads/{sample}.bam”
output:
“sorted_reads/{sample}.bam.bai”
shell:
“samtools index {input}”
rule bcftools_call:
input:
fa=”data/genome.fa”,
bam=expand(“sorted_reads/{sample}.bam”, sample=config[“samples”]),
bai=expand(“sorted_reads/{sample}.bam.bai”, sample=config[“samples”])
output:
“calls/all.vcf”
params:
rate=config[“prior_mutation_rate”]
log:
“logs/bcftools_call/all.log”
shell:
“(bcftools mpileup -f {input.fa} {input.bam} | ”
“bcftools call -mv -P {params.rate} – > {output}) 2> {log}”
rule plot_quals:
input:
“calls/all.vcf”
output:
“plots/quals.svg”
script:
“scripts/plot-quals.py”
好了小果今天的讲解就到这里了,下期再见~