Skip to content

Commit 5dcce85

Browse files
authored
Merge pull request #14 from Juke34/stat-script-bug-fix
Stat script bug fix
2 parents 903e7d6 + 1f4219e commit 5dcce85

13 files changed

Lines changed: 1149 additions & 25 deletions

File tree

bin/stats/feature_aggregator.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from Bio.SeqFeature import SeqFeature
2+
import numpy as np
3+
from numpy.typing import NDArray
4+
from utils import argmax
5+
6+
7+
class FeatureAggregator:
8+
def __init__(self, mode: str) -> None:
9+
# Variables for ID creation
10+
self.gene_indices: dict[str, int] = dict()
11+
self.gene_counter: int = 0
12+
13+
self.gene_subfeature_ids: dict[int, int] = dict()
14+
15+
# Variables for ID creation
16+
self.feature_indices: dict[str, int] = dict()
17+
self.feature_counter: int = 0
18+
19+
self.feature_ids: dict[str, int] = (
20+
dict()
21+
) # Used for checking if the feature has already been seen
22+
self.feature_counters: dict[str, int] = (
23+
dict()
24+
) # Used for assigning incremental IDs
25+
26+
# Variables for target selection
27+
self.has_cds: bool = False
28+
self.cds_lengths: NDArray
29+
self.exon_lengths: NDArray
30+
31+
# Define target selecting function by mode
32+
match mode:
33+
case "all":
34+
self.select_targets = self._select_all_targets
35+
case "cds_longest":
36+
self.select_targets = self._select_target_with_longest_aggr_cds_or_exon
37+
case _:
38+
raise Exception(f"Invalid target selection mode {mode}")
39+
40+
return None
41+
42+
def _select_all_targets(self, gene: SeqFeature) -> list[SeqFeature]:
43+
"""
44+
Return all the sub-features of a gene.
45+
"""
46+
return gene.sub_features
47+
48+
def _update_cds_and_exon_info(self, gene: SeqFeature) -> None:
49+
"""
50+
Update the information needed for selecting features based on aggregate CDS and exon lengths.
51+
"""
52+
# Reset selection variables
53+
self.cds_lengths = np.zeros(len(gene.sub_features), dtype=np.int32)
54+
self.exon_lengths = np.zeros(len(gene.sub_features), dtype=np.int32)
55+
self.has_cds: bool = False
56+
57+
for i, child in enumerate(gene.sub_features):
58+
for grand_child in child.sub_features:
59+
if grand_child.type == "exon":
60+
self.exon_lengths[i] += len(grand_child)
61+
if grand_child.type == "CDS":
62+
self.has_cds = True
63+
self.cds_lengths[i] += len(grand_child)
64+
65+
return None
66+
67+
def _select_target_with_longest_aggr_cds_or_exon(
68+
self, gene: SeqFeature
69+
) -> list[SeqFeature]:
70+
"""
71+
Return the gene sub-feature with the longest aggregate CDS if any sub-feature contains a CDS.
72+
Elsewise, the sub-feature with the longest aggregate exon length.
73+
"""
74+
self._update_cds_and_exon_info(gene)
75+
76+
if self.has_cds:
77+
return [gene.sub_features[argmax(self.cds_lengths)]]
78+
else:
79+
return [gene.sub_features[argmax(self.exon_lengths)]]
80+
81+
def create_aggregated(self, parent: SeqFeature, feature: SeqFeature) -> SeqFeature:
82+
new_feature = SeqFeature(
83+
location=feature.location,
84+
id=f"{feature.type}-aggregate-{parent.id}",
85+
type=feature.type + "-aggregate",
86+
qualifiers={"Parent": [parent.id]},
87+
)
88+
# The sub_feature attribute must be created manually because it is deprecated in BioPython
89+
new_feature.sub_features = []
90+
91+
new_subfeatures = []
92+
93+
# Copy the entire hierarchy, but this won't work for aggregating features beyond this point
94+
for child in feature.sub_features:
95+
new_subfeatures.append(self.create_aggregated(new_feature, child))
96+
97+
feature.sub_features += new_subfeatures
98+
99+
return new_feature
100+
101+
def aggregate_sub_features(self, gene: SeqFeature) -> None:
102+
if len(gene.sub_features) == 0:
103+
return None
104+
105+
if gene.id not in self.gene_indices:
106+
self.gene_counter += 1
107+
self.gene_indices[gene.id] = self.gene_counter
108+
109+
target_transcripts: list[SeqFeature] = self.select_targets(gene)
110+
111+
for transcript in target_transcripts:
112+
new_features = []
113+
114+
for child in transcript.sub_features:
115+
new_features.append(
116+
self.create_aggregated(transcript, child)
117+
)
118+
119+
transcript.sub_features += new_features
120+
121+
return None

0 commit comments

Comments
 (0)