diff --git a/mgexpose/__init__.py b/mgexpose/__init__.py index b4d74cf..d4129ee 100644 --- a/mgexpose/__init__.py +++ b/mgexpose/__init__.py @@ -1,3 +1,6 @@ """ MGExpose """ __version__ = "3.7.6" + +# Don't import at package level to avoid import errors during installation +__all__ = ["batch_helpers", "downstream"] diff --git a/mgexpose/__main__.py b/mgexpose/__main__.py deleted file mode 100755 index c1cdb08..0000000 --- a/mgexpose/__main__.py +++ /dev/null @@ -1,354 +0,0 @@ -#!/usr/bin/env python - -# pylint: disable=R0912,R0914,R0915,R0913,R0917 - -""" Mobile genetic element annotation """ - -import contextlib -import gzip -import logging -import os -import pathlib - -from .gene_annotator import GeneAnnotator -from .handle_args import handle_args -from .island_processing import ( - generate_island_set, - annotate_islands, - evaluate_islands, - prepare_precomputed_islands -) -from .islands import MgeGenomicIsland -from .readers import read_fasta, read_prodigal_gff, read_mge_rules -from .gffio import read_genomic_islands_gff - -MGE_TABLE_HEADERS = \ - ("is_tn",) + \ - MgeGenomicIsland.TABLE_HEADERS[1:6] + \ - MgeGenomicIsland.TABLE_HEADERS[8:14] + \ - ("mgeR", "name", "genes",) - -logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s] %(message)s' -) - -logger = logging.getLogger(__name__) - - -def process_islands(genes, genome_id, single_island=None, island_file=None, output_dir=None,): - """ helper function to declutter main() """ - precomputed_islands = prepare_precomputed_islands( - single_island=single_island, - island_file=island_file, - genome_id=genome_id, - ) - - if output_dir: - pang_calls_out = open( - os.path.join( - output_dir, - f"{genome_id}.pan_genome_calls.txt"), - "wt", - encoding="UTF-8", - ) - - islands_out = open( - os.path.join( - output_dir, - f"{genome_id}.pan_genome_islands.txt", - ), - "wt", - encoding="UTF-8", - ) - - raw_islands_out = open( - os.path.join( - output_dir, - "..", # temporary! this is only until i know if this is final output or not - f"{genome_id}.pan_genome_islands_raw.txt", - ), - "wt", - encoding="UTF-8", - ) - else: - pang_calls_out, islands_out, raw_islands_out = [contextlib.nullcontext() for _ in range(3)] - - with pang_calls_out, islands_out, raw_islands_out: - yield from generate_island_set( - genes, - pang_calls_out=pang_calls_out, - raw_islands_out=raw_islands_out, - islands_out=islands_out, - precomputed_islands=precomputed_islands, - ) - - -def dump_islands(islands, out_prefix, db, write_genes=False, add_functional_annotation=False): - """ dump genomic islands to intermediate gff """ - with open( - f"{out_prefix}.unannotated_islands.gff3", - "wt", encoding="UTF-8" - ) as _out: - print("##gff-version 3", file=_out) - for island in sorted(islands, key=lambda isl: isl.contig): - island.to_gff( - _out, db, write_genes=write_genes, - add_functional_annotation=add_functional_annotation, - intermediate_dump=True, - ) - - -def identify_recombinase_islands(islands, genome_id, mge_rules, output_dir=None): - """Identify MGE-islands according to a set of rules - using various signals annotated in the corresponding gene set. """ - if output_dir: - step1_out = open( - os.path.join( - output_dir, - f"{genome_id}.assign_mge.step1.txt", - ), - "wt", - encoding="UTF-8", - ) - - step2_out = open( - os.path.join( - output_dir, - f"{genome_id}.assign_mge.step2.txt", - ), - "wt", - encoding="UTF-8", - ) - - step3_out = open( - os.path.join( - output_dir, - f"{genome_id}.assign_mge.step3.txt", - ), - "wt", - encoding="UTF-8", - ) - - else: - step1_out, step2_out, step3_out = [contextlib.nullcontext() for _ in range(3)] - - with step1_out: - annotated_islands = list(annotate_islands(islands, outstream=step1_out)) - with step2_out, step3_out: - return list( - evaluate_islands( - annotated_islands, - read_mge_rules(mge_rules), - outstream=step2_out, - outstream2=step3_out - ) - ) - - -def write_final_results( - recombinase_islands, - output_dir, - genome_id, - output_suffix, - dbformat=None, - write_tsv=True, - write_gff=True, - write_genes_to_gff=True, - add_functional_annotation=False, - genome_seqs=None, -): - """ write final results """ - - outstream = contextlib.nullcontext() - gff_outstream = contextlib.nullcontext() - - out_prefix = os.path.join( - output_dir, - f"{genome_id}.{output_suffix}" - ) - - if write_tsv: - outstream = open( - f"{out_prefix}.txt", - "wt", - encoding="UTF-8", - ) - if write_gff: - gff_outstream = open( - f"{out_prefix}.gff3", - "wt", - encoding="UTF-8", - ) - - # Sort the list of MGEGenomicIslands based on contig names - sorted_islands = sorted(recombinase_islands, key=lambda isl: isl.contig) - islands_by_contig = {} - - with outstream, gff_outstream: - # TSV header - if write_tsv: - print(*MGE_TABLE_HEADERS, sep="\t", file=outstream) - # GFF3 header - if write_gff: - print("##gff-version 3", file=gff_outstream) - - # Start recording the outputs - for island in sorted_islands: - islands_by_contig.setdefault(island.contig, []).append(island) - # TSV: ignore gene-wise annotations; each line is recombinase island, - # all gene IDs are stored in a gene_list column - # assert genome_id == island.genome - if write_tsv: - island.to_tsv(outstream) - # GFF3: add individual genes annotation; - # parent lines are recombinase islands, children lines are genes - # GFF3 parent term: recombinase island - if write_gff: - island.to_gff( - gff_outstream, - source_db=dbformat, - write_genes=write_genes_to_gff, - add_functional_annotation=add_functional_annotation, - ) - - if genome_seqs is not None: - with gzip.open( - f"{out_prefix}.ffn.gz", - "wt", - ) as _out: - for header, seq in read_fasta(genome_seqs): - seqid, *_ = header.split(" ") - for island in islands_by_contig.get(seqid, []): - attribs = island.get_attribs() - try: - del attribs["ID"] - except KeyError: - pass - try: - del attribs["name"] - except KeyError: - pass - attrib_str = ";".join(f"{item[0]}={item[1]}" for item in attribs.items() if item[1]) - print( - f">{island.get_id()} {attrib_str}", seq[island.start - 1: island.end], sep="\n", file=_out - ) - - - - -def denovo_annotation(args, debug_dir=None): - """ denovo annotation """ - annotator = GeneAnnotator( - args.genome_id, - args.speci, - read_prodigal_gff(args.prodigal_gff), - include_genome_id=args.include_genome_id, - has_batch_data=args.allow_batch_data, - dbformat=args.dbformat, - ) - - annotated_genes = annotator.annotate_genes( - args.recombinase_hits, - ( - args.phage_eggnog_data, - args.phage_filter_terms, - ), - ( - args.txs_macsy_report, - args.txs_macsy_rules, - # args.macsy_version, - ), - clusters=args.cluster_data, - use_y_clusters=args.use_y_clusters, - core_threshold=(args.core_threshold, -1)[args.precomputed_core_genes], - output_dir=args.output_dir, - pyhmmer=args.pyhmmer_input, - ) - - out_prefix = os.path.join(args.output_dir, args.genome_id) - - genomic_islands = list( - process_islands( - annotated_genes, - args.genome_id, - single_island=args.single_island, - island_file=args.precomputed_islands, - output_dir=debug_dir, - ) - ) - - if args.dump_genomic_islands or args.skip_island_identification: - - dump_islands( - genomic_islands, - out_prefix, - args.dbformat, - write_genes=True, - add_functional_annotation=args.add_functional_annotation, - ) - - # - # test_islands = list(read_genomic_islands_gff(f"{out_prefix}.unannotated_islands.gff3")) - # dump_islands( - # test_islands, - # out_prefix + ".test", - # args.dbformat, - # write_genes=True, - # add_functional_annotation=args.add_functional_annotation, - # ) - - with open( - os.path.join(args.output_dir, f"{args.genome_id}.gene_info.txt"), - "wt", - encoding="UTF-8", - ) as _out: - annotator.dump_genes(_out) - - return genomic_islands - - -def main(): - """ main """ - - args = handle_args() - logger.info("ARGS: %s", str(args)) - - debug_dir = None - cdir = args.output_dir - if args.dump_intermediate_steps: - cdir = debug_dir = os.path.join(args.output_dir, "debug") - pathlib.Path(cdir).mkdir(exist_ok=True, parents=True) - - genomic_islands = None - if args.command == "denovo": - genomic_islands = denovo_annotation(args, debug_dir=debug_dir) - - elif args.command == "annotate": - raise NotImplementedError - - if not args.skip_island_identification: - - recombinase_islands = identify_recombinase_islands( - genomic_islands, - args.genome_id, - args.mge_rules, - output_dir=debug_dir, - ) - - if recombinase_islands: - write_final_results( - recombinase_islands, - args.output_dir, - args.genome_id, - args.output_suffix, - dbformat=args.dbformat, - write_gff=args.write_gff, - write_genes_to_gff=args.write_genes_to_gff, - add_functional_annotation=args.add_functional_annotation, - genome_seqs=args.extract_islands, - ) - - -if __name__ == "__main__": - main() diff --git a/mgexpose/base_logger.py b/mgexpose/base_logger.py new file mode 100644 index 0000000..47b94df --- /dev/null +++ b/mgexpose/base_logger.py @@ -0,0 +1,4 @@ +import logging + +logger = logging +logger.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) \ No newline at end of file diff --git a/mgexpose/chunk_reader.py b/mgexpose/chunk_reader.py deleted file mode 100644 index 34d7037..0000000 --- a/mgexpose/chunk_reader.py +++ /dev/null @@ -1,31 +0,0 @@ -""" Module docstring """ - -import gzip - - -def get_lines_from_chunks(f: str, bufsize: int = 800000000): - """ - Provides generator access to the lines of large text files. - File is read chunk-wise into a buffer of the specified size. - Support gzip-compressed files. - - inputs: - - f: str -- filename - - bufsize: int -- size of buffer - - """ - gzipped = f.endswith(".gz") - with (gzip.open if gzipped else open)(f, "r") as _in: - tail = "" - while 1: - chunk = _in.read(bufsize) - if gzipped: - chunk = chunk.decode() - chunk = "".join((tail, chunk)) - if not chunk: - break - chunk = chunk.split("\n") - *lines, tail = chunk - yield from lines - if tail: - yield tail diff --git a/mgexpose/clustering_parser.py b/mgexpose/clustering_parser.py deleted file mode 100644 index 57a3979..0000000 --- a/mgexpose/clustering_parser.py +++ /dev/null @@ -1,182 +0,0 @@ -# pylint: disable=R0902,R0914 - -""" Functions for gene cluster parsing """ - -import logging -import os - -from collections import Counter -from contextlib import nullcontext -from dataclasses import dataclass - -from .chunk_reader import get_lines_from_chunks - - -logger = logging.getLogger(__name__) - - -def extract_genome_id(gene_id): - """ Extract genome id from gene id. """ - sep = "." if "." in gene_id else ("_" if "_" in gene_id else None) - if sep is None: - raise ValueError(f"gene `{gene_id}` does not seem to contain a genome_id.") - - return gene_id[:gene_id.rfind(sep)] - - -def parse_db_clusters(cluster_data): - """ Parse gene, cluster, is_core from tsv-stream. """ - return [ - tuple(line.strip().split("\t")) - for line in get_lines_from_chunks(cluster_data) - ] - - -def parse_full_seq_clusters( - genome_id_prefix, - genes, - cluster_data, - output_dir=None, -): - """ Parse data from linclust gene clustering. """ - - genomes = set() - cluster_genes = Counter() - gene_cluster_map = [] - - if output_dir is None: - write_data = True - cluster_genes_out = nullcontext() - gene_clusters_out = nullcontext() - else: - write_data = False - cluster_genes_out = open( - os.path.join(output_dir, f"{genome_id_prefix}.cluster_genes.txt"), - "wt", - encoding="UTF-8", - ) - gene_clusters_out = open( - os.path.join(output_dir, f"{genome_id_prefix}.gene_clusters.txt"), - "wt", - encoding="UTF-8", - ) - - with gene_clusters_out, cluster_genes_out: - for line in get_lines_from_chunks(cluster_data): - cluster_id, gene_id = line.split("\t") - if gene_id.startswith(genome_id_prefix): - gene_id = gene_id[len(genome_id_prefix) + 1:] - - gene = genes.get( - gene_id, - genes.get(gene_id[gene_id.rfind(".") + 1:]) - ) - if gene is not None: - logger.info("Adding cluster %s to gene %s", cluster_id, gene_id) - genome_id = gene.genome - gene_cluster_map.append((gene_id, cluster_id)) - else: - genome_id = extract_genome_id(gene_id) - - cluster_genes[cluster_id] += 1 - genomes.add(genome_id) - if write_data: - print(cluster_id, gene_id, sep="\t", file=cluster_genes_out) - print(gene_id, cluster_id, sep="\t", file=gene_clusters_out) - - if write_data: - with open( - os.path.join(output_dir, f"{genome_id_prefix}.genomes.txt"), - "wt", - encoding="UTF-8", - ) as genomes_out: - print(*sorted(genomes), sep="\n", file=genomes_out) - - n_genomes = len(genomes) - return n_genomes, cluster_genes, gene_cluster_map, genome_id_prefix in genomes - - -@dataclass -class RefGene: - """ Y-cluster reference gene class. """ - refset: str = None - speci: int = None - refset_id: int = None - is_core: bool = None - is_singleton: bool = None - is_unique: bool = None - prevalence: int = None - sp100_id: int = None - genome_id: int = None - gene_id: str = None - n_rep_genomes: int = None - n_rep_genes: int = None - - @classmethod - def from_string(cls, s): - """ Construct RefGene from cluster id string. """ - # proMGE095-00037-00002654-AN012-0000748630-000497735_01349-0000000104-0000000608 - fields = s.strip().split("-") - return cls( - fields[0], - int(fields[1]), - int(fields[2]), - fields[3][0] == "C", - fields[3][1] == "S", - fields[3][1] in "SU", - int(fields[3][2:]), - int(fields[4]), - int(fields[5][:fields[5].find("_")]), - fields[5], - int(fields[6]), - int(fields[7]), - ) - - -def evaluate_cluster(rep_id, cluster, genes): - """ Add cluster information from Y-cluster data. """ - ref_genes = {gene for gene in cluster if gene.startswith("proMGE") or gene[0] == "-"} - query_genes = cluster.difference(ref_genes) - - if query_genes: - if ref_genes: - if len(ref_genes) > 1: - # this is a heuristic -- - # we take the ref_gene with the largest represented genomes ([6]) - # to approximate the cluster's rep_genomes - ref_genes = sorted( - ((int(r.split("-")[6]), r) for r in ref_genes), - key=lambda x: x[0], reverse=True - ) - # print("MULTIREF-CLUSTER", ref_genes[0][1]) - # print(*(r[1] for r in ref_genes[1:]), sep="\n") - rep = ref_genes[0][1] - else: - rep = list(ref_genes)[0] - ref_gene = RefGene.from_string(rep) - else: - # this cluster doesn't contain any reference genes -> genes are accessory - ref_gene = RefGene(is_core=False, prevalence=0) - - for gene_id in query_genes: - gene = genes.get(gene_id) - if gene is not None: - gene.cluster = rep_id - gene.is_core = ref_gene.is_core - gene.prevalence = ref_gene.prevalence - - -def parse_y_clusters(cluster_data, genes): - """ Parse data from Y gene clustering approach. """ - - cluster, members = None, set() - for line in get_lines_from_chunks(cluster_data): - cluster_id, gene_id = line.split("\t") - if cluster_id != cluster: - if cluster is not None: - evaluate_cluster(cluster, members, genes) - cluster, members = cluster_id, set() - - members.add(gene_id) - - evaluate_cluster(cluster, members, genes) diff --git a/mgexpose/downstream.py b/mgexpose/downstream.py new file mode 100644 index 0000000..821a2af --- /dev/null +++ b/mgexpose/downstream.py @@ -0,0 +1,488 @@ +#!/usr/bin/env python +# pylint: disable=R0912,R0914,R0915 +import os + +from collections import Counter, defaultdict + +from .gffio import read_mge_genomic_islands_gff + +from .base_logger import logger + +def stat_nested(islands): + """Calculate statistics on nested MGEs and return as a dictionary.""" + total = 0 + nested = 0 + for island in islands: + if island.mge_type == "nested": + nested += 1 + total += 1 + nested_percentage = (nested / total * 100) if total > 0 else 0 + non_nested_percentage = ((total - nested) / total * 100) if total > 0 else 0 + + return { + "total": total, + "nested_count": nested, + "non_nested_count": total - nested, + "nested_percentage": nested_percentage, + "non_nested_percentage": non_nested_percentage, + } + + +def count_nested(islands): + """ + Calculate the count of nested MGEs. + + Parameters: + - islands: List of MGE objects. + + Returns: + - Integer count of nested MGEs. + """ + nested = sum(1 for island in islands if island.mge_type == "nested") + return nested + + +def stat_core(islands): + """Calculate statistics on MGEs in the core genome and return as a dictionary.""" + total = 0 + core = 0 + for island in islands: + if island.is_core: + core += 1 + total += 1 + core_prop = (core / total * 100) if total > 0 else 0 + acc_prop = ((total - core) / total * 100) if total > 0 else 0 + + return { + "total": total, + "core_count": core, + "accessory_count": total - core, + "core_prop": core_prop, + "acc_prop": acc_prop, + } + + +def count_core(islands): + """ + Calculate the count of MGEs in the core genome. + + Parameters: + - islands: List of MGE objects. + + Returns: + - Integer count of MGEs in the core genome. + """ + core = sum(1 for island in islands if island.is_core) + return core + + +def count_total_islands(islands): + """ + Count the total number of MGE islands. + + Parameters: + - islands: List of MGE objects. + + Returns: + - Integer count of total MGE islands. + """ + return len(list(islands)) + + +def stat_mge_type(islands): + """ + Calculate counts of each MGE type and return as a Counter object. + + Parameters: + - islands: List of MGE objects. + + Returns: + - Counter object with counts of each MGE type. + """ + mge_counts = Counter() + for island in islands: + try: + if island.mge_type == "nested": + mge_counts["nested"] += 1 + else: + # Get the first key (assuming there's only one key) + mge = next(iter(island.mge.keys())) + mge_counts[mge] += 1 + except Exception as e: + raise ValueError(f"Unknown or absent MGE type: {e}") + + return mge_counts + + +def stat_mean_genes(islands): + """Calculate the mean number of genes per MGE and return as a dictionary.""" + genes_lst = [island.n_genes for island in islands] + mean_genes = (sum(genes_lst) / len(genes_lst)) if genes_lst else 0 + + # Return the result as a dictionary + return {"mean_genes_per_mge": mean_genes} + + +def extract_cargo(island): + cargo_genes = [] + for gene in island.genes: + if (gene.phage is None) and (gene.recombinase is None) and (gene.secretion_system is None): + cargo_genes.append(gene) + return cargo_genes + + +def get_kegg_ko(gene): + for key, value in gene.eggnog: + if key == "kegg_ko": + return value + + +def get_cazy(gene): + for key, value in gene.eggnog: + if key == "cazy": + return value + + +def get_cog_category(gene): + if gene.eggnog: + for key, value in gene.eggnog: + if key == "cog_fcat": + if value: + return value + else: + return '-' + else: + return 'no_cog_fcat' + else: + return 'no_eggnog' + + +# SP95 for SPIRE +def get_gene_cluster(gene): + return gene.cluster + + +def get_gene_id(gene): + id_lst = gene.id.split('.') # orginal ID e.g. 28901.SAMN15849311.GCA_014242155_04079 + return id_lst[2] # extract to match CY clustering IDs e.g. GCA_xxx_xxx + +# Extract MGE recombinases +def extract_mger(island): + mgeR_genes = [] + for gene in island.genes: + if gene.recombinase: + try: + gene_id = gene.id + gene_cluster = get_gene_cluster(gene) + mgeR = gene.recombinase + annot = [gene_id, gene_cluster, mgeR] + mgeR_genes.append(annot) + except Exception as e: + logger.error(f"Error processing recombinase gene {gene}: {e}") + logger.error(traceback.format_exc()) + return mgeR_genes + + +# Extract secretion system genes +def extract_secretion_system(island): + secretion_genes = [] + for gene in island.genes: + if gene.secretion_system: + try: + gene_id = gene.id + gene_cluster = get_gene_cluster(gene) + info = gene.secretion_system + annot = [gene_id, gene_cluster, info] + secretion_genes.append(annot) + except Exception as e: + logger.error(f"Error processing secretion system gene {gene}: {e}") + logger.error(traceback.format_exc()) + return secretion_genes + + +# Extract phage genes +def extract_phage(island): + phage_genes = [] + for gene in island.genes: + if gene.phage: + try: + gene_id = gene.id + gene_cluster = get_gene_cluster(gene) + info = gene.phage + annot = [gene_id, gene_cluster, info] + phage_genes.append(annot) + except Exception as e: + logger.error(f"Error processing phage gene {gene}: {e}") + logger.error(traceback.format_exc()) + return phage_genes + + +def get_most_common_kegg_ko(genes): + """ + Calculate the most common KEGG KO annotations for a list of genes. + + Parameters: + - genes: List of gene objects. + + Returns: + - Counter object with counts of KEGG KO annotations. + """ + kos = [get_kegg_ko(gene) for gene in genes] + return Counter(kos) + + +def count_gene_clusters(genes, **kwargs): + """ + Count genes in gene clusters. + + Parameters: + - genes: List of gene objects. + + Returns: + - Counter object with counts of cluster genes. + """ + gene_clusters = [get_gene_cluster(gene) for gene in genes] + return Counter(gene_clusters) + + +def get_majority_cog_category(genes, **kwargs): + cluster_to_categories = defaultdict(list) + + for gene in genes: + gene_cluster = get_gene_cluster(gene) + cog_category = get_cog_category(gene) + cluster_to_categories[gene_cluster].append(cog_category) + + majority_cog_category = {} + + # Determine the majority cog category for each cluster + for cluster, categories in cluster_to_categories.items(): + category_count = Counter(categories) + majority_cog_category[cluster] = category_count.most_common(1)[0][0] # a list of tuples where each tuple is a category and its count; [0][0] extracts the category with the highest count. + #logger.info(majority_cog_category) + return majority_cog_category + + +def get_gene_annotation(genes, func, **kwargs): + gene_annot_dict = {} + for gene in genes: + gene_id = get_gene_id(gene) + func_annot = func(gene) + gene_annot_dict[gene_id] = func_annot + + return gene_annot_dict + + +def get_genes_cog_categories(genes, **kwargs): + return get_gene_annotation(genes, get_cog_category) + + +def get_genes_ko(genes, *_args): + return get_gene_annotation(genes, get_kegg_ko) + + +def get_genes_cazy(genes, *_args): + return get_gene_annotation(genes, get_cazy) + + +def get_genes_clusters(genes, **kwargs): + return get_gene_annotation(genes, get_gene_cluster) + + +def get_genes(genes, mge_id): + gene_ids = [get_gene_id(gene) for gene in genes] + return {mge_id: gene_ids} + + +def count_per_mge_cargo(islands, func): + """ + Extract and count cargo genes associated with each MGE type and return as a dictionary of Counter objects. + + Parameters: + - islands: List of MGE objects. + - func: function to extract cargo e.g. count KO terms or gene clusters + + Returns: + - Dictionary where keys are MGE types, and values are Counter objects with KEGG KO counts. + """ + mge_cargo_counts = { + "nested": Counter(), + "phage": Counter(), + "phage_like": Counter(), + "is_tn": Counter(), + "ce": Counter(), + "mi": Counter(), + "integron": Counter(), + "cellular": Counter(), + } + + for island in islands: + try: + cargo = extract_cargo(island) + if island.mge_type == "nested": + mge_cargo_counts["nested"].update(func(cargo)) + else: + # Get the first key (assuming there's only one key) + mge = next(iter(island.mge.keys())) + mge_cargo_counts[mge].update(func(cargo)) + except Exception as e: + raise ValueError(f"Error processing cargo for island: {e}") + + return mge_cargo_counts + + +def get_per_mge_cargo(islands, func): + mge_cargo_annot = { + "nested": {}, + "phage": {}, + "phage_like": {}, + "is_tn": {}, + "ce": {}, + "mi": {}, + "integron": {}, + "cellular": {}, + } + + for island in islands: + try: + cargo = extract_cargo(island) + mge_id = island.get_id() + if island.mge_type == "nested": + mge_cargo_annot["nested"].update(func(cargo, mge_id)) # Merges another dictionary into existing one + else: + # Get the first key (assuming there's only one key) + mge = next(iter(island.mge.keys())) + mge_cargo_annot[mge].update(func(cargo, mge_id)) + except Exception as e: + raise ValueError(f"Error processing cargo for island: {e}") + + return mge_cargo_annot + + +def get_machinery_genes_tsv(islands): + tsv_rows = [] + header = ['mge_id', 'mge', 'n_genes', 'gene_id', 'gene_cluster', 'feature_type', 'feature_info'] + tsv_rows.append('\t'.join(header)) + + for island in islands: + try: + mge_id = island.get_id() + n_genes = len(island.genes) + mge = ",".join(f"{k}:{v}" for k, v in island.mge.items()) + + recombinases = extract_mger(island) # list of [gene_id, gene_cluster, info] + conj_machinery = extract_secretion_system(island) + phage_machinery = extract_phage(island) + + for gene in recombinases: + gene_id, gene_cluster, info = gene + tsv_rows.append(f"{mge_id}\t{mge}\t{n_genes}\t{gene_id}\t{gene_cluster}\tmgeR\t{info}") + + for gene in conj_machinery: + gene_id, gene_cluster, info = gene + tsv_rows.append(f"{mge_id}\t{mge}\t{n_genes}\t{gene_id}\t{gene_cluster}\tsecretion_system\t{info}") + + for gene in phage_machinery: + gene_id, gene_cluster, info = gene + tsv_rows.append(f"{mge_id}\t{mge}\t{n_genes}\t{gene_id}\t{gene_cluster}\tphage\t{info}") + + except Exception as e: + raise ValueError(f"Error processing machinery for island {island}: {e}") + + tsv_output = '\n'.join(tsv_rows) + return tsv_output + +def get_cargo_genes_tsv(islands): + tsv_rows = [] + header = ['mge_id', 'mge', 'gene_ids', 'gene_clusters'] + tsv_rows.append('\t'.join(header)) + + for island in islands: + gene_ids = [] + gene_clusters = [] + try: + mge_id = island.get_id() + mge = ",".join(f"{k}:{v}" for k, v in island.mge.items()) + + cargo_genes = extract_cargo(island) + + for gene in cargo_genes: + gene_ids.append(gene.id) + gene_clusters.append(get_gene_cluster(gene)) + gene_ids = ';'.join(gene_ids) + gene_clusters = ';'.join(gene_clusters) + tsv_rows.append(f"{mge_id}\t{mge}\t{gene_ids}\t{gene_clusters}") + + except Exception as e: + raise ValueError(f"Error processing cargo for island {island}: {e}") + + tsv_output = '\n'.join(tsv_rows) + return tsv_output + +# Counting works with aggregation since the objects are small. Getting only works with batch saving, since the output is huge. +def count_cargo_gene_clusters(islands): + return count_per_mge_cargo(islands, count_gene_clusters) + + +def get_cargo_species_gene_clusters(islands): + return get_per_mge_cargo(islands, func=get_genes_clusters) + + +# Output: for each mge_type output a dictionary with geneID: COG. geneID supposed to be unique -> overwriting is okay. +def get_cargo_genes_cog(islands): + return get_per_mge_cargo(islands, func=get_genes_cog_categories) + + +def get_cargo_genes_ko(islands): + return get_per_mge_cargo(islands, func=get_genes_ko) + + +def get_cargo_genes_cazy(islands): + return get_per_mge_cargo(islands, func=get_genes_cazy) + + +# Output: for each mge_type output a dictionary mge_id: list(cargo_ids) +def get_cargo_genes(islands): + return get_per_mge_cargo(islands, func=get_genes) + + +def get_mge_basic_info(islands): + """ + Extract basic information for each MGE island. + + Parameters: + - islands: List of MGE objects. + + Returns: + - List of dictionaries with mge_id, mge, mge_type, n_genes, and size for each island. + """ + mge_info_list = [] + + for island in islands: + try: + mge_id = island.get_id() + mge = ",".join( + f"{k}:{v}" + for k, v in sorted(island.mge.items()) + ) if island.mge else "" + mge_type = island.mge_type if island.mge_type else "" + n_genes = island.n_genes + size = island.size + + mge_info = { + "mge_id": mge_id, + "mge": mge, + "mge_type": mge_type, + "n_genes": n_genes, + "size": size, + "is_core": int(island.is_core) # 1 = core, 0 = accessory + } + mge_info_list.append(mge_info) + + except Exception as e: + logger.error(f"Error processing island {island}: {e}") + continue + + return mge_info_list + + diff --git a/mgexpose/gene.py b/mgexpose/gene.py index 23e09b6..5d1eb59 100644 --- a/mgexpose/gene.py +++ b/mgexpose/gene.py @@ -1,4 +1,4 @@ -# pylint: disable=R0902,R0917,R0913 +# pylint: disable=R0902 """ Gene module """ @@ -34,7 +34,7 @@ class Gene: # specify optional annotations here # when adding new class variables, # otherwise output will be suppressed. - OPTIONAL_ANNOTATIONS = ("phage", "secretion_system", "secretion_rule", "recombinase", "eggnog",) + OPTIONAL_ANNOTATIONS = ("phage", "secretion_system", "secretion_rule", "recombinase", "eggnog") # these are only optional when core genome calculations # are disabled, e.g. co-transferred region inputs CLUSTER_ANNOTATIONS = ("cluster", "is_core",) @@ -42,15 +42,7 @@ class Gene: @staticmethod def rtype(is_core): """ Returns is_core-tag. """ - if is_core is None: - return "NA" return ("ACC", "COR")[is_core] - - @staticmethod - def is_core_gene(occ, n_genomes, core_threshold=0.95, strict=True): - if strict or n_genomes == 2 or n_genomes > 20: - return occ / n_genomes > core_threshold - return occ >= n_genomes - 1 def stringify_eggnog(self): """ convert eggnog annotation into gff-col9 key-value pairs """ @@ -107,16 +99,12 @@ def from_gff(cls, *cols): end=int(cols[4]), # end strand=cols[6], # strand recombinase=attribs.get("recombinase"), - cluster=attribs.get("cluster") or attribs.get("Cluster"), + cluster=attribs.get("cluster"), is_core=attribs.get("genome_type") == "COR", phage=attribs.get("phage"), secretion_system=attribs.get("secretion_system"), secretion_rule=attribs.get("secretion_rule"), - eggnog=tuple( - (k, attribs.get(k)) - for k in EggnogReader.EMAPPER_FIELDS["v2.1.2"] - if attribs.get(k) and k != "description" - ), + eggnog=tuple((k, attribs.get(k)) for k in EggnogReader.EMAPPER_FIELDS["v2.1.2"] if attribs.get(k) and k != "description"), ) def to_gff( @@ -125,17 +113,12 @@ def to_gff( genomic_island_id, add_functional_annotation=False, intermediate_dump=False, - add_header=False, ): """ dump gene to gff record """ - - if add_header: - print("##gff-version 3", file=gff_outstream) - attribs = { "ID": self.id, "Parent": genomic_island_id, - "cluster": self.cluster, + "Cluster": self.cluster, "size": len(self), "secretion_system": self.secretion_system, "secretion_rule": self.secretion_rule, diff --git a/mgexpose/gene_annotator.py b/mgexpose/gene_annotator.py deleted file mode 100644 index eb9153f..0000000 --- a/mgexpose/gene_annotator.py +++ /dev/null @@ -1,222 +0,0 @@ -# pylint: disable=R0912,R0913,R0914,R0917 - -""" Classes for integrating gene annotations. """ - -import logging - -from contextlib import nullcontext - -from .clustering_parser import parse_full_seq_clusters, parse_y_clusters, parse_db_clusters -from .gene import Gene -from .phage import PhageDetection -from .readers import ( - EggnogReader, - parse_macsyfinder_report, - read_recombinase_hits, -) - - -logger = logging.getLogger(__name__) - - -class GeneAnnotator: - """ GeneAnnotator class. """ - def __init__( - self, - genome_id, - speci, - genes, - include_genome_id=False, - has_batch_data=False, - dbformat=None - ): - logger.info("Creating new %s for genome=%s specI=%s", self.__class__, genome_id, speci) - self.genome_id = genome_id - self.speci = speci - self.genes = {} - self.has_batch_data = has_batch_data - self.include_genome_id = include_genome_id - - for gene_id, annotation in genes: - - if dbformat != "PG3": - gene_id = f'{annotation[0]}_{gene_id.split("_")[-1]}' - - logger.info("Adding gene %s", gene_id) - self.genes[gene_id] = Gene( - id=gene_id, - genome=self.genome_id, - speci=self.speci, - contig=annotation[0], - start=int(annotation[3]), - end=int(annotation[4]), - strand=annotation[6], - ) - - def add_recombinases(self, recombinases): - """ Add information from recombinase scan """ - for gene_id, recombinase in recombinases: - gene = self.genes.get(gene_id) - if gene is not None: - gene.recombinase = recombinase - - def add_cluster( - self, - cluster_data, - use_y_clusters=False, - core_threshold=0.95, - output_dir=None, - strict=True, - ): - """ Add information from gene clustering to allow for core/accessory gene classification """ - - if use_y_clusters: - parse_y_clusters(cluster_data, self.genes) - return None - - write_data = False - gene_clusters_out = nullcontext() - n_genomes = 0 - cluster_genes = {} - - with gene_clusters_out: - if cluster_data is not None: - - if core_threshold != -1: - n_genomes, cluster_genes, gene_cluster_map, _ = parse_full_seq_clusters( - self.genome_id, - self.genes, - cluster_data, - output_dir=output_dir, - ) - - logger.info( - "Parsed %s genomes with %s gene clusters.", - n_genomes, - len(cluster_genes), - ) - else: - gene_cluster_map = parse_db_clusters(cluster_data) - - n_genes = len(gene_cluster_map) - n_core_genes = sum(1 for _, _, is_core in gene_cluster_map if is_core) - logger.info( - "Parsed %s precomputed gene-cluster mappings with %s core genes (%s%%)", - n_genes, - n_core_genes, - round(n_core_genes / n_genes, 2), - ) - - for gene_id, *cluster in gene_cluster_map: - cluster, *is_core = cluster - is_core = is_core[0].lower() == "true" if is_core else None - if not self.include_genome_id or gene_id.startswith(self.genome_id): - gene = self.genes.get( - gene_id, - self.genes.get( - gene_id.replace(self.genome_id + ".", "") - ) - ) - logger.info( - "Checking cluster %s gene %s... %s", - str(cluster), - gene_id, - str(gene), - ) - if gene and gene.speci is not None: - gene.cluster = cluster - - if cluster_genes: - occ = cluster_genes[cluster] - # gene.is_core = any(( - # occ / n_genomes > core_threshold, - # (2 < n_genomes <= 20 and occ >= n_genomes - 1), - # (n_genomes == 2 and occ == 2), - # )) - gene.is_core = Gene.is_core_gene(occ, n_genomes, core_threshold=core_threshold, strict=strict,) - elif core_threshold == -1: - gene.is_core = is_core - - if write_data: - print(gene.id, gene.cluster, sep="\t", file=gene_clusters_out) - - return None - - def add_eggnog_annotation(self, eggnog_annotation): - """ Add eggnog output and phage signals to each gene """ - for gene_id, phage_data, eggnog_data in eggnog_annotation: - gene = self.genes.get(gene_id) - if gene is not None: - gene.eggnog = eggnog_data - gene.phage = phage_data - - def add_secretion_system(self, secretion_annotation): - """ Add information from txsscan """ - for gene_id, secretion_data in secretion_annotation: - system, rule, *_ = secretion_data - gene = self.genes.get(gene_id) - if gene is not None: - gene.secretion_system = system - gene.secretion_rule = rule - - def annotate_genes( - self, - recombinases, - eggnog_annotation, - secretion_annotation, - clusters=None, - use_y_clusters=False, - core_threshold=None, - output_dir=None, - pyhmmer=True, - ): - """ Annotate genes with MGE-relevant data. """ - self.add_recombinases( - read_recombinase_hits(recombinases, pyhmmer=pyhmmer,) - ) - if all(secretion_annotation): - self.add_secretion_system( - parse_macsyfinder_report( - *secretion_annotation[:2], - # macsy_version=secretion_annotation[-1], - ), - ) - if eggnog_annotation is not None: - phage_detection = PhageDetection(eggnog_annotation[1]) - - self.add_eggnog_annotation( - EggnogReader.parse_emapper( - eggnog_annotation[0], - phage_annotation=phage_detection, - ) - ) - if clusters is not None: - self.add_cluster( - clusters, - use_y_clusters=use_y_clusters, - core_threshold=core_threshold, - output_dir=output_dir, - ) - yield from self.genes.values() - - def dump_genes(self, outstream): - """ Write gene info to stream. """ - - headers = list(Gene().__dict__.keys()) - headers.remove("eggnog") - headers += EggnogReader.EMAPPER_FIELDS["v2.1.2"] - headers.remove("description") - - # print(*Gene().__dict__.keys(), sep="\t", file=outstream) - print(*headers, sep="\t", file=outstream) - for gene in self.genes.values(): - gene.stringify_speci() - eggnog_data = {} - if gene.eggnog: - eggnog_data = dict(gene.eggnog) - eggnog_cols = ( - eggnog_data.get(k) - for k in EggnogReader.EMAPPER_FIELDS["v2.1.2"] - if k != "description" - ) - print(gene, *eggnog_cols, sep="\t", file=outstream) diff --git a/mgexpose/gffio.py b/mgexpose/gffio.py index 9dcb99a..7be11a1 100644 --- a/mgexpose/gffio.py +++ b/mgexpose/gffio.py @@ -1,20 +1,20 @@ -""" GFF I/O -- wannabe serialisation module """ from .gene import Gene from .islands import GenomicIsland, MgeGenomicIsland -def read_island_gff(fn, island_cls): - """ Read island gff """ +from .base_logger import logger + +def read_genomic_islands_gff(fn): with open(fn, "rt", encoding="UTF-8") as _in: island = None for line in _in: line = line.strip() if line and line[0] != "#": cols = line.split("\t") - if cols[2] == island_cls.GFFTYPE: + if cols[2] == "region": if island is not None: yield island - island = island_cls.from_gff(*cols) + island = GenomicIsland.from_gff(*cols) elif cols[2] == "gene": gene = Gene.from_gff(*cols) if island is not None: @@ -23,13 +23,47 @@ def read_island_gff(fn, island_cls): raise ValueError("Found gene but no island.") if island is not None: yield island + +def read_mge_genomic_islands_gff(fn, relevant_ids=None, parse_mge_id=True): + """ + Generator function to read and parse MGEs and associated genes from a GFF file. + Parameters: + - fn: Path to the GFF file. + - relevant_ids: Optional set of relevant MGE IDs to filter. If None, all MGEs are processed. + - parse_mge_id: Boolean indicating whether to parse the MGE ID into components. + Yields: + - MgeGenomicIsland objects that match the relevant IDs or all if None. + """ + with open(fn, "rt", encoding="UTF-8") as _in: + island = None + for line in _in: + line = line.strip() + if line and line[0] != "#": + cols = line.split("\t") + attributes = {kv.split('=')[0]: kv.split('=')[1] for kv in cols[8].split(';') if '=' in kv} -def read_genomic_islands_gff(fn): - """ reads a set of genomic islands + genes from a gff3 """ - yield from read_island_gff(fn, GenomicIsland) + if cols[2] == "mobile_genetic_element": + mge_id = attributes.get("ID") + + if relevant_ids is None or mge_id in relevant_ids: + if island is not None: + yield island + island = MgeGenomicIsland.from_gff(*cols, parse_mge_id=parse_mge_id) + + elif cols[2] == "gene": + parent_id = attributes.get("Parent") -def read_mge_genomic_islands_gff(fn): - """ reads a set of mge genomic islands + genes from a gff3 """ - yield from read_island_gff(fn, MgeGenomicIsland) + if island is not None: + if relevant_ids is None or parent_id in relevant_ids: + gene = Gene.from_gff(*cols) + island.genes.add(gene) + else: + continue + else: + # This situation should not happen unless the GFF is malformed + raise ValueError("Found gene with no preceding island.") + + if island is not None: + yield island \ No newline at end of file diff --git a/mgexpose/handle_args.py b/mgexpose/handle_args.py deleted file mode 100644 index 6988298..0000000 --- a/mgexpose/handle_args.py +++ /dev/null @@ -1,194 +0,0 @@ -""" Module for argument handling """ - -import argparse - -from .readers import EggnogReader - - -from . import __version__ - -def handle_args(): - """ Argument handling """ - ap = argparse.ArgumentParser( - prog="mgexpose", - formatter_class=argparse.RawTextHelpFormatter, - ) - - ap.add_argument( - "--version", action="version", version="%(prog)s " + __version__ - ) - - # ap.add_argument("--output_dir", "-o", type=str, default=".") - # ap.add_argument("--dbformat", type=str, choices=("PG3", "SPIRE")) - # ap.add_argument("--write_gff", action="store_true") - # ap.add_argument("--write_genes_to_gff", action="store_true") - # ap.add_argument("--dump_intermediate_steps", action="store_true") - # ap.add_argument("--output_suffix", type=str, default="full_length_MGE_assignments") - # ap.add_argument("--debug", action="store_true") - - subparsers = ap.add_subparsers(dest="command", required=True) - - parent_subparser = argparse.ArgumentParser(add_help=False) - parent_subparser.add_argument("--output_dir", "-o", type=str, default=".") - parent_subparser.add_argument("--dbformat", type=str, choices=("PG3", "SPIRE")) - parent_subparser.add_argument("--write_gff", action="store_true") - parent_subparser.add_argument("--write_genes_to_gff", action="store_true") - parent_subparser.add_argument("--dump_intermediate_steps", action="store_true") - parent_subparser.add_argument( - "--output_suffix", type=str, default="full_length_MGE_assignments", - ) - parent_subparser.add_argument("--debug", action="store_true") - - denovo_ap = subparsers.add_parser( - "denovo", - help="Classify and annotate mobile genomic regions from annotated genes.", - parents=(parent_subparser,), - ) - denovo_ap.add_argument("genome_id", type=str) - denovo_ap.add_argument("prodigal_gff", type=str) - denovo_ap.add_argument("recombinase_hits", type=str) - denovo_ap.add_argument("mge_rules", type=str) - denovo_ap.add_argument("--speci", type=str, default="no_speci") - denovo_ap.add_argument("--txs_macsy_rules", type=str) - denovo_ap.add_argument("--txs_macsy_report", type=str) - denovo_ap.add_argument("--phage_eggnog_data", type=str) - denovo_ap.add_argument("--cluster_data", type=str) - denovo_ap.add_argument("--skip_island_identification", action="store_true") - denovo_ap.add_argument("--dump_genomic_islands", action="store_true") - denovo_ap.add_argument("--phage_filter_terms", type=str) - - denovo_ap.add_argument("--include_genome_id", action="store_true") - denovo_ap.add_argument("--core_threshold", type=float, default=0.95) - denovo_ap.add_argument( - "--allow_batch_data", - action="store_true", - help=( - "SPIRE annotation may have data that does not relate to the current bin." - " Ignore those data." - ), - ) - denovo_ap.add_argument( - "--use_y_clusters", - action="store_true", - help=( - "Gene clustering is performed against annotated" - " and redundancy-reduced reference sets." - ), - ) - denovo_ap.add_argument( - "--single_island", - action="store_true", - help="Input is genomic region, skips island computation." - ) - denovo_ap.add_argument( - "--precomputed_islands", - type=str, - help="Input is set of genomic regions, skips island computation." - ) - denovo_ap.add_argument( - "--precomputed_core_genes", - action="store_true", - help="Core/accessory gene sets were precomputed." - ) - - denovo_ap.add_argument( - "--add_functional_annotation", - action="store_true", - help="If specified, per gene emapper annotations are stored in the gff." - ) - # ensure newest eggnog version - denovo_ap.add_argument("--extract_islands", type=str) - - denovo_ap.add_argument("--pyhmmer_input", action="store_true") - - denovo_ap.set_defaults(func=None) # TODO - - identify_mobile_islands_ap = subparsers.add_parser( - "identify_mobile_islands", - help="Identify and classify genomic islands as mobile.", - parents=(parent_subparser,), - ) - - identify_mobile_islands_ap.add_argument("island_gff", type=str) - - identify_mobile_islands_ap.set_defaults(func=None) # TODO - - return ap.parse_args() - - -def handle_args_old(): - """ Argument handling """ - ap = argparse.ArgumentParser() - ap.add_argument("genome_id", type=str) - ap.add_argument("prodigal_gff", type=str) - ap.add_argument("recombinase_hits", type=str) - ap.add_argument("speci", type=str) - - ap.add_argument( - "txs_macsy_rules", - type=str, - help=( - "In macsyfinder v1, this is found in macsyfinder.summary(.txt)." - " In v2+, this is provided with the pipeline." - ), - ) - ap.add_argument("txs_macsy_report", type=str) - ap.add_argument("phage_eggnog_data", type=str) - ap.add_argument("mge_rules", type=str) - - ap.add_argument("--cluster_data", type=str) - ap.add_argument("--output_dir", "-o", type=str, default=".") - ap.add_argument("--phage_filter_terms", type=str) - ap.add_argument("--include_genome_id", action="store_true") - ap.add_argument("--core_threshold", type=float, default=0.95) - ap.add_argument("--macsy_version", type=int, choices=(1, 2), default=2) - ap.add_argument( - "--emapper_version", - type=str, - choices=EggnogReader.EMAPPER_FIELDS.keys(), - default="v2.1.2", - ) - ap.add_argument( - "--allow_batch_data", - action="store_true", - help=( - "SPIRE annotation may have data that does not relate to the current bin." - " Ignore those data." - ), - ) - ap.add_argument( - "--use_y_clusters", - action="store_true", - help=( - "Gene clustering is performed against annotated" - " and redundancy-reduced reference sets." - ), - ) - ap.add_argument( - "--single_island", - action="store_true", - help="Input is genomic region, skips island computation." - ) - ap.add_argument( - "--precomputed_islands", - type=str, - help="Input is set of genomic regions, skips island computation." - ) - ap.add_argument("--write_gff", action="store_true") - ap.add_argument("--write_genes_to_gff", action="store_true") - ap.add_argument("--add_functional_annotation", - action="store_true", - help="If specified, per gene emapper annotations are stored in the gff.") - # ensure newest eggnog version - ap.add_argument("--dump_intermediate_steps", action="store_true") - ap.add_argument("--output_suffix", type=str, default="full_length_MGE_assignments") - ap.add_argument("--dbformat", type=str, choices=("PG3", "SPIRE")) - ap.add_argument( - "--precomputed_core_genes", - action="store_true", - help="Core/accessory gene sets were precomputed." - ) - ap.add_argument("--skip_island_identification", action="store_true") - ap.add_argument("--extract_islands", type=str) - - return ap.parse_args() diff --git a/mgexpose/island_processing.py b/mgexpose/island_processing.py deleted file mode 100644 index 4b87bbc..0000000 --- a/mgexpose/island_processing.py +++ /dev/null @@ -1,193 +0,0 @@ -""" Module for processing mobile genetic islands """ - -import contextlib -import logging - - -from .islands import GenomicIsland, AnnotatedGenomicIsland, MgeGenomicIsland - - -logger = logging.getLogger(__name__) - - -def is_valid_stream(stream): - """ Checks if a stream-variable represents a valid stream. """ - return stream is not None and not isinstance( - stream, contextlib.nullcontext - ) - - -def check_island_genes(genes, precomputed_islands=None): - """ Check if genes have valid annotation and belong to a precomputed island. """ - has_precomputed_islands = False - if precomputed_islands is not None: - logger.info("Precomputed islands: %s", len(precomputed_islands)) - has_precomputed_islands = True - - for gene in genes: - is_annotated = gene.has_basic_annotation(skip_core_gene_computation=has_precomputed_islands) - add_gene = False - if is_annotated and has_precomputed_islands: - gene.contig = gene.contig.split(".")[-1] - for island in precomputed_islands.get(gene.contig, []): - log_str = ( - f"Checking gene={gene.contig}:" - f"{gene.start}-{gene.end} against " - f"{island.contig}:{island.start}-{island.end}: " - ) - - if gene.is_in_interval(island.start, island.end): - add_gene = True - if gene.speci is None or gene.speci == "no_speci": - gene.speci = {island.name} - else: - gene.speci.add(island.name) - island.add_gene(gene) - - logger.info("%s %s", log_str, str(add_gene)) - else: - add_gene = is_annotated - - log_str = ( - f"Gene {gene}: {is_annotated=} {add_gene=} ->" - f" contig set = {is_annotated and add_gene}" - ) - logger.info(log_str) - if add_gene: - yield gene - - -def filter_precomputed_islands(precomputed_islands, raw_island_stream=None, island_stream=None): - """ Return precomputed islands with mge signals. """ - for _, islands in precomputed_islands.items(): - seen_islands = set() - for island in islands: - island.dump(seen_islands, raw_outstream=raw_island_stream, outstream=island_stream) - if island.recombinases: - logger.info("GenomicIsland %s created.", str(island)) - yield island - - -def compute_islands(contigs, raw_island_stream=None, island_stream=None): - """ Form genomic islands from stretches of core or accessory genes, - the return those with mge signals.""" - for _, genes in sorted(contigs.items()): - seen_islands = set() - current_island = None - for gene in sorted(genes, key=lambda g: (g.start, g.end, g.strand)): - if current_island is None or current_island.is_core != gene.is_core: - if current_island is not None: - current_island.dump( - seen_islands, - raw_outstream=raw_island_stream, - outstream=island_stream - ) - if current_island.recombinases: - yield current_island - - current_island = GenomicIsland.from_gene(gene) - - current_island.add_gene(gene) - - if current_island is not None: - current_island.dump( - seen_islands, - raw_outstream=raw_island_stream, - outstream=island_stream - ) - if current_island.recombinases: - yield current_island - - -def generate_island_set( - genes, - pang_calls_out=None, - raw_islands_out=None, - islands_out=None, - precomputed_islands=None, -): - """ Compute mge islands """ - contigs = {} - logger.info("generate_island_set: collecting genes") - - for gene in check_island_genes(genes, precomputed_islands=precomputed_islands): - logger.info("Adding gene %s to contig set.", str(gene)) - if is_valid_stream(pang_calls_out): - print(gene, file=pang_calls_out) - if precomputed_islands is None: - contigs.setdefault( - (gene.speci, gene.contig), [] - ).append(gene) - - if is_valid_stream(islands_out): - print(*GenomicIsland.get_fieldnames(), sep="\t", file=islands_out) - island_stream = islands_out - else: - island_stream = islands_out - if is_valid_stream(raw_islands_out): - print(*GenomicIsland.RAW_TABLE_HEADER, sep="\t", file=raw_islands_out) - raw_island_stream = raw_islands_out - else: - raw_island_stream = None - - if precomputed_islands is not None: - yield from filter_precomputed_islands( - precomputed_islands, - raw_island_stream=raw_island_stream, - island_stream=island_stream, - ) - else: - yield from compute_islands( - contigs, - raw_island_stream=raw_island_stream, - island_stream=island_stream, - ) - - -def annotate_islands(islands, outstream=None): - """ Adds annotation to previously computed islands. """ - do_print = is_valid_stream(outstream) - if do_print: - print(*AnnotatedGenomicIsland.TABLE_HEADERS, sep="\t", file=outstream) - for island in sorted(islands, key=lambda x: (x.contig, x.start, x.end)): - annotated_island = AnnotatedGenomicIsland.from_genomic_island(island) - if do_print: - print(annotated_island, file=outstream) - yield annotated_island - - -def evaluate_islands(islands, rules, outstream=None, outstream2=None): - """ Classify/annotate mge islands according to present signals. """ - if is_valid_stream(outstream): - print(*MgeGenomicIsland.TABLE_HEADERS, sep="\t", file=outstream) - if is_valid_stream(outstream2): - print(*MgeGenomicIsland.TABLE_HEADERS, sep="\t", file=outstream2) - for island in islands: - mge_island = MgeGenomicIsland.from_annotated_genomic_island(island) - mge_island.evaluate_recombinases( - rules, - outstream=outstream if not isinstance(outstream, contextlib.nullcontext) else None, - outstream2=outstream2 if not isinstance(outstream2, contextlib.nullcontext) else None, - ) - - yield mge_island - - -def prepare_precomputed_islands(single_island=None, island_file=None, genome_id=None,): - """ Helper function to deal with precomputed regions/islands. """ - precomputed_islands = None - if single_island and island_file: - raise ValueError("Both --single_island and --precomputed_islands set.") - if single_island and not island_file: - precomputed_islands = [GenomicIsland.from_region_string(single_island, genome_id=genome_id,)] - elif not single_island and island_file: - with open(island_file, "rt", encoding="UTF-8",) as _in: - precomputed_islands = [GenomicIsland.from_region_string(line, genome_id=genome_id,) for line in _in] - - if precomputed_islands is not None: - precomputed_islands_by_contig = {} - for island in precomputed_islands: - precomputed_islands_by_contig.setdefault(island.contig, []).append(island) - precomputed_islands = precomputed_islands_by_contig - - return precomputed_islands diff --git a/mgexpose/islands.py b/mgexpose/islands.py index f22ea55..611b0e1 100644 --- a/mgexpose/islands.py +++ b/mgexpose/islands.py @@ -1,4 +1,4 @@ -# pylint: disable=C0116,C0301,R0902,R0916,R0913,R0917 +# pylint: disable=C0116,C0301,R0902,R0916 """ Data Structures Module @@ -18,7 +18,7 @@ import itertools as it import logging import sys -import warnings +import re from collections import Counter from dataclasses import dataclass, field @@ -26,14 +26,13 @@ from .gene import Gene from .recombinases import MgeRule, MGE_ALIASES - logger = logging.getLogger(__name__) @dataclass class GenomicIsland: '''The following class describes a generic genomic region - with one or more identified recombinases (recombinases). + with one or more identified recombinases. This region is then referred as Recombinase Island. The Genomic Island is represented by contig, start and end coordinates, set of genes, some of which are recombinases and MGE machinery. @@ -48,7 +47,6 @@ class GenomicIsland: "end", "gene_list", ) - GFFTYPE = "region" speci: str = None genome: str = None @@ -62,15 +60,6 @@ class GenomicIsland: # recombinases: list = field(default_factory=list) recombinases: Counter = field(default_factory=Counter) - @staticmethod - def parse_id(id_string): - """ Parse genome id, contig id, start and end coordinates from id string. - Reverses get_id(). """ - cols = id_string.split("_") - contig, coords = cols[3].split(':') - - return "_".join(cols[1:3]), contig, int(coords[0]), int(coords[1]) - @staticmethod def get_fieldnames(): """ Returns column headers for island table. """ @@ -88,12 +77,12 @@ def get_fieldnames(): ) @classmethod - def from_region_string(cls, region, genome_id=None,): + def from_region_string(cls, region): """ Creates island from a predefined region string. """ _, _, contig, start_end, *_ = region.strip().split(".") contig = contig.split(".")[-1] start, end = map(int, start_end.split("-")) - return cls(None, genome_id, None, contig, start, end, region.strip()) + return cls(None, None, None, contig, start, end, region.strip()) @classmethod def from_gene(cls, gene): @@ -120,8 +109,8 @@ def __str__(self): genes = ( f"{gene.id}.{gene.cluster}" for gene in sorted( - self.genes, key=lambda g: (g.start, g.end, g.strand) - ) + self.genes, key=lambda g: (g.start, g.end, g.strand) + ) ) return "\t".join( @@ -136,7 +125,7 @@ def add_gene(self, gene): """ Adds a gene to the island. """ if gene not in self.genes: self.end = max(self.end, gene.end) - if gene.recombinase: + if gene.recombinase is not None: # self.recombinases.append( # (f"{gene.id}.{gene.cluster}", gene.recombinase) # ) @@ -174,13 +163,20 @@ def get_id(self): @classmethod def from_gff(cls, *cols): - attribs = dict(item.split("=") for item in cols[-1].split(";")) - recombinases = Counter( - { - item.split(":")[0]: int(item.split(":")[1]) - for item in attribs["recombinases"].split(",") - } - ) + try: + attribs = dict(item.split("=") for item in cols[-1].split(";")) + except: + raise ValueError(f"not enough cols? {cols}") + + try: + recombinases = Counter( + dict( + item.split(":") + for item in attribs["recombinases"].split(",") + ) + ) + except: + raise ValueError(f"recombinase string weird? {attribs['recombinases'].split(',')}") return cls( attribs["specI"], @@ -189,23 +185,12 @@ def from_gff(cls, *cols): cols[0], # contig int(cols[3]), # start int(cols[4]), # end - genes=set(), recombinases=recombinases, + genes=set(), ) - def to_gff( - self, - gff_outstream, - source_db, - write_genes=False, - add_functional_annotation=False, - intermediate_dump=False, - add_header=False, - ): - - if add_header: - print("##gff-version 3", file=gff_outstream) - + def to_gff(self, gff_outstream, source_db, write_genes=False, add_functional_annotation=False, + intermediate_dump=False): island_id = self.get_id() attribs = { "ID": island_id, @@ -222,7 +207,7 @@ def to_gff( ) if self.recombinases else "" ), - "specI": self.speci, + "specI": self.speci, #TODO: does it work? } if self.name: attribs["name"] = self.name @@ -235,7 +220,7 @@ def to_gff( print( self.contig, source, - GenomicIsland.GFFTYPE, + "region", self.start, self.end, len(self), # Score field @@ -248,7 +233,7 @@ def to_gff( if write_genes: # GFF3 child term: genes - for gene in sorted(self.genes, key=lambda g: (g.start, g.end,)): + for gene in sorted(self.genes, key=lambda g: g.id): gene.to_gff( gff_outstream, genomic_island_id=island_id, @@ -370,7 +355,6 @@ class MgeGenomicIsland(AnnotatedGenomicIsland): "conj_man_count", "recombinases", ) - GFFTYPE = "mobile_genetic_element" integron: int = 0 cellular: int = 0 @@ -385,9 +369,14 @@ class MgeGenomicIsland(AnnotatedGenomicIsland): tn3_found: bool = False ser_found: bool = False + mge: Counter = field(default_factory=Counter) + mge_type: str = None + size: int = 0 + n_genes: int = 0 + def __post_init__(self): """ Apply annotations. """ - recombinases = (",".join(it.chain(*((r,) * v for r, v in self.recombinases.items())))).lower() + recombinases = (",".join(r for _, r in self.get_recombinases())).lower() for name, alias in MGE_ALIASES.items(): recombinases = recombinases.replace(name, alias) @@ -396,13 +385,13 @@ def __post_init__(self): # integron self.integron = int("integron" in recombinases) + # tag recombinase island with more than 3 recombinases + # self.c_nmi = int(len(self.recombinases) > 3) + self.c_nmi = sum(self.recombinases.values()) # self.recombinases = recombinases.split(",") if recombinases else [] self.recombinases = Counter(recombinases.split(",")) - # tag recombinase island with more than 3 recombinases - self.c_nmi = sum(self.recombinases.values()) > 3 - def __str__(self): """ String representation. """ return "\t".join( @@ -424,7 +413,61 @@ def __str__(self): self.name, ) ) - + + @staticmethod + def parse_mge_id(mge_id): + """ + Generalized parser for MGE IDs. + + Returns: + genome_id (str): parsed bin or genome identifier + contig (str): contig name (usually something like k141_32063) + start (int): start coordinate + end (int): end coordinate + """ + try: + # Extract coordinates + coord_match = re.search(r":(\d+)-(\d+)$", mge_id) + if not coord_match: + raise ValueError("No coordinates found in MGE ID.") + start, end = map(int, coord_match.groups()) + + # Remove leading MGE_ and SPIRE_ (if present) + cleaned = mge_id + if cleaned.startswith("MGE_"): + cleaned = cleaned[4:] + if cleaned.startswith("SPIRE_"): + cleaned = cleaned[6:] + + # Strip coordinates + core = cleaned.split(':')[0] + + # Assembly-style pattern (e.g., GCA_019800745.1) + if re.match(r"GCA_[\d.]+", core): + genome_id = core.split('_')[0] + '_' + core.split('_')[1] + contig = core.split('.')[-1] + return genome_id, contig, start, end + + # Bin-style pattern: extract contig and genome_id + kmer_match = re.search(r"(_k\d+_\d+)$", core) + if kmer_match: + contig = kmer_match.group(1)[1:] # remove leading underscore + genome_id = core[: -len(kmer_match.group(1))] # remove contig + return genome_id, contig, start, end + + # Underscore split pattern e.g. MGE_NT12001_NC_000913.3:5234-20508 + underscore_split = core.split('_', 1) + if len(underscore_split) == 2 and '.' in underscore_split[1]: + genome_id, contig = underscore_split + return genome_id, contig, start, end + + # Fallback for unknown formats + raise ValueError(f"Unrecognized MGE ID format: {mge_id}") + + except Exception as e: + raise ValueError(f"Failed to parse MGE ID '{mge_id}': {e}") + + def get_mge_metrics(self): """ Cast mge metrics to int. """ return tuple( @@ -458,8 +501,7 @@ def get_annotated_mge_metrics(self): def is_nested(annotated_mge_metrics): n_mges = sum(v for _, v in annotated_mge_metrics) if not n_mges: - # raise UserWarning("No MGEs were assigned to recombinase island") - warnings.warn("No MGEs were assigned to recombinase island") + raise UserWarning("No MGEs were assigned to recombinase island") # Solitary or nested MGE? return n_mges > 1 @@ -519,7 +561,7 @@ def evaluate_recombinases(self, rules, outstream=None, outstream2=None): self.c_tn = patch_c_tn if outstream: - print(self, sep="\t", file=outstream,) + print(self, sep="\t", file=outstream, ) # previous step in some cases generates overlap between Phage/Phage_like and Mobility island # this step specifically resolves such instances based on recombinase presence and presence/ @@ -537,7 +579,7 @@ def evaluate_recombinases(self, rules, outstream=None, outstream2=None): self.phage, self.c_mi = True, False if outstream2: - print(self, sep="\t", file=outstream2,) + print(self, sep="\t", file=outstream2, ) @classmethod def from_annotated_genomic_island(cls, ag_island): @@ -549,11 +591,13 @@ def from_annotated_genomic_island(cls, ag_island): def get_id(self): return f"MGE_{self.genome}_{self.contig}:{self.start}-{self.end}" - - def get_attribs(self): + + + def to_gff(self, gff_outstream, source_db, write_genes=False, add_functional_annotation=False): + island_id = self.get_id() mge_metrics = self.get_annotated_mge_metrics() attribs = { - "ID": self.get_id(), + "ID": island_id, "mge": ",".join(f"{k}:{v}" for k, v in mge_metrics), # Count each mge type "genome_type": Gene.rtype(self.is_core), "mge_type": self.mge_num_island_type(self.is_nested(mge_metrics)), @@ -570,52 +614,16 @@ def get_attribs(self): } if self.name: attribs["name"] = self.name - return attribs - - def to_gff( - self, - gff_outstream, - source_db, - write_genes=False, - add_functional_annotation=False, - intermediate_dump=False, - add_header=False, - ): - if add_header: - print("##gff-version 3", file=gff_outstream) - - # island_id = self.get_id() - # mge_metrics = self.get_annotated_mge_metrics() - # attribs = { - # "ID": island_id, - # "mge": ",".join(f"{k}:{v}" for k, v in mge_metrics), # Count each mge type - # "genome_type": Gene.rtype(self.is_core), - # "mge_type": self.mge_num_island_type(self.is_nested(mge_metrics)), - # "size": len(self), - # "n_genes": len(self.genes), - # "mgeR": ( - # ",".join( - # f"{k}:{v}" - # # for k, v in sorted(Counter(self.recombinases).items()) - # for k, v in sorted(self.recombinases.items()) - # ) - # if self.recombinases else "" - # ), - # } - # if self.name: - # attribs["name"] = self.name - attribs = self.get_attribs() attrib_str = ";".join(f"{item[0]}={item[1]}" for item in attribs.items() if item[1]) # Format the source column - source = ("proMGE", f"proMGE_{source_db}")[bool(source_db)] - # if source_db: - # source = f"proMGE_{source_db}" - # else: - # source = "proMGE" + if source_db: + source = f"proMGE_{source_db}" + else: + source = "proMGE" print( self.contig, source, - MgeGenomicIsland.GFFTYPE, + "mobile_genetic_element", self.start, self.end, len(self), # Score field @@ -628,15 +636,15 @@ def to_gff( if write_genes: # GFF3 child term: genes - for gene in sorted(self.genes, key=lambda g: (g.start, g.end,)): + for gene in sorted(self.genes, key=lambda g: g.id): gene.to_gff( gff_outstream, - genomic_island_id=attribs["ID"], + genomic_island_id=island_id, add_functional_annotation=add_functional_annotation, ) @classmethod - def from_gff(cls, *cols): + def from_gff(cls, *cols, parse_mge_id=True): try: attribs = dict(item.split("=") for item in cols[-1].split(";")) except: @@ -661,13 +669,16 @@ def from_gff(cls, *cols): ) except: raise ValueError(f"mge string weird? {attribs['mge'].split(',')}") + + if parse_mge_id: + genome_id, contig, start, end = cls.parse_mge_id(attribs["ID"]) + else: + genome_id = attribs.get("genome", "") + contig = cols[0] + start = int(cols[3]) + end = int(cols[4]) - if mges.get("is_tn"): - mges["c_tn"] = mges["is_tn"] - del mges["is_tn"] - genome_id, *_ = GenomicIsland.parse_id(attribs["ID"]) - # TODO: check coordinates and ID overlap return cls( "", # TODO: where to get/ how to handle specI genome_id, @@ -676,11 +687,10 @@ def from_gff(cls, *cols): int(cols[3]), # start int(cols[4]), # end recombinases=recombinases, - # mge=mges, - **mges, - # mge_type=attribs["mge_type"], - # size=int(attribs["size"]), - # n_genes=int(attribs["n_genes"]), + mge=mges, + mge_type=attribs["mge_type"], + size=int(attribs["size"]), + n_genes=int(attribs["n_genes"]), genes=set(), ) diff --git a/mgexpose/phage.py b/mgexpose/phage.py deleted file mode 100644 index 1d8fc73..0000000 --- a/mgexpose/phage.py +++ /dev/null @@ -1,126 +0,0 @@ -# pylint: disable=R0903 - -""" Phage detection via keyword search """ - -import re - - -class PhageDetection: - """ Class to detect phage signals in freetext functional gene annotation. """ - # VIRAL_STRUCTURES = re.compile( - # "portal|fiber|collar|terminase|prohead" - # "|baseplate|sheath|base-plate|tail|head|capsid|tube" - # ) - # m/portal|tail_fiber|terminase|prohead|baseplate|tail_sheath|Tail_sheath| - # base-plate|tail_protein|capsid|tail_tube/ - VIRAL_STRUCTURES_KEYWORDS = ( - r"base-?plate", - r"capsid", - r"portal", - r"prohead", - r"terminase", - r"tail_(fiber|protein|sheath|tube)", - ) - VIRAL_STRUCTURES = re.compile(r"|".join(VIRAL_STRUCTURES_KEYWORDS)) - - # EXCLUDE_LIST = re.compile( - # "ribosome|ribosomal|30s|50s|sipc|tafi" - # "|post-translational|mycolic|macrophage" - # ) - # m/ribosome|ribosomal|30S|50S|SipC|Tafi|post-translational|mycolic| - # macrophage|peptidoglycan|sickle|Rhophilin|ATPase|myelin|Cysteine/i) - EXCLUDED_KEYWORDS = ( - r"ribosom(e|al)", - r"[35]0s", - r"sipc", - r"tafi", - r"post-?translational", - r"my(elin|colic)", - r"macrophage", - r"peptido-?glycan", - r"sickle", - r"rhophilin", - r"atpase", - r"cysteine", - ) - EXCLUDE_LIST = re.compile(r"|".join(EXCLUDED_KEYWORDS)) - - # EXPECTED_PHAGES = re.compile("phage|bacteriophage|prophage|lamboid|lambda") - # m/phage|bacteriophage|prophage|lamboid|lambda|\bMu\b|Mu-like - PHAGE_KEYWORDS = ( - r"(bacterio|pro)?phage", - r"lamb(da|oid)", - r"mu(-like)?" - ) - EXPECTED_PHAGES = re.compile(r"|".join(PHAGE_KEYWORDS)) - - # EXTENDED_VIRAL_STRUCTURES = re.compile("holi|dna-packaging|mu-like|lysis|associated|membrane") - # m/holi|DNA-packaging|portal|fiber|collar|terminase|prohead|baseplate| - # sheath|base-plate|lysis|membrane_protein|tail|head|capsid|tube - EXTENDED_VIRAL_STRUCTURES_KEYWORDS = ( - r"holi", - r"dna-packaging", - r"portal", - r"fiber", - r"terminase", - r"prohead", - r"base-?plate", - r"sheath", - r"lysis", - r"membrane_protein", - r"tail", - r"head", - r"capsid", - r"tube", - ) - EXTENDED_VIRAL_STRUCTURES = re.compile(r"|".join(EXTENDED_VIRAL_STRUCTURES_KEYWORDS)) - - # EXCLUDE_INTEGRASE = re.compile("[pP]hage[ _]integrase") - # m/Phage integrase|phage integrase/i - INTEGRASE = re.compile(r"phage[ _]integrase") - - def __init__(self, phage_filter_file=None): - self.phage_filter = set() - if phage_filter_file is not None: - with open(phage_filter_file, "rt", encoding="UTF-8") as _in: - self.phage_filter = set(line.strip().split("\t")[0] for line in _in) - - def is_phage(self, eggnog_freetext, eggnog_og): - """ Filters phage-related parsed eggnog mapper output. - Returns binary phage signal. - """ - if eggnog_og in self.phage_filter: - return False - - viral_structure = PhageDetection.VIRAL_STRUCTURES.search(eggnog_freetext) - excluded_term = PhageDetection.EXCLUDE_LIST.search(eggnog_freetext) - if viral_structure and not excluded_term: - return True - - phage_structure = PhageDetection.EXPECTED_PHAGES.search(eggnog_freetext) - ext_viral_structure = PhageDetection.EXTENDED_VIRAL_STRUCTURES.search(eggnog_freetext) - integrase = PhageDetection.INTEGRASE.search(eggnog_freetext) - - if phage_structure and ext_viral_structure and not integrase: - return True - - return False - - # if all(( - # PhageDetection.VIRAL_STRUCTURES.search(eggnog_freetext), - # not PhageDetection.EXCLUDE_LIST.search(eggnog_freetext), - # )): - # self.phage_annotated.add(gene_id) - # return True - # if all(( - # gene_id not in self.phage_annotated, - # PhageDetection.EXPECTED_PHAGES.search(eggnog_freetext), - # any(( - # PhageDetection.VIRAL_STRUCTURES.search(eggnog_freetext), - # PhageDetection.EXTENDED_VIRAL_STRUCTURES.search(eggnog_freetext), - # )), - # not PhageDetection.EXCLUDE_INTEGRASE.search(eggnog_freetext), - # )): - # self.phage_annotated.add(gene_id) - # return True - # return False diff --git a/mgexpose/query_db.py b/mgexpose/query_db.py new file mode 100644 index 0000000..d8bb94a --- /dev/null +++ b/mgexpose/query_db.py @@ -0,0 +1,295 @@ +import psycopg2 +import pandas as pd +import json +import sys + +genome2speci_pg3_file = '/home/grekova/workspace/promge_website/data/genome_id2speci.tsv' + +df = pd.read_csv(genome2speci_pg3_file, sep="\t") + +genome2speci_ids = dict(zip(df["sample_name"], df["cluster_name"])) + +def get_gtdb_id(identifier): + """Extract the GTDB ID from a genome identifier.""" + parts = identifier.split('_') + return f"{parts[0]}_{parts[1]}" # GCA_xxxxxx.x + +def connect(params_dic): + """ Connect to the PostgreSQL database server """ + conn = None + try: + # connect to the PostgreSQL server + print('Connecting to the PostgreSQL database...') + conn = psycopg2.connect(**params_dic) + except (Exception, psycopg2.DatabaseError) as error: + print(error) + sys.exit(1) + print("Connection created successfully") + return conn + +def parse_mge_id(id): + #'GCA_009102765.1_371601.SAMN11944272.WDCH01000111:267-2860' + mge_dict = {'gtdb_id': '', + 'contig' : '', + 'start' : '', + 'end' : '' + } + id = id.replace(':', '_').split('_') + mge_dict['gtdb_id'] = (id[0] + '_' + id[1]) #GCA_xxxxxx.x + mge_dict['contig'] = (id[2]) + coordinates = [int(c) for c in id[3].split('-')] + mge_dict['start'] = (coordinates[0]) + mge_dict['end'] = (coordinates[1]) + return mge_dict + +def query_mge_annotations(conn): + ''' Query the database to get all levels of taxonomy, mge_type and recombinases from the mge table. + Args: + In: conn: psycopg connection + Out: result: df mge_id recombinase tax_domain tax_phylum tax_class tax_order tax_family tax_genus tax_species + + ''' + cursor = conn.cursor() + levels = ["clusters.{level}".format(level=level) for level in ['tax_domain', 'tax_phylum', 'tax_class', + 'tax_order', 'tax_family', 'tax_genus', + 'tax_species']] + levels_str = ', '.join(levels) + query = """ + SELECT contig || ':' || start_pos || '-' || end_pos AS contig_pos, + {levels_str} + FROM clusters AS clusters, pg3.mge AS mge + WHERE clusters.id = mge.cluster_id; + """.format(levels_str=levels_str) + cursor.execute(query) + + result = cursor.fetchall() + cursor.close() + columns = ['contig_pos'] + columns.extend([l.replace("clusters.", "") for l in levels]) + result = pd.DataFrame(result, columns=columns) + return result + +def get_taxa(speci_lst, cursor, level=None): + ''' Query the database to get taxonomy information + Args: + In: speci_lst (list): List of species names + cursor: psycopg cursor object + level (str, optional): Specific taxonomic level to query. If None, fetch all levels. + Out: result: (DataFrame) containing taxonomy information + ''' + levels = { + "tax_domain", "tax_phylum", "tax_class", "tax_order", "tax_family", "tax_genus", "tax_species" + } + + if level and level not in levels: + raise ValueError(f"Invalid level: {level}. Choose from {levels} or None for full taxonomy.") + + levels_str = f"clusters.{level}" if level else ', '.join(f"clusters.{lvl}" for lvl in levels) + + specI_str = ', '.join(['%s'] * len(speci_lst)) + + query = f""" + SELECT cluster_name, {levels_str} + FROM clusters AS clusters + WHERE clusters.cluster_name IN ({specI_str}); + """ + + cursor.execute(query, tuple(speci_lst)) + result = cursor.fetchall() + + columns = ['cluster_name'] + ([level] if level else list(levels)) + + return pd.DataFrame(result, columns=columns) if result else pd.DataFrame(columns=columns) + +def get_gtdb_taxa(sample_ids, db, cursor, level=None): + levels = { + "d", "p", "c", "o", "f", "g", "s" + } + if level and level not in levels: + raise ValueError(f"Invalid level: {level}. Choose from {levels} or None for full taxonomy.") + + if db == "pg3": + tax_table = "pg3.gtdb_r220" + sample_table = "pg3.samples" + assembly = "genome_id" + levels_str = f"t.{level}" if level else ', '.join(f"t.{lvl}" for lvl in levels) + elif db == "spire": + tax_table = "gtdb_r220" + sample_table = "bins" + assembly = "bin_id" + levels_str = f"{tax_table}.{level}" if level else ', '.join(f"{tax_table}.{lvl}" for lvl in levels) + else: + raise ValueError(f"Invalid db specification: {db}. pg3 or spire are allowed.") + + + sample_ids_str = ', '.join(['%s'] * len(sample_ids)) + + if db == "pg3": + query = f""" + SELECT sample_name, {levels_str} + FROM {sample_table} AS s, {tax_table} AS t + WHERE (s.sample_name IN ({sample_ids_str})) AND (s.id = t.sample_id); + """ + elif db == "spire": + query = f""" + SELECT bin_name, {levels_str} + FROM {sample_table} AS {sample_table}, {tax_table} AS {tax_table} + WHERE ({sample_table}.bin_name IN ({sample_ids_str})) AND ({sample_table}.id = {tax_table}.bin_id); + """ + else: + raise ValueError(f"Invalid db specification: {db}. pg3 or spire are allowed.") + + cursor.execute(query, tuple(sample_ids)) + result = cursor.fetchall() + + columns = [assembly] + ([level] if level else list(levels)) + + return pd.DataFrame(result, columns=columns) if result else pd.DataFrame(columns=columns) + + +def annotate_clustering_df(clustered_df, conn, level="tax_species"): + ''' Query taxonomy information and update DataFrame ''' + + print("Clustering df, nrows:", len(clustered_df)) + + genome2speci = {id: genome2speci_ids[get_gtdb_id(id)] for id in clustered_df.member_seq_100 if 'GCA_' in id} + clustered_df['speci'] = clustered_df['member_seq_100'].map(genome2speci) + specIs = list(set(genome2speci.values())) + print("# specI:", len(specIs)) + + cursor = conn.cursor() + + if level == "full": + result_df = get_taxa(specIs, cursor) + else: + result_df = get_taxa(specIs, cursor, level) + + print("Merging taxonomy") + clustered_df = clustered_df.merge(result_df, how="inner", left_on="speci", right_on="cluster_name") + + cursor.close() + return clustered_df + + +def get_speci_taxonomy_df(speci_lst, conn, level="tax_species"): + ''' For each specI cluster get taxonomy information and return as taxa_df dataframe''' + + specIs = list(set(speci_lst)) # Ensure that it is a list + print("# specI:", len(speci_lst)) + + cursor = conn.cursor() + + if level == "full": + taxa_df = get_taxa(speci_lst, cursor) + else: + taxa_df = get_taxa(speci_lst, cursor, level) + + cursor.close() + return taxa_df + + +def get_gtdb_taxonomy_df(sample_ids, db, conn, level="s"): + ''' For each sample_id (bin_id or genome_id) get gtdb taxonomy information and return as taxa_df dataframe''' + + sample_ids= list(set(sample_ids)) # Ensure that it is a list + print("# samples_ids:", len(sample_ids)) + + cursor = conn.cursor() + + if level == "full": + taxa_df = get_gtdb_taxa(sample_ids, db, cursor) + else: + taxa_df = get_gtdb_taxa(sample_ids, db, cursor, level) + + cursor.close() + return taxa_df + + +def get_microontology(sample_names, conn): + ''' + Query the database to get microontology information. + + Args: + sample_names (list): List of sample names + cursor (psycopg cursor): Active DB cursor + + Returns: + pd.DataFrame: DataFrame with sample_name, sample_id, term_id, term, term_array + ''' + + if len(sample_names) == 0: + return pd.DataFrame(columns=["sample_name", "study_id", "sample_id", "term"]) + cursor = conn.cursor() + + samples_str = ', '.join(['%s'] * len(sample_names)) + + query = f""" + SELECT + s.sample_name, + s.study_id, + mv.sample_id, + mt.term + FROM samples s + JOIN microntology_v3 mv ON s.id = mv.sample_id + JOIN LATERAL unnest(mv.microntology_terms) AS term_id ON TRUE + JOIN microntology_terms mt ON mt.id = term_id + WHERE s.sample_name IN ({samples_str}); + """ + + cursor.execute(query, tuple(sample_names)) + result = cursor.fetchall() + cursor.close() + columns = ["sample_name", "study_id", "sample_id", "term"] + + return pd.DataFrame(result, columns=columns) if result else pd.DataFrame(columns=columns) + +# Given a contig_name query for its length +# from this table +''' +spire=> SELECT * FROM pg3.contigs LIMIT 10; + id | sample_id | contig_name | length +----+-----------+----------------------------------+-------- + 1 | 1004653 | 938639.SAMN02441251.ATTB01000002 | 138806 + 2 | 1004653 | 938639.SAMN02441251.ATTB01000003 | 88240 + 3 | 1004653 | 938639.SAMN02441251.ATTB01000004 | 86762 +''' +def get_contig_length(contig_name, conn): + + cursor = conn.cursor() + query = f"SELECT length FROM pg3.contigs WHERE contig_name = %s;" + cursor.execute(query, (contig_name,)) + result = cursor.fetchone() + cursor.close() + return result[0] if result else None + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/mgexpose/readers.py b/mgexpose/readers.py deleted file mode 100644 index 23bf3be..0000000 --- a/mgexpose/readers.py +++ /dev/null @@ -1,215 +0,0 @@ -# pylint: disable=R0903 - -""" Module contains various reader/parser functions """ - -import csv -import gzip -import re -import sys - -from .chunk_reader import get_lines_from_chunks -from .recombinases import MgeRule - - -def read_fasta(f): - header, seq = None, [] - for line in get_lines_from_chunks(f): - if line[0] == ">": - if seq: - yield header, "".join(seq) - seq.clear() - header = line.strip()[1:] - else: - seq.append(line.strip()) - if seq: - yield header, "".join(seq) - - -def read_prodigal_gff(f): - """ Prodigal gff output reader. - - Returns (gene_id, gff_line) tuples via generator. - """ - for line in get_lines_from_chunks(f): - line = line.strip() - if line and line[0] != "#": - line = line.split("\t") - _id = [ - item.split("=")[1] - for item in line[8].split(";") - if item.startswith("ID") - ][0] - # gene_id = f"{line[0]}_{_id.split('_')[1]}" - # yield gene_id, line - yield _id, line - - -def read_recombinase_hits(f, pyhmmer=True): - """ Read hmmer output from recombinase scan. - - Returns (gene_id, mge_name) tuples via generator. - """ - with open(f, "rt", encoding="UTF-8") as _in: - for line in _in: - line = line.strip() - if line and line[0] != "#": - if pyhmmer: - gene_id, mge = line.split("\t")[:2] - else: - gene_id, _, mge, *_ = re.split(r"\s+", line) - yield gene_id, mge - - -# would love to add raw scan parsing to annotator, -# but then the upstream filtering doesn't work anymore... >:( -# def read_recombinase_scan(f): -# recombinase_hits = {} -# with open(f, "rt") as _in: -# for line in _in: -# line = line.strip() -# if line and line[0] != "#": -# gene_id, _, mge, pfam_acc, evalue, score, *_ = re.split(r"\s+", line) -# score = float(score) -# best_hit = recombinase_hits.get(gene_id) -# if best_hit is None or score > best_hit[0]: -# recombinase_hits[gene_id] = score, mge, pfam_acc, evalue - -# for gene_id, recombinase_annotation in recombinase_hits.items(): -# yield gene_id, recombinase_annotation - - -def parse_macsyfinder_rules(f, macsy_version=2): - """ Read macsyfinder rules. - - Returns dictionary {secretion_system: {mandatory: count, accessory: count}}. - """ - key_col, mandatory_col, accessory_col = (0, 1, 2) if macsy_version == 2 else (1, 5, 6) - - with open(f, "rt", encoding="UTF-8") as _in: - return { - row[key_col].replace("_putative", ""): { - "mandatory": int(row[mandatory_col]), - "accessory": int(row[accessory_col]), - } - for row_index, row in enumerate(csv.reader(_in, delimiter="\t")) - if row_index and row and not row[0].startswith("#") - } - - -def parse_macsyfinder_report(f, f_rules, macsy_version=2): - """ Read macsyfinder/txsscan results. - - Returns (gene_id, txsscan_results) tuples via generator. - """ - - rules = parse_macsyfinder_rules(f_rules, macsy_version=macsy_version) - - key_col, col1, col2 = (1, 4, 8) if macsy_version == 2 else (0, 6, 9) - - with open(f, "rt", encoding="UTF-8") as _in: - for line in _in: - line = line.strip() - if line and line[0] != "#": - line = re.split(r"\s+", line.strip()) - system = line[col1].replace("TXSS/", "") - rule = rules.get(system) - if rule is None: - print( - "WARNING: cannot find txsscan-rule for system:", - f"`{system}`", - file=sys.stderr, - ) - - if line and line[0] and line[0] != "replicon": - yield line[key_col], (system, rule, line[col2]) - - -def read_mge_rules(f, recombinase_scan=False): - """ Read MGE rules. - - Returns dictionary {mge: MgeRule}. - """ - with open(f, "rt", encoding="UTF-8") as _in: - rules = { - row[0].lower(): MgeRule(row[0], *(tuple(map(int, row[1:]))), recombinase_scan) - for i, row in enumerate(csv.reader(_in, delimiter="\t")) - if i != 0 - } - - # #special case for Tn3 since it can carry conjugative system# - # for rule_id, rule in rules.items(): - # if "tn3" in rule_id: - # rule.ce = 1 - - return rules - - -class EggnogReader: - """ - Class to read and parse Eggnog annotations. - Currently, phages are detected - based on the regex signals in the emapper 'description' field - """ - EMAPPER_FIELDS = { - "v1": {"cog_fcat": 11, "description": 12}, - "v2.0.0": {"cog_fcat": 20, "description": 21}, - "v2.0.2": {"cog_fcat": 20, "description": 21}, - "v2.1.0": {"cog_fcat": 9, "description": 10}, - "v2.1.2": {"cog_fcat": 6, "description": 7, - "seed_eggNOG_ortholog": 1, - "seed_ortholog_evalue": 2, - "seed_ortholog_score": 3, - "eggnog_ogs": 4, - "max_annot_lvl": 5, - "goes": 9, - "ec": 10, - "kegg_ko": 11, - "kegg_pathway": 12, - "kegg_module": 13, - "kegg_reaction": 14, - "kegg_rclass": 15, - "brite": 16, - "cazy": 18, - "bigg_reaction": 19, - "pfam": 20 - }, - } - - @staticmethod - def parse_emapper(f, emapper_version="v2.1.2", phage_annotation=None): - """ Parses emapper annotations output. - Returns (gene_id, phage_signal, eggnog) -> (str, boolean, tuple) tuples via generator. - """ - def filter_record(key, value, row): - return value < len(row) and row[value] and row[value] != "-" and key != "description" - - emapper_fields = EggnogReader.EMAPPER_FIELDS.get(emapper_version) - if emapper_fields is None: - raise ValueError(f"{emapper_version} is an unknown emapper annotation format.") - if f.endswith(".gz"): - emapper_stream = gzip.open(f, "rt") - else: - emapper_stream = open(f, "rt", encoding="UTF-8") - - with emapper_stream: - for row in csv.reader(emapper_stream, delimiter="\t"): - if row and row[0][0] != "#": - gene_id = row[0] - # Collect non-empty eggnog attributes - eggnog_gene_ann = tuple( - (key, row[value]) - for key, value in emapper_fields.items() - if filter_record(key, value, row) - ) - phage_signal = None - if phage_annotation is not None: - # note: freetext is converted to lower case here, - # so REs only have to match against lower! - eggnog_freetext = re.sub( - r"\s", "_", row[emapper_fields["description"]] - ).lower() - is_phage = phage_annotation.is_phage( - eggnog_freetext, row[emapper_fields["eggnog_ogs"]] - ) - phage_signal = (None, eggnog_freetext)[is_phage] - yield gene_id, phage_signal, eggnog_gene_ann diff --git a/mgexpose/recombinases.py b/mgexpose/recombinases.py deleted file mode 100644 index 2fc86d6..0000000 --- a/mgexpose/recombinases.py +++ /dev/null @@ -1,186 +0,0 @@ -# pylint: disable=R0916 - -""" Recombinase rules and aliases """ - -import itertools as it - -from dataclasses import dataclass - - -MGE_ALIASES = { - "c1_n1ser": "ser_tn", - "c2_n1ser": "ser_ce", - "c3_n1ser": "ser_lsr", - "casposons": "cas1", -} - - -@dataclass -class MgeRule: - '''The following class defines the set of rules used to determine MGE type. - Type classification is based on two criteria: - 1. Recombinase subfamily i.e. - 2. Structural information - - MGE categories include - - IS_Tn(tn) - - Phage(ph) - - CE(conjugative elements) - - Integron(int) - - Cellular(cell) - As well as newly introduced categories: - - Phage_like(pli) - - Mobility island(mi)''' - subfamily: str = None - is_tn: bool = False - phage: bool = False - ce: bool = False - integron: bool = False - cellular: bool = False - recombinase_scan: bool = False - - def __post_init__(self): - """ Deal with special case for Tn3 - since it can carry conjugative system - - ignored if the rule is used during recombinase-scans - """ - if all( - ( - self.subfamily is not None, - "tn3" in self.subfamily.lower(), - not self.recombinase_scan, - ) - ): - self.ce = 1 - - def get_signals(self): - """ Returns MGE signals of rule. """ - return tuple( - k - for k, v in self.__dict__.items() - if v and k not in ("subfamily", "recombinase_scan") - ) - - def c_tn_check(self, island): - """ Tn check. """ - # c_tn, n_recombinases = island.c_tn, len(island.recombinases) - c_tn, n_recombinases = island.c_tn, sum(island.recombinases.values()) - if self.is_tn and not self.cellular and not self.ce and not self.phage: - # IS_Tn - c_tn += 1 - elif ( - self.is_tn and self.ce and island.conj_man_count < 1 and - n_recombinases == 2 and not island.tn3_found and not island.ser_found - ): - # c2_n1ser(considers solo c2_n1ser and Tn3) - c_tn = 1 - elif self.is_tn and self.ce and island.conj_man_count < 1 and n_recombinases == 1: - # c2_n1ser(considers c2_n1ser and Tn3 as one tn when not together) - c_tn += 1 - - # disentangles recombinase shared by tn and ph - if (self.is_tn and self.phage and island.phage_count < 2): - c_tn += 1 - - return c_tn - - def patch_c_tn_check(self, island): - """Deals with special case when 2 recombinases. - - c_tn = is_tn and ce and conj_man_count < 1 and |recombinases|=2 and !(tn3 or ser) - """ - # old check was: - # if rule.is_tn and rule.ce and self.conj_man_count < 1 - # and len(self.recombinases) == 2 and not self.tn3_found: - # self.c_tn = True - # elif rule.is_tn and rule.ce and self.conj_man_count < 1 - # and len(self.recombinases) == 2 and not self.ser_found: - # self.c_tn = True - # old: - # if len(island.recombinases) == 2 and self.is_tn and self.ce and island.conj_man_count < 1: - # recombinase_types = list(island.recombinases) - # two_tn3 = "tn3" in recombinase_types[0] and "tn3" in recombinase_types[1] - # two_ser_ce = "ser_ce" in recombinase_types[0] and "ser_ce" in recombinase_types[1] - - # mixed = "tn3" in recombinase_types[0] or "tn3" in recombinase_types[1] - # mixed |= "ser_ce" in recombinase_types[0] or "ser_ce" in recombinase_types[1] - - # return two_tn3 != two_ser_ce or mixed - - if sum(island.recombinases.values()) == 2 and self.is_tn and self.ce and island.conj_man_count < 1: - # recombinase_types = ",".join(list(island.recombinases)) - recombinase_types = ",".join(it.chain(*it.chain((r,) * c for r, c in island.recombinases.items()))) - - mixed = "tn3" in recombinase_types and "ser_ce" in recombinase_types - - two_tn3 = recombinase_types.count("tn3") == 2 - two_ser_ce = recombinase_types.count("ser_ce") == 2 - - return (two_tn3 != two_ser_ce) or mixed - - return False - - def phage_check(self, island): - """Phage annotation based on recombinase presence - and phage structural genes in the neighbourhood. - """ - phage, c_mi, nov = island.phage, island.c_mi, island.nov - phage |= (self.is_tn and self.phage) - phage |= (self.phage and not self.ce) - phage |= (self.phage and self.ce) - - c_mi |= (not self.phage and self.ce) - nov = c_mi - - return phage, c_mi, nov - - def phage_like_check(self, island, is_brujita): - """Annotate phage_like element - (presence of phage specific recombinase and absence of phage structural genes - in the neighbourhood) - and mobility island - (presence of recombinase common to phages and conjugative elements - and absence of phage structural genes in the neighbourhood) - """ - c_pli, c_mi = island.c_pli, island.c_mi - if not self.is_tn: - if self.phage and (not self.ce or is_brujita): - c_pli = 1 - elif self.ce and (not self.phage or not is_brujita): - c_mi = 1 - - return c_pli, c_mi - - def conjug_element_check(self, island): - """Conjugative element annotation based on presence of recombinase - and presence of conjugative machinery genes in the neighbourhood - """ - c_ce, nov = island.c_ce, island.nov - if self.ce: - c_ce = 1 - elif self.phage: - c_ce = nov = 1 - elif all( - ( - bool(self.is_tn), - bool(self.ce), - # len(island.recombinases) >= 3, - sum(island.recombinases.values()) >= 3, - (island.tn3_found or island.ser_found) - ) - ): - c_ce = 1 - - return c_ce, nov - - def mobility_island_check(self, island): - """Annotate MI(Mobility island) presence of both - phage structural genes and conjugation machinery genes in the neighbourhood - """ - phage, c_mi, nov = island.phage, island.c_mi, island.nov - if self.is_tn and self.phage: - phage = 1 - else: - c_mi = nov = 1 - - return phage, c_mi, nov diff --git a/setup.cfg b/setup.cfg index 85077c7..767d236 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,8 @@ +[options] +packages = find: +[options.packages.find] +include = mgexpose* [metadata] description_file = README.md -license_files = LICENSE \ No newline at end of file +license_files = LICENSE +