diff --git a/bin/collate_stats.py b/bin/collate_stats.py new file mode 100755 index 0000000..ce7920b --- /dev/null +++ b/bin/collate_stats.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 + +import argparse +import glob +import os +import pathlib +import re +import sys + + +""" + 93 chimeras.qc.txt + 106 flagstats.txt + 56 orphans.qc.txt + 106 qc.txt + 106 raw.txt + 1 reports + 106 singles.qc.txt +""" + +""" +input -> + +O2.UC3-0.chimeras.qc.txt O2.UC3-0.orphans.qc.txt O2.UC3-0.raw.txt O2.UC3-0.singles.raw.txt +O2.UC3-0.flagstats.txt O2.UC3-0.qc.txt O2.UC3-0.singles.qc.txt + +prefix, _ = os.path.splitext(f) -> + +O2.UC3-0.chimeras.qc O2.UC3-0.orphans.qc O2.UC3-0.raw O2.UC3-0.singles.raw +O2.UC3-0.flagstats O2.UC3-0.qc O2.UC3-0.singles.qc + +prefix, suffix = os.path.splitext(prefix) -> + +O2.UC3-0.chimeras,qc O2.UC3-0.orphans,qc O2.UC3-0,raw O2.UC3-0.singles,raw +O2.UC3-0.flagstats O2.UC3-0,qc O2.UC3-0.singles,qc + + +""" + +def collect_readcounts(d): + readcounts = {} + files = glob.iglob(os.path.join(d, "*.txt")) + + for f in files: + prefix, _ = os.path.splitext(f) + prefix, suffix = os.path.splitext(prefix) + if suffix == ".raw": + prefix, suffix = os.path.splitext(prefix) + if suffix == ".singles": + suffix = ".singles.raw" + else: + prefix += suffix + suffix = ".raw" + elif suffix == ".qc": + prefix, suffix = os.path.splitext(prefix) + if suffix not in (".chimeras", ".singles", ".orphans"): + prefix += suffix + suffix = ".main" + else: + ... + else: + continue + + count_data = open(f).read().strip() + n_mates = int(count_data[0]) + n_reads = int(count_data.split("\t")[1]) + + prefix = os.path.basename(prefix) + + readcounts.setdefault(prefix, {})[suffix] = n_mates * n_reads + if suffix == ".raw": + readcounts[prefix]["paired_end"] = n_mates == 2 + readcounts[prefix]["single_end"] = readcounts[prefix].get("single_end", False) or n_mates == 1 + if suffix == ".singles.raw": + readcounts[prefix]["single_end"] = True + + return readcounts + + + + +def collect_readcounts_old(d): + readcounts = {} + if os.path.exists(os.path.join(d, "raw_counts")): + walk = os.walk(os.path.join(d, "raw_counts")) + pwd, dirs, files = next(walk) + + for f in files: + prefix, suffix = os.path.splitext(f) + if suffix == ".txt": + prefix, suffix = os.path.splitext(prefix) + if suffix not in (".chimeras", ".singles", ".orphans"): + prefix, suffix = prefix + suffix, ".main" + + count_data = open(os.path.join(pwd, f)).read().strip() + n_mates = int(count_data[0]) + n_reads = int(count_data.split("\t")[1]) + + readcounts.setdefault(prefix, {})[suffix] = n_mates * n_reads + if suffix == ".main": + readcounts[prefix]["paired_end"] = n_mates == 2 + readcounts[prefix]["single_end"] = readcounts[prefix].get("single_end", False) or n_mates == 1 + if suffix == ".singles": + readcounts[prefix]["single_end"] = True + + + return readcounts + + +def collect_flagstats(d): + + def extract(line): + line = line.strip().split(" ") + return int(line[0]) + int(line[2]) + + flagstats = {} + + files = glob.iglob(os.path.join(d, "*.flagstats.txt")) + for f in files: + lines = open(f).readlines() + sample = os.path.basename(f).replace(".flagstats.txt", "") + flagstats[sample] = { + "primary_alignments": extract(lines[0x1]), + "secondary_alignments": extract(lines[0x2]), + "npaired_mapped": extract(lines[0xC]), + "nsingles_mapped": extract(lines[0xD]) + (extract(lines[0x7]) - extract(lines[0x8])), + } + flagstats[sample]["n_alignments"] = flagstats[sample]["primary_alignments"] + flagstats[sample]["secondary_alignments"] + + return flagstats + + + + + +def collect_flagstats_old(d): + + def extract(line): + line = line.strip().split(" ") + return int(line[0]) + int(line[2]) + + flagstats = {} + if os.path.exists(os.path.join(d, "stats")): + walk = os.walk(os.path.join(d, "stats")) + pwd, dirs, files = next(walk) + + for f in files: + lines = open(os.path.join(pwd, f)).readlines() + sample = f.replace(".flagstats.txt", "") + flagstats[sample] = { + "primary_alignments": extract(lines[0x1]), + "secondary_alignments": extract(lines[0x2]), + "npaired_mapped": extract(lines[0xC]), + "nsingles_mapped": extract(lines[0xD]) + (extract(lines[0x7]) - extract(lines[0x8])), + } + flagstats[sample]["n_alignments"] = flagstats[sample]["primary_alignments"] + flagstats[sample]["secondary_alignments"] + + return flagstats + + +def write_output(readcounts, flagstats, outfile): + samples = sorted(set(readcounts).union(flagstats)) + + header = ["sample", "pe", "se", "n_reads", "main_lib", "add_lib"] + header += ["n_clean", "clean_paired", "clean_single", "qc_orphans", "dc_orphans"] + header += ["n_aln", "pri_aln", "sec_aln", "pe_aln", "se_aln"] + print(*header, sep="\t", file=outfile) + + for sample in samples: + + + + if readcounts[sample].get(".main") is None: + try: + readcounts[sample][".main"] = readcounts[sample][".singles"] + except KeyError: + readcounts[sample][".main"] = -1 + print(f"WARNING: Could neither find .singles nor .main counts for {sample}.", file=sys.stderr) + else: + del readcounts[sample][".singles"] + + if readcounts[sample].get(".raw") is None: + try: + readcounts[sample][".raw"] = readcounts[sample][".singles.raw"] + except KeyError: + readcounts[sample][".main"] = -1 + print(f"WARNING: Could find neither .raw nor .singles.raw for {sample}.", file=sys.stderr) + else: + del readcounts[sample][".singles.raw"] + + line = ( + sample, + readcounts[sample].get("paired_end", False), + readcounts[sample].get("single_end", False), + ) + + line += ( + readcounts[sample].get(".raw", 0) + readcounts[sample].get(".singles.raw", 0), + readcounts[sample].get(".raw", 0), + readcounts[sample].get(".singles.raw", 0) + ) + + line += ( + readcounts[sample].get(".main", 0) + readcounts[sample].get(".singles", 0), + readcounts[sample].get(".main", 0), + readcounts[sample].get(".singles", 0), + readcounts[sample].get(".orphans", 0), + readcounts[sample].get(".chimeras", 0) + ) + + + # line = [sample, readcounts[sample].get("paired_end", False), readcounts[sample].get("single_end", False), sum(v for k, v in readcounts[sample].items() if k[0] == ".")] + # line += (readcounts[sample].get(key, 0) for key in (".main", ".singles", ".orphans", ".chimeras")) + line += tuple( + flagstats.get(sample, {}).get(key, 0) + for key in ( + "n_alignments", "primary_alignments", + "secondary_alignments", "npaired_mapped", "nsingles_mapped" + ) + ) + + print(*line, sep="\t", file=outfile) + + + +def main(): + + ap = argparse.ArgumentParser() + ap.add_argument("input_dir", type=str, default=".") + args = ap.parse_args() + + readcounts = collect_readcounts(args.input_dir) + flagstats = collect_flagstats(args.input_dir) + + # pathlib.Path(os.path.join(args.input_dir, "reports")).mkdir(exist_ok=True, parents=True) + + #with open(os.path.join(args.input_dir, "reports", "library_stats.tsv"), "wt") as _out: + write_output(readcounts, flagstats, sys.stdout) + + +if __name__ == "__main__": + main() + +""" +0 299821609 + 0 in total (QC-passed reads + QC-failed reads) +1 51499902 + 0 primary +2 244967945 + 0 secondary +3 3353762 + 0 supplementary +4 0 + 0 duplicates +5 0 + 0 primary duplicates +6 299821609 + 0 mapped (100.00% : N/A) +7 51499902 + 0 primary mapped (100.00% : N/A) +8 47968663 + 0 paired in sequencing +9 23987824 + 0 read1 +A 23980839 + 0 read2 +B 23298984 + 0 properly paired (48.57% : N/A) +C 46949898 + 0 with itself and mate mapped +D 1018765 + 0 singletons (2.12% : N/A) +E 23643000 + 0 with mate mapped to a different chr +F 9277385 + 0 with mate mapped to a different chr (mapQ>=5) +""" diff --git a/bin/prepare_fastqs.py b/bin/prepare_fastqs.py new file mode 100755 index 0000000..d22598a --- /dev/null +++ b/bin/prepare_fastqs.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 + +import argparse +import itertools +import logging +import os +import pathlib +import re +import shutil +import subprocess +import sys + +from collections import Counter + + +logging.basicConfig( + level=logging.DEBUG, + format='[%(asctime)s] %(message)s' +) + +logger = logging.getLogger(__name__) + + +def check_pairwise(r1, r2): + """ Checks if two sets of read files contain the same prefixes. + + Input: + - r1/r2: lists of tuples (prefix, filename) of R1/R2 paired-end read files + + Raises error if one prefix is not found in either r1 or r2. + + """ + r1_r2 = tuple(prefix[:-1] for prefix, _ in itertools.chain(r1, r2)) + for prefix in set(r1_r2): + if r1_r2.count(prefix) != 2: + raise ValueError(f"Missing mates for prefix {prefix}.") + + +def transfer_file(source, dest, remote_input=False): + """ Transfers a file depending on its location and compressed state. + + Input: + - path to source file + - path to destination file + - whether source file is considered to be located on a remote file system + + """ + resolved_src = pathlib.Path(source).resolve() + + if os.path.splitext(source)[1][1:] in ("gz", "bz2"): + if remote_input: + # if file is on remote file system, copy it to destination + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=copy', source, dest, remote_input) + shutil.copyfile(resolved_src, dest) + else: + # if file is compressed and on local fs, just symlink it + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=symlink', source, dest, remote_input) + pathlib.Path(dest).symlink_to(resolved_src) + else: + # if file is not compressed, gzip it to destination + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=gzip', source, dest, remote_input) + with open(dest, "wt") as _out: + subprocess.run(("gzip", "-c", resolved_src), stdout=_out) + + +def transfer_multifiles(files, dest, remote_input=False, compression=None): + """ Transfers a set of files depending on their location and compressed state. + + Input: + - list of source file paths + - path to destination file + - whether source files are considered to be located on a remote file system + - the compression type of the files (supported: None, gz, bz2) + """ + + if len(files) > 1: + src_files = tuple(os.path.abspath(f) for f in files) # tuple(f.resolve() for f in files) + cat_cmd = ("cat", ) + src_files + + if compression in ("gz", "bz2"): + # multiple compressed files can just be concatenated + logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate', compression, remote_input) + with open(dest, "wt") as _out: + subprocess.run(cat_cmd, stdout=_out) + else: + # multiple uncompressed files will be cat | gzipped + logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate+gzip', compression, remote_input) + cat_pr = subprocess.Popen(cat_cmd, stdout=subprocess.PIPE) + with open(dest, "wt") as _out: + subprocess.run(("gzip", "-c", "-"), stdin=cat_pr.stdout, stdout=_out) + + else: + logging.debug('transfer_multifiles: single file, source=%s, dest=%s, remote_input=%s, action=defer->transfer_file', files[0], dest, remote_input) + transfer_file(files[0], dest, remote_input=remote_input) + + +def process_sample( + sample, fastqs, output_dir, + fastq_suffix_pattern, + remove_suffix=None, remote_input=False, +): + """ Checks if a set of fastq files in a directory is a valid collection + and transfers files to a destination dir upon success. + + Input: + - sample_id + - list of fastq files + - path to output directory + - suffix to strip off from filenames (e.g. _001) + - whether fastq files are located on remote file system + """ + + if not fastqs: + ... + elif len(fastqs) == 1: + # remove potential "single(s)" string from single fastq file name prefix + sample_sub = re.sub(r"[._]singles?", "", sample) + # 20221018: and attach it at the end of the sample name + # - this might be a temporary fix, but @93a73d0 + # single-end samples without .singles-suffix cause problems + # with fastqc results in the collate step + sample = sample_sub + ".singles" + sample_dir = os.path.join(output_dir, sample) + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + + dest_compression = fastqs[0][fastqs[0].rfind(".") + 1:] + + dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") + transfer_file(fastqs[0], dest, remote_input=remote_input) + + elif fastqs: + + # check if all fastq files are compressed the same way + suffixes = Counter( + f[f.rfind("."):] in (".gz", ".bz2") for f in fastqs + ) + + if len(suffixes) > 1: + raise ValueError(f"sample: {sample} has mixed compressed and uncompressed input files. Please check.") + + if suffixes.most_common()[0][0]: + # all compressed + suffixes = Counter( + f[f.rfind(".") + 1:] for f in fastqs + ) + if len(suffixes) > 1: + raise ValueError(f"sample: {sample} has mixed gzip and bzip2 files. Please check.") + dest_compression = compression = suffixes.most_common()[0][0] + else: + dest_compression, compression = "gz", None + + # extract the file name prefixes + prefixes = [re.sub(fastq_suffix_pattern, "", os.path.basename(f)) for f in fastqs] + if remove_suffix: + # remove suffix pattern if requested + prefixes = [re.sub(remove_suffix + r"$", "", p) for p in prefixes] + + print("PRE", prefixes, file=sys.stderr) + + # partition fastqs into R1, R2, and 'other' sets + r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)] + r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)] + others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2}))) + + # check if R1/R2 sets have equal sizes or are empty + # R1 empty: potential scRNAseq (or any protocol with barcode reads in R1) + # R2 empty: typical single end reads with (R?)1 suffix + assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length" + + # if R1 and R2 are of equal size, check if the prefixes match + if len(r1) == len(r2) and r1: + check_pairwise(r1, r2) + + # sort R1/R2 for concatenation, get rid off prefixes + r1 = sorted(f for _, f in r1) + r2 = sorted(f for _, f in r2) + + print("R1", r1, file=sys.stderr) + print("R2", r2, file=sys.stderr) + print("others", others, file=sys.stderr, flush=True) + + sample_dir = os.path.join(output_dir, sample) + + if r1 or r2: + + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + + if r1: + # if R1 is not empty, transfer R1-files + dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") + transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression) + if r2: + # if R2 is not empty, transfer R2-files, + # if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end + target_r = "R2" if r1 else "R1" + dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}") + transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression) + + if others: + # if single-end reads exist, + # transfer them to .singles + # these will be processed independently and merged with the paired-end reads + # at a later stage + sample_dir = sample_dir + ".singles" + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + dest = os.path.join(sample_dir, f"{sample}.singles_R1.fastq.{dest_compression}") + transfer_multifiles(others, dest, remote_input=remote_input, compression=compression) + + +def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes): + """ Checks if a file is a fastq file (compressed or uncompressed.) + + Input: + - filename + + Output: + - true if file is fastq else false + + """ + prefix, suffix = os.path.splitext(f) + if suffix in valid_fastq_suffixes: + return True + if suffix in valid_compression_suffixes: + _, suffix = os.path.splitext(prefix) + return suffix in valid_fastq_suffixes + return False + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("-i", "--input_dir", type=str, default=".") + ap.add_argument("-o", "--output_dir", type=str, default="prepared_samples") + ap.add_argument("-p", "--prefix", type=str, required=True) + ap.add_argument("--remote-input", action="store_true") + ap.add_argument("--remove-suffix", type=str, default=None) + ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq") + ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2") + + args = ap.parse_args() + + valid_fastq_suffixes = tuple(f".{suffix}" for suffix in args.valid_fastq_suffixes.split(",")) + print(valid_fastq_suffixes) + valid_compression_suffixes = tuple(f".{suffix}" for suffix in args.valid_compression_suffixes.split(",")) + print(valid_compression_suffixes) + + fastq_file_suffix_pattern = r"[._](" + \ + args.valid_fastq_suffixes.replace(",", "|") + \ + ")([._](" + \ + args.valid_compression_suffixes.replace(",", "|") + \ + "))?$" + + fastq_mate_pattern = r"([._R][12])$" + + def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffixes): + return sorted( + os.path.join(input_dir, f) + for f in os.listdir(input_dir) + if is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes) + ) + + try: + pwd, dirs, _ = next(os.walk(args.input_dir)) + except StopIteration: + raise ValueError(f"Could not find input directory {args.input_dir} ({os.path.abspath(args.input_dir)})") + + samples = {} + pathlib.Path(args.output_dir).mkdir(parents=True, exist_ok=True) + + for sample_dir in dirs: + sample, sample_dir = sample_dir, os.path.join(pwd, sample_dir) + + samples.setdefault(sample, []).extend( + collect_fastq_files(sample_dir, valid_fastq_suffixes, valid_compression_suffixes) + ) + + root_fastqs = collect_fastq_files(args.input_dir, valid_fastq_suffixes, valid_compression_suffixes) + + if samples and root_fastqs: + raise ValueError("Found {len(root_fastqs)} fastq files in input directory together with {len(samples)} sample directories. Please check input data.") + elif root_fastqs: + for f in root_fastqs: + sample = re.sub(fastq_file_suffix_pattern, "", os.path.basename(f)) + sample = re.sub(fastq_mate_pattern, "", sample) + samples.setdefault(sample, []).append(f) + + # check and transfer the files + for sample, fastqs in samples.items(): + try: + process_sample( + sample, fastqs, args.output_dir, + fastq_file_suffix_pattern, + remove_suffix=args.remove_suffix, remote_input=args.remote_input + ) + except Exception as e: + raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/bin/transfer.py b/bin/transfer.py new file mode 100755 index 0000000..cb4fc2c --- /dev/null +++ b/bin/transfer.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +import argparse +import gzip +import os +import pathlib +import shutil +import subprocess +import sys + + +def main(): + + ap = argparse.ArgumentParser() + ap.add_argument("--input_dir", "-i", type=str, default=".") + ap.add_argument("--output_dir", "-o", type=str, default="fastq") + args = ap.parse_args() + + pathlib.Path(args.output_dir).mkdir(parents=True, exist_ok=True) + + for f in sorted(os.listdir(args.input_dir)): + full_f = os.path.join(args.input_dir, f) + if pathlib.Path(full_f).is_symlink() and f != os.path.basename(__file__): + link_target = pathlib.Path(full_f).resolve() + sample = os.path.basename(os.path.dirname(link_target)) + print(f, sample) + if not sample: + raise NotImplementedError("Flat-directories not implemented.") + sample_dir = os.path.join(args.output_dir, sample) + print(sample_dir) + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + dest = os.path.join(sample_dir, f) + if f.endswith(".gz"): + shutil.copyfile(link_target, dest) + else: + with open(dest + ".gz", "wt") as _out: + subprocess.run(("gzip", "-c", full_f), stdout=_out) + + + + + + + ... + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/config/params.yml b/config/params.yml index d8f0c9f..c666aad 100644 --- a/config/params.yml +++ b/config/params.yml @@ -43,10 +43,10 @@ qc_params_shotgun: "qtrim=rl trimq=3 maq=25 ktrim=r k=23 mink=11 hdist=1 ftm=5 e # kraken2 parameters # kraken2 db path -kraken_database: "/g/scb/zeller/jawirbel/total_RNAseq/databases/kraken2_standard" +# kraken_database: "/g/scb/zeller/jawirbel/total_RNAseq/databases/kraken2_standard" # kraken2_min_hit_groups -kraken2_min_hit_groups: 10 +# kraken2_min_hit_groups: 10 # pathseq parameters diff --git a/config/run.config b/config/run.config index 8dc5437..60eea65 100644 --- a/config/run.config +++ b/config/run.config @@ -23,6 +23,10 @@ profiles { process { cache = "lenient" container = "oras://ghcr.io/cschu/vortex_knight@sha256:f01a08ba83c59e9ddb5ce69c2f3bd2179e13122e631f4836444699a9f064dd8d" + withName: prepare_fastqs { + executor = 'local' + container = null + } withName: fastqc { errorStrategy = {task.attempt <= 3 ? "retry" : "ignore"} cpus = 2 diff --git a/main.nf b/main.nf index 005a7a6..6edf7c5 100644 --- a/main.nf +++ b/main.nf @@ -2,182 +2,42 @@ nextflow.enable.dsl=2 -include { bam2fq } from "./nevermore/modules/converters/bam2fq" -include { fq2bam } from "./nevermore/modules/converters/fq2bam" -include { prepare_fastqs } from "./nevermore/modules/converters/prepare_fastqs" -include { nevermore_simple_preprocessing } from "./nevermore/workflows/nevermore" -include { classify_sample } from "./nevermore/modules/functions" -include { remove_host_kraken2; remove_host_kraken2_individual } from "./nevermore/modules/decon/kraken2" -include { flagstats } from "./nevermore/modules/stats" -include { bam_analysis; fastq_analysis } from "./vlight/workflows/vlight" - - -def run_kraken2 = (!params.skip_kraken2 || params.run_kraken2) -def run_pathseq = (!params.skip_pathseq || params.run_pathseq) - -def get_basecounts = (!params.skip_basecounts || params.run_basecounts); -def convert_fastq2bam = (run_pathseq || get_basecounts); - -def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing) - -def run_bam_analysis = run_pathseq -def run_fastq_analysis = run_kraken2 - - -process collate_results { - publishDir params.output_dir, mode: params.publish_mode - - input: - path(results) - path(collate_script) - path(gtdb_markers) - - output: - path("collated/*.rds"), emit: collated, optional: true - - script: - """ - mkdir -p collated/ - - mkdir -p kraken2/ - (mv *kraken2_report.txt kraken2/) || : - - mkdir -p pathseq/ - (mv *pathseq.scores pathseq/) || : - - mkdir -p libsize/ - (mv *libsize.txt libsize/) || : - - mkdir -p liblayout/ - (mv *is_paired.txt liblayout/) || : - - mkdir -p flagstats/ - (mv *flagstats.txt flagstats/) || : - - mkdir -p raw_counts/ - (mv *.txt raw_counts/) || : - - Rscript ${collate_script} \ - --libdir \$(dirname \$(readlink ${collate_script})) \ - --gtdb_markers ${gtdb_markers} \ - --kraken2_res_path kraken2/ \ - --PathSeq_res_path pathseq/ \ - --libsize_res_path libsize/ \ - --lib_layout_res_path liblayout/ \ - --N_raw_counts_path raw_counts/ \ - --out_folder collated/ - """ +include { fastq_input; bam_input } from "./nevermore/workflows/input" +include { vlight_main } from "./vlight/workflows/vlight" + + +if (params.input_dir && params.remote_input_dir) { + log.info """ + Cannot process both --input_dir and --remote_input_dir. Please check input parameters. + """.stripIndent() + exit 1 +} else if (!params.input_dir && !params.remote_input_dir) { + log.info """ + Neither --input_dir nor --remote_input_dir set. + """.stripIndent() + exit 1 } +params.do_bam2fq_conversion = true // vlight bam input requires this -workflow { - - fastq_ch = Channel - //.fromPath(params.input_dir + "/" + "**.{fastq,fq,fastq.gz,fq.gz}") - .fromPath(params.input_dir + "/" + "**[._]{fastq.gz,fq.gz}") - .map { file -> - def sample = file.name.replaceAll(/.?(fastq|fq)(.gz)?$/, "") - sample = sample.replaceAll(/_R?[12]$/, "") - return tuple(sample, file) - } - .groupTuple(sort: true) - .map { classify_sample(it[0], it[1]) } - - bam_ch = Channel - .fromPath(params.input_dir + "/" + "**.bam") - .map { file -> - def sample = file.name.replaceAll(/.bam$/, "") - return tuple(sample, file) - } - .groupTuple(sort: true) - .map { classify_sample(it[0], it[1]) } - - bam2fq(bam_ch) - - bfastq_ch = bam2fq.out.reads - .map { classify_sample(it[0].id, it[1]) } - - prepare_fastqs(fastq_ch) - - results_ch = Channel.empty() - - if (do_preprocessing) { - - raw_fastq_ch = prepare_fastqs.out.reads.concat(bfastq_ch) - - nevermore_simple_preprocessing(raw_fastq_ch) - - preprocessed_ch = nevermore_simple_preprocessing.out.main_reads_out - results_ch = results_ch - .concat(nevermore_simple_preprocessing.out.raw_counts) - .map { sample, files -> files } - - if (params.remove_host) { +def input_dir = (params.input_dir) ? params.input_dir : params.remote_input_dir - if (params.remove_host == "individual") { - remove_host_kraken2_individual(nevermore_simple_preprocessing.out.main_reads_out, params.remove_host_kraken2_db) - - preprocessed_ch = remove_host_kraken2_individual.out.reads - - - } else if (params.remove_host == "pair") { - - remove_host_kraken2(nevermore_simple_preprocessing.out.main_reads_out, params.remove_host_kraken2_db) - - preprocessed_ch = remove_host_kraken2.out.reads - - } - - } +workflow { + if (params.bam_input_pattern) { + bam_input( + Channel.fromPath(params.bam_input_pattern) + ) + fastq_ch = bam_input.out.bamfiles } else { - - preprocessed_ch = prepare_fastqs.out.reads - .concat(bfastq_ch) - - } - - - if (get_basecounts || run_bam_analysis) { - - fq2bam(preprocessed_ch) - - if (get_basecounts) { - - flagstats(fq2bam.out.reads) - - flagstat_results_ch = flagstats.out.flagstats - .concat(flagstats.out.counts) - .concat(flagstats.out.is_paired) - .map { sample, files -> files } - results_ch = results_ch.concat(flagstat_results_ch) - - } - - if (run_bam_analysis) { - - bam_analysis(fq2bam.out.reads) - results_ch = results_ch.concat(bam_analysis.out.results) - - } - - } - - if (run_fastq_analysis) { - - fastq_analysis(preprocessed_ch) - results_ch = results_ch.concat(fastq_analysis.out.results) - - } - - if (!params.skip_collate) { - collate_results( - results_ch.collect(), - "${projectDir}/scripts/ExtractProfiledCounts_210823.R", - params.GTDB_markers + fastq_input( + Channel.fromPath(input_dir + "/*", type: "dir") ) + fastq_ch = fastq_input.out.fastqs } + vlight_main(fastq_ch) + } diff --git a/nevermore/bin/collate_stats.py b/nevermore/bin/collate_stats.py index 471c0ed..ce7920b 100755 --- a/nevermore/bin/collate_stats.py +++ b/nevermore/bin/collate_stats.py @@ -212,7 +212,7 @@ def write_output(readcounts, flagstats, outfile): # line = [sample, readcounts[sample].get("paired_end", False), readcounts[sample].get("single_end", False), sum(v for k, v in readcounts[sample].items() if k[0] == ".")] # line += (readcounts[sample].get(key, 0) for key in (".main", ".singles", ".orphans", ".chimeras")) line += tuple( - flagstats[sample].get(key, 0) + flagstats.get(sample, {}).get(key, 0) for key in ( "n_alignments", "primary_alignments", "secondary_alignments", "npaired_mapped", "nsingles_mapped" diff --git a/nevermore/bin/prepare_fastqs.py b/nevermore/bin/prepare_fastqs.py index a991eea..d22598a 100755 --- a/nevermore/bin/prepare_fastqs.py +++ b/nevermore/bin/prepare_fastqs.py @@ -2,119 +2,297 @@ import argparse import itertools +import logging import os import pathlib import re +import shutil +import subprocess import sys +from collections import Counter -def check_pairwise(r1, r2): - pairs = {} - for p1, p2 in itertools.product( - tuple(p[:-1] for p, _ in r1), - tuple(p[:-1] for p, _ in r2) - ): - pairs.setdefault(p1, set()).add(1) - pairs.setdefault(p2, set()).add(2) - - for p, counts in pairs.items(): - if len(counts) < 2: - raise ValueError(f"Missing mates for prefix {p}, files={str(counts)}") - elif len(counts) > 2: - raise ValueError(f"Too many files for prefix {p}, files={str(counts)}") - else: - ... +logging.basicConfig( + level=logging.DEBUG, + format='[%(asctime)s] %(message)s' +) + +logger = logging.getLogger(__name__) -def process_sample(input_dir, sample_id, fastqs, output_dir, remove_suffix=None): +def check_pairwise(r1, r2): + """ Checks if two sets of read files contain the same prefixes. + + Input: + - r1/r2: lists of tuples (prefix, filename) of R1/R2 paired-end read files - print("#!/usr/bin/bash") + Raises error if one prefix is not found in either r1 or r2. - print("RESUF", remove_suffix, file=sys.stderr) + """ + r1_r2 = tuple(prefix[:-1] for prefix, _ in itertools.chain(r1, r2)) + for prefix in set(r1_r2): + if r1_r2.count(prefix) != 2: + raise ValueError(f"Missing mates for prefix {prefix}.") - if len(fastqs) == 1: - sample_id = re.sub(r"[._]singles?", "", sample_id) + ".singles" - pathlib.Path(os.path.join(output_dir, sample_id)).mkdir(parents=True, exist_ok=True) - print(f"cp {os.path.join(input_dir, fastqs[0])} {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz") +def transfer_file(source, dest, remote_input=False): + """ Transfers a file depending on its location and compressed state. + Input: + - path to source file + - path to destination file + - whether source file is considered to be located on a remote file system + """ + resolved_src = pathlib.Path(source).resolve() + if os.path.splitext(source)[1][1:] in ("gz", "bz2"): + if remote_input: + # if file is on remote file system, copy it to destination + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=copy', source, dest, remote_input) + shutil.copyfile(resolved_src, dest) + else: + # if file is compressed and on local fs, just symlink it + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=symlink', source, dest, remote_input) + pathlib.Path(dest).symlink_to(resolved_src) else: - prefixes = [re.sub(r"\.(fastq|fq).gz$", "", f) for f in fastqs] + # if file is not compressed, gzip it to destination + logging.debug('transfer_file: source=%s, dest=%s, remote_input=%s, action=gzip', source, dest, remote_input) + with open(dest, "wt") as _out: + subprocess.run(("gzip", "-c", resolved_src), stdout=_out) + + +def transfer_multifiles(files, dest, remote_input=False, compression=None): + """ Transfers a set of files depending on their location and compressed state. + + Input: + - list of source file paths + - path to destination file + - whether source files are considered to be located on a remote file system + - the compression type of the files (supported: None, gz, bz2) + """ + + if len(files) > 1: + src_files = tuple(os.path.abspath(f) for f in files) # tuple(f.resolve() for f in files) + cat_cmd = ("cat", ) + src_files + + if compression in ("gz", "bz2"): + # multiple compressed files can just be concatenated + logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate', compression, remote_input) + with open(dest, "wt") as _out: + subprocess.run(cat_cmd, stdout=_out) + else: + # multiple uncompressed files will be cat | gzipped + logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate+gzip', compression, remote_input) + cat_pr = subprocess.Popen(cat_cmd, stdout=subprocess.PIPE) + with open(dest, "wt") as _out: + subprocess.run(("gzip", "-c", "-"), stdin=cat_pr.stdout, stdout=_out) + + else: + logging.debug('transfer_multifiles: single file, source=%s, dest=%s, remote_input=%s, action=defer->transfer_file', files[0], dest, remote_input) + transfer_file(files[0], dest, remote_input=remote_input) + + +def process_sample( + sample, fastqs, output_dir, + fastq_suffix_pattern, + remove_suffix=None, remote_input=False, +): + """ Checks if a set of fastq files in a directory is a valid collection + and transfers files to a destination dir upon success. + + Input: + - sample_id + - list of fastq files + - path to output directory + - suffix to strip off from filenames (e.g. _001) + - whether fastq files are located on remote file system + """ + + if not fastqs: + ... + elif len(fastqs) == 1: + # remove potential "single(s)" string from single fastq file name prefix + sample_sub = re.sub(r"[._]singles?", "", sample) + # 20221018: and attach it at the end of the sample name + # - this might be a temporary fix, but @93a73d0 + # single-end samples without .singles-suffix cause problems + # with fastqc results in the collate step + sample = sample_sub + ".singles" + sample_dir = os.path.join(output_dir, sample) + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + + dest_compression = fastqs[0][fastqs[0].rfind(".") + 1:] + + dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") + transfer_file(fastqs[0], dest, remote_input=remote_input) + + elif fastqs: + + # check if all fastq files are compressed the same way + suffixes = Counter( + f[f.rfind("."):] in (".gz", ".bz2") for f in fastqs + ) + + if len(suffixes) > 1: + raise ValueError(f"sample: {sample} has mixed compressed and uncompressed input files. Please check.") + + if suffixes.most_common()[0][0]: + # all compressed + suffixes = Counter( + f[f.rfind(".") + 1:] for f in fastqs + ) + if len(suffixes) > 1: + raise ValueError(f"sample: {sample} has mixed gzip and bzip2 files. Please check.") + dest_compression = compression = suffixes.most_common()[0][0] + else: + dest_compression, compression = "gz", None + + # extract the file name prefixes + prefixes = [re.sub(fastq_suffix_pattern, "", os.path.basename(f)) for f in fastqs] if remove_suffix: + # remove suffix pattern if requested prefixes = [re.sub(remove_suffix + r"$", "", p) for p in prefixes] print("PRE", prefixes, file=sys.stderr) - r1 = [(p, f) for p, f in zip(prefixes, fastqs) if p.endswith("1")] - r2 = [(p, f) for p, f in zip(prefixes, fastqs) if p.endswith("2")] - others = set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2}) + # partition fastqs into R1, R2, and 'other' sets + r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)] + r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)] + others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2}))) - assert len(r2) == 0 or len(r1) == len(r2), "R1/R2 sets are not of the same length" - check_pairwise(r1, r2) + # check if R1/R2 sets have equal sizes or are empty + # R1 empty: potential scRNAseq (or any protocol with barcode reads in R1) + # R2 empty: typical single end reads with (R?)1 suffix + assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length" - r1 = [os.path.join(input_dir, f) for f in sorted(f for _, f in r1)] - r2 = [os.path.join(input_dir, f) for f in sorted(f for _, f in r2)] + # if R1 and R2 are of equal size, check if the prefixes match + if len(r1) == len(r2) and r1: + check_pairwise(r1, r2) + # sort R1/R2 for concatenation, get rid off prefixes + r1 = sorted(f for _, f in r1) + r2 = sorted(f for _, f in r2) print("R1", r1, file=sys.stderr) print("R2", r2, file=sys.stderr) + print("others", others, file=sys.stderr, flush=True) - r1_cmd = " ".join(["cat"] + r1) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz" - print(r1_cmd) - - if r2: - r2_cmd = " ".join(["cat"] + r2) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R2.fastq.gz" - print(r2_cmd) - - if others: - sample_id = sample_id + ".singles" - pathlib.Path(os.path.join(output_dir, sample_id)).mkdir(parents=True, exist_ok=True) - other_cmd = " ".join(["cat"] + list(others)) + f" > {os.path.join(output_dir, sample_id, sample_id)}_R1.fastq.gz" - print(other_cmd) - - - # ['HD_Path_4_S81_L001_R1_001', 'HD_Path_4_S81_L001_R2_001', 'HD_Path_4_S81_L002_R1_001', 'HD_Path_4_S81_L002_R2_001'] - # ['HD_Path_4_S81_L001_R1', 'HD_Path_4_S81_L001_R2', 'HD_Path_4_S81_L002_R1', 'HD_Path_4_S81_L002_R2'] - + sample_dir = os.path.join(output_dir, sample) + if r1 or r2: + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + if r1: + # if R1 is not empty, transfer R1-files + dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") + transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression) + if r2: + # if R2 is not empty, transfer R2-files, + # if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end + target_r = "R2" if r1 else "R1" + dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}") + transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression) + if others: + # if single-end reads exist, + # transfer them to .singles + # these will be processed independently and merged with the paired-end reads + # at a later stage + sample_dir = sample_dir + ".singles" + pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + dest = os.path.join(sample_dir, f"{sample}.singles_R1.fastq.{dest_compression}") + transfer_multifiles(others, dest, remote_input=remote_input, compression=compression) + + +def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes): + """ Checks if a file is a fastq file (compressed or uncompressed.) + + Input: + - filename + + Output: + - true if file is fastq else false + + """ + prefix, suffix = os.path.splitext(f) + if suffix in valid_fastq_suffixes: + return True + if suffix in valid_compression_suffixes: + _, suffix = os.path.splitext(prefix) + return suffix in valid_fastq_suffixes + return False def main(): - ap = argparse.ArgumentParser() ap.add_argument("-i", "--input_dir", type=str, default=".") ap.add_argument("-o", "--output_dir", type=str, default="prepared_samples") - ap.add_argument("-s", "--sample_id", type=str, required=True) + ap.add_argument("-p", "--prefix", type=str, required=True) + ap.add_argument("--remote-input", action="store_true") ap.add_argument("--remove-suffix", type=str, default=None) + ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq") + ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2") args = ap.parse_args() - pathlib.Path(os.path.join(args.output_dir, args.sample_id)).mkdir(parents=True, exist_ok=True) + valid_fastq_suffixes = tuple(f".{suffix}" for suffix in args.valid_fastq_suffixes.split(",")) + print(valid_fastq_suffixes) + valid_compression_suffixes = tuple(f".{suffix}" for suffix in args.valid_compression_suffixes.split(",")) + print(valid_compression_suffixes) + fastq_file_suffix_pattern = r"[._](" + \ + args.valid_fastq_suffixes.replace(",", "|") + \ + ")([._](" + \ + args.valid_compression_suffixes.replace(",", "|") + \ + "))?$" - walk = os.walk(args.input_dir) - try: - cwd, dirs, files = next(walk) - except StopIteration: - raise StopIteration(f"Invalid directory: {args.input_dir}") + fastq_mate_pattern = r"([._R][12])$" - fastqs = sorted(f for f in files if f.endswith(".fq.gz") or f.endswith(".fastq.gz")) - assert fastqs, "Could not find any fastq files." + def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffixes): + return sorted( + os.path.join(input_dir, f) + for f in os.listdir(input_dir) + if is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes) + ) - print(cwd, fastqs, file=sys.stderr) try: - process_sample(cwd, args.sample_id, fastqs, args.output_dir, remove_suffix=args.remove_suffix) - except Exception as e: - raise ValueError(f"Encountered problems processing sample '{args.sample_id}': {e}.\nPlease check your file names.") - - - + pwd, dirs, _ = next(os.walk(args.input_dir)) + except StopIteration: + raise ValueError(f"Could not find input directory {args.input_dir} ({os.path.abspath(args.input_dir)})") + + samples = {} + pathlib.Path(args.output_dir).mkdir(parents=True, exist_ok=True) + + for sample_dir in dirs: + sample, sample_dir = sample_dir, os.path.join(pwd, sample_dir) + + samples.setdefault(sample, []).extend( + collect_fastq_files(sample_dir, valid_fastq_suffixes, valid_compression_suffixes) + ) + + root_fastqs = collect_fastq_files(args.input_dir, valid_fastq_suffixes, valid_compression_suffixes) + + if samples and root_fastqs: + raise ValueError("Found {len(root_fastqs)} fastq files in input directory together with {len(samples)} sample directories. Please check input data.") + elif root_fastqs: + for f in root_fastqs: + sample = re.sub(fastq_file_suffix_pattern, "", os.path.basename(f)) + sample = re.sub(fastq_mate_pattern, "", sample) + samples.setdefault(sample, []).append(f) + + # check and transfer the files + for sample, fastqs in samples.items(): + try: + process_sample( + sample, fastqs, args.output_dir, + fastq_file_suffix_pattern, + remove_suffix=args.remove_suffix, remote_input=args.remote_input + ) + except Exception as e: + raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.") if __name__ == "__main__": diff --git a/nevermore/modules/align/bwa.nf b/nevermore/modules/align/bwa.nf index b93508f..ba8eeb6 100644 --- a/nevermore/modules/align/bwa.nf +++ b/nevermore/modules/align/bwa.nf @@ -17,14 +17,14 @@ process bwa_mem_align { def sort_reads2 = (sample.is_paired) ? "sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R2.fastq.gz out=${sample.id}_R2.sorted.fastq.gz" : "" def blocksize = "-K 10000000" // shamelessly taken from NGLess - def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam -" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -" + def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -" """ set -e -o pipefail + mkdir -p tmp/ sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=${sample.id}_R1.sorted.fastq.gz ${sort_reads2} bwa mem -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${sample.id}_R1.sorted.fastq.gz ${reads2} | samtools view -F 4 -buSh - | ${sort_cmd} + rm -rvf tmp/ *.sorted.fastq.gz """ - // samtools sort -@ ${sort_cpus} -o ${sample.id}.bam - } diff --git a/nevermore/modules/align/helpers.nf b/nevermore/modules/align/helpers.nf index dc30210..7cf512c 100644 --- a/nevermore/modules/align/helpers.nf +++ b/nevermore/modules/align/helpers.nf @@ -1,6 +1,5 @@ process merge_and_sort { label 'samtools' - publishDir params.output_dir, mode: params.publish_mode input: tuple val(sample), path(bamfiles) @@ -12,19 +11,58 @@ process merge_and_sort { script: def sort_order = (do_name_sort) ? "-n" : "" + def merge_cmd = "" + // need a better detection for this if (bamfiles instanceof Collection && bamfiles.size() >= 2) { - """ - mkdir -p bam/ stats/bam - samtools merge -@ $task.cpus ${sort_order} bam/${sample}.bam ${bamfiles} - samtools flagstats bam/${sample}.bam > stats/bam/${sample}.flagstats.txt - """ + merge_cmd = "samtools merge -@ $task.cpus ${sort_order} bam/${sample}.bam ${bamfiles}" } else { - // i don't like this solution - """ - mkdir -p bam/ stats/bam - ln -s ../${bamfiles[0]} bam/${sample}.bam - samtools flagstats bam/${sample}.bam > stats/bam/${sample}.flagstats.txt - """ + merge_cmd = "ln -s ../${bamfiles[0]} bam/${sample}.bam" } + + """ + mkdir -p bam/ stats/bam/ + ${merge_cmd} + samtools flagstats bam/${sample}.bam > stats/bam/${sample}.flagstats.txt + """ +} + + +process db_filter { + label 'samtools' + + input: + tuple val(sample), path(bam) + path(db_bedfile) + + output: + tuple val(sample), path("filtered_bam/${sample}.bam"), emit: bam + tuple val(sample), path("stats/filtered_bam/${sample}.flagstats.txt"), emit: flagstats + + script: + """ + mkdir -p filtered_bam/ stats/filtered_bam/ + bedtools intersect -u -ubam -a ${bam} -b ${db_bedfile} > filtered_bam/${sample}.bam + samtools flagstats filtered_bam/${sample}.bam > stats/filtered_bam/${sample}.flagstats.txt + """ +} + +process db2bed3 { + input: + path(db) + + output: + path("db.bed3"), emit: db + + script: + """ + cp -v ${db} db.sqlite3 + sqlite3 db.sqlite3 'select seqid,start,end from annotatedsequence;' '.exit' | awk -F \\| -v OFS='\\t' '{print \$1,\$2-1,\$3}' | sort -k1,1 -k2,2g -k3,3g > db.bed3 + rm -v db.sqlite3* + + if [[ ! -s db.bed3 ]]; then + echo 'Empty database!' >> /dev/stderr + exit 1 + fi + """ } diff --git a/nevermore/modules/align/sam_align.nf b/nevermore/modules/align/sam_align.nf new file mode 100644 index 0000000..e65b639 --- /dev/null +++ b/nevermore/modules/align/sam_align.nf @@ -0,0 +1,42 @@ +process minimap2_align { + label 'align' + + input: + tuple val(sample), path(reads) + path(reference) + + output: + tuple val(sample), path("${sample.id}/${sample.id}.sam"), emit: sam + + script: + def reads = (sample.is_paired) ? "${sample.id}_R1.fastq.gz ${sample.id}_R2.fastq.gz" : "${sample.id}_R1.fastq.gz" + def mm_options = "--sam-hit-only -t ${task.cpus} -x sr --secondary=yes -a" + + """ + mkdir -p ${sample.id}/ + minimap2 ${mm_options} --split-prefix ${sample.id}_split ${reference} ${reads} > ${sample.id}/${sample.id}.sam + """ +} + + +process bwa_mem_align { + label 'align' + + input: + tuple val(sample), path(reads) + path(reference) + + output: + tuple val(sample), path("${sample.id}/${sample.id}.sam"), emit: sam + + script: + def reads = (sample.is_paired) ? "${sample.id}_R1.fastq.gz ${sample.id}_R2.fastq.gz" : "${sample.id}_R1.fastq.gz" + def bwa_options = "-a -t ${task.cpus} -K 10000000" + + """ + mkdir -p ${sample.id}/ + bwa mem ${bwa_options} \$(readlink ${reference}) ${reads} |\ + awk -F '\t' -v OFS='\t' 'substr(\$1,1,1) == "@" || !and(\$2, 4)' > ${sample.id}/${sample.id}.sam + """ + +} \ No newline at end of file diff --git a/nevermore/modules/collate.nf b/nevermore/modules/collate.nf index 2b14921..9dbf3ed 100644 --- a/nevermore/modules/collate.nf +++ b/nevermore/modules/collate.nf @@ -1,5 +1,5 @@ process collate_stats { - publishDir params.output_dir, mode: params.publish_mode + // publishDir params.output_dir, mode: params.publish_mode input: path(stats_files) diff --git a/nevermore/modules/converters/bam2fq.nf b/nevermore/modules/converters/bam2fq.nf index e264d74..9f806c0 100644 --- a/nevermore/modules/converters/bam2fq.nf +++ b/nevermore/modules/converters/bam2fq.nf @@ -1,6 +1,4 @@ process bam2fq { - publishDir params.output_dir, mode: params.publish_mode - input: tuple val(sample), path(bam) diff --git a/nevermore/modules/converters/fq2bam.nf b/nevermore/modules/converters/fq2bam.nf index 420937e..3138e5d 100644 --- a/nevermore/modules/converters/fq2bam.nf +++ b/nevermore/modules/converters/fq2bam.nf @@ -8,9 +8,15 @@ process fq2bam { script: def maxmem = task.memory.toGiga() def r2 = (sample.is_paired) ? "in2=${sample.id}_R2.fastq.gz" : "" + def qual_modifier = "" + if (params.pb_reads) { + qual_modifier = "qin=33" + } """ + set -e -o pipefail mkdir -p out/ - reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz ${r2} trimreaddescription=t out=stdout.bam | samtools addreplacerg -r "ID:A" -r "SM:${sample.id}" -o out/${sample.id}.bam - + reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz ${r2} trimreaddescription=t out=stdout.bam ${qual_modifier} | samtools addreplacerg -r "ID:A" -r "SM:${sample.id}" -o out/${sample.id}.bam - """ -} \ No newline at end of file + // https://forum.qiime2.org/t/bug-q2-itsxpresss-dependency-bbmap-cannot-handle-pacbio-ccs/17612/4 +} diff --git a/nevermore/modules/converters/fq2fa.nf b/nevermore/modules/converters/fq2fa.nf index 729e168..612e449 100644 --- a/nevermore/modules/converters/fq2fa.nf +++ b/nevermore/modules/converters/fq2fa.nf @@ -8,9 +8,15 @@ process fq2fa { script: def maxmem = task.memory.toGiga() def r2 = (sample.is_paired) ? "in2=${sample.id}_R2.fastq.gz out2=out/${sample.id}_R2.fasta" : "" + def qual_modifier = "" + if (params.pb_reads) { + qual_modifier = "qin=33" + } """ + set -e -o pipefail mkdir -p out/ - reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=out/${sample.id}_R1.fasta ${r2} trimreaddescription=t + reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=out/${sample.id}_R1.fasta ${r2} trimreaddescription=t ${qual_modifier} """ + // https://forum.qiime2.org/t/bug-q2-itsxpresss-dependency-bbmap-cannot-handle-pacbio-ccs/17612/4 } diff --git a/nevermore/modules/converters/merge_fastqs.nf b/nevermore/modules/converters/merge_fastqs.nf index b375f3a..74c6d65 100644 --- a/nevermore/modules/converters/merge_fastqs.nf +++ b/nevermore/modules/converters/merge_fastqs.nf @@ -6,10 +6,23 @@ process merge_single_fastqs { tuple val(sample), path("merged/${sample.id}_R1.fastq.gz"), emit: fastq script: + + def fastq_in = "" + def prefix = "" + def suffix = "" + if (fastqs instanceof Collection && fastqs.size() == 2) { + prefix = "cat ${fastqs} | gzip -dc - |" + fastq_in = "stdin.fastq" + suffix = " || { ec=\$?; [ \$ec -eq 141 ] && true || (exit \$ec); }" + } else { + fastq_in = "${fastqs[0]}" + } + """ + set -e -o pipefail mkdir -p merged/ - cat *.fastq.gz | sortbyname.sh in=stdin.gz out=merged/${sample.id}_R1.fastq.gz + ${prefix} sortbyname.sh in=${fastq_in} out=merged/${sample.id}_R1.fastq.gz ${suffix} """ - + // https://stackoverflow.com/questions/22464786/ignoring-bash-pipefail-for-error-code-141/72985727#72985727 } diff --git a/nevermore/modules/decon/dehumanise.nf b/nevermore/modules/decon/dehumanise.nf index dadbea2..e279691 100644 --- a/nevermore/modules/decon/dehumanise.nf +++ b/nevermore/modules/decon/dehumanise.nf @@ -1,5 +1,5 @@ process dehumanise { - publishDir "$output_dir", mode: params.publish_mode, pattern: "no_human/*/*.fastq.gz" + // publishDir "$output_dir", mode: params.publish_mode, pattern: "no_human/*/*.fastq.gz" input: tuple val(sample), path(fq) diff --git a/nevermore/modules/profilers/gffquant.nf b/nevermore/modules/profilers/gffquant.nf index 0b50caa..376c605 100644 --- a/nevermore/modules/profilers/gffquant.nf +++ b/nevermore/modules/profilers/gffquant.nf @@ -1,26 +1,65 @@ process run_gffquant { - publishDir "${params.output_dir}", mode: params.publish_mode + label "gffquant" input: - tuple val(sample), path(bam) - path(db) + tuple val(sample), path(alignments) + path(gq_db) output: tuple val(sample), path("profiles/${sample}/*.txt.gz"), emit: results + tuple val(sample), path("logs/${sample}.log") script: - def emapper_version = (params.emapper_version) ? "--emapper_version ${params.emapper_version}" : "" + def gq_output = "-o profiles/${sample}/${sample}" + + def gq_params = "-m ${params.gq_mode} --ambig_mode ${params.gq_ambig_mode}" + gq_params += (params.gq_strand_specific) ? " --strand_specific" : "" + gq_params += (params.gq_unmarked_orphans) ? " --unmarked_orphans" : "" + gq_params += (params.gq_calc_coverage) ? " --calc_coverage" : "" + gq_params += (params.gq_min_seqlen) ? (" --min_seqlen " + params.gq_min_seqlen) : "" + gq_params += (params.gq_min_identity) ? (" --min_identity " + params.gq_min_identity) : "" + gq_params += (params.bam_input_pattern || !params.large_reference) ? (" --format bam") : " --format sam" + + def gq_cmd = "gffquant ${gq_output} ${gq_params} gq_db.sqlite3" + + def mk_aln_sam = "" + if (params.bam_input_pattern) { + + if (params.do_name_sort) { + gq_cmd = "samtools collate -@ ${task.cpus} -O ${alignments} tmp/collated_bam | ${gq_cmd} -" + } else { + gq_cmd = "${gq_cmd} ${alignments}" + } + + } else if (params.large_reference) { + + mk_aln_sam += "echo 'Making alignment stream...'\n" + if (alignments instanceof Collection && alignments.size() >= 2) { + mk_aln_sam += "cat ${sample}.sam > tmp/alignments.sam \n" + mk_aln_sam += "grep -v '^@' ${sample}.singles.sam >> tmp/alignments.sam" + } else { + mk_aln_sam += "ln -s ${alignments[0]} tmp/alignments.sam" + } + gq_cmd = "cat tmp/alignments.sam | ${gq_cmd} -" + + } else { + + gq_cmd = "${gq_cmd} ${alignments}" + + } + """ - echo $sample $bam - mkdir -p logs/ - mkdir -p profiles/ - gffquant ${db} ${bam} -o profiles/${sample}/${sample} ${params.gffquant_params} > logs/${sample}.o 2> logs/${sample}.e + set -e -o pipefail + mkdir -p logs/ tmp/ profiles/ + echo 'Copying database...' + cp -v ${gq_db} gq_db.sqlite3 + ${mk_aln_sam} + ${gq_cmd} &> logs/${sample}.log + rm -rfv gq_db.sqlite3* tmp/ """ } - process collate_feature_counts { - publishDir "${params.output_dir}", mode: params.publish_mode input: tuple val(sample), path(count_tables) diff --git a/nevermore/modules/qc/bbduk.nf b/nevermore/modules/qc/bbduk.nf index 551e2f8..d97356c 100644 --- a/nevermore/modules/qc/bbduk.nf +++ b/nevermore/modules/qc/bbduk.nf @@ -1,6 +1,5 @@ process qc_bbduk { label 'bbduk' - // publishDir path: params.output_dir, mode: params.publish_mode input: tuple val(sample), path(reads) @@ -13,14 +12,16 @@ process qc_bbduk { script: def maxmem = task.memory.toGiga() - def read1 = "in1=${sample.id}_R1.fastq.gz out1=qc_reads/${sample.id}/${sample.id}_R1.fastq.gz" - def read2 = sample.is_paired ? "in2=${sample.id}_R2.fastq.gz out2=qc_reads/${sample.id}/${sample.id}_R2.fastq.gz outs=qc_reads/${sample.id}/${sample.id}.orphans_R1.fastq.gz" : "" + def compression = (reads[0].name.endsWith(".gz")) ? "gz" : "bz2" + read2 = (sample.is_paired) ? "in2=${sample.id}_R2.fastq.gz out2=qc_reads/${sample.id}/${sample.id}_R2.fastq.gz outs=qc_reads/${sample.id}/${sample.id}.orphans_R1.fastq.gz" : "" - trim_params = params.qc_params_shotgun + " ref=${adapters} minlen=${params.qc_minlen}" + def read1 = "in1=${sample.id}_R1.fastq.${compression} out1=qc_reads/${sample.id}/${sample.id}_R1.fastq.gz" + + def trim_params = params.qc_params_shotgun + " ref=${adapters} minlen=${params.qc_minlen}" + def stats_out = "stats=stats/qc/bbduk/${sample.id}.bbduk_stats.txt" """ - mkdir -p qc_reads/${sample.id} - mkdir -p stats/qc/bbduk/ - bbduk.sh -Xmx${maxmem}g t=${task.cpus} ${trim_params} stats=stats/qc/bbduk/${sample.id}.bbduk_stats.txt ${read1} ${read2} + mkdir -p qc_reads/${sample.id} stats/qc/bbduk/ + bbduk.sh -Xmx${maxmem}g t=${task.cpus} ${trim_params} ${stats_out} ${read1} ${read2} """ } diff --git a/nevermore/modules/qc/bbduk_amplicon.nf b/nevermore/modules/qc/bbduk_amplicon.nf index 87f029f..8fce802 100644 --- a/nevermore/modules/qc/bbduk_amplicon.nf +++ b/nevermore/modules/qc/bbduk_amplicon.nf @@ -20,7 +20,7 @@ process qc_bbduk_stepwise_amplicon { trim_params = "ref=${adapters} minlength=${params.qc_minlen}" } - def bbduk_call = "bbduk.sh -Xmx${maxmem} t=${task.cpus} ordered=t trd=t" + def bbduk_call = "bbduk.sh -Xmx${maxmem}g t=${task.cpus} ordered=t trd=t" ref_p5_r1 = (params.primers) ? "literal=" + params.primers.split(",")[0] : "ref=${adapters}" ref_p5_r2 = (params.primers && !sample.is_paired) ? "literal=" + params.primers.split(",")[1] : "ref=${adapters}" @@ -30,16 +30,16 @@ process qc_bbduk_stepwise_amplicon { if (!sample.is_paired) { """ mkdir -p ${sample.id}/ - mkdir -p stats/qc/bbduk/ - mkdir -p qc_reads/ + mkdir -p stats/qc/bbduk/${sample.id}/ + mkdir -p qc_reads/${sample.id}/ ${bbduk_call} ${trim_params} in1=${sample.id}_R1.fastq.gz out1=qc_reads/${sample.id}/${sample.id}_R1.fastq.gz stats=stats/qc/bbduk/${sample.id}/${sample.id}.fwd_bbduk_stats.txt lhist=stats/qc/bbduk/${sample.id}/${sample.id}.p5_lhist.txt ${bbduk_call} in1=qc_reads/${sample.id}/${sample.id}_R1.fastq.gz lhist=stats/qc/bbduk/${sample.id}/${sample.id}_R1.post_lhist.txt """ } else if (params.long_reads) { """ mkdir -p ${sample.id}/ - mkdir -p stats/qc/bbduk/ - mkdir -p qc_reads/ + mkdir -p stats/qc/bbduk/${sample.id}/ + mkdir -p qc_reads/${sample.id}/ mkdir -p tmp/ ${bbduk_call} ${ref_p5_r1} minlength=${params.qc_minlen} ${params.p5_primer_params} in1=${sample.id}_R1.fastq.gz out1=fwd_p5.fastq.gz ${bbduk_call} ${ref_p5_r2} minlength=${params.qc_minlen} ${params.p5_primer_params} in1=${sample.id}_R2.fastq.gz out1=rev_p5.fastq.gz @@ -56,8 +56,8 @@ process qc_bbduk_stepwise_amplicon { } else { """ mkdir -p ${sample.id}/ - mkdir -p stats/qc/bbduk/ - mkdir -p qc_reads/ + mkdir -p stats/qc/bbduk/${sample.id}/ + mkdir -p qc_reads/${sample.id}/ mkdir -p tmp/ ${bbduk_call} ${ref_p5_r1} minlength=${params.qc_minlen} ${params.p5_primer_params} in1=${sample.id}_R1.fastq.gz out1=fwd.fastq.gz ${bbduk_call} ${ref_p5_r2} minlength=${params.qc_minlen} ${params.p5_primer_params} in1=${sample.id}_R2.fastq.gz out1=rev.fastq.gz diff --git a/nevermore/modules/qc/fastqc.nf b/nevermore/modules/qc/fastqc.nf index e100dea..03b7918 100644 --- a/nevermore/modules/qc/fastqc.nf +++ b/nevermore/modules/qc/fastqc.nf @@ -1,6 +1,5 @@ process fastqc { - // publishDir params.output_dir, mode: params.publish_mode - + input: tuple val(sample), path(reads) val(stage) @@ -10,12 +9,15 @@ process fastqc { tuple val(sample), path("stats/${stage}/read_counts/${sample.id}.${stage}.txt"), emit: counts script: - def process_r2 = (sample.is_paired) ? "fastqc -t $task.cpus --extract --outdir=fastqc ${sample.id}_R2.fastq.gz && mv fastqc/${sample.id}_R2_fastqc/fastqc_data.txt fastqc/${sample.id}_R2_fastqc/${sample.id}_R2_fastqc_data.txt" : ""; + def compression = (reads[0].name.endsWith(".gz")) ? "gz" : "bz2" + + def fastqc_cmd = "fastqc -t ${task.cpus} --extract --outdir=fastqc" + def process_r2 = (sample.is_paired) ? "${fastqc_cmd} ${sample.id}_R2.fastq.${compression} && mv fastqc/${sample.id}_R2_fastqc/fastqc_data.txt fastqc/${sample.id}_R2_fastqc/${sample.id}_R2_fastqc_data.txt" : "" """ - set -e + set -e -o pipefail mkdir -p stats/${stage}/read_counts fastqc/ - fastqc -t $task.cpus --extract --outdir=fastqc ${sample.id}_R1.fastq.gz && mv fastqc/${sample.id}_R1_fastqc/fastqc_data.txt fastqc/${sample.id}_R1_fastqc/${sample.id}_R1_fastqc_data.txt + ${fastqc_cmd} ${sample.id}_R1.fastq.${compression} && mv fastqc/${sample.id}_R1_fastqc/fastqc_data.txt fastqc/${sample.id}_R1_fastqc/${sample.id}_R1_fastqc_data.txt ${process_r2} grep "Total Sequences" fastqc/*/*data.txt > seqcount.txt diff --git a/nevermore/modules/qc/multiqc.nf b/nevermore/modules/qc/multiqc.nf index 55fcf03..472e905 100644 --- a/nevermore/modules/qc/multiqc.nf +++ b/nevermore/modules/qc/multiqc.nf @@ -1,5 +1,5 @@ process multiqc { - publishDir params.output_dir, mode: params.publish_mode + // publishDir params.output_dir, mode: params.publish_mode input: path(reports) @@ -10,7 +10,7 @@ process multiqc { path("reports/${stage}.multiqc_report.html") script: - def send_report = (params.email && params.mailer) ? "echo . | ${params.mailer} -s 'multiqc_report' -a reports/${stage}.multiqc_report.html ${params.email}" : "" + def send_report = (false && params.email && params.mailer) ? "echo . | ${params.mailer} -s 'multiqc_report' -a reports/${stage}.multiqc_report.html ${params.email}" : "" """ mkdir -p reports/ multiqc -o reports/ -n ${stage}.multiqc_report.html -c ${multiqc_config} . diff --git a/nevermore/modules/stats.nf b/nevermore/modules/stats.nf index 40e239d..a493cff 100644 --- a/nevermore/modules/stats.nf +++ b/nevermore/modules/stats.nf @@ -1,5 +1,5 @@ process flagstats { - publishDir params.output_dir, mode: params.publish_mode + // publishDir params.output_dir, mode: params.publish_mode input: tuple val(sample), path(bam) diff --git a/nevermore/workflows/align.nf b/nevermore/workflows/align.nf index a9135d8..3959fbb 100644 --- a/nevermore/workflows/align.nf +++ b/nevermore/workflows/align.nf @@ -8,17 +8,15 @@ include { bwa_mem_align } from "../modules/align/bwa" include { merge_and_sort } from "../modules/align/helpers" include { merge_single_fastqs } from "../modules/converters/merge_fastqs" -def config_dir = (projectDir.endsWith("nevermore")) ? "${projectDir}/config" : "${projectDir}/nevermore/config" +def asset_dir = "${projectDir}/nevermore/assets" -workflow nevermore_align { +workflow nevermore_prep_align { take: - fastq_ch - + main: - /* route all single-read files into a common channel */ single_ch = fastq_ch @@ -69,6 +67,7 @@ workflow nevermore_align { concat with merged single-read files (takes care of single-end qc-survivors), concat with paired-end files, and route them into a channel for post-qc fastqc analysis + (THIS IS JUST FOR STATS PURPOSES, NO WORRIES, THE SE READS ARE PROCESSED PROPERLY!) */ fastqc_in_ch = single_ch @@ -92,14 +91,32 @@ workflow nevermore_align { .filter { it[0].merged == true || it[0].is_paired == true } .map { sample, report -> return report } .collect(), - "${config_dir}/multiqc.config", + "${asset_dir}/multiqc.config", "qc" ) + fastq_prep_ch = paired_ch.concat(merge_single_fastqs.out.fastq) + + emit: + fastqs = fastq_prep_ch + read_counts = fastqc.out.counts + + +} + + +workflow nevermore_align { + + take: + + fastq_ch + + main: + /* align merged single-read and paired-end sets against reference */ bwa_mem_align( - paired_ch.concat(merge_single_fastqs.out.fastq), + fastq_ch, params.reference, true ) @@ -119,7 +136,6 @@ workflow nevermore_align { emit: alignments = merge_and_sort.out.bam - read_counts = fastqc.out.counts aln_counts = merge_and_sort.out.flagstats } diff --git a/nevermore/workflows/gffquant.nf b/nevermore/workflows/gffquant.nf index fd7054b..7a5720e 100644 --- a/nevermore/workflows/gffquant.nf +++ b/nevermore/workflows/gffquant.nf @@ -11,18 +11,35 @@ workflow gffquant_flow { run_gffquant(bam_ch, params.gffquant_db) - feature_count_ch = run_gffquant.out.results //.collect() + feature_count_ch = run_gffquant.out.results .map { sample, files -> return files } .flatten() - .filter { !it.name.endsWith("gene_counts.txt") } - .filter { !it.name.endsWith("seqname.uniq.txt") } - .filter { !it.name.endsWith("seqname.dist1.txt") } + .filter { !it.name.endsWith(".aln_stats.txt.gz") } + .filter { !it.name.endsWith("Counter.txt.gz") } .map { file -> - def category = file.name.replaceAll(/\.txt$/, "") + def category = file.name + .replaceAll(/\.txt\.gz$/, "") .replaceAll(/.+\./, "") return tuple(category, file) } - .groupTuple(sort:true) + .groupTuple(sort: true) + + // M0x20MCx1884.aln_stats.txt.gz M0x20MCx1884.BRITE.txt.gz M0x20MCx1884.EC_number.txt.gz M0x20MCx1884.Gene_Ontology_terms.txt.gz M0x20MCx1884.KEGG_Pathway.txt.gz M0x20MCx1884.KEGG_TC.txt.gz + // M0x20MCx1884.AmbiguousSeqCounter.txt.gz M0x20MCx1884.CAZy.txt.gz M0x20MCx1884.eggNOG_OGs.txt.gz M0x20MCx1884.KEGG_ko.txt.gz M0x20MCx1884.KEGG_rclass.txt.gz M0x20MCx1884.UniqueSeqCounter.txt.gz + // M0x20MCx1884.BiGG_Reaction.txt.gz M0x20MCx1884.COG_Functional_Category.txt.gz M0x20MCx1884.gene_counts.txt.gz M0x20MCx1884.KEGG_Module.txt.gz M0x20MCx1884.KEGG_Reaction.txt.gz + + // feature_count_ch = run_gffquant.out.results //.collect() + // .map { sample, files -> return files } + // .flatten() + // .filter { !it.name.endsWith("gene_counts.txt") } + // .filter { !it.name.endsWith("seqname.uniq.txt") } + // .filter { !it.name.endsWith("seqname.dist1.txt") } + // .map { file -> + // def category = file.name.replaceAll(/\.txt$/, "") + // .replaceAll(/.+\./, "") + // return tuple(category, file) + // } + // .groupTuple(sort:true) collate_feature_counts(feature_count_ch) diff --git a/nevermore/workflows/input.nf b/nevermore/workflows/input.nf index d0f7265..860922d 100644 --- a/nevermore/workflows/input.nf +++ b/nevermore/workflows/input.nf @@ -1,6 +1,15 @@ nextflow.enable.dsl=2 include { classify_sample } from "../modules/functions" +include { bam2fq } from "../modules/converters/bam2fq" + +def bam_suffix_pattern = null + +if (params.bam_input_pattern) { + bam_suffix_pattern = params.bam_input_pattern.replaceAll(/\*/, "") +} + +def input_dir = (params.input_dir) ? params.input_dir : params.remote_input_dir process transfer_fastqs { @@ -14,6 +23,7 @@ process transfer_fastqs { """ } + process transfer_bams { input: path(bamfiles) @@ -26,21 +36,28 @@ process transfer_bams { """ } + process prepare_fastqs { - publishDir params.output_dir, mode: "${params.publish_mode}" input: - tuple val(sample_id), path(files) + path(files) + val(remote_input) output: - tuple val(sample_id), path("${sample_id}/*.fastq.gz"), emit: paired, optional: true - tuple val("${sample_id}.singles"), path("${sample_id}.singles/*.fastq.gz"), emit: single, optional: true - script: + path("fastq/*/*.fastq.{gz,bz2}"), emit: fastqs + + script: + def remote_option = (remote_input) ? "--remote-input" : "" + def remove_suffix = (params.suffix_pattern) ? "--remove-suffix ${params.suffix_pattern}" : "" + def input_dir_prefix = (params.input_dir) ? params.input_dir : params.remote_input_dir + + def custom_suffixes = (params.custom_fastq_file_suffixes) ? "--valid-fastq-suffixes ${params.custom_fastq_file_suffixes}" : "" + + """ + prepare_fastqs.py -i . -o fastq/ -p ${input_dir_prefix} ${custom_suffixes} ${remote_option} ${remove_suffix} """ - prepare_fastqs.py -i . -o . -s ${sample_id} --remove-suffix ${params.suffix_pattern} > run.sh - bash run.sh - """ } + workflow remote_fastq_input { take: fastq_ch @@ -55,50 +72,59 @@ workflow remote_fastq_input { } +workflow remote_bam_input { + take: + bam_ch + main: + transfer_bams(bam_ch) + res_ch = transfer_bams.out.bamfiles.flatten() + emit: + bamfiles = res_ch +} + + workflow fastq_input { take: fastq_ch main: - if (params.remote_input_dir) { - fastq_ch = remote_fastq_input(fastq_ch) - } + prepare_fastqs(fastq_ch.collect(), (params.remote_input_dir != null || params.remote_input_dir)) - fastq_ch = fastq_ch - .map { file -> - def sample = file.getParent().getName() + fastq_ch = prepare_fastqs.out.fastqs + .flatten() + .map { file -> + def sample = file.getParent().getName() return tuple(sample, file) - } - .groupTuple(sort: true) - - prepare_fastqs(fastq_ch) - - fastq_ch = prepare_fastqs.out.paired - .concat(prepare_fastqs.out.single) - .map { classify_sample(it[0], it[1]) } - + }.groupTuple(sort: true) + .map { classify_sample(it[0], it[1]) } emit: fastqs = fastq_ch } + workflow bam_input { take: bam_ch main: - transfer_bams(bam_ch.collect()) - transfer_bams.out.bamfiles.view() - bam_ch = transfer_bams.out.bamfiles.flatten() + + if (params.remote_input_dir) { + bam_ch = remote_bam_input(bam_ch.collect()) + } + + bam_ch = bam_ch .map { file -> - def sample = file.name.replaceAll(/.bam$/, "") + def sample = file.name.replaceAll(bam_suffix_pattern, "").replaceAll(/\.$/, "") return tuple(sample, file) } .groupTuple(sort: true) .map { classify_sample(it[0], it[1]) } - bam2fq(bam_ch) - bam_ch = bam2fq.out.reads - .map { classify_sample(it[0].id, it[1]) } + if (params.do_bam2fq_conversion) { + bam2fq(bam_ch) + bam_ch = bam2fq.out.reads + .map { classify_sample(it[0].id, it[1]) } + } emit: bamfiles = bam_ch -} \ No newline at end of file +} diff --git a/nevermore/workflows/nevermore.nf b/nevermore/workflows/nevermore.nf index 172187f..f21fe9c 100644 --- a/nevermore/workflows/nevermore.nf +++ b/nevermore/workflows/nevermore.nf @@ -8,12 +8,10 @@ include { prepare_fastqs } from "../modules/converters/prepare_fastqs" include { fastqc } from "../modules/qc/fastqc" include { multiqc } from "../modules/qc/multiqc" include { collate_stats } from "../modules/collate" -include { nevermore_align } from "./align" +include { nevermore_align; nevermore_prep_align } from "./align" def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing) -def config_dir = (projectDir.endsWith("nevermore")) ? "${projectDir}/config" : "${projectDir}/nevermore/config" - workflow nevermore_main { @@ -23,11 +21,11 @@ workflow nevermore_main { main: if (do_preprocessing) { - prepare_fastqs(fastq_ch) + //prepare_fastqs(fastq_ch) - raw_fastq_ch = prepare_fastqs.out.reads + //raw_fastq_ch = prepare_fastqs.out.reads - nevermore_simple_preprocessing(raw_fastq_ch) + nevermore_simple_preprocessing(fastq_ch) preprocessed_ch = nevermore_simple_preprocessing.out.main_reads_out if (!params.drop_orphans) { @@ -58,31 +56,40 @@ workflow nevermore_main { } - nevermore_align(preprocessed_ch) - + nevermore_prep_align(preprocessed_ch) + align_ch = Channel.empty() + if (do_preprocessing) { - collate_ch = nevermore_simple_preprocessing.out.raw_counts - .map { sample, file -> return file } - .collect() - .concat( - nevermore_align.out.read_counts - .map { sample, file -> return file } - .collect() - ) - .concat( - nevermore_align.out.aln_counts - .map { sample, file -> return file } - .collect() - ) - .collect() - - collate_stats( - collate_ch + .map { sample, file -> return file } + .collect() + .concat( + nevermore_prep_align.out.read_counts + .map { sample, file -> return file } + .collect() ) - + } + + if (!params.skip_alignment) { + nevermore_align(nevermore_prep_align.out.fastqs) + align_ch = nevermore_align.out.alignments + + if (do_preprocessing) { + collate_ch = collate_ch + .concat( + nevermore_align.out.aln_counts + .map { sample, file -> return file } + .collect() + ) + } + } + + if (do_preprocessing) { + collate_stats(collate_ch.collect()) } emit: - alignments = nevermore_align.out.alignments + alignments = align_ch + fastqs = nevermore_prep_align.out.fastqs + read_counts = nevermore_prep_align.out.read_counts } diff --git a/nevermore/workflows/prep.nf b/nevermore/workflows/prep.nf index 922ffac..e0b1777 100644 --- a/nevermore/workflows/prep.nf +++ b/nevermore/workflows/prep.nf @@ -52,7 +52,7 @@ workflow nevermore_simple_preprocessing { if (params.amplicon_seq) { - qc_bbduk_stepwise_amplicon(fastq_ch, "${asset_dir}/adapters.fa") + qc_bbduk_stepwise_amplicon(fastq_ch, "${asset_dir}/adapters.fa") // amplicon cannot yet do bz2 processed_reads_ch = processed_reads_ch.concat(qc_bbduk_stepwise_amplicon.out.reads) orphans_ch = orphans_ch.concat(qc_bbduk_stepwise_amplicon.out.orphans) diff --git a/nextflow.config b/nextflow.config index 0d02fc5..da25d8a 100644 --- a/nextflow.config +++ b/nextflow.config @@ -4,5 +4,14 @@ manifest { description = "Ensemble taxonomic microbiome profiling" name = "vortex_light" nextflowVersion = ">=21.10.04" - version = "0.7.3" + version = "0.9" } + +process { + withName: collate_stats { + publishDir = "${params.output_dir}" + executor = "local" + container = null + scratch = null + } +} \ No newline at end of file diff --git a/scripts/ExtractProfiledCounts_210823.R b/scripts/ExtractProfiledCounts_210823.R new file mode 100644 index 0000000..21939e0 --- /dev/null +++ b/scripts/ExtractProfiledCounts_210823.R @@ -0,0 +1,94 @@ +#!/usr/bin/env Rscript + + +library(optparse) +library(tidyverse) + +option_list = list( + make_option(c("-f", "--file"), type="character", default=NULL, + help="dataset file name", metavar="character"), + make_option(c("-o", "--out"), type="character", default="out.txt", + help="output file name [default= %default]", metavar="character") +); + +option_list = list( + make_option(c("--kraken2_res_path"), type="character", default=NULL, + help="Path to folder with kraken2 result tables", metavar="character"), + make_option(c("--PathSeq_res_path"), type="character", default=NULL, + help="Path to folder with PathSeq result tables", metavar="character"), + make_option(c("--libsize_res_path"), type="character", default=NULL, + help="Path to folder with libsize results", metavar="character"), + make_option(c("--lib_layout_res_path"), type="character", default=NULL, + help="Path to folder with library_layout results", metavar="character"), + make_option(c("--flagstats_res_path"), type="character", default=NULL, + help="Path to folder with flagstats", metavar="character"), + make_option(c("--N_raw_counts_path"), type="character", default=NULL, + help="Path to folder with number of raw_counts", metavar="character"), + + + make_option(c("--out_folder"), type="character", default=NULL, + help="output folder path", metavar="character"), + make_option(c("--libdir"), type="character", default=".", help="path to functions", metavar="character"), + make_option(c("--gtdb_markers"), type="character", default=NULL, help="path to gtdb markers", metavar="character") +); + +opt_parser = OptionParser(option_list=option_list); +opt = parse_args(opt_parser); + +source(file.path(opt$libdir, "functions_read_in_profiled_data_210702.R")) + +out.folder <- opt$out_folder +message("kraken2") +#kraken2 +if(!(is.null(opt$kraken2_res_path))){ + if(length(list.files(opt$kraken2_res_path))>0){ + res.kraken2 <- .f_read_in_files_kraken2(path_to_folder = opt$kraken2_res_path, + tax.level = "genus") + saveRDS(res.kraken2,paste0(out.folder,"/res_kraken2.rds")) + }else{ + message("kraken2 path empty") + } +} + +#pathseq +message("pathseq") +if(!(is.null(opt$PathSeq_res_path))){ + if(length(list.files(opt$PathSeq_res_path))>0){ + res.pathseq <- .f_read_in_files_PathSeq(path_to_folder = opt$PathSeq_res_path, + tax.level = "genus") + saveRDS(res.pathseq,paste0(out.folder,"/res_pathseq.rds")) + } else{ + message("PathSeq path empty") + } +} + +#libsize +if(!(is.null(opt$libsize_res_path))){ + if(length(list.files(opt$libsize_res_path))>0){ + res.libsize <- .f_read_in_libsize(path_to_folder = opt$libsize_res_path) + saveRDS(res.libsize,paste0(out.folder,"/res_libsize.rds")) + } else{ + message("libsize path empty") + } +} + +#liblayout +if(!(is.null(opt$lib_layout_res_path))){ + if(length(list.files(opt$lib_layout_res_path))>0){ + res.lib_layout <- .f_read_in_lib_layout(path_to_folder = opt$lib_layout_res_path) + saveRDS(res.lib_layout,paste0(out.folder,"/res_lib_layout.rds")) + } else{ + message("liblayout path empty") + } +} + +#flagstats --> total numbers of reads before QC +#No needed anymore in vknight build >= 827290457b +if(!(is.null(opt$flagstats_res_path))){ + if(length(list.files(opt$flagstats_res_path))>0){ + res.flagstats <- .f_read_in_flagstats(path_to_folder = opt$flagstats_res_path) + saveRDS(res.flagstats,paste0(out.folder,"/res_flagstat_total_reads.rds")) + } else{ + message("flagstats path empty") + } +} diff --git a/vlight/modules/collate.nf b/vlight/modules/collate.nf new file mode 100644 index 0000000..c0ad687 --- /dev/null +++ b/vlight/modules/collate.nf @@ -0,0 +1,40 @@ +process collate_results { + publishDir params.output_dir, mode: params.publish_mode + + input: + path(results) + path(collate_script) + path(gtdb_markers) + + output: + path("collated/*.rds"), emit: collated, optional: true + + script: + """ + mkdir -p collated/ + + mkdir -p pathseq/ + (mv *pathseq.scores pathseq/) || : + + mkdir -p libsize/ + (mv *libsize.txt libsize/) || : + + mkdir -p liblayout/ + (mv *is_paired.txt liblayout/) || : + + mkdir -p flagstats/ + (mv *flagstats.txt flagstats/) || : + + mkdir -p raw_counts/ + (mv *.txt raw_counts/) || : + + Rscript ${collate_script} \ + --libdir \$(dirname \$(readlink ${collate_script})) \ + --gtdb_markers ${gtdb_markers} \ + --PathSeq_res_path pathseq/ \ + --libsize_res_path libsize/ \ + --lib_layout_res_path liblayout/ \ + --N_raw_counts_path raw_counts/ \ + --out_folder collated/ + """ +} \ No newline at end of file diff --git a/vlight/modules/profilers/pathseq.nf b/vlight/modules/profilers/pathseq.nf index 8f30c2b..d6be305 100644 --- a/vlight/modules/profilers/pathseq.nf +++ b/vlight/modules/profilers/pathseq.nf @@ -11,6 +11,14 @@ process pathseq { script: def maxmem = task.memory.toGiga() + + def microbe_seq = "" + if (params.pathseq_db_microbe_fasta) { + microbe_seq = "--microbe-fasta ${params.pathseq_db_microbe_fasta}" + } else { + microbe_seq = "--microbe-dict ${params.pathseq_db_microbe_dict}" + } + """ mkdir -p ${sample.id} @@ -19,7 +27,7 @@ process pathseq { --filter-bwa-image ${params.pathseq_db_filter_bwa_image} \\ --kmer-file ${params.pathseq_db_kmer_file} \\ --min-clipped-read-length ${params.pathseq_min_clipped_read_length} \\ - --microbe-fasta ${params.pathseq_db_microbe_fasta} \\ + ${microbe_seq} \\ --microbe-bwa-image ${params.pathseq_db_microbe_bwa_image} \\ --taxonomy-file ${params.pathseq_db_taxonomy_file} \\ --output ${sample.id}/${sample.id}.pathseq.bam \\ diff --git a/vlight/workflows/vlight.nf b/vlight/workflows/vlight.nf index 2ccf9dd..1c03e7f 100644 --- a/vlight/workflows/vlight.nf +++ b/vlight/workflows/vlight.nf @@ -2,9 +2,17 @@ nextflow.enable.dsl=2 -include { kraken2 } from "../modules/profilers/kraken2" include { pathseq } from "../modules/profilers/pathseq" + include { fq2fa } from "../../nevermore/modules/converters/fq2fa" +include { fastqc } from "../../nevermore/modules/qc/fastqc" +include { multiqc } from "../../nevermore/modules/qc/multiqc" +include { fq2bam } from "../../nevermore/modules/converters/fq2bam" +include { nevermore_simple_preprocessing } from "../../nevermore/workflows/nevermore" +include { remove_host_kraken2; remove_host_kraken2_individual } from "../../nevermore/modules/decon/kraken2" +include { flagstats } from "../../nevermore/modules/stats" +include { collate_results } from "../modules/collate" +include { collate_stats } from "../../nevermore/modules/collate" if (!params.publish_mode) { @@ -21,8 +29,17 @@ if (!params.pathseq_min_clipped_read_length) { params.pathseq_min_clipped_read_length = 31 } -def run_kraken2 = (!params.skip_kraken2 || params.run_kraken2); -def run_pathseq = (!params.skip_pathseq || params.run_pathseq); +def run_kraken2 = false +def run_pathseq = true + +def asset_dir = "${projectDir}/nevermore/assets" + +def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing) +def get_basecounts = (!params.skip_basecounts || params.run_basecounts); + +def run_bam_analysis = true +def run_fastq_analysis = false + workflow bam_analysis { @@ -62,3 +79,98 @@ workflow fastq_analysis { emit: results = out_ch } + + +workflow vlight_main { + take: + fastq_ch + main: + results_ch = Channel.empty() + + if (do_preprocessing) { + + nevermore_simple_preprocessing(fastq_ch) + + preprocessed_ch = nevermore_simple_preprocessing.out.main_reads_out + results_ch = results_ch + .concat(nevermore_simple_preprocessing.out.raw_counts) + .map { sample, files -> files } + + if (params.remove_host) { + + if (params.remove_host == "individual") { + + remove_host_kraken2_individual(nevermore_simple_preprocessing.out.main_reads_out, params.remove_host_kraken2_db) + + preprocessed_ch = remove_host_kraken2_individual.out.reads + + + } else if (params.remove_host == "pair") { + + remove_host_kraken2(nevermore_simple_preprocessing.out.main_reads_out, params.remove_host_kraken2_db) + + preprocessed_ch = remove_host_kraken2.out.reads + + } + + } + + } else { + + preprocessed_ch = fastq_ch + + } + + // + /* perform post-qc fastqc analysis and generate multiqc report on merged single-read and paired-end sets */ + + fastqc(preprocessed_ch, "qc") + + multiqc( + fastqc.out.stats + .map { sample, report -> return report }.collect(), + "${asset_dir}/multiqc.config", + "qc" + ) + + // + + if (get_basecounts || run_bam_analysis) { + + fq2bam(preprocessed_ch) + + if (get_basecounts) { + + flagstats(fq2bam.out.reads) + + flagstat_results_ch = flagstats.out.flagstats + .concat(flagstats.out.counts) + .concat(flagstats.out.is_paired) + .map { sample, files -> files } + results_ch = results_ch.concat(flagstat_results_ch) + + } + + if (run_bam_analysis) { + + bam_analysis(fq2bam.out.reads) + results_ch = results_ch.concat(bam_analysis.out.results) + + } + + } + + + if (!params.skip_collate) { + collate_results( + results_ch.collect(), + "${projectDir}/scripts/ExtractProfiledCounts_210823.R", + params.GTDB_markers + ) + } + + collate_stats(results_ch) + + emit: + results = results_ch +}