diff --git a/py-oxbow/oxbow/__init__.py b/py-oxbow/oxbow/__init__.py index 0ee1db3..8fe67da 100644 --- a/py-oxbow/oxbow/__init__.py +++ b/py-oxbow/oxbow/__init__.py @@ -26,6 +26,7 @@ ) from oxbow.oxbow import ( read_bam, + read_bam_references, read_bcf, read_bed, read_bigbed, @@ -36,6 +37,7 @@ read_gff, read_gtf, read_sam, + read_tabix, read_vcf, ) @@ -65,6 +67,7 @@ "read_fastq", "read_sam", "read_bam", + "read_bam_references", "read_cram", "read_bcf", "read_vcf", @@ -73,6 +76,7 @@ "read_bigwig", "read_gff", "read_gtf", + "read_tabix", ] from_bam.__module__ = __name__ diff --git a/py-oxbow/src/alignment.rs b/py-oxbow/src/alignment.rs index 28e33eb..d742898 100644 --- a/py-oxbow/src/alignment.rs +++ b/py-oxbow/src/alignment.rs @@ -1,6 +1,9 @@ use std::io::Seek; use std::sync::Arc; +use arrow::array::{ArrayRef, RecordBatchIterator, StringArray, UInt32Array}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::PyDict; @@ -1348,3 +1351,41 @@ pub fn read_cram( ipc.map_err(|e| PyErr::new::(e.to_string())) } + +/// Return reference sequence names and lengths from a BAM file header as Arrow IPC bytes. +/// +/// Parameters +/// ---------- +/// src : str or file-like +/// The path to the BAM file or a file-like object. +/// +/// Returns +/// ------- +/// bytes +/// Arrow IPC bytes with "name" (Utf8) and "length" (UInt32) columns. +#[pyfunction] +pub fn read_bam_references(py: Python, src: Py) -> PyResult> { + let reader = pyobject_to_bufreader(py, src, true)?; + let mut fmt_reader = noodles::bam::io::Reader::from(reader); + let header = fmt_reader.read_header()?; + let scanner = BamScanner::new(header, None, None).map_err(to_py)?; + let chrom_sizes = scanner.chrom_sizes(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("length", DataType::UInt32, false), + ])); + let names: Vec<&str> = chrom_sizes.iter().map(|(n, _)| n.as_str()).collect(); + let lengths: Vec = chrom_sizes.iter().map(|(_, l)| *l).collect(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(names)) as ArrayRef, + Arc::new(UInt32Array::from(lengths)) as ArrayRef, + ], + ) + .map_err(|e| PyErr::new::(e.to_string()))?; + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + batches_to_ipc(reader).map_err(|e| PyErr::new::(e.to_string())) +} diff --git a/py-oxbow/src/lib.rs b/py-oxbow/src/lib.rs index 22c8ec0..38c156f 100644 --- a/py-oxbow/src/lib.rs +++ b/py-oxbow/src/lib.rs @@ -9,14 +9,18 @@ mod bed; mod error; mod gxf; mod sequence; +mod tabix; mod variant; -use crate::alignment::{read_bam, read_cram, read_sam, PyBamScanner, PyCramScanner, PySamScanner}; +use crate::alignment::{ + read_bam, read_bam_references, read_cram, read_sam, PyBamScanner, PyCramScanner, PySamScanner, +}; use crate::bbi::{ read_bigbed, read_bigwig, PyBBIFileType, PyBBIZoomScanner, PyBigBedScanner, PyBigWigScanner, }; use crate::bed::{read_bed, PyBedScanner}; use crate::gxf::{read_gff, read_gtf, PyGffScanner, PyGtfScanner}; use crate::sequence::{read_fasta, read_fastq, PyFastaScanner, PyFastqScanner}; +use crate::tabix::read_tabix; use crate::util::partition_from_index; use crate::variant::{read_bcf, read_vcf, PyBcfScanner, PyVcfScanner}; @@ -45,6 +49,7 @@ fn oxbow_sandbox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(read_fastq, m)?)?; m.add_function(wrap_pyfunction!(read_sam, m)?)?; m.add_function(wrap_pyfunction!(read_bam, m)?)?; + m.add_function(wrap_pyfunction!(read_bam_references, m)?)?; m.add_function(wrap_pyfunction!(read_cram, m)?)?; m.add_function(wrap_pyfunction!(read_gff, m)?)?; m.add_function(wrap_pyfunction!(read_gtf, m)?)?; @@ -53,6 +58,7 @@ fn oxbow_sandbox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(read_bed, m)?)?; m.add_function(wrap_pyfunction!(read_bigbed, m)?)?; m.add_function(wrap_pyfunction!(read_bigwig, m)?)?; + m.add_function(wrap_pyfunction!(read_tabix, m)?)?; m.add_function(wrap_pyfunction!(partition_from_index, m)?)?; m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add("__core_version__", oxbow::VERSION)?; diff --git a/py-oxbow/src/tabix.rs b/py-oxbow/src/tabix.rs new file mode 100644 index 0000000..7f831ab --- /dev/null +++ b/py-oxbow/src/tabix.rs @@ -0,0 +1,114 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatchIterator, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use noodles::core::Region; +use noodles::csi; +use noodles::csi::io::IndexedRecord; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::types::PyString; + +use crate::filelike::PyFileLikeObject; +use crate::util::resolve_index; +use oxbow::util::batches_to_ipc; + +/// Read records from a tabix-indexed file in the given genomic region. +/// +/// Parameters +/// ---------- +/// src : str or file-like +/// The path to the BGZF-compressed tabix-indexed file, or a seekable +/// file-like object. +/// region : str +/// Genomic region to query, e.g. ``"chr1:1000-2000"``. +/// index : str or file-like, optional +/// Path to the tabix index (.tbi or .csi), or a file-like object. +/// Required when ``src`` is a file-like object; inferred from ``src`` +/// when ``src`` is a file path (appends ``.tbi``). +/// +/// Returns +/// ------- +/// bytes +/// Arrow IPC bytes with "chrom" (Utf8), "start" (Int32), "end" (Int32), +/// and "raw" (Utf8) columns. Coordinates are 1-based, matching the +/// tabix index convention. +/// +/// Raises +/// ------ +/// ValueError +/// If ``src`` is a file-like and ``index`` is not provided, or if the +/// region chromosome is not present in the index +/// ("missing reference sequence name"). +#[pyfunction] +#[pyo3(signature = (src, region, index=None))] +pub fn read_tabix( + py: Python, + src: Py, + region: String, + index: Option>, +) -> PyResult> { + let region = region + .parse::() + .map_err(|e| PyErr::new::(e.to_string()))?; + + let csi_index = resolve_index(py, &src, index)?; + + let records: Vec<(String, i32, i32, String)> = + if let Ok(py_string) = src.cast_bound::(py) { + let path = py_string.to_string(); + let file = std::fs::File::open(&path)?; + collect_query(file, csi_index.into_boxed(), ®ion) + .map_err(|e| PyErr::new::(e.to_string()))? + } else { + let file_like = PyFileLikeObject::new(src, true, false, true)?; + collect_query(file_like, csi_index.into_boxed(), ®ion) + .map_err(|e| PyErr::new::(e.to_string()))? + }; + + let schema = Arc::new(Schema::new(vec![ + Field::new("chrom", DataType::Utf8, false), + Field::new("start", DataType::Int32, false), + Field::new("end", DataType::Int32, false), + Field::new("raw", DataType::Utf8, false), + ])); + + let chroms: Vec<&str> = records.iter().map(|(c, _, _, _)| c.as_str()).collect(); + let starts: Vec = records.iter().map(|(_, s, _, _)| *s).collect(); + let ends: Vec = records.iter().map(|(_, _, e, _)| *e).collect(); + let raws: Vec<&str> = records.iter().map(|(_, _, _, r)| r.as_str()).collect(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(chroms)) as ArrayRef, + Arc::new(Int32Array::from(starts)) as ArrayRef, + Arc::new(Int32Array::from(ends)) as ArrayRef, + Arc::new(StringArray::from(raws)) as ArrayRef, + ], + ) + .map_err(|e| PyErr::new::(e.to_string()))?; + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + batches_to_ipc(reader).map_err(|e| PyErr::new::(e.to_string())) +} + +fn collect_query( + reader: R, + index: Box, + region: &Region, +) -> std::io::Result> { + let mut indexed_reader = csi::io::IndexedReader::new(reader, index); + let query = indexed_reader.query(region)?; + query + .map(|result| { + let record = result?; + let chrom = record.indexed_reference_sequence_name().to_string(); + let start = record.indexed_start_position().get() as i32; + let end = record.indexed_end_position().get() as i32; + let raw = record.as_ref().to_string(); + Ok((chrom, start, end, raw)) + }) + .collect() +} diff --git a/py-oxbow/tests/test_scanners.py b/py-oxbow/tests/test_scanners.py index a083ecf..c3d31c0 100644 --- a/py-oxbow/tests/test_scanners.py +++ b/py-oxbow/tests/test_scanners.py @@ -8,6 +8,10 @@ from tests.utils import Input +def _read_ipc(ipc_bytes): + return pa.ipc.open_file(pa.BufferReader(ipc_bytes)).read_all() + + class TestPySamScanner: @pytest.mark.parametrize( "input", @@ -1011,3 +1015,58 @@ def test_scan_virtual_ranges(self): reader = pa.RecordBatchReader.from_stream(data=stream, schema=pa.schema(schema)) batch2 = reader.read_next_batch() assert batch.to_pydict() == batch2.to_pydict() + + +class TestReadBamReferences: + def test_schema(self): + table = _read_ipc(ox.read_bam_references("data/sample.bam")) + assert table.schema.names == ["name", "length"] + assert pa.types.is_string(table.schema.field("name").type) + assert pa.types.is_uint32(table.schema.field("length").type) + + def test_returns_references(self): + table = _read_ipc(ox.read_bam_references("data/sample.bam")) + assert table.num_rows > 0 + assert "chr1" in table.column("name").to_pylist() + + def test_file_like_object(self): + with open("data/sample.bam", "rb") as f: + table = _read_ipc(ox.read_bam_references(f)) + assert table.num_rows > 0 + + def test_matches_path_result(self): + table_path = _read_ipc(ox.read_bam_references("data/sample.bam")) + with open("data/sample.bam", "rb") as f: + table_filelike = _read_ipc(ox.read_bam_references(f)) + assert table_path.to_pydict() == table_filelike.to_pydict() + + +class TestReadTabix: + def test_schema(self): + table = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1")) + assert table.schema.names == ["chrom", "start", "end", "raw"] + assert pa.types.is_string(table.schema.field("chrom").type) + assert pa.types.is_int32(table.schema.field("start").type) + assert pa.types.is_int32(table.schema.field("end").type) + assert pa.types.is_string(table.schema.field("raw").type) + + def test_returns_records_for_region(self): + table = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1")) + assert table.num_rows > 0 + assert all(c == "chr1" for c in table.column("chrom").to_pylist()) + + def test_explicit_index(self): + table_inferred = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1")) + table_explicit = _read_ipc( + ox.read_tabix("data/sample.bed.gz", "chr1", index="data/sample.bed.gz.tbi") + ) + assert table_inferred.to_pydict() == table_explicit.to_pydict() + + def test_file_like_with_index(self): + with open("data/sample.bed.gz", "rb") as f: + table = _read_ipc(ox.read_tabix(f, "chr1", index="data/sample.bed.gz.tbi")) + assert table.num_rows > 0 + + def test_missing_region_raises(self): + with pytest.raises(ValueError): + ox.read_tabix("data/sample.bed.gz", "nonexistent_chrom")