|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +from BCBio import GFF |
| 4 | +from collections import deque |
| 5 | +from Bio.SeqFeature import SeqFeature, SimpleLocation |
| 6 | +import numpy as np |
| 7 | +from numpy.typing import NDArray |
| 8 | +from typing import Generator, Optional, Any, TextIO, Type |
| 9 | +from pathlib import Path |
| 10 | +from reditools_reader import ReditoolsReader |
| 11 | +from utils import NUC_CODE, SiteVariantData |
| 12 | +import argparse |
| 13 | + |
| 14 | +############################################## |
| 15 | +# Add a method to the SeqFeature class for easy retrieval |
| 16 | +# of **feature** identifiers (not sequence/record identifiers) |
| 17 | + |
| 18 | +def add_feature_id_method(cls: Type[SeqFeature]) -> None: |
| 19 | + def feature_id(self) -> str: |
| 20 | + return self.qualifiers["ID"][0] |
| 21 | + |
| 22 | + cls.feature_id = feature_id |
| 23 | + |
| 24 | + return None |
| 25 | + |
| 26 | + |
| 27 | +add_feature_id_method(SeqFeature) |
| 28 | + |
| 29 | +############################################## |
| 30 | + |
| 31 | +class FeatureManager: |
| 32 | + def __init__(self, features: list[SeqFeature], index_base: int = 1): |
| 33 | + """ |
| 34 | + Initialize a FeatureManager from a list of features. |
| 35 | +
|
| 36 | + The base of the location indices can be 0 (BCBio's and Python's default) |
| 37 | + or 1 (GFF3 format, default for this class). |
| 38 | + """ |
| 39 | + self.waiting_queue: deque[SeqFeature] = deque() |
| 40 | + self.active_queue: deque[SeqFeature] = deque() |
| 41 | + self.next_start = np.int64(0) |
| 42 | + self.next_end = np.iinfo(np.int64).max |
| 43 | + self.position = 0 |
| 44 | + |
| 45 | + waiting_list: list[SeqFeature] = [] # Temp list for sorting |
| 46 | + |
| 47 | + for feature in features: |
| 48 | + feature.location = SimpleLocation( |
| 49 | + feature.location.start + index_base, feature.location.end |
| 50 | + ) |
| 51 | + self._send_to_waiting_list(waiting_list, feature) |
| 52 | + |
| 53 | + waiting_list.sort(key=lambda x: (x.location.start, x.location.end)) |
| 54 | + |
| 55 | + self.waiting_queue.extend(waiting_list) |
| 56 | + |
| 57 | + self.next_start = self.waiting_queue[0].location.start |
| 58 | + self.next_end = self.waiting_queue[0].location.end |
| 59 | + |
| 60 | + self.counters: dict[str, np.int32] = dict() |
| 61 | + self.output_dir: Path = Path("") |
| 62 | + self.output_file_prefix: str = "" |
| 63 | + self.output_handles: dict[str, TextIO] = dict() |
| 64 | + |
| 65 | + return None |
| 66 | + |
| 67 | + def __len__(self) -> int: |
| 68 | + return len(self.active_queue) + len(self.waiting_queue) |
| 69 | + |
| 70 | + def open_output_file(self, ftype: str) -> None: |
| 71 | + """Create an output handle for a feature type""" |
| 72 | + hpath: Path = self.output_dir / ( |
| 73 | + self.output_file_prefix + ftype + "_edits.tsv" |
| 74 | + ) |
| 75 | + file_handle: TextIO = open(hpath, "w") |
| 76 | + self.output_handles[ftype] = file_handle |
| 77 | + file_handle.write("FeatureID\tSpan\tAC\n") |
| 78 | + |
| 79 | + return None |
| 80 | + |
| 81 | + def close_output_files(self) -> None: |
| 82 | + for handle in self.output_handles.values(): |
| 83 | + handle.close() |
| 84 | + |
| 85 | + return None |
| 86 | + |
| 87 | + def check_in(self, feature: SeqFeature) -> None: |
| 88 | + self.counters[feature.feature_id()] = np.int32(0) |
| 89 | + |
| 90 | + if feature.type not in self.output_handles: |
| 91 | + self.open_output_file(feature.type) |
| 92 | + |
| 93 | + return None |
| 94 | + |
| 95 | + def write_feature_data(self, feature) -> None: |
| 96 | + identifier = feature.feature_id() |
| 97 | + span = feature.location.end - feature.location.start + 1 |
| 98 | + edit_count = self.counters[identifier] |
| 99 | + handle = self.output_handles[feature.type] |
| 100 | + handle.write(identifier + "\t" + str(span) + "\t" + str(edit_count) + "\n") |
| 101 | + |
| 102 | + return None |
| 103 | + |
| 104 | + def check_out(self, feature: SeqFeature) -> None: |
| 105 | + self.write_feature_data(feature) |
| 106 | + del self.counters[feature.feature_id()] |
| 107 | + |
| 108 | + return None |
| 109 | + |
| 110 | + def _send_to_waiting_list( |
| 111 | + self, waiting_list: list[SeqFeature], feature: SeqFeature |
| 112 | + ) -> None: |
| 113 | + """Load features to the waiting queue in **post-order**""" |
| 114 | + if feature.sub_features: |
| 115 | + for child in feature.sub_features: |
| 116 | + self._send_to_waiting_list(waiting_list, child) |
| 117 | + |
| 118 | + waiting_list.append(feature) |
| 119 | + |
| 120 | + return None |
| 121 | + |
| 122 | + def insert_active_queue(self, new: SeqFeature) -> None: |
| 123 | + for i, old in enumerate(self.active_queue): |
| 124 | + if new.location.end < old.location.end: |
| 125 | + self.active_queue.insert(i, new) |
| 126 | + return None |
| 127 | + |
| 128 | + # If the queue was empty or the new end position is greater than all the old ones |
| 129 | + self.active_queue.append(new) |
| 130 | + |
| 131 | + return None |
| 132 | + |
| 133 | + def update_active(self, site_data: SiteVariantData) -> None: |
| 134 | + for feature in self.active_queue: |
| 135 | + self.counters[feature.feature_id()] += \ |
| 136 | + (site_data.strand == feature.location.strand) * \ |
| 137 | + site_data.frequencies[NUC_CODE['G']] |
| 138 | + |
| 139 | + return None |
| 140 | + |
| 141 | + def update_queues(self, pos: np.int64) -> bool: |
| 142 | + updated: bool = False |
| 143 | + |
| 144 | + # Remove items from the active queue whose end is behind the current position |
| 145 | + while self.active_queue and self.active_queue[0].location.end < pos: |
| 146 | + feature: SeqFeature = self.active_queue.popleft() |
| 147 | + self.check_out(feature) |
| 148 | + updated = True |
| 149 | + |
| 150 | + # Remove items from the waiting queue whose end is behind the current position |
| 151 | + while self.waiting_queue and self.waiting_queue[0].location.end < pos: |
| 152 | + # The feature is checked-in and immediately checked-out (it was never observed) |
| 153 | + feature: SeqFeature = self.waiting_queue.popleft() |
| 154 | + self.check_in(feature) |
| 155 | + self.check_out(feature) |
| 156 | + updated = True |
| 157 | + |
| 158 | + # Move items from the waiting queue to the active queue whose start is at or behind the current position |
| 159 | + while self.waiting_queue and self.waiting_queue[0].location.start <= pos: |
| 160 | + feature: SeqFeature = self.waiting_queue.popleft() |
| 161 | + |
| 162 | + if feature.location.end < pos: |
| 163 | + # Skip the feature if its end is past the current position as well |
| 164 | + # Check in and check out immediately to record the feature with 0 edits |
| 165 | + self.check_in(feature) |
| 166 | + self.check_out(feature) |
| 167 | + else: |
| 168 | + self.insert_active_queue(feature) |
| 169 | + self.check_in(feature) |
| 170 | + |
| 171 | + updated = True |
| 172 | + |
| 173 | + if self.active_queue: |
| 174 | + self.next_end = self.active_queue[0].location.end |
| 175 | + |
| 176 | + if self.waiting_queue: |
| 177 | + self.next_start = self.waiting_queue[0].location.start |
| 178 | + |
| 179 | + return updated |
| 180 | + |
| 181 | +if __name__ == "__main__": |
| 182 | + arg_parser = argparse.ArgumentParser(description="Count the number of A->G modifications per genomic feature") |
| 183 | + arg_parser.add_argument("--sites", "-s", type=str, help="File containing per-site base alteration data") |
| 184 | + arg_parser.add_argument("--gff", "-g", type=str, help="GFF3 file") |
| 185 | + arg_parser.add_argument("--format", "-f", type=str, choices=["reditools", "jacusa2"], default="reditools", help="Sites file format") |
| 186 | + args = arg_parser.parse_args() |
| 187 | + site_file = args.sites |
| 188 | + gff_file = args.gff |
| 189 | + site_format = args.format #TODO: Implement readers for more input formats |
| 190 | + |
| 191 | + gff_file = "result/agat_gff3/chr21_small_filtered_normalized.gff3" |
| 192 | + site_file = "result/edit_tables/serial_table.txt" |
| 193 | + |
| 194 | + with open(gff_file) as gff_handle, open(site_file) as edit_handle: |
| 195 | + for record in GFF.parse(gff_file): |
| 196 | + edit_reader = ReditoolsReader(edit_handle) |
| 197 | + fm = FeatureManager(record.features) |
| 198 | + record.features = None # For garbage collection |
| 199 | + |
| 200 | + edit_data: SiteVariantData = edit_reader.read() |
| 201 | + while edit_data and len(fm) > 0: |
| 202 | + fm.update_active(edit_data) |
| 203 | + updated = fm.update_queues(edit_data.position) |
| 204 | + edit_data = edit_reader.read() |
| 205 | + |
| 206 | + fm.close_output_files() |
0 commit comments