From b0b9685e3adac976fb9e9f70a1bfa845f856e7aa Mon Sep 17 00:00:00 2001 From: Nicholas Hathaway Date: Mon, 24 Nov 2025 17:22:21 -0800 Subject: [PATCH] added several exporter functions to go from PMO to various tables; --- src/pmotools/cli.py | 81 ++- src/pmotools/pmo_engine/pmo_exporter.py | 642 ++++++++++++++++++ src/pmotools/pmo_engine/pmo_processor.py | 441 ------------ ..._library_sample_names_per_specimen_name.py | 4 +- .../scripts/pmo_to_tables/__init__.py | 0 .../export_library_sample_meta_table.py | 60 ++ .../export_panel_info_meta_table.py | 60 ++ .../export_project_info_meta_table.py | 60 ++ .../export_sequencing_info_meta_table.py | 60 ++ .../export_specimen_meta_table.py | 60 ++ .../export_specimen_travel_meta_table.py | 60 ++ .../export_target_info_meta_table.py | 60 ++ .../extract_allele_table.py | 4 +- .../extract_insert_of_panels.py | 5 +- .../extract_refseq_of_inserts_of_panels.py | 5 +- tests/test_pmo_engine/test_pmo_exporter.py | 277 ++++++++ tests/test_pmo_engine/test_pmo_processor.py | 124 ---- 17 files changed, 1415 insertions(+), 588 deletions(-) create mode 100644 src/pmotools/pmo_engine/pmo_exporter.py create mode 100644 src/pmotools/scripts/pmo_to_tables/__init__.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_library_sample_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_panel_info_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_project_info_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_sequencing_info_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_specimen_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_specimen_travel_meta_table.py create mode 100644 src/pmotools/scripts/pmo_to_tables/export_target_info_meta_table.py rename src/pmotools/scripts/{extractors_from_pmo => pmo_to_tables}/extract_allele_table.py (97%) rename src/pmotools/scripts/{extract_info_from_pmo => pmo_to_tables}/extract_insert_of_panels.py (94%) rename src/pmotools/scripts/{extract_info_from_pmo => pmo_to_tables}/extract_refseq_of_inserts_of_panels.py (90%) create mode 100755 tests/test_pmo_engine/test_pmo_exporter.py diff --git a/src/pmotools/cli.py b/src/pmotools/cli.py index 855e48f..ce368ac 100644 --- a/src/pmotools/cli.py +++ b/src/pmotools/cli.py @@ -37,7 +37,7 @@ from pmotools.scripts.extractors_from_pmo.extract_pmo_with_read_filter import ( extract_pmo_with_read_filter, ) -from pmotools.scripts.extractors_from_pmo.extract_allele_table import ( +from pmotools.scripts.pmo_to_tables.extract_allele_table import ( extract_for_allele_table, ) @@ -66,13 +66,37 @@ ) # panel info subset -from pmotools.scripts.extract_info_from_pmo.extract_insert_of_panels import ( +from pmotools.scripts.pmo_to_tables.extract_insert_of_panels import ( extract_insert_of_panels, ) -from pmotools.scripts.extract_info_from_pmo.extract_refseq_of_inserts_of_panels import ( +from pmotools.scripts.pmo_to_tables.extract_refseq_of_inserts_of_panels import ( extract_refseq_of_inserts_of_panels, ) +# pmo to tables + +from pmotools.scripts.pmo_to_tables.export_specimen_meta_table import ( + export_specimen_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_library_sample_meta_table import ( + export_library_sample_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_project_info_meta_table import ( + export_project_info_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_sequencing_info_meta_table import ( + export_sequencing_info_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_specimen_travel_meta_table import ( + export_specimen_travel_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_target_info_meta_table import ( + export_target_info_meta_table, +) +from pmotools.scripts.pmo_to_tables.export_panel_info_meta_table import ( + export_panel_info_meta_table, +) + @dataclass(frozen=True) class PmoCommand: @@ -115,17 +139,6 @@ class PmoCommand: "extract_pmo_with_read_filter": PmoCommand( extract_pmo_with_read_filter, "Extract with a read filter" ), - "extract_allele_table": PmoCommand( - extract_for_allele_table, - "Extract allele tables for tools like dcifer or moire", - ), - "extract_insert_of_panels": PmoCommand( - extract_insert_of_panels, "Extract inserts of panels from a PMO" - ), - "extract_refseq_of_inserts_of_panels": PmoCommand( - extract_refseq_of_inserts_of_panels, - "Extract ref_seq of panel inserts from a PMO", - ), }, "working_with_multiple_pmos": { "combine_pmos": PmoCommand( @@ -160,6 +173,46 @@ class PmoCommand: validate_pmo, "Validate a PMO file against a JSON Schema" ) }, + "pmo_to_table": { + "export_specimen_meta_table": PmoCommand( + export_specimen_meta_table, "export the specimen meta table from a PMO file" + ), + "export_library_sample_meta_table": PmoCommand( + export_library_sample_meta_table, + "export the library_sample meta table from a PMO file", + ), + "export_project_info_meta_table": PmoCommand( + export_project_info_meta_table, + "export the project_info meta table from a PMO file", + ), + "export_sequencing_info_meta_table": PmoCommand( + export_sequencing_info_meta_table, + "export the sequencing_info meta table from a PMO file", + ), + "export_specimen_travel_meta_table": PmoCommand( + export_specimen_travel_meta_table, + "export the specimen travel_info meta table from a PMO file", + ), + "export_target_info_meta_table": PmoCommand( + export_target_info_meta_table, + "export the target info meta table from a PMO file", + ), + "export_panel_info_meta_table": PmoCommand( + export_panel_info_meta_table, + "export the panel info meta table from a PMO file", + ), + "extract_allele_table": PmoCommand( + extract_for_allele_table, + "Extract allele tables for tools like dcifer or moire", + ), + "extract_insert_of_panels": PmoCommand( + extract_insert_of_panels, "Extract inserts of panels from a PMO" + ), + "extract_refseq_of_inserts_of_panels": PmoCommand( + extract_refseq_of_inserts_of_panels, + "Extract ref_seq of panel inserts from a PMO", + ), + }, } diff --git a/src/pmotools/pmo_engine/pmo_exporter.py b/src/pmotools/pmo_engine/pmo_exporter.py new file mode 100644 index 0000000..1a39282 --- /dev/null +++ b/src/pmotools/pmo_engine/pmo_exporter.py @@ -0,0 +1,642 @@ +#!/usr/bin/env python3 +import copy +import json +import os +from collections import defaultdict +from typing import NamedTuple +import pandas as pd + +from pmotools.pmo_engine.pmo_checker import PMOChecker +from pmotools.pmo_engine.pmo_processor import PMOProcessor + +from pmotools import __version__ as __pmotools_version__ + +bed_loc_tuple = NamedTuple( + "bed_loc", + [ + ("chrom", str), + ("start", int), + ("end", int), + ("name", str), + ("score", float), + ("strand", str), + ("ref_seq", str), + ("extra_info", str), + ], +) + + +class PMOExporter(object): + """ + A collection of functions to export information out of a PMO + """ + + @staticmethod + def is_primitive(x) -> bool: + return isinstance(x, (str, int, float, bool)) or x is None + + @staticmethod + def is_primitive_list(x) -> bool: + return isinstance(x, (list, tuple)) and all( + PMOExporter.is_primitive(i) for i in x + ) + + @staticmethod + def is_exportable(x) -> bool: + return PMOExporter.is_primitive(x) or PMOExporter.is_primitive_list(x) + + @staticmethod + def export_specimen_travel_meta_table( + pmodata, separator: str = "," + ) -> pd.DataFrame: + """ + Export the specimen meta information of a PMO to a dataframe + Currently avoiding exporting values of complex object types like TravelInfo or Parasite densities, best to export such values in their own tables + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the specimen metadata + """ + rows = [] + for specimen in pmodata["specimen_info"]: + if "travel_out_six_month" in specimen: + for travel_meta in specimen["travel_out_six_month"]: + export_row = {"specimen_name": specimen["specimen_name"]} + for key, value in travel_meta.items(): + if PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def export_specimen_meta_table(pmodata, separator: str = ",") -> pd.DataFrame: + """ + Export the specimen meta information of a PMO to a dataframe + Currently avoiding exporting values of complex object types like TravelInfo or Parasite densities, best to export such values in their own tables + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the specimen metadata + """ + rows = [] + for specimen in pmodata["specimen_info"]: + export_row = {} + for key, value in specimen.items(): + if "project_id" == key: + export_row["project_name"] = pmodata["project_info"][value][ + "project_name" + ] + elif PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def export_library_sample_meta_table(pmodata, separator: str = ",") -> pd.DataFrame: + """ + Export the library_sample meta information of a PMO to a dataframe + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the library_sample metadata + """ + rows = [] + for library_sample in pmodata["library_sample_info"]: + export_row = {} + for key, value in library_sample.items(): + if "sequencing_info_id" == key: + export_row["sequencing_info_name"] = pmodata["sequencing_info"][ + value + ]["sequencing_info_name"] + elif "specimen_id" == key: + export_row["specimen_name"] = pmodata["specimen_info"][value][ + "specimen_name" + ] + elif "panel_id" == key: + export_row["panel_name"] = pmodata["panel_info"][value][ + "panel_name" + ] + elif PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def export_sequencing_info_meta_table( + pmodata, separator: str = "," + ) -> pd.DataFrame: + """ + Export the sequencing_info meta information of a PMO to a dataframe + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the sequencing_info metadata + """ + rows = [] + for sequencing_info in pmodata["sequencing_info"]: + export_row = {} + for key, value in sequencing_info.items(): + if PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def export_project_info_meta_table(pmodata, separator: str = ",") -> pd.DataFrame: + """ + Export the project_info meta information of a PMO to a dataframe + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the project_info metadata + """ + rows = [] + for project_info in pmodata["project_info"]: + export_row = {} + for key, value in project_info.items(): + if PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def export_panel_info_meta_table(pmodata, separator: str = ",") -> pd.DataFrame: + """ + Export the panel meta information of a PMO to a dataframe + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the panel metadata + """ + rows = [] + for panel_info in pmodata["panel_info"]: + export_row = {} + for key, value in panel_info.items(): + if PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + reactions_for_target = defaultdict(list) + for reaction in panel_info["reactions"]: + for target_id in reaction["panel_targets"]: + reactions_for_target[ + pmodata["target_info"][target_id]["target_name"] + ].append(reaction["reaction_name"]) + for target, reactions in reactions_for_target.items(): + export_row_per_target = copy.deepcopy(export_row) + export_row_per_target["target_name"] = target + export_row_per_target["reaction_name"] = separator.join(reactions) + rows.append(export_row_per_target) + return pd.DataFrame(rows) + + @staticmethod + def export_target_info_meta_table(pmodata, separator: str = ",") -> pd.DataFrame: + """ + Export the target meta information of a PMO to a dataframe + :param pmodata: the pmo export the information from + :param separator: the separator to use for list values + :return: a pandas dataframe of the panel metadata + """ + rows = [] + for panel_info in pmodata["target_info"]: + export_row = {} + for key, value in panel_info.items(): + if "forward_primer" == key: + export_row["forward_primer_seq"] = panel_info["forward_primer"][ + "seq" + ] + elif "reverse_primer" == key: + export_row["reverse_primer_seq"] = panel_info["reverse_primer"][ + "seq" + ] + elif PMOExporter.is_primitive(value): + export_row[key] = value + elif PMOExporter.is_primitive_list(value): + export_row[key] = separator.join(str(v) for v in value) + rows.append(export_row) + return pd.DataFrame(rows) + + @staticmethod + def write_bed_locs(bed_locs: list[bed_loc_tuple], fnp, add_header: bool = False): + """ + Write out a list of bed_loc_tuple to a file, will auto overwrite it + :param bed_locs: a list of bed_loc_tuple + :param fnp: output file path, will be overwritten if it exists + :param add_header: add header of #chrom,start end,name,score,strand,ref_seq,extra_info, starts with comment so tools will treat it as a comment line + """ + with open(fnp, "w") as f: + if add_header: + f.write( + "\t".join( + [ + "#chrom", + "start", + "end", + "name", + "score", + "strand", + "ref_seq", + "extra_info", + ] + ) + ) + for bed_loc in bed_locs: + f.write( + "\t".join( + [ + bed_loc.chrom, + str(bed_loc.start), + str(bed_loc.end), + bed_loc.name, + str(bed_loc.score), + bed_loc.strand, + str(bed_loc.ref_seq), + bed_loc.extra_info, + ] + ) + ) + f.write("\n") + + @staticmethod + def extract_targets_insert_bed_loc( + pmodata, select_target_ids: list[int] = None, sort_output: bool = True + ): + """ + Extract out of a PMO the insert location for targets, will add ref seq if loaded into PMO + :param pmodata: the PMO to extract from + :param select_target_ids: a list of target ids to select, if None will select all targets + :param sort_output: whether to sort output by genomic location + :return: a list of target inserts, with named tuples with fields: chrom, start, end, name, score, strand, extra_info, ref_seq + """ + # bed_loc = NamedTuple("bed_loc", [("chrom", str), ("start", int), ("end", int), ("name", str), ("score", float), ("strand", str), ("extra_info", str), ("ref_seq", str)]) + bed_loc_out = [] + if select_target_ids is None: + select_target_ids = list(range(len(pmodata["target_info"]))) + for target_id in select_target_ids: + tar = pmodata["target_info"][target_id] + if "insert_location" not in tar: + raise Exception( + "no insert_location in pmodata for target id " + + str(target_id) + + " target_name " + + str(tar["target_name"]) + + ", cannot extract insert_location" + ) + genome_info = pmodata["targeted_genomes"][ + tar["insert_location"]["genome_id"] + ] + genome_name_version = ( + genome_info["name"] + "_" + genome_info["genome_version"] + ) + extra_info = ( + str("[") + str("genome_name_version=") + genome_name_version + ";]" + ) + strand = ( + "+" + if "strand" not in tar["insert_location"] + else tar["insert_location"]["strand"] + ) + ref_seq = ( + "" + if "ref_seq" not in tar["insert_location"] + else tar["insert_location"]["ref_seq"] + ) + bed_loc_out.append( + bed_loc_tuple( + tar["insert_location"]["chrom"], + tar["insert_location"]["start"], + tar["insert_location"]["end"], + tar["target_name"], + tar["insert_location"]["end"] - tar["insert_location"]["start"], + strand, + ref_seq, + extra_info, + ) + ) + if sort_output: + return sorted(bed_loc_out, key=lambda bed: (bed.chrom, bed.start, bed.end)) + return bed_loc_out + + @staticmethod + def extract_panels_insert_bed_loc( + pmodata, select_panel_ids: list[int] = None, sort_output: bool = True + ): + """ + Extract out of a PMO the insert location for panels, will add ref seq if loaded into PMO + :param pmodata: the PMO to extract from + :param select_panel_ids: a list of panels ids to select, if None will select all panels + :param sort_output: whether to sort output by genomic location + :return: a list of target inserts, with named tuples with fields: chrom, start, end, name, score, strand, extra_info, ref_seq + """ + bed_loc_out = {} + if select_panel_ids is None: + select_panel_ids = list(range(len(pmodata["panel_info"]))) + for panel_id in select_panel_ids: + bed_loc_out_per_panel = [] + for reaction_id in range(len(pmodata["panel_info"][panel_id]["reactions"])): + for target_id in pmodata["panel_info"][panel_id]["reactions"][ + reaction_id + ]["panel_targets"]: + tar = pmodata["target_info"][target_id] + if "insert_location" not in tar: + raise Exception( + "no insert_location in pmodata for target id " + + str(target_id) + + " target_name " + + str(tar["target_name"]) + + ", cannot extract insert_location" + ) + genome_info = pmodata["targeted_genomes"][ + tar["insert_location"]["genome_id"] + ] + genome_name_version = ( + genome_info["name"] + "_" + genome_info["genome_version"] + ) + extra_info = ( + str("[") + + "genome_name_version=" + + genome_name_version + + ";" + + "panel=" + + pmodata["panel_info"][panel_id]["panel_name"] + + ";" + + "reaction=" + + pmodata["panel_info"][panel_id]["reactions"][reaction_id][ + "reaction_name" + ] + + ";" + + "]" + ) + strand = ( + "+" + if "strand" not in tar["insert_location"] + else tar["insert_location"]["strand"] + ) + ref_seq = ( + "" + if "ref_seq" not in tar["insert_location"] + else tar["insert_location"]["ref_seq"] + ) + bed_loc_out_per_panel.append( + bed_loc_tuple( + tar["insert_location"]["chrom"], + tar["insert_location"]["start"], + tar["insert_location"]["end"], + tar["target_name"], + tar["insert_location"]["end"] + - tar["insert_location"]["start"], + strand, + ref_seq, + extra_info, + ) + ) + if sort_output: + return sorted( + bed_loc_out_per_panel, + key=lambda bed: (bed.chrom, bed.start, bed.end), + ) + bed_loc_out[panel_id] = bed_loc_out_per_panel + return bed_loc_out + + @staticmethod + def extract_alleles_per_sample_table( + pmodata, + additional_specimen_info_fields: list[str] = None, + additional_library_sample_info_fields: list[str] = None, + additional_microhap_fields: list[str] = None, + additional_representative_info_fields: list[str] = None, + default_base_col_names: list[str] = [ + "library_sample_name", + "target_name", + "mhap_id", + ], + jsonschema_fnp=os.path.join( + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ), + "schemas/", + f"portable_microhaplotype_object_v{__pmotools_version__}.schema.json", + ), + validate_pmo: bool = False, + ) -> pd.DataFrame: + """ + Create a pd.Dataframe of sample, target and allele. Can optionally add on any other additional fields + + :param pmodata: the data to write from + :param additional_specimen_info_fields: any additional fields to write from the specimen_info object + :param additional_library_sample_info_fields: any additional fields to write from the library_samples object + :param additional_microhap_fields: any additional fields to write from the microhap object + :param additional_representative_info_fields: any additional fields to write from the representative_microhaplotype_sequences object + :param default_base_col_names: The default column name for the sample, locus and allele + :param jsonschema_fnp: path to the jsonschema schema file to validate the PMO against + :param validate_pmo: whether to validate the PMO with a jsonschema + :return: pandas dataframe + """ + + # check input + if validate_pmo: + with open(jsonschema_fnp) as f: + checker = PMOChecker(json.load(f)) + checker.validate_pmo_json(pmodata) + + # Check to see if at least 1 sample has supplied meta field + # samples without this meta field will have NA + if additional_specimen_info_fields is not None: + # Find meta fields that have at least some data + meta_fields_with_data = { + metafield + for metafield in additional_specimen_info_fields + for specimen_data in pmodata["specimen_info"] + if metafield in specimen_data + } + + # Determine meta fields with no samples having data + meta_fields_with_no_samples = ( + set(additional_specimen_info_fields) - meta_fields_with_data + ) + + if meta_fields_with_no_samples: + raise Exception( + f"No specimen_info have data for fields: {', '.join(meta_fields_with_no_samples)}" + ) + # Check to see if at least 1 sample has supplied meta field + # samples without this meta field will have NA + if additional_library_sample_info_fields is not None: + # Find meta fields that have at least some data + meta_fields_with_data = { + metafield + for metafield in additional_library_sample_info_fields + for library_data in pmodata["library_sample_info"] + if metafield in library_data + } + # Determine meta fields with no samples having data + meta_fields_with_no_samples = ( + set(additional_library_sample_info_fields) - meta_fields_with_data + ) + + if meta_fields_with_no_samples: + raise Exception( + f"No library_sample_info have data for fields: {', '.join(meta_fields_with_no_samples)}" + ) + + # Check to see if at least 1 haplotype has this field + # samples without this meta field will have NA + if additional_microhap_fields is not None: + # Find meta fields that have at least some data + additional_microhap_fields_with_data = { + additional_microhap_field + for additional_microhap_field in additional_microhap_fields + for detected_microhaplotypes in pmodata["detected_microhaplotypes"] + for library_samples_data in detected_microhaplotypes["library_samples"] + for target_data in library_samples_data["target_results"] + for microhap_data in target_data["mhaps"] + if additional_microhap_field in microhap_data + } + # Determine meta fields with no samples having data + additional_microhap_fields_with_no_samples = ( + set(additional_microhap_fields) - additional_microhap_fields_with_data + ) + + if additional_microhap_fields_with_no_samples: + raise Exception( + f"No detected_microhaplotypes have data for fields: {', '.join(additional_microhap_fields_with_no_samples)}" + ) + # Check to see if at least 1 haplotype has this field + # samples without this meta field will have NA + if additional_representative_info_fields is not None: + # Find meta fields that have at least some data + additional_microhap_fields_with_data = { + additional_microhap_field + for additional_microhap_field in additional_representative_info_fields + for target_data in pmodata["representative_microhaplotypes"]["targets"] + for microhap_data in target_data["microhaplotypes"] + if additional_microhap_field in microhap_data + } + # Determine meta fields with no samples having data + additional_microhap_fields_with_no_samples = ( + set(additional_representative_info_fields) + - additional_microhap_fields_with_data + ) + + if additional_microhap_fields_with_no_samples: + raise Exception( + f"No representative_microhaplotype_sequences have data for fields: {', '.join(additional_microhap_fields_with_no_samples)}" + ) + + if len(default_base_col_names) != 3: + raise Exception( + "Must have 3 default columns for allele counts, not {}".format( + len(default_base_col_names) + ) + ) + + rows = [] + specimen_info = pmodata["specimen_info"] + target_info = pmodata["target_info"] + library_sample_info = pmodata["library_sample_info"] + detected_microhaps = pmodata["detected_microhaplotypes"] + rep_haps = pmodata["representative_microhaplotypes"]["targets"] + bioinformatics_run_names = PMOProcessor.get_bioinformatics_run_names(pmodata) + for bio_run_for_detected_microhaps in detected_microhaps: + bioinformatics_run_id = bio_run_for_detected_microhaps[ + "bioinformatics_run_id" + ] + for sample_data in bio_run_for_detected_microhaps["library_samples"]: + library_sample_id = sample_data["library_sample_id"] + specimen_id = library_sample_info[library_sample_id]["specimen_id"] + library_meta = library_sample_info[library_sample_id] + specimen_meta = specimen_info[specimen_id] + for target_data in sample_data["target_results"]: + target_name = target_info[ + rep_haps[target_data["mhaps_target_id"]]["target_id"] + ]["target_name"] + for microhap_data in target_data["mhaps"]: + allele_id = microhap_data["mhap_id"] + # print(rep_haps[target_data["mhaps_target_id"]]) + rep_hap_meta = rep_haps[target_data["mhaps_target_id"]][ + "microhaplotypes" + ][allele_id] + row = { + "bioinformatics_run_name": bioinformatics_run_names[ + bioinformatics_run_id + ], + default_base_col_names[0]: library_meta[ + "library_sample_name" + ], + default_base_col_names[1]: target_name, + default_base_col_names[2]: allele_id, + } + if additional_library_sample_info_fields is not None: + for field in additional_library_sample_info_fields: + row[field] = library_meta.get(field, "NA") + if additional_specimen_info_fields is not None: + for field in additional_specimen_info_fields: + row[field] = specimen_meta.get(field, "NA") + if additional_microhap_fields is not None: + for field in additional_microhap_fields: + row[field] = microhap_data.get(field, "NA") + if additional_representative_info_fields is not None: + for field in additional_representative_info_fields: + row[field] = rep_hap_meta.get(field, "NA") + rows.append(row) + # Build and return DataFrame + return pd.DataFrame(rows) + + @staticmethod + def list_library_sample_names_per_specimen_name( + pmodata, + select_specimen_ids: list[int] = None, + select_specimen_names: list[str] = None, + ) -> pd.DataFrame: + """ + List all the library_sample_names per specimen_name + :param pmodata: the PMO + :param select_specimen_ids: a list of specimen_ids to select, if None, all specimen_ids are used + :param select_specimen_names: a list of specimen_names to select, if None, all specimen_names are used + :return: a pandas dataframe with 3 columns, specimen_id, library_sample_id, and library_sample_id_count(the number of library_sample_ids per specimen_id) + """ + if select_specimen_ids is not None and select_specimen_names is not None: + raise ValueError( + "Cannot specify both select_specimen_ids and select_specimen_names" + ) + lib_samples_per_spec = defaultdict(list[str]) + if select_specimen_names is not None: + select_specimen_ids = PMOProcessor.get_index_of_specimen_names( + pmodata, select_specimen_names + ) + for lib_sample in pmodata["library_sample_info"]: + if ( + select_specimen_ids is None + or lib_sample["specimen_id"] in select_specimen_ids + ): + lib_samples_per_spec[ + pmodata["specimen_info"][lib_sample["specimen_id"]]["specimen_name"] + ].append(lib_sample["library_sample_name"]) + + specimens_not_list = [] + for specimen in pmodata["specimen_info"]: + if specimen["specimen_name"] not in lib_samples_per_spec: + specimens_not_list.append(specimen["specimen_name"]) + + # Prepare the data for DataFrame creation + data = [] + for specimen_name, library_sample_names in lib_samples_per_spec.items(): + for library_sample_name in library_sample_names: + data.append( + { + "specimen_name": specimen_name, + "library_sample_name": library_sample_name, + "library_sample_count": len(library_sample_names), + } + ) + + # Create the DataFrame + df = pd.DataFrame( + data, + columns=["specimen_name", "library_sample_name", "library_sample_count"], + ) + return df diff --git a/src/pmotools/pmo_engine/pmo_processor.py b/src/pmotools/pmo_engine/pmo_processor.py index 07ab55b..8047912 100644 --- a/src/pmotools/pmo_engine/pmo_processor.py +++ b/src/pmotools/pmo_engine/pmo_processor.py @@ -1,28 +1,8 @@ #!/usr/bin/env python3 -import json import os -from typing import NamedTuple import copy -import pandas import pandas as pd from collections import defaultdict -from pmotools.pmo_engine.pmo_checker import PMOChecker - -from pmotools import __version__ as __pmotools_version__ - -bed_loc_tuple = NamedTuple( - "bed_loc", - [ - ("chrom", str), - ("start", int), - ("end", int), - ("name", str), - ("score", float), - ("strand", str), - ("ref_seq", str), - ("extra_info", str), - ], -) class PMOProcessor: @@ -423,61 +403,6 @@ def count_targets_per_panel(pmodata) -> pd.DataFrame: data={"panel_name": panels, "panel_target_count": target_count} ) - @staticmethod - def list_library_sample_names_per_specimen_name( - pmodata, - select_specimen_ids: list[int] = None, - select_specimen_names: list[str] = None, - ) -> pandas.DataFrame: - """ - List all the library_sample_names per specimen_name - :param pmodata: the PMO - :param select_specimen_ids: a list of specimen_ids to select, if None, all specimen_ids are used - :param select_specimen_names: a list of specimen_names to select, if None, all specimen_names are used - :return: a pandas dataframe with 3 columns, specimen_id, library_sample_id, and library_sample_id_count(the number of library_sample_ids per specimen_id) - """ - if select_specimen_ids is not None and select_specimen_names is not None: - raise ValueError( - "Cannot specify both select_specimen_ids and select_specimen_names" - ) - lib_samples_per_spec = defaultdict(list[str]) - if select_specimen_names is not None: - select_specimen_ids = PMOProcessor.get_index_of_specimen_names( - pmodata, select_specimen_names - ) - for lib_sample in pmodata["library_sample_info"]: - if ( - select_specimen_ids is None - or lib_sample["specimen_id"] in select_specimen_ids - ): - lib_samples_per_spec[ - pmodata["specimen_info"][lib_sample["specimen_id"]]["specimen_name"] - ].append(lib_sample["library_sample_name"]) - - specimens_not_list = [] - for specimen in pmodata["specimen_info"]: - if specimen["specimen_name"] not in lib_samples_per_spec: - specimens_not_list.append(specimen["specimen_name"]) - - # Prepare the data for DataFrame creation - data = [] - for specimen_name, library_sample_names in lib_samples_per_spec.items(): - for library_sample_name in library_sample_names: - data.append( - { - "specimen_name": specimen_name, - "library_sample_name": library_sample_name, - "library_sample_count": len(library_sample_names), - } - ) - - # Create the DataFrame - df = pd.DataFrame( - data, - columns=["specimen_name", "library_sample_name", "library_sample_count"], - ) - return df - @staticmethod def count_specimen_per_meta_fields(pmodata) -> pd.DataFrame: """ @@ -628,190 +553,6 @@ def extract_allele_counts_freq_from_pmo( ["bioinformatics_run_id", "target_name", "mhap_id"] ).reset_index(drop=True) - @staticmethod - def extract_alleles_per_sample_table( - pmodata, - additional_specimen_info_fields: list[str] = None, - additional_library_sample_info_fields: list[str] = None, - additional_microhap_fields: list[str] = None, - additional_representative_info_fields: list[str] = None, - default_base_col_names: list[str] = [ - "library_sample_name", - "target_name", - "mhap_id", - ], - jsonschema_fnp=os.path.join( - os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - ), - "schemas/", - f"portable_microhaplotype_object_v{__pmotools_version__}.schema.json", - ), - validate_pmo: bool = False, - ) -> pd.DataFrame: - """ - Create a pd.Dataframe of sample, target and allele. Can optionally add on any other additional fields - - :param pmodata: the data to write from - :param additional_specimen_info_fields: any additional fields to write from the specimen_info object - :param additional_library_sample_info_fields: any additional fields to write from the library_samples object - :param additional_microhap_fields: any additional fields to write from the microhap object - :param additional_representative_info_fields: any additional fields to write from the representative_microhaplotype_sequences object - :param default_base_col_names: The default column name for the sample, locus and allele - :param jsonschema_fnp: path to the jsonschema schema file to validate the PMO against - :param validate_pmo: whether to validate the PMO with a jsonschema - :return: pandas dataframe - """ - - # check input - if validate_pmo: - with open(jsonschema_fnp) as f: - checker = PMOChecker(json.load(f)) - checker.validate_pmo_json(pmodata) - - # Check to see if at least 1 sample has supplied meta field - # samples without this meta field will have NA - if additional_specimen_info_fields is not None: - # Find meta fields that have at least some data - meta_fields_with_data = { - metafield - for metafield in additional_specimen_info_fields - for specimen_data in pmodata["specimen_info"] - if metafield in specimen_data - } - - # Determine meta fields with no samples having data - meta_fields_with_no_samples = ( - set(additional_specimen_info_fields) - meta_fields_with_data - ) - - if meta_fields_with_no_samples: - raise Exception( - f"No specimen_info have data for fields: {', '.join(meta_fields_with_no_samples)}" - ) - # Check to see if at least 1 sample has supplied meta field - # samples without this meta field will have NA - if additional_library_sample_info_fields is not None: - # Find meta fields that have at least some data - meta_fields_with_data = { - metafield - for metafield in additional_library_sample_info_fields - for library_data in pmodata["library_sample_info"] - if metafield in library_data - } - # Determine meta fields with no samples having data - meta_fields_with_no_samples = ( - set(additional_library_sample_info_fields) - meta_fields_with_data - ) - - if meta_fields_with_no_samples: - raise Exception( - f"No library_sample_info have data for fields: {', '.join(meta_fields_with_no_samples)}" - ) - - # Check to see if at least 1 haplotype has this field - # samples without this meta field will have NA - if additional_microhap_fields is not None: - # Find meta fields that have at least some data - additional_microhap_fields_with_data = { - additional_microhap_field - for additional_microhap_field in additional_microhap_fields - for detected_microhaplotypes in pmodata["detected_microhaplotypes"] - for library_samples_data in detected_microhaplotypes["library_samples"] - for target_data in library_samples_data["target_results"] - for microhap_data in target_data["mhaps"] - if additional_microhap_field in microhap_data - } - # Determine meta fields with no samples having data - additional_microhap_fields_with_no_samples = ( - set(additional_microhap_fields) - additional_microhap_fields_with_data - ) - - if additional_microhap_fields_with_no_samples: - raise Exception( - f"No detected_microhaplotypes have data for fields: {', '.join(additional_microhap_fields_with_no_samples)}" - ) - # Check to see if at least 1 haplotype has this field - # samples without this meta field will have NA - if additional_representative_info_fields is not None: - # Find meta fields that have at least some data - additional_microhap_fields_with_data = { - additional_microhap_field - for additional_microhap_field in additional_representative_info_fields - for target_data in pmodata["representative_microhaplotypes"]["targets"] - for microhap_data in target_data["microhaplotypes"] - if additional_microhap_field in microhap_data - } - # Determine meta fields with no samples having data - additional_microhap_fields_with_no_samples = ( - set(additional_representative_info_fields) - - additional_microhap_fields_with_data - ) - - if additional_microhap_fields_with_no_samples: - raise Exception( - f"No representative_microhaplotype_sequences have data for fields: {', '.join(additional_microhap_fields_with_no_samples)}" - ) - - if len(default_base_col_names) != 3: - raise Exception( - "Must have 3 default columns for allele counts, not {}".format( - len(default_base_col_names) - ) - ) - - rows = [] - specimen_info = pmodata["specimen_info"] - target_info = pmodata["target_info"] - library_sample_info = pmodata["library_sample_info"] - detected_microhaps = pmodata["detected_microhaplotypes"] - rep_haps = pmodata["representative_microhaplotypes"]["targets"] - bioinformatics_run_names = PMOProcessor.get_bioinformatics_run_names(pmodata) - for bio_run_for_detected_microhaps in detected_microhaps: - bioinformatics_run_id = bio_run_for_detected_microhaps[ - "bioinformatics_run_id" - ] - for sample_data in bio_run_for_detected_microhaps["library_samples"]: - library_sample_id = sample_data["library_sample_id"] - specimen_id = library_sample_info[library_sample_id]["specimen_id"] - library_meta = library_sample_info[library_sample_id] - specimen_meta = specimen_info[specimen_id] - for target_data in sample_data["target_results"]: - target_name = target_info[ - rep_haps[target_data["mhaps_target_id"]]["target_id"] - ]["target_name"] - for microhap_data in target_data["mhaps"]: - allele_id = microhap_data["mhap_id"] - # print(rep_haps[target_data["mhaps_target_id"]]) - rep_hap_meta = rep_haps[target_data["mhaps_target_id"]][ - "microhaplotypes" - ][allele_id] - row = { - "bioinformatics_run_name": bioinformatics_run_names[ - bioinformatics_run_id - ], - default_base_col_names[0]: library_meta[ - "library_sample_name" - ], - default_base_col_names[1]: target_name, - default_base_col_names[2]: allele_id, - } - if additional_library_sample_info_fields is not None: - for field in additional_library_sample_info_fields: - row[field] = library_meta.get(field, "NA") - if additional_specimen_info_fields is not None: - for field in additional_specimen_info_fields: - row[field] = specimen_meta.get(field, "NA") - if additional_microhap_fields is not None: - for field in additional_microhap_fields: - row[field] = microhap_data.get(field, "NA") - if additional_representative_info_fields is not None: - for field in additional_representative_info_fields: - row[field] = rep_hap_meta.get(field, "NA") - rows.append(row) - # Build and return DataFrame - return pd.DataFrame(rows) - @staticmethod def filter_pmo_by_library_sample_ids(pmodata, library_sample_ids: set[int]): """ @@ -1320,185 +1061,3 @@ def extract_from_pmo_with_read_filter(pmodata, read_filter: float): ) pmo_out["detected_microhaplotypes"].append(extracted_microhaps_for_id) return pmo_out - - @staticmethod - def write_bed_locs(bed_locs: list[bed_loc_tuple], fnp, add_header: bool = False): - """ - Write out a list of bed_loc_tuple to a file, will auto overwrite it - :param bed_locs: a list of bed_loc_tuple - :param fnp: output file path, will be overwritten if it exists - :param add_header: add header of #chrom,start end,name,score,strand,ref_seq,extra_info, starts with comment so tools will treat it as a comment line - """ - with open(fnp, "w") as f: - if add_header: - f.write( - "\t".join( - [ - "#chrom", - "start", - "end", - "name", - "score", - "strand", - "ref_seq", - "extra_info", - ] - ) - ) - for bed_loc in bed_locs: - f.write( - "\t".join( - [ - bed_loc.chrom, - str(bed_loc.start), - str(bed_loc.end), - bed_loc.name, - str(bed_loc.score), - bed_loc.strand, - str(bed_loc.ref_seq), - bed_loc.extra_info, - ] - ) - ) - f.write("\n") - - @staticmethod - def extract_targets_insert_bed_loc( - pmodata, select_target_ids: list[int] = None, sort_output: bool = True - ): - """ - Extract out of a PMO the insert location for targets, will add ref seq if loaded into PMO - :param pmodata: the PMO to extract from - :param select_target_ids: a list of target ids to select, if None will select all targets - :param sort_output: whether to sort output by genomic location - :return: a list of target inserts, with named tuples with fields: chrom, start, end, name, score, strand, extra_info, ref_seq - """ - # bed_loc = NamedTuple("bed_loc", [("chrom", str), ("start", int), ("end", int), ("name", str), ("score", float), ("strand", str), ("extra_info", str), ("ref_seq", str)]) - bed_loc_out = [] - if select_target_ids is None: - select_target_ids = list(range(len(pmodata["target_info"]))) - for target_id in select_target_ids: - tar = pmodata["target_info"][target_id] - if "insert_location" not in tar: - raise Exception( - "no insert_location in pmodata for target id " - + str(target_id) - + " target_name " - + str(tar["target_name"]) - + ", cannot extract insert_location" - ) - genome_info = pmodata["targeted_genomes"][ - tar["insert_location"]["genome_id"] - ] - genome_name_version = ( - genome_info["name"] + "_" + genome_info["genome_version"] - ) - extra_info = ( - str("[") + str("genome_name_version=") + genome_name_version + ";]" - ) - strand = ( - "+" - if "strand" not in tar["insert_location"] - else tar["insert_location"]["strand"] - ) - ref_seq = ( - "" - if "ref_seq" not in tar["insert_location"] - else tar["insert_location"]["ref_seq"] - ) - bed_loc_out.append( - bed_loc_tuple( - tar["insert_location"]["chrom"], - tar["insert_location"]["start"], - tar["insert_location"]["end"], - tar["target_name"], - tar["insert_location"]["end"] - tar["insert_location"]["start"], - strand, - ref_seq, - extra_info, - ) - ) - if sort_output: - return sorted(bed_loc_out, key=lambda bed: (bed.chrom, bed.start, bed.end)) - return bed_loc_out - - @staticmethod - def extract_panels_insert_bed_loc( - pmodata, select_panel_ids: list[int] = None, sort_output: bool = True - ): - """ - Extract out of a PMO the insert location for panels, will add ref seq if loaded into PMO - :param pmodata: the PMO to extract from - :param select_panel_ids: a list of panels ids to select, if None will select all panels - :param sort_output: whether to sort output by genomic location - :return: a list of target inserts, with named tuples with fields: chrom, start, end, name, score, strand, extra_info, ref_seq - """ - bed_loc_out = {} - if select_panel_ids is None: - select_panel_ids = list(range(len(pmodata["panel_info"]))) - for panel_id in select_panel_ids: - bed_loc_out_per_panel = [] - for reaction_id in range(len(pmodata["panel_info"][panel_id]["reactions"])): - for target_id in pmodata["panel_info"][panel_id]["reactions"][ - reaction_id - ]["panel_targets"]: - tar = pmodata["target_info"][target_id] - if "insert_location" not in tar: - raise Exception( - "no insert_location in pmodata for target id " - + str(target_id) - + " target_name " - + str(tar["target_name"]) - + ", cannot extract insert_location" - ) - genome_info = pmodata["targeted_genomes"][ - tar["insert_location"]["genome_id"] - ] - genome_name_version = ( - genome_info["name"] + "_" + genome_info["genome_version"] - ) - extra_info = ( - str("[") - + "genome_name_version=" - + genome_name_version - + ";" - + "panel=" - + pmodata["panel_info"][panel_id]["panel_name"] - + ";" - + "reaction=" - + pmodata["panel_info"][panel_id]["reactions"][reaction_id][ - "reaction_name" - ] - + ";" - + "]" - ) - strand = ( - "+" - if "strand" not in tar["insert_location"] - else tar["insert_location"]["strand"] - ) - ref_seq = ( - "" - if "ref_seq" not in tar["insert_location"] - else tar["insert_location"]["ref_seq"] - ) - bed_loc_out_per_panel.append( - bed_loc_tuple( - tar["insert_location"]["chrom"], - tar["insert_location"]["start"], - tar["insert_location"]["end"], - tar["target_name"], - tar["insert_location"]["end"] - - tar["insert_location"]["start"], - strand, - ref_seq, - extra_info, - ) - ) - if sort_output: - return sorted( - bed_loc_out_per_panel, - key=lambda bed: (bed.chrom, bed.start, bed.end), - ) - bed_loc_out[panel_id] = bed_loc_out_per_panel - return bed_loc_out diff --git a/src/pmotools/scripts/extract_info_from_pmo/list_library_sample_names_per_specimen_name.py b/src/pmotools/scripts/extract_info_from_pmo/list_library_sample_names_per_specimen_name.py index f34e5dd..de5a1cd 100755 --- a/src/pmotools/scripts/extract_info_from_pmo/list_library_sample_names_per_specimen_name.py +++ b/src/pmotools/scripts/extract_info_from_pmo/list_library_sample_names_per_specimen_name.py @@ -3,7 +3,7 @@ import sys -from pmotools.pmo_engine.pmo_processor import PMOProcessor +from pmotools.pmo_engine.pmo_exporter import PMOExporter from pmotools.pmo_engine.pmo_reader import PMOReader from pmotools.utils.small_utils import Utils @@ -46,7 +46,7 @@ def list_library_sample_names_per_specimen_name(): pmo = PMOReader.read_in_pmo(args.file) # count fields - info_df = PMOProcessor.list_library_sample_names_per_specimen_name(pmo) + info_df = PMOExporter.list_library_sample_names_per_specimen_name(pmo) # output info_df.to_csv( diff --git a/src/pmotools/scripts/pmo_to_tables/__init__.py b/src/pmotools/scripts/pmo_to_tables/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pmotools/scripts/pmo_to_tables/export_library_sample_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_library_sample_meta_table.py new file mode 100644 index 0000000..b624799 --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_library_sample_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_library_sample_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_library_sample_meta_table(): + args = parse_args_export_library_sample_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_library_sample_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_library_sample_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_panel_info_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_panel_info_meta_table.py new file mode 100644 index 0000000..a54fc18 --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_panel_info_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_panel_info_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_panel_info_meta_table(): + args = parse_args_export_panel_info_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_panel_info_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_panel_info_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_project_info_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_project_info_meta_table.py new file mode 100644 index 0000000..9fbc64d --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_project_info_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_project_info_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_project_info_meta_table(): + args = parse_args_export_project_info_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_project_info_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_project_info_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_sequencing_info_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_sequencing_info_meta_table.py new file mode 100644 index 0000000..2c7334f --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_sequencing_info_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_sequencing_info_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_sequencing_info_meta_table(): + args = parse_args_export_sequencing_info_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_sequencing_info_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_sequencing_info_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_specimen_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_specimen_meta_table.py new file mode 100644 index 0000000..5a45012 --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_specimen_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_specimen_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_specimen_meta_table(): + args = parse_args_export_specimen_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_specimen_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_specimen_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_specimen_travel_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_specimen_travel_meta_table.py new file mode 100644 index 0000000..3594cc8 --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_specimen_travel_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_specimen_travel_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_specimen_travel_meta_table(): + args = parse_args_export_specimen_travel_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_specimen_travel_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_specimen_travel_meta_table() diff --git a/src/pmotools/scripts/pmo_to_tables/export_target_info_meta_table.py b/src/pmotools/scripts/pmo_to_tables/export_target_info_meta_table.py new file mode 100644 index 0000000..05d6d4f --- /dev/null +++ b/src/pmotools/scripts/pmo_to_tables/export_target_info_meta_table.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +import argparse +import sys + + +from pmotools.pmo_engine.pmo_exporter import PMOExporter +from pmotools.pmo_engine.pmo_reader import PMOReader +from pmotools.utils.small_utils import Utils + + +def parse_args_export_target_info_meta_table(): + parser = argparse.ArgumentParser() + parser.add_argument("--file", type=str, required=True, help="PMO file") + parser.add_argument( + "--output", type=str, default="STDOUT", required=False, help="output file" + ) + parser.add_argument( + "--delim", + default="tab", + type=str, + required=False, + help="the delimiter of the output text file, examples input tab,comma but can also be the actual delimiter", + ) + parser.add_argument( + "--overwrite", action="store_true", help="If output file exists, overwrite it" + ) + + return parser.parse_args() + + +def export_target_info_meta_table(): + args = parse_args_export_target_info_meta_table() + + # check files + output_delim, output_extension = Utils.process_delimiter_and_output_extension( + args.delim, gzip=args.output.endswith(".gz") + ) + args.output = ( + args.output + if "STDOUT" == args.output + else Utils.appendStrAsNeeded(args.output, output_extension) + ) + Utils.inputOutputFileCheck(args.file, args.output, args.overwrite) + + # read in PMO + pmo = PMOReader.read_in_pmo(args.file) + + # count fields + info_df = PMOExporter.export_target_info_meta_table(pmo) + + # output + info_df.to_csv( + sys.stdout if "STDOUT" == args.output else args.output, + sep=output_delim, + index=False, + ) + + +if __name__ == "__main__": + export_target_info_meta_table() diff --git a/src/pmotools/scripts/extractors_from_pmo/extract_allele_table.py b/src/pmotools/scripts/pmo_to_tables/extract_allele_table.py similarity index 97% rename from src/pmotools/scripts/extractors_from_pmo/extract_allele_table.py rename to src/pmotools/scripts/pmo_to_tables/extract_allele_table.py index 3a61d83..549028c 100755 --- a/src/pmotools/scripts/extractors_from_pmo/extract_allele_table.py +++ b/src/pmotools/scripts/pmo_to_tables/extract_allele_table.py @@ -8,6 +8,8 @@ from pmotools.utils.small_utils import Utils from pmotools.pmo_engine.pmo_checker import PMOChecker from pmotools.pmo_engine.pmo_processor import PMOProcessor +from pmotools.pmo_engine.pmo_exporter import PMOExporter + from pmotools import __version__ as __pmotools_version__ @@ -132,7 +134,7 @@ def extract_for_allele_table(): args.representative_haps_fields, "," ) - allele_table = PMOProcessor.extract_alleles_per_sample_table( + allele_table = PMOExporter.extract_alleles_per_sample_table( pmodata, additional_specimen_info_fields=args.specimen_info_meta_fields, additional_library_sample_info_fields=args.library_sample_info_meta_fields, diff --git a/src/pmotools/scripts/extract_info_from_pmo/extract_insert_of_panels.py b/src/pmotools/scripts/pmo_to_tables/extract_insert_of_panels.py similarity index 94% rename from src/pmotools/scripts/extract_info_from_pmo/extract_insert_of_panels.py rename to src/pmotools/scripts/pmo_to_tables/extract_insert_of_panels.py index 1687c41..b0da163 100755 --- a/src/pmotools/scripts/extract_info_from_pmo/extract_insert_of_panels.py +++ b/src/pmotools/scripts/pmo_to_tables/extract_insert_of_panels.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 import argparse - -from pmotools.pmo_engine.pmo_processor import PMOProcessor +from pmotools.pmo_engine.pmo_exporter import PMOExporter from pmotools.pmo_engine.pmo_reader import PMOReader from pmotools.utils.small_utils import Utils @@ -35,7 +34,7 @@ def extract_insert_of_panels(): pmo = PMOReader.read_in_pmo(args.file) # get panel insert locations - panel_bed_locs = PMOProcessor.extract_panels_insert_bed_loc(pmo) + panel_bed_locs = PMOExporter.extract_panels_insert_bed_loc(pmo) # write with Utils.smart_open_write(args.output) as f: diff --git a/src/pmotools/scripts/extract_info_from_pmo/extract_refseq_of_inserts_of_panels.py b/src/pmotools/scripts/pmo_to_tables/extract_refseq_of_inserts_of_panels.py similarity index 90% rename from src/pmotools/scripts/extract_info_from_pmo/extract_refseq_of_inserts_of_panels.py rename to src/pmotools/scripts/pmo_to_tables/extract_refseq_of_inserts_of_panels.py index 49c0112..f335334 100755 --- a/src/pmotools/scripts/extract_info_from_pmo/extract_refseq_of_inserts_of_panels.py +++ b/src/pmotools/scripts/pmo_to_tables/extract_refseq_of_inserts_of_panels.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 import argparse - -from pmotools.pmo_engine.pmo_processor import PMOProcessor +from pmotools.pmo_engine.pmo_exporter import PMOExporter from pmotools.pmo_engine.pmo_reader import PMOReader from pmotools.utils.small_utils import Utils @@ -30,7 +29,7 @@ def extract_refseq_of_inserts_of_panels(): pmo = PMOReader.read_in_pmo(args.file) # get panel insert locations - panel_bed_locs = PMOProcessor.extract_panels_insert_bed_loc(pmo) + panel_bed_locs = PMOExporter.extract_panels_insert_bed_loc(pmo) # write with Utils.smart_open_write(args.output) as f: diff --git a/tests/test_pmo_engine/test_pmo_exporter.py b/tests/test_pmo_engine/test_pmo_exporter.py new file mode 100755 index 0000000..84acb43 --- /dev/null +++ b/tests/test_pmo_engine/test_pmo_exporter.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +import hashlib +import os +import tempfile +import unittest +import json +import pandas as pd +from pmotools.pmo_engine.pmo_exporter import PMOExporter + + +def md5sum_of_fnp(filename): + with open(filename, "rb") as f: + return hashlib.md5(f.read()).hexdigest() + + +class TestPMOExporter(unittest.TestCase): + def setUp(self): + self.working_dir = os.path.dirname(os.path.abspath(__file__)) + self.test_dir = tempfile.TemporaryDirectory() + with open( + os.path.join( + os.path.dirname(self.working_dir), "data/combined_pmo_example.json" + ) + ) as f: + self.combined_pmo_data = json.load(f) + with open( + os.path.join( + os.path.dirname(self.working_dir), "data/minimum_pmo_example.json" + ) + ) as f: + self.minimum_pmo_data = json.load(f) + + def tearDown(self): + self.test_dir.cleanup() + + def test_list_library_sample_names_per_specimen_name(self): + id_counts = PMOExporter.list_library_sample_names_per_specimen_name( + self.minimum_pmo_data + ) + id_counts_check_data = { + "specimen_name": ["8025874217", "8025874266"], + "library_sample_name": ["8025874217_lib_name", "8025874266_lib_name"], + "library_sample_count": [1, 1], + } + id_counts_check_df = pd.DataFrame(id_counts_check_data) + pd.testing.assert_frame_equal(id_counts, id_counts_check_df) + + with open( + os.path.join( + os.path.dirname(self.working_dir), "data/minimum_pmo_example_2.json" + ) + ) as f: + pmo_data_2 = json.load(f) + id_counts_2 = PMOExporter.list_library_sample_names_per_specimen_name( + pmo_data_2 + ) + id_counts_check_data_2 = { + "specimen_name": ["5tbx", "XUC009"], + "library_sample_name": ["5tbx_lib_name", "XUC009_lib_name"], + "library_sample_count": [1, 1], + } + id_counts_check_df_2 = pd.DataFrame(id_counts_check_data_2) + pd.testing.assert_frame_equal(id_counts_check_df_2, id_counts_2) + + def test_extract_targets_insert_bed_loc(self): + all_target_inserts = PMOExporter.extract_targets_insert_bed_loc( + self.combined_pmo_data, sort_output=True + ) + output1_fnp = os.path.join(self.test_dir.name, "all_target_inserts_test1.bed") + PMOExporter.write_bed_locs(all_target_inserts, output1_fnp) + self.assertEqual("b7e477fe327ad7ae85f78ddaa66c313c", md5sum_of_fnp(output1_fnp)) + + all_target_inserts_8_6_10 = PMOExporter.extract_targets_insert_bed_loc( + self.combined_pmo_data, [8, 6, 10, 20], sort_output=False + ) + output2_fnp = os.path.join( + self.test_dir.name, "all_target_inserts_test2_8_6_10.bed" + ) + PMOExporter.write_bed_locs(all_target_inserts_8_6_10, output2_fnp) + self.assertEqual("9be1da64a4794489e08eca11c240d879", md5sum_of_fnp(output2_fnp)) + + all_target_inserts_8_6_10_sorted = PMOExporter.extract_targets_insert_bed_loc( + self.combined_pmo_data, [8, 6, 10, 20], sort_output=True + ) + output3_fnp = os.path.join( + self.test_dir.name, "all_target_inserts_test2_8_6_10_sorted.bed" + ) + PMOExporter.write_bed_locs(all_target_inserts_8_6_10_sorted, output3_fnp) + self.assertEqual("1831cc6a6f9f2bc4036d1dfad90771a1", md5sum_of_fnp(output3_fnp)) + + def test_extract_panels_insert_bed_loc(self): + all_target_inserts = PMOExporter.extract_panels_insert_bed_loc( + self.combined_pmo_data, sort_output=True + ) + output_fnp = os.path.join(self.test_dir.name, "all_panel_inserts_test1.bed") + PMOExporter.write_bed_locs(all_target_inserts, output_fnp) + self.assertEqual("52b1f79a3a89f8265573fa54b5a7ce57", md5sum_of_fnp(output_fnp)) + + def test_extract_alleles_per_sample_table(self): + allele_data = PMOExporter.extract_alleles_per_sample_table( + self.combined_pmo_data + ).sort_values( + by=[ + "bioinformatics_run_name", + "library_sample_name", + "target_name", + "mhap_id", + ] + ) + output_fnp = os.path.join( + self.test_dir.name, "extracted_alleles_per_sample_table_no_extra_args.csv" + ) + allele_data.to_csv(output_fnp, index=False) + self.assertEqual("2898d87133e2e381612f3c0dea70122f", md5sum_of_fnp(output_fnp)) + + allele_data_with_seq_reads = PMOExporter.extract_alleles_per_sample_table( + self.combined_pmo_data, + additional_microhap_fields=["reads"], + additional_representative_info_fields=["seq"], + ).sort_values( + by=[ + "bioinformatics_run_name", + "library_sample_name", + "target_name", + "mhap_id", + ] + ) + output_fnp = os.path.join( + self.test_dir.name, + "extracted_alleles_per_sample_table_no_extra_args_with_seq_reads.csv", + ) + allele_data_with_seq_reads.to_csv(output_fnp, index=False) + self.assertEqual("744c1c0233066f030881c8b595b9ad5c", md5sum_of_fnp(output_fnp)) + + allele_data_with_seq_reads_panel_id_collection_country = ( + PMOExporter.extract_alleles_per_sample_table( + self.combined_pmo_data, + additional_microhap_fields=["reads"], + additional_representative_info_fields=["seq"], + additional_library_sample_info_fields=["panel_id"], + additional_specimen_info_fields=["collection_country"], + ).sort_values( + by=[ + "bioinformatics_run_name", + "library_sample_name", + "target_name", + "mhap_id", + ] + ) + ) + output_fnp = os.path.join( + self.test_dir.name, + "extracted_alleles_per_sample_table_no_extra_args_with_seq_reads_panel_id_collection_country.csv", + ) + allele_data_with_seq_reads_panel_id_collection_country.to_csv( + output_fnp, index=False + ) + self.assertEqual("c425004244e6af1386b6e7776da76fed", md5sum_of_fnp(output_fnp)) + + def test_export_specimen_meta_table(self): + spec_table = PMOExporter.export_specimen_meta_table(self.minimum_pmo_data) + spec_table.to_csv(os.path.join(self.test_dir.name, "specimen_meta_table.csv")) + self.assertEqual( + "8f94b8b774696e26c4ff6c8086e616a4", + md5sum_of_fnp(os.path.join(self.test_dir.name, "specimen_meta_table.csv")), + ) + + def test_export_target_info_meta_table(self): + target_info_table = PMOExporter.export_target_info_meta_table( + self.minimum_pmo_data + ) + target_info_table.to_csv( + os.path.join(self.test_dir.name, "target_info_table.csv") + ) + self.assertEqual( + "cb0319482c9da5f9d8b22fba955ce1c8", + md5sum_of_fnp(os.path.join(self.test_dir.name, "target_info_table.csv")), + ) + + def test_export_panel_info_meta_table(self): + panel_info_table = PMOExporter.export_panel_info_meta_table( + self.minimum_pmo_data + ) + panel_info_table.to_csv( + os.path.join(self.test_dir.name, "panel_info_table.csv") + ) + self.assertEqual( + "e5127ecaf7fe7950395d6f3d45f1c82a", + md5sum_of_fnp(os.path.join(self.test_dir.name, "panel_info_table.csv")), + ) + + def test_export_library_sample_meta_table(self): + library_sample_table = PMOExporter.export_library_sample_meta_table( + self.minimum_pmo_data + ) + library_sample_table.to_csv( + os.path.join(self.test_dir.name, "library_sample_table.csv") + ) + self.assertEqual( + "7c433a74d215708e9339b5f6dece0bf3", + md5sum_of_fnp(os.path.join(self.test_dir.name, "library_sample_table.csv")), + ) + + def test_export_sequencing_info_meta_table(self): + sequencing_info_table = PMOExporter.export_sequencing_info_meta_table( + self.minimum_pmo_data + ) + sequencing_info_table.to_csv( + os.path.join(self.test_dir.name, "sequencing_info_table.csv") + ) + self.assertEqual( + "1cc6fb83227752454cfc3ba63eac503b", + md5sum_of_fnp( + os.path.join(self.test_dir.name, "sequencing_info_table.csv") + ), + ) + + def test_export_project_info_meta_table(self): + project_info_table = PMOExporter.export_project_info_meta_table( + self.minimum_pmo_data + ) + project_info_table.to_csv( + os.path.join(self.test_dir.name, "project_info_table.csv") + ) + print(project_info_table) + self.assertEqual( + "e533098411cbd96de2733668e8475ab8", + md5sum_of_fnp(os.path.join(self.test_dir.name, "project_info_table.csv")), + ) + + def test_export_specimen_travel_meta_table(self): + test_pmo_with_travel_info = { + "specimen_info": [ + { + "specimen_name": "spec1", + "travel_out_six_month": [ + { + "travel_country": "Kenya", + "travel_start_date": "2024-01", + "travel_end_date": "2024-02", + }, + { + "travel_country": "Kenya", + "travel_start_date": "2024-04", + "travel_end_date": "2024-06", + }, + ], + }, + { + "specimen_name": "spec2", + "travel_out_six_month": [ + { + "travel_country": "Tanzania", + "travel_start_date": "2024-02-15", + "travel_end_date": "2024-02-27", + } + ], + }, + ] + } + specimen_trable_info_table = PMOExporter.export_specimen_travel_meta_table( + test_pmo_with_travel_info + ) + specimen_trable_info_table.to_csv( + os.path.join(self.test_dir.name, "specimen_trable_info_table.csv") + ) + print(specimen_trable_info_table) + self.assertEqual( + "0305350d655184aa385d3d1ddc9b3600", + md5sum_of_fnp( + os.path.join(self.test_dir.name, "specimen_trable_info_table.csv") + ), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pmo_engine/test_pmo_processor.py b/tests/test_pmo_engine/test_pmo_processor.py index 4d03a6e..3b4348b 100755 --- a/tests/test_pmo_engine/test_pmo_processor.py +++ b/tests/test_pmo_engine/test_pmo_processor.py @@ -41,35 +41,6 @@ def setUp(self): def tearDown(self): self.test_dir.cleanup() - def test_list_library_sample_names_per_specimen_name(self): - id_counts = PMOProcessor.list_library_sample_names_per_specimen_name( - self.minimum_pmo_data - ) - id_counts_check_data = { - "specimen_name": ["8025874217", "8025874266"], - "library_sample_name": ["8025874217_lib_name", "8025874266_lib_name"], - "library_sample_count": [1, 1], - } - id_counts_check_df = pd.DataFrame(id_counts_check_data) - pd.testing.assert_frame_equal(id_counts, id_counts_check_df) - - with open( - os.path.join( - os.path.dirname(self.working_dir), "data/minimum_pmo_example_2.json" - ) - ) as f: - pmo_data_2 = json.load(f) - id_counts_2 = PMOProcessor.list_library_sample_names_per_specimen_name( - pmo_data_2 - ) - id_counts_check_data_2 = { - "specimen_name": ["5tbx", "XUC009"], - "library_sample_name": ["5tbx_lib_name", "XUC009_lib_name"], - "library_sample_count": [1, 1], - } - id_counts_check_df_2 = pd.DataFrame(id_counts_check_data_2) - pd.testing.assert_frame_equal(id_counts_check_df_2, id_counts_2) - def test_count_targets_per_library_sample(self): targets_per_sample_counts = PMOProcessor.count_targets_per_library_sample( self.minimum_pmo_data @@ -338,101 +309,6 @@ def test_extract_allele_counts_freq_from_pmo(self): "cb34c7e1357e2e35024a89464b63f06c", ) - def test_extract_targets_insert_bed_loc(self): - all_target_inserts = PMOProcessor.extract_targets_insert_bed_loc( - self.combined_pmo_data, sort_output=True - ) - output1_fnp = os.path.join(self.test_dir.name, "all_target_inserts_test1.bed") - PMOProcessor.write_bed_locs(all_target_inserts, output1_fnp) - self.assertEqual("b7e477fe327ad7ae85f78ddaa66c313c", md5sum_of_fnp(output1_fnp)) - - all_target_inserts_8_6_10 = PMOProcessor.extract_targets_insert_bed_loc( - self.combined_pmo_data, [8, 6, 10, 20], sort_output=False - ) - output2_fnp = os.path.join( - self.test_dir.name, "all_target_inserts_test2_8_6_10.bed" - ) - PMOProcessor.write_bed_locs(all_target_inserts_8_6_10, output2_fnp) - self.assertEqual("9be1da64a4794489e08eca11c240d879", md5sum_of_fnp(output2_fnp)) - - all_target_inserts_8_6_10_sorted = PMOProcessor.extract_targets_insert_bed_loc( - self.combined_pmo_data, [8, 6, 10, 20], sort_output=True - ) - output3_fnp = os.path.join( - self.test_dir.name, "all_target_inserts_test2_8_6_10_sorted.bed" - ) - PMOProcessor.write_bed_locs(all_target_inserts_8_6_10_sorted, output3_fnp) - self.assertEqual("1831cc6a6f9f2bc4036d1dfad90771a1", md5sum_of_fnp(output3_fnp)) - - def test_extract_panels_insert_bed_loc(self): - all_target_inserts = PMOProcessor.extract_panels_insert_bed_loc( - self.combined_pmo_data, sort_output=True - ) - output_fnp = os.path.join(self.test_dir.name, "all_panel_inserts_test1.bed") - PMOProcessor.write_bed_locs(all_target_inserts, output_fnp) - self.assertEqual("52b1f79a3a89f8265573fa54b5a7ce57", md5sum_of_fnp(output_fnp)) - - def test_extract_alleles_per_sample_table(self): - allele_data = PMOProcessor.extract_alleles_per_sample_table( - self.combined_pmo_data - ).sort_values( - by=[ - "bioinformatics_run_name", - "library_sample_name", - "target_name", - "mhap_id", - ] - ) - output_fnp = os.path.join( - self.test_dir.name, "extracted_alleles_per_sample_table_no_extra_args.csv" - ) - allele_data.to_csv(output_fnp, index=False) - self.assertEqual("2898d87133e2e381612f3c0dea70122f", md5sum_of_fnp(output_fnp)) - - allele_data_with_seq_reads = PMOProcessor.extract_alleles_per_sample_table( - self.combined_pmo_data, - additional_microhap_fields=["reads"], - additional_representative_info_fields=["seq"], - ).sort_values( - by=[ - "bioinformatics_run_name", - "library_sample_name", - "target_name", - "mhap_id", - ] - ) - output_fnp = os.path.join( - self.test_dir.name, - "extracted_alleles_per_sample_table_no_extra_args_with_seq_reads.csv", - ) - allele_data_with_seq_reads.to_csv(output_fnp, index=False) - self.assertEqual("744c1c0233066f030881c8b595b9ad5c", md5sum_of_fnp(output_fnp)) - - allele_data_with_seq_reads_panel_id_collection_country = ( - PMOProcessor.extract_alleles_per_sample_table( - self.combined_pmo_data, - additional_microhap_fields=["reads"], - additional_representative_info_fields=["seq"], - additional_library_sample_info_fields=["panel_id"], - additional_specimen_info_fields=["collection_country"], - ).sort_values( - by=[ - "bioinformatics_run_name", - "library_sample_name", - "target_name", - "mhap_id", - ] - ) - ) - output_fnp = os.path.join( - self.test_dir.name, - "extracted_alleles_per_sample_table_no_extra_args_with_seq_reads_panel_id_collection_country.csv", - ) - allele_data_with_seq_reads_panel_id_collection_country.to_csv( - output_fnp, index=False - ) - self.assertEqual("c425004244e6af1386b6e7776da76fed", md5sum_of_fnp(output_fnp)) - def test_extract_from_pmo_with_read_filter(self): pmo_data_filtered = PMOProcessor.extract_from_pmo_with_read_filter( self.combined_pmo_data, 1000