|
| 1 | +"""Shared utilities for Hail-based DivRef pipeline tools.""" |
| 2 | + |
| 3 | +from typing import Any |
| 4 | + |
| 5 | +import hail as hl |
| 6 | + |
| 7 | +HailPath = str |
| 8 | +"""Type alias for filesystem paths accepted by Hail: local, GCS (gs://), or HDFS (hdfs://).""" |
| 9 | + |
| 10 | + |
| 11 | +def to_hashable_items(d: dict[str, Any]) -> tuple[tuple[str, Any], ...]: |
| 12 | + """ |
| 13 | + Convert a dictionary to a sorted tuple of items for use as a hashable key. |
| 14 | +
|
| 15 | + Args: |
| 16 | + d: Dictionary to convert. |
| 17 | +
|
| 18 | + Returns: |
| 19 | + Sorted tuple of (key, value) pairs. |
| 20 | + """ |
| 21 | + return tuple(sorted(d.items())) |
| 22 | + |
| 23 | + |
| 24 | +def get_haplo_sequence(context_size: int, variants: Any) -> Any: |
| 25 | + """ |
| 26 | + Construct a haplotype sequence string with flanking genomic context. |
| 27 | +
|
| 28 | + Builds a sequence by combining alternate alleles from each variant with |
| 29 | + intervening reference sequence, bounded by context_size flanking bases on |
| 30 | + each side. |
| 31 | +
|
| 32 | + Args: |
| 33 | + context_size: Number of reference bases to include flanking each end. |
| 34 | + variants: Hail array expression of variant structs with locus and alleles fields. |
| 35 | +
|
| 36 | + Returns: |
| 37 | + Hail string expression representing the full haplotype sequence. |
| 38 | + """ |
| 39 | + sorted_variants = hl.sorted(variants, key=lambda x: x.locus.position) |
| 40 | + min_variant = sorted_variants[0] |
| 41 | + max_variant = sorted_variants[-1] |
| 42 | + min_pos = min_variant.locus.position |
| 43 | + max_pos = max_variant.locus.position |
| 44 | + max_variant_size = hl.len(max_variant.alleles[0]) |
| 45 | + full_context = hl.get_sequence( |
| 46 | + min_variant.locus.contig, |
| 47 | + min_pos, |
| 48 | + before=context_size, |
| 49 | + after=(max_pos - min_pos + max_variant_size + context_size - 1), |
| 50 | + reference_genome="GRCh38", |
| 51 | + ) |
| 52 | + |
| 53 | + # (min_pos - index_translation) equals context_size, mapping locus positions to string indices |
| 54 | + index_translation = min_pos - context_size |
| 55 | + |
| 56 | + def get_chunk_until_next_variant(i: Any) -> Any: |
| 57 | + v = sorted_variants[i] |
| 58 | + variant_size = hl.len(v.alleles[0]) |
| 59 | + reference_buffer_size = hl.if_else( |
| 60 | + i == hl.len(sorted_variants) - 1, |
| 61 | + context_size, |
| 62 | + sorted_variants[i + 1].locus.position - (v.locus.position + variant_size), |
| 63 | + ) |
| 64 | + start = v.locus.position - index_translation + variant_size |
| 65 | + return v.alleles[1] + full_context[start : start + reference_buffer_size] |
| 66 | + |
| 67 | + return full_context[:context_size] + hl.delimit( |
| 68 | + hl.range(hl.len(sorted_variants)).map(get_chunk_until_next_variant), |
| 69 | + "", |
| 70 | + ) |
| 71 | + |
| 72 | + |
| 73 | +def variant_distance(v1: Any, v2: Any) -> Any: |
| 74 | + """ |
| 75 | + Calculate the number of reference bases between two variants. |
| 76 | +
|
| 77 | + For example: 1:1:A:T and 1:3:A:T have distance 1 (one base between them). |
| 78 | + 1:1:AA:T and 1:3:A:T have distance 0 (deletion closes the gap). |
| 79 | +
|
| 80 | + Args: |
| 81 | + v1: First variant Hail struct with locus and alleles fields. |
| 82 | + v2: Second variant Hail struct with locus and alleles fields. |
| 83 | +
|
| 84 | + Returns: |
| 85 | + Hail int32 expression for the number of reference bases between v1 and v2. |
| 86 | + """ |
| 87 | + return v2.locus.position - v1.locus.position - hl.len(v1.alleles[0]) |
| 88 | + |
| 89 | + |
| 90 | +def split_haplotypes(ht: Any, window_size: int) -> Any: |
| 91 | + """ |
| 92 | + Split multi-variant haplotypes at gaps larger than window_size bases. |
| 93 | +
|
| 94 | + Haplotypes spanning variants further than window_size bases apart are broken |
| 95 | + into sub-haplotypes at those gaps. Sub-haplotypes with fewer than two variants |
| 96 | + are discarded. |
| 97 | +
|
| 98 | + Args: |
| 99 | + ht: Hail table with variants, haplotype, and gnomad_freqs array fields. |
| 100 | + window_size: Maximum reference bases allowed between adjacent variants in |
| 101 | + a haplotype segment. |
| 102 | +
|
| 103 | + Returns: |
| 104 | + Hail table with haplotypes exploded into sub-haplotypes by window. |
| 105 | + """ |
| 106 | + breakpoints = hl.range(1, hl.len(ht.variants)).filter( |
| 107 | + lambda i: (i == 0) | (variant_distance(ht.variants[i - 1], ht.variants[i]) >= window_size) |
| 108 | + ) |
| 109 | + |
| 110 | + def get_range(i: Any) -> Any: |
| 111 | + start_index = hl.if_else(i == 0, 0, breakpoints[i - 1]) |
| 112 | + end_index = hl.if_else(i == hl.len(breakpoints), hl.len(ht.variants), breakpoints[i]) |
| 113 | + return hl.range(start_index, end_index) |
| 114 | + |
| 115 | + split_hap_indices = ( |
| 116 | + hl.range(0, hl.len(breakpoints) + 1).map(get_range).filter(lambda r: hl.len(r) > 1) |
| 117 | + ) |
| 118 | + ht = ht.annotate(haplotype_indices=split_hap_indices) |
| 119 | + ht = ht.explode("haplotype_indices") |
| 120 | + ht = ht.annotate( |
| 121 | + haplotype=ht.haplotype_indices.map(lambda i: ht.haplotype[i]), |
| 122 | + variants=ht.haplotype_indices.map(lambda i: ht.variants[i]), |
| 123 | + gnomad_freqs=ht.haplotype_indices.map(lambda i: ht.gnomad_freqs[i]), |
| 124 | + ) |
| 125 | + return ht.drop("haplotype_indices") |
0 commit comments