Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Building a workflow with dynamic input without using deprecated dynamic API

I'd like to make a workflow which downloads the list of some FASTQ files from the remote server, checks md5 and runs some post-processing, e.g. aligning.

I understand how to implement this using two workflows:

  1. first download fastq files list file, e.g. md5 file.

  2. read the md5 file content and create corresponding targets in all rule for desired resulting files.

I'd like to do this in a single workflow. The incorrect workflow below shows the idea what I'd like to achieve.

  • in all rule input: section I don't know {sample} values before md5 file is download and parsed

  • I've tried to play with dynamic, checkpoints and subforkflows, but failed to achieve the desired result. As for dynamic I've managed only to implement this workflow only for dynamic("fastq/{sample}.fq.gz.md5") output.

  • Also, I'm interested in a solution which doesn't use dynamic because it is deprecated.

rule all:
    input:
         "md5",
         "bams/{sample}.bam",

rule download_files_list:
    output: "md5"
    #shell: "wget {}".format(config["url_files_list"])
    run:
        # For testing instead of downloading:
        content = """
        bfe583337fd68b3  ID_001_1.fq.gz
        1636b6756daa65f  ID_001_2.fq.gz
        0428baf25307249  ID_002_1.fq.gz
        de33d81ba5bfa62  ID_002_2.fq.gz
        """.strip()
        with open(output[0], mode="w") as f:
            print(content, file=f)

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: "mkdir -p fastq && awk '{{ print $0 > (\"fastq/\" $2 \".md5\") }}' {input}"

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task
like image 314
Roman Chernyatchik Avatar asked Jan 26 '23 21:01

Roman Chernyatchik


2 Answers

I've seen a lot of confusion over how to use the new checkpoint feature. Here is an illustrative example, simplified:

shell.prefix('set -vexu pipefail; ')

rule finish:
        input:
                "D/all.txt"

checkpoint A:
        output:
                mydir = directory("A")
        shell: """
                mkdir -p A
                N=$(( $RANDOM % 7 + 1))
                echo "N=$N"
                # Create a number of files. (
                for i in $(seq 1 $N); do
                        echo $i > "A/$i.txt"
                done
        """

rule B:
        output:
                txt = "B/{i}.txt",
        input:
                txt = "A/{i}.txt",
        shell: """
                mkdir -p B
                cp -f {input.txt} {output.txt}
        """

rule C:
        output:
                txt = "C/{i}.txt",
        input:
                txt = "B/{i}.txt",
        shell: """
                mkdir -p C
                cp -f {input.txt} {output.txt}
        """

def gatherForD_fromC_basedOnA(wildcards):
        checkpoint_output = checkpoints.A.get(**wildcards).output.mydir
        # That will raise if not ready.

        ivals = glob_wildcards(os.path.join(checkpoint_output,
                        "{i}.txt")).i
        print("ivals={}".format(ivals))
        return expand("C/{i}.txt", i=ivals)

rule D:
        output:
                combined = "D/all.txt",
        input:
                gathered = gatherForD_fromC_basedOnA,
        shell: """
                mkdir -p D
                cat {input.gathered} > {output.combined}
        """

Copy to snakefile and run it with

snakemake --verbose -p
  • Checkpoint/Rule A outputs a random number of files. (You could base those on an "input" section instead, of course.)
  • Rules B and C are parallel rules using standard snakemake "wildcards".
  • Rule D takes an unknown number of inputs, based on an input-generating function.
  • Function gatherForD_fromC_basedOnA waits for the output of checkpoint-rule A, but it names the output of rule C, which is finally consumed by rule D. As a result, snakemake knows what D will consume (after A is done). So it knows what C must produce. So it knows what B must produce.
  • Finally, rule finish waits for a specific, known file.
like image 128
cdunn2001 Avatar answered Jan 29 '23 09:01

cdunn2001


You could download the list of fastq files and extract from there the list of samples using pure python code before snakemake rules kicks in:

def download_files_list(output):
    """Download the list of fastq files and return the list
    of samples
    """
    content = """
    bfe583337fd68b3  ID_001_1.fq.gz
    1636b6756daa65f  ID_001_2.fq.gz
    0428baf25307249  ID_002_1.fq.gz
    de33d81ba5bfa62  ID_002_2.fq.gz
    """.strip()
    with open(output, mode="w") as f:
        print(content, file=f)
    return ['ID_001_1', 'ID_001_2', 'ID_002_1', 'ID_002_2']    

samples= download_files_list("md5")
wildcard_constraints:
    sample= '|'.join([re.escape(x) for x in samples]),

rule all:
    input:
         expand("bams/{sample}.bam", sample= samples),

rule fastq_md5_files:
    input: "md5"
    output: "fastq/{sample}.fq.gz.md5"
    shell: """awk '{{ print $0 > ("fastq/" $2 ".md5") }}' {input}"""

rule download_fastq_and_check_md5:
    input: "fastq/{sample}.fq.gz.md5"
    output: "fastq/{sample}.fq.gz"
    #shell: "wget {}/{{sample}} && md5sum --check {{input}}".format(config["url_file_prefix"])
    shell: "touch {output}" 

rule align_fastq:
    input: "fastq/{sample}.fq.gz"
    output: "bams/{sample}.bam"
    shell: "touch {output}" # aligning task

(I'm curious myself about a more snakemake-ish solution using checkpoints or similar)

like image 34
dariober Avatar answered Jan 29 '23 10:01

dariober