Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions py-oxbow/oxbow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from oxbow.oxbow import (
read_bam,
read_bam_references,
read_bcf,
read_bed,
read_bigbed,
Expand All @@ -36,6 +37,7 @@
read_gff,
read_gtf,
read_sam,
read_tabix,
read_vcf,
)

Expand Down Expand Up @@ -65,6 +67,7 @@
"read_fastq",
"read_sam",
"read_bam",
"read_bam_references",
"read_cram",
"read_bcf",
"read_vcf",
Expand All @@ -73,6 +76,7 @@
"read_bigwig",
"read_gff",
"read_gtf",
"read_tabix",
]

from_bam.__module__ = __name__
Expand Down
41 changes: 41 additions & 0 deletions py-oxbow/src/alignment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::io::Seek;
use std::sync::Arc;

use arrow::array::{ArrayRef, RecordBatchIterator, StringArray, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyDict;
Expand Down Expand Up @@ -1348,3 +1351,41 @@ pub fn read_cram(

ipc.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))
}

/// Return reference sequence names and lengths from a BAM file header as Arrow IPC bytes.
///
/// Parameters
/// ----------
/// src : str or file-like
/// The path to the BAM file or a file-like object.
///
/// Returns
/// -------
/// bytes
/// Arrow IPC bytes with "name" (Utf8) and "length" (UInt32) columns.
#[pyfunction]
pub fn read_bam_references(py: Python, src: Py<PyAny>) -> PyResult<Vec<u8>> {
let reader = pyobject_to_bufreader(py, src, true)?;
let mut fmt_reader = noodles::bam::io::Reader::from(reader);
let header = fmt_reader.read_header()?;
let scanner = BamScanner::new(header, None, None).map_err(to_py)?;
let chrom_sizes = scanner.chrom_sizes();

let schema = Arc::new(Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("length", DataType::UInt32, false),
]));
let names: Vec<&str> = chrom_sizes.iter().map(|(n, _)| n.as_str()).collect();
let lengths: Vec<u32> = chrom_sizes.iter().map(|(_, l)| *l).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(names)) as ArrayRef,
Arc::new(UInt32Array::from(lengths)) as ArrayRef,
],
)
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;

let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
batches_to_ipc(reader).map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))
}
8 changes: 7 additions & 1 deletion py-oxbow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ mod bed;
mod error;
mod gxf;
mod sequence;
mod tabix;
mod variant;
use crate::alignment::{read_bam, read_cram, read_sam, PyBamScanner, PyCramScanner, PySamScanner};
use crate::alignment::{
read_bam, read_bam_references, read_cram, read_sam, PyBamScanner, PyCramScanner, PySamScanner,
};
use crate::bbi::{
read_bigbed, read_bigwig, PyBBIFileType, PyBBIZoomScanner, PyBigBedScanner, PyBigWigScanner,
};
use crate::bed::{read_bed, PyBedScanner};
use crate::gxf::{read_gff, read_gtf, PyGffScanner, PyGtfScanner};
use crate::sequence::{read_fasta, read_fastq, PyFastaScanner, PyFastqScanner};
use crate::tabix::read_tabix;
use crate::util::partition_from_index;
use crate::variant::{read_bcf, read_vcf, PyBcfScanner, PyVcfScanner};

Expand Down Expand Up @@ -45,6 +49,7 @@ fn oxbow_sandbox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(read_fastq, m)?)?;
m.add_function(wrap_pyfunction!(read_sam, m)?)?;
m.add_function(wrap_pyfunction!(read_bam, m)?)?;
m.add_function(wrap_pyfunction!(read_bam_references, m)?)?;
m.add_function(wrap_pyfunction!(read_cram, m)?)?;
m.add_function(wrap_pyfunction!(read_gff, m)?)?;
m.add_function(wrap_pyfunction!(read_gtf, m)?)?;
Expand All @@ -53,6 +58,7 @@ fn oxbow_sandbox(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(read_bed, m)?)?;
m.add_function(wrap_pyfunction!(read_bigbed, m)?)?;
m.add_function(wrap_pyfunction!(read_bigwig, m)?)?;
m.add_function(wrap_pyfunction!(read_tabix, m)?)?;
m.add_function(wrap_pyfunction!(partition_from_index, m)?)?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add("__core_version__", oxbow::VERSION)?;
Expand Down
114 changes: 114 additions & 0 deletions py-oxbow/src/tabix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatchIterator, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use noodles::core::Region;
use noodles::csi;
use noodles::csi::io::IndexedRecord;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyString;

use crate::filelike::PyFileLikeObject;
use crate::util::resolve_index;
use oxbow::util::batches_to_ipc;

/// Read records from a tabix-indexed file in the given genomic region.
///
/// Parameters
/// ----------
/// src : str or file-like
/// The path to the BGZF-compressed tabix-indexed file, or a seekable
/// file-like object.
/// region : str
/// Genomic region to query, e.g. ``"chr1:1000-2000"``.
/// index : str or file-like, optional
/// Path to the tabix index (.tbi or .csi), or a file-like object.
/// Required when ``src`` is a file-like object; inferred from ``src``
/// when ``src`` is a file path (appends ``.tbi``).
///
/// Returns
/// -------
/// bytes
/// Arrow IPC bytes with "chrom" (Utf8), "start" (Int32), "end" (Int32),
/// and "raw" (Utf8) columns. Coordinates are 1-based, matching the
/// tabix index convention.
///
/// Raises
/// ------
/// ValueError
/// If ``src`` is a file-like and ``index`` is not provided, or if the
/// region chromosome is not present in the index
/// ("missing reference sequence name").
#[pyfunction]
#[pyo3(signature = (src, region, index=None))]
pub fn read_tabix(
py: Python,
src: Py<PyAny>,
region: String,
index: Option<Py<PyAny>>,
) -> PyResult<Vec<u8>> {
let region = region
.parse::<Region>()
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;

let csi_index = resolve_index(py, &src, index)?;

let records: Vec<(String, i32, i32, String)> =
if let Ok(py_string) = src.cast_bound::<PyString>(py) {
let path = py_string.to_string();
let file = std::fs::File::open(&path)?;
collect_query(file, csi_index.into_boxed(), &region)
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?
} else {
let file_like = PyFileLikeObject::new(src, true, false, true)?;
collect_query(file_like, csi_index.into_boxed(), &region)
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?
};

let schema = Arc::new(Schema::new(vec![
Field::new("chrom", DataType::Utf8, false),
Field::new("start", DataType::Int32, false),
Field::new("end", DataType::Int32, false),
Field::new("raw", DataType::Utf8, false),
]));

let chroms: Vec<&str> = records.iter().map(|(c, _, _, _)| c.as_str()).collect();
let starts: Vec<i32> = records.iter().map(|(_, s, _, _)| *s).collect();
let ends: Vec<i32> = records.iter().map(|(_, _, e, _)| *e).collect();
let raws: Vec<&str> = records.iter().map(|(_, _, _, r)| r.as_str()).collect();

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(chroms)) as ArrayRef,
Arc::new(Int32Array::from(starts)) as ArrayRef,
Arc::new(Int32Array::from(ends)) as ArrayRef,
Arc::new(StringArray::from(raws)) as ArrayRef,
],
)
.map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))?;

let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
batches_to_ipc(reader).map_err(|e| PyErr::new::<PyValueError, _>(e.to_string()))
}

fn collect_query<R: std::io::Read + std::io::Seek>(
reader: R,
index: Box<dyn csi::BinningIndex>,
region: &Region,
) -> std::io::Result<Vec<(String, i32, i32, String)>> {
let mut indexed_reader = csi::io::IndexedReader::new(reader, index);
let query = indexed_reader.query(region)?;
query
.map(|result| {
let record = result?;
let chrom = record.indexed_reference_sequence_name().to_string();
let start = record.indexed_start_position().get() as i32;
let end = record.indexed_end_position().get() as i32;
let raw = record.as_ref().to_string();
Ok((chrom, start, end, raw))
})
.collect()
}
59 changes: 59 additions & 0 deletions py-oxbow/tests/test_scanners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from tests.utils import Input


def _read_ipc(ipc_bytes):
return pa.ipc.open_file(pa.BufferReader(ipc_bytes)).read_all()


class TestPySamScanner:
@pytest.mark.parametrize(
"input",
Expand Down Expand Up @@ -1011,3 +1015,58 @@ def test_scan_virtual_ranges(self):
reader = pa.RecordBatchReader.from_stream(data=stream, schema=pa.schema(schema))
batch2 = reader.read_next_batch()
assert batch.to_pydict() == batch2.to_pydict()


class TestReadBamReferences:
def test_schema(self):
table = _read_ipc(ox.read_bam_references("data/sample.bam"))
assert table.schema.names == ["name", "length"]
assert pa.types.is_string(table.schema.field("name").type)
assert pa.types.is_uint32(table.schema.field("length").type)

def test_returns_references(self):
table = _read_ipc(ox.read_bam_references("data/sample.bam"))
assert table.num_rows > 0
assert "chr1" in table.column("name").to_pylist()

def test_file_like_object(self):
with open("data/sample.bam", "rb") as f:
table = _read_ipc(ox.read_bam_references(f))
assert table.num_rows > 0

def test_matches_path_result(self):
table_path = _read_ipc(ox.read_bam_references("data/sample.bam"))
with open("data/sample.bam", "rb") as f:
table_filelike = _read_ipc(ox.read_bam_references(f))
assert table_path.to_pydict() == table_filelike.to_pydict()


class TestReadTabix:
def test_schema(self):
table = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1"))
assert table.schema.names == ["chrom", "start", "end", "raw"]
assert pa.types.is_string(table.schema.field("chrom").type)
assert pa.types.is_int32(table.schema.field("start").type)
assert pa.types.is_int32(table.schema.field("end").type)
assert pa.types.is_string(table.schema.field("raw").type)

def test_returns_records_for_region(self):
table = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1"))
assert table.num_rows > 0
assert all(c == "chr1" for c in table.column("chrom").to_pylist())

def test_explicit_index(self):
table_inferred = _read_ipc(ox.read_tabix("data/sample.bed.gz", "chr1"))
table_explicit = _read_ipc(
ox.read_tabix("data/sample.bed.gz", "chr1", index="data/sample.bed.gz.tbi")
)
assert table_inferred.to_pydict() == table_explicit.to_pydict()

def test_file_like_with_index(self):
with open("data/sample.bed.gz", "rb") as f:
table = _read_ipc(ox.read_tabix(f, "chr1", index="data/sample.bed.gz.tbi"))
assert table.num_rows > 0

def test_missing_region_raises(self):
with pytest.raises(ValueError):
ox.read_tabix("data/sample.bed.gz", "nonexistent_chrom")