Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::error::{
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::BoxedRecordBatchStream;
use crate::read::projection::ProjectionMapper;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::region::options::{MergeMode, RegionOptions};
Expand Down Expand Up @@ -848,7 +848,7 @@ impl CompactionSstReaderBuilder<'_> {
}

fn build_scan_input(self) -> Result<ScanInput> {
let mapper = ProjectionMapper::all(&self.metadata)?;
let mapper = FlatProjectionMapper::all(&self.metadata)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
Expand Down
37 changes: 24 additions & 13 deletions src/mito2/src/memtable/bulk/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use table::predicate::Predicate;

use crate::error::Result;
use crate::sst::parquet::file_range::{PreFilterMode, RangeBase};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::flat_format::FlatReadFormat;
use crate::sst::parquet::prefilter::CachedPrimaryKeyFilter;
use crate::sst::parquet::reader::SimpleFilterContext;
use crate::sst::parquet::stats::RowGroupPruningStats;
Expand Down Expand Up @@ -77,14 +77,26 @@ impl BulkIterContext {
})
.collect();

let read_format = ReadFormat::new(
region_metadata.clone(),
projection,
true,
None,
"memtable",
skip_auto_convert,
)?;
let read_format = if let Some(column_ids) = projection {
FlatReadFormat::new(
region_metadata.clone(),
column_ids.iter().copied(),
None,
"memtable",
skip_auto_convert,
)?
} else {
FlatReadFormat::new(
region_metadata.clone(),
region_metadata
.column_metadatas
.iter()
.map(|col| col.column_id),
None,
"memtable",
skip_auto_convert,
)?
};

let dyn_filters = predicate
.as_ref()
Expand Down Expand Up @@ -143,11 +155,10 @@ impl BulkIterContext {

/// Extracts PK filters if flat format with dictionary-encoded PKs is used.
fn extract_pk_filters(
read_format: &ReadFormat,
read_format: &FlatReadFormat,
filters: &[SimpleFilterContext],
) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
let flat_format = read_format.as_flat()?;
if flat_format.batch_has_raw_pk_columns() {
if read_format.batch_has_raw_pk_columns() {
return None;
}
let metadata = read_format.metadata();
Expand Down Expand Up @@ -179,7 +190,7 @@ impl BulkIterContext {
Some(CachedPrimaryKeyFilter::new(inner))
}

pub(crate) fn read_format(&self) -> &ReadFormat {
pub(crate) fn read_format(&self) -> &FlatReadFormat {
&self.base.read_format
}

Expand Down
7 changes: 1 addition & 6 deletions src/mito2/src/memtable/bulk/part_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ impl EncodedBulkPartIter {
sequence: Option<SequenceRange>,
mem_scan_metrics: Option<MemScanMetrics>,
) -> error::Result<Self> {
assert!(context.read_format().as_flat().is_some());

let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
let data = encoded_part.data().clone();
let series_count = encoded_part.metadata().num_series as usize;
Expand Down Expand Up @@ -238,8 +236,6 @@ impl BulkPartBatchIter {
series_count: usize,
mem_scan_metrics: Option<MemScanMetrics>,
) -> Self {
assert!(context.read_format().as_flat().is_some());

let pk_filter = context.build_pk_filter();

Self {
Expand Down Expand Up @@ -406,8 +402,7 @@ fn apply_combined_filters(
};

// Converts the format to the flat format.
let format = context.read_format().as_flat().unwrap();
let record_batch = format.convert_batch(record_batch, None)?;
let record_batch = context.read_format().convert_batch(record_batch, None)?;

let num_rows = record_batch.num_rows();
let mut combined_filter = None;
Expand Down
26 changes: 1 addition & 25 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod series_scan;
pub mod stream;
pub(crate) mod unordered_scan;

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -63,16 +63,13 @@ use futures::TryStreamExt;
use futures::stream::BoxStream;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};

use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu,
Result,
};
use crate::memtable::{BoxedBatchIterator, BoxedRecordBatchIterator};
use crate::read::prune::PruneReader;

/// Storage internal representation of a batch of rows for a primary key (time series).
///
/// Rows are sorted by primary key, timestamp, sequence desc, op_type desc. Fields
Expand Down Expand Up @@ -573,24 +570,6 @@ impl Batch {
size
}

/// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<(ColumnId, ConcreteDataType)> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some((column.column_id, column.column_schema.data_type.clone()))
} else {
None
}
})
.collect()
}

/// Returns timestamps in a native slice or `None` if the batch is empty.
pub(crate) fn timestamps_native(&self) -> Option<&[i64]> {
if self.timestamps.is_empty() {
Expand Down Expand Up @@ -1111,8 +1090,6 @@ pub enum Source {
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
/// Source from a [PruneReader].
PruneReader(PruneReader),
}

impl Source {
Expand All @@ -1122,7 +1099,6 @@ impl Source {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
Source::PruneReader(reader) => reader.next_batch().await,
}
}
}
Expand Down
Loading
Loading