Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 121 additions & 30 deletions oxbow/src/alignment/model.rs

Large diffs are not rendered by default.

15 changes: 13 additions & 2 deletions oxbow/src/alignment/model/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ use indexmap::IndexMap;
use noodles::sam::alignment::record::data::field::Tag;

use crate::batch::{Push, RecordBatchBuilder};
use crate::Select;
use crate::{CoordSystem, Select};

/// The coordinate system in which noodles returns alignment start positions.
/// `noodles::core::Position::get()` is always 1-based regardless of the underlying
/// format, so all alignment batch builders share this source coordinate system.
const SOURCE_CS: CoordSystem = CoordSystem::OneClosed;

use super::field::Push as _;
use super::field::{Field, FieldBuilder};
Expand Down Expand Up @@ -36,7 +41,7 @@ impl BatchBuilder {
tag_defs: Option<Vec<(String, String)>>,
capacity: usize,
) -> crate::Result<Self> {
let model = Model::new(fields, tag_defs)?;
let model = Model::new(fields, tag_defs, CoordSystem::OneClosed)?;
Self::from_model(&model, header, capacity)
}

Expand All @@ -52,13 +57,18 @@ impl BatchBuilder {
.map(|(name, _)| name.to_string())
.collect();

let coord_offset = model.coord_system().start_offset_from(SOURCE_CS);

let mut field_builders = IndexMap::new();
for field in model.fields() {
let builder = match field {
Field::Rname | Field::Rnext => {
FieldBuilder::with_refs(field.clone(), capacity, &ref_names)
.map_err(|e| crate::OxbowError::invalid_data(e.to_string()))?
}
Field::Pos | Field::Pnext => {
FieldBuilder::new(field.clone(), capacity).with_coord_offset(coord_offset)
}
_ => FieldBuilder::new(field.clone(), capacity),
};
field_builders.insert(field.clone(), builder);
Expand Down Expand Up @@ -301,6 +311,7 @@ mod tests {
let model = Model::new(
Select::Some(vec!["qname".into(), "pos".into()]),
Some(vec![("NM".into(), "i".into())]),
CoordSystem::OneClosed,
)
.unwrap();
let header = noodles::sam::Header::default();
Expand Down
54 changes: 36 additions & 18 deletions oxbow/src/alignment/model/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ pub enum FieldBuilder {
Qname(GenericStringBuilder<i32>),
Flag(UInt16Builder),
Rname(StringDictionaryBuilder<Int32Type>),
Pos(Int32Builder),
Pos(Int32Builder, i32),
Mapq(UInt8Builder),
Cigar(GenericStringBuilder<i32>),
Rnext(StringDictionaryBuilder<Int32Type>),
Pnext(Int32Builder),
Pnext(Int32Builder, i32),
Tlen(Int32Builder),
Seq(GenericStringBuilder<i32>),
Qual(GenericStringBuilder<i32>),
Expand All @@ -138,18 +138,32 @@ impl FieldBuilder {
Field::Qname => Self::Qname(GenericStringBuilder::<i32>::with_capacity(capacity, 1024)),
Field::Flag => Self::Flag(UInt16Builder::with_capacity(capacity)),
Field::Rname => Self::Rname(StringDictionaryBuilder::<Int32Type>::new()),
Field::Pos => Self::Pos(Int32Builder::with_capacity(capacity)),
Field::Pos => Self::Pos(Int32Builder::with_capacity(capacity), 0),
Field::Mapq => Self::Mapq(UInt8Builder::with_capacity(capacity)),
Field::Cigar => Self::Cigar(GenericStringBuilder::<i32>::with_capacity(capacity, 1024)),
Field::Rnext => Self::Rnext(StringDictionaryBuilder::<Int32Type>::new()),
Field::Pnext => Self::Pnext(Int32Builder::with_capacity(capacity)),
Field::Pnext => Self::Pnext(Int32Builder::with_capacity(capacity), 0),
Field::Tlen => Self::Tlen(Int32Builder::with_capacity(capacity)),
Field::Seq => Self::Seq(GenericStringBuilder::<i32>::with_capacity(capacity, 1024)),
Field::Qual => Self::Qual(GenericStringBuilder::<i32>::with_capacity(capacity, 1024)),
Field::End => Self::End(Int32Builder::with_capacity(capacity)),
}
}

/// Sets the coordinate offset for start position fields (`pos`, `pnext`).
///
/// The offset is added to start coordinates when appending records, converting
/// from the source coordinate system to the output coordinate system. Use
/// [`CoordSystem::start_offset_from`][crate::CoordSystem::start_offset_from] to
/// compute this value. Has no effect on other field variants.
pub fn with_coord_offset(self, offset: i32) -> Self {
match self {
Self::Pos(b, _) => Self::Pos(b, offset),
Self::Pnext(b, _) => Self::Pnext(b, offset),
other => other,
}
}

pub fn with_refs(
field: Field,
capacity: usize,
Expand Down Expand Up @@ -186,14 +200,14 @@ impl FieldBuilder {
let array = reset_dictarray_builder(builder);
Arc::new(array)
}
Self::Pos(builder) => Arc::new(builder.finish()),
Self::Pos(builder, _) => Arc::new(builder.finish()),
Self::Mapq(builder) => Arc::new(builder.finish()),
Self::Cigar(builder) => Arc::new(builder.finish()),
Self::Rnext(builder) => {
let array = reset_dictarray_builder(builder);
Arc::new(array)
}
Self::Pnext(builder) => Arc::new(builder.finish()),
Self::Pnext(builder, _) => Arc::new(builder.finish()),
Self::Tlen(builder) => Arc::new(builder.finish()),
Self::Seq(builder) => Arc::new(builder.finish()),
Self::Qual(builder) => Arc::new(builder.finish()),
Expand Down Expand Up @@ -226,10 +240,10 @@ impl Push<&noodles::sam::Record> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| name.to_string()));
builder.append_option(rname);
}
Self::Pos(builder) => {
Self::Pos(builder, offset) => {
let start = record
.alignment_start()
.and_then(|result| result.ok().map(|pos| pos.get() as i32));
.and_then(|result| result.ok().map(|pos| pos.get() as i32 + *offset));
builder.append_option(start);
}
Self::Mapq(builder) => {
Expand All @@ -247,10 +261,10 @@ impl Push<&noodles::sam::Record> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| name.to_string()));
builder.append_option(rnext);
}
Self::Pnext(builder) => {
Self::Pnext(builder, offset) => {
let start = record
.mate_alignment_start()
.and_then(|result| result.ok().map(|pos| pos.get() as i32));
.and_then(|result| result.ok().map(|pos| pos.get() as i32 + *offset));
builder.append_option(start);
}
Self::Tlen(builder) => {
Expand Down Expand Up @@ -294,10 +308,10 @@ impl Push<&noodles::bam::Record> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| name.to_string()));
builder.append_option(rname);
}
Self::Pos(builder) => {
Self::Pos(builder, offset) => {
let start = record
.alignment_start()
.and_then(|result| result.ok().map(|pos| pos.get() as i32));
.and_then(|result| result.ok().map(|pos| pos.get() as i32 + *offset));
builder.append_option(start);
}
Self::Mapq(builder) => {
Expand All @@ -312,10 +326,10 @@ impl Push<&noodles::bam::Record> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| name.to_string()));
builder.append_option(rnext);
}
Self::Pnext(builder) => {
Self::Pnext(builder, offset) => {
let start = record
.mate_alignment_start()
.and_then(|result| result.ok().map(|pos| pos.get() as i32));
.and_then(|result| result.ok().map(|pos| pos.get() as i32 + *offset));
builder.append_option(start);
}
Self::Tlen(builder) => {
Expand Down Expand Up @@ -359,8 +373,10 @@ impl Push<&noodles::sam::alignment::RecordBuf> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| String::from_utf8_lossy(name)));
builder.append_option(rname);
}
Self::Pos(builder) => {
let start = record.alignment_start().map(|pos| pos.get() as i32);
Self::Pos(builder, offset) => {
let start = record
.alignment_start()
.map(|pos| pos.get() as i32 + *offset);
builder.append_option(start);
}
Self::Mapq(builder) => {
Expand All @@ -375,8 +391,10 @@ impl Push<&noodles::sam::alignment::RecordBuf> for FieldBuilder {
.and_then(|result| result.ok().map(|(name, _)| String::from_utf8_lossy(name)));
builder.append_option(rnext);
}
Self::Pnext(builder) => {
let start = record.mate_alignment_start().map(|pos| pos.get() as i32);
Self::Pnext(builder, offset) => {
let start = record
.mate_alignment_start()
.map(|pos| pos.get() as i32 + *offset);
builder.append_option(start);
}
Self::Tlen(builder) => {
Expand Down
16 changes: 10 additions & 6 deletions oxbow/src/alignment/scanner/bam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::alignment::model::BatchBuilder;
use crate::alignment::scanner::batch_iterator::{BatchIterator, QueryBatchIterator};
use crate::alignment::AlignmentModel;
use crate::util::query::{BgzfChunkReader, ByteRangeReader};
use crate::Select;
use crate::{CoordSystem, Region, Select};

/// A BAM scanner.
///
Expand All @@ -30,7 +30,8 @@ use crate::Select;
/// let header = fmt_reader.read_header().unwrap();
///
/// let tag_defs = Scanner::tag_defs(&mut fmt_reader, Some(1000)).unwrap();
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs)).unwrap();
/// use oxbow::CoordSystem;
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs), CoordSystem::OneClosed).unwrap();
/// let batches = scanner.scan(fmt_reader, None, None, Some(1000));
/// ```
pub struct Scanner {
Expand All @@ -43,12 +44,14 @@ impl Scanner {
///
/// - `fields`: standard SAM field selection.
/// - `tag_defs`: `None` → no tags column. `Some(vec![])` → empty struct.
/// - `coord_system`: output coordinate system for position columns.
pub fn new(
header: noodles::sam::Header,
fields: Select<String>,
tag_defs: Option<Vec<(String, String)>>,
coord_system: CoordSystem,
) -> crate::Result<Self> {
let model = AlignmentModel::new(fields, tag_defs)?;
let model = AlignmentModel::new(fields, tag_defs, coord_system)?;
Ok(Self { header, model })
}

Expand Down Expand Up @@ -172,13 +175,14 @@ impl Scanner {
pub fn scan_query<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::bam::io::Reader<R>,
region: noodles::core::Region,
region: Region,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let region = region.to_noodles()?;
let interval = region.interval();

let batch_builder = self.build_batch_builder(columns, batch_size)?;
Expand Down Expand Up @@ -297,7 +301,7 @@ mod tests {
#[test]
fn test_scan_with_multithreaded_reader() {
let (header, fmt_reader) = mt_reader();
let scanner = Scanner::new(header, Select::All, None).unwrap();
let scanner = Scanner::new(header, Select::All, None, CoordSystem::OneClosed).unwrap();
let mut batches = scanner.scan(fmt_reader, None, None, Some(10)).unwrap();

let batch = batches.next().unwrap().unwrap();
Expand All @@ -308,7 +312,7 @@ mod tests {
#[test]
fn test_scan_query_with_multithreaded_reader() {
let (header, fmt_reader) = mt_reader();
let scanner = Scanner::new(header, Select::All, None).unwrap();
let scanner = Scanner::new(header, Select::All, None, CoordSystem::OneClosed).unwrap();

let index = noodles::bam::bai::fs::read("../fixtures/sample.bam.bai").unwrap();

Expand Down
12 changes: 8 additions & 4 deletions oxbow/src/alignment/scanner/cram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::alignment::model::tag::TagScanner;
use crate::alignment::model::BatchBuilder;
use crate::alignment::AlignmentModel;
use crate::batch::{Push, RecordBatchBuilder as _};
use crate::Select;
use crate::{CoordSystem, Region, Select};

/// A CRAM scanner.
///
Expand All @@ -35,7 +35,8 @@ use crate::Select;
/// let header = fmt_reader.read_header().unwrap();
///
/// let tag_defs = Scanner::tag_defs(&mut fmt_reader, &header, Some(1000)).unwrap();
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs), repository).unwrap();
/// use oxbow::CoordSystem;
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs), repository, CoordSystem::OneClosed).unwrap();
/// let batches = scanner.scan(fmt_reader, None, None, Some(1000));
/// ```
pub struct Scanner {
Expand All @@ -49,15 +50,17 @@ impl Scanner {
///
/// - `fields`: standard SAM field selection.
/// - `tag_defs`: `None` → no tags column. `Some(vec![])` → empty struct.
/// - `coord_system`: output coordinate system for position columns.
///
/// The FASTA repository is stored and used by scan methods for decoding.
pub fn new(
header: noodles::sam::Header,
fields: Select<String>,
tag_defs: Option<Vec<(String, String)>>,
repo: noodles::fasta::Repository,
coord_system: CoordSystem,
) -> crate::Result<Self> {
let model = AlignmentModel::new(fields, tag_defs)?;
let model = AlignmentModel::new(fields, tag_defs, coord_system)?;
Ok(Self {
header,
model,
Expand Down Expand Up @@ -207,13 +210,14 @@ impl Scanner {
pub fn scan_query<R: Read + Seek>(
&self,
fmt_reader: noodles::cram::io::Reader<R>,
region: noodles::core::Region,
region: Region,
index: noodles::cram::crai::Index,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let region = region.to_noodles()?;
let interval = region.interval();

let batch_builder = self.build_batch_builder(columns, batch_size)?;
Expand Down
12 changes: 8 additions & 4 deletions oxbow/src/alignment/scanner/sam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::alignment::model::BatchBuilder;
use crate::alignment::scanner::batch_iterator::{BatchIterator, QueryBatchIterator};
use crate::alignment::AlignmentModel;
use crate::util::query::{BgzfChunkReader, ByteRangeReader};
use crate::Select;
use crate::{CoordSystem, Region, Select};

/// A SAM scanner.
///
Expand All @@ -30,7 +30,8 @@ use crate::Select;
/// let header = fmt_reader.read_header().unwrap();
///
/// let tag_defs = Scanner::tag_defs(&mut fmt_reader, Some(1000)).unwrap();
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs)).unwrap();
/// use oxbow::CoordSystem;
/// let scanner = Scanner::new(header, Select::All, Some(tag_defs), CoordSystem::OneClosed).unwrap();
/// let batches = scanner.scan(fmt_reader, None, None, Some(1000));
/// ```
pub struct Scanner {
Expand All @@ -43,12 +44,14 @@ impl Scanner {
///
/// - `fields`: standard SAM field selection.
/// - `tag_defs`: `None` → no tags column. `Some(vec![])` → empty struct.
/// - `coord_system`: output coordinate system for position columns.
pub fn new(
header: noodles::sam::Header,
fields: Select<String>,
tag_defs: Option<Vec<(String, String)>>,
coord_system: CoordSystem,
) -> crate::Result<Self> {
let model = AlignmentModel::new(fields, tag_defs)?;
let model = AlignmentModel::new(fields, tag_defs, coord_system)?;
Ok(Self { header, model })
}

Expand Down Expand Up @@ -172,13 +175,14 @@ impl Scanner {
pub fn scan_query<R: noodles::bgzf::io::BufRead + noodles::bgzf::io::Seek>(
&self,
fmt_reader: noodles::sam::io::Reader<R>,
region: noodles::core::Region,
region: Region,
index: impl BinningIndex,
columns: Option<Vec<String>>,
batch_size: Option<usize>,
limit: Option<usize>,
) -> crate::Result<impl RecordBatchReader> {
let batch_size = batch_size.unwrap_or(1024);
let region = region.to_noodles()?;
let interval = region.interval();

let batch_builder = self.build_batch_builder(columns, batch_size)?;
Expand Down
Loading