Skip to content

Commit 86173d3

Browse files
committed
wip: rework decode
1 parent 029a788 commit 86173d3

38 files changed

Lines changed: 3592 additions & 39 deletions

avro/src/decode/block.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ pub struct BlockStateMachine {
2525
impl BlockStateMachine {
2626
pub fn new_with_tape(command_tape: CommandTape, tape: Vec<ItemRead>) -> Self {
2727
Self {
28-
// This clone is *cheap*
2928
command_tape,
3029
tape_or_fsm: TapeOrFsm::Tape(tape),
3130
left_in_current_block: 0,

avro/src/decode/commands.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ impl<'a> CommandTapeBuilder<'a> {
385385
self.tape.push(CommandTape::BYTES);
386386
Ok(1)
387387
}
388-
Schema::String | Schema::Uuid => {
388+
Schema::String | Schema::Uuid(_) => {
389389
self.tape.push(CommandTape::STRING);
390390
Ok(1)
391391
}
@@ -512,6 +512,7 @@ impl<'a> CommandTapeBuilder<'a> {
512512

513513
#[cfg(test)]
514514
mod tests {
515+
use crate::schema::UuidSchema;
515516
use super::*;
516517

517518
#[test]
@@ -636,7 +637,7 @@ mod tests {
636637
&[CommandTape::STRING]
637638
);
638639
assert_eq!(
639-
CommandTape::build_from_schema(&Schema::Uuid, &HashMap::new())
640+
CommandTape::build_from_schema(&Schema::Uuid(UuidSchema::String), &HashMap::new())
640641
.unwrap()
641642
.inner
642643
.as_ref(),

avro/src/decode/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ pub fn value_from_tape_internal(
450450
}
451451
.into()),
452452
},
453-
Schema::Uuid => match tape.next().ok_or(ValueFromTapeError::UnexpectedEndOfTape)? {
453+
Schema::Uuid(_) => match tape.next().ok_or(ValueFromTapeError::UnexpectedEndOfTape)? {
454454
ItemRead::String(string) => Uuid::from_str(&string)
455455
.map(Value::Uuid)
456456
.map_err(|e| Details::ConvertStrToUuid(e).into()),
@@ -605,6 +605,7 @@ mod tests {
605605
use pretty_assertions::assert_eq;
606606
use std::collections::HashMap;
607607
use uuid::Uuid;
608+
use crate::schema::UuidSchema;
608609

609610
#[test]
610611
fn test_decode_array_without_size() -> TestResult {
@@ -1116,7 +1117,7 @@ mod tests {
11161117
let mut buffer = Vec::new();
11171118
encode(&value, &schema, &mut buffer).expect(&success(&value, &schema));
11181119

1119-
let result = from_avro_datum(&Schema::Uuid, &mut &buffer[..], None)?;
1120+
let result = from_avro_datum(&Schema::Uuid(UuidSchema::String), &mut &buffer[..], None)?;
11201121
assert_eq!(result, value);
11211122

11221123
Ok(())

avro/src/decode2/codec.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use crate::{
2+
Codec,
3+
decode2::{Fsm, FsmControlFlow, FsmResult},
4+
};
5+
use oval::Buffer;
6+
7+
pub struct CodecStateMachine<T: Fsm> {
8+
sub_machine: Option<T>,
9+
codec: Decoder,
10+
buffer: Buffer,
11+
}
12+
13+
impl<T: Fsm> CodecStateMachine<T> {
14+
pub fn new(sub_machine: T, codec: Codec) -> Self {
15+
Self {
16+
sub_machine: Some(sub_machine),
17+
codec: codec.into(),
18+
buffer: Buffer::with_capacity(1024),
19+
}
20+
}
21+
22+
pub fn reset(&mut self, sub_machine: T) {
23+
self.buffer.reset();
24+
self.sub_machine = Some(sub_machine);
25+
self.codec.reset();
26+
}
27+
}
28+
29+
pub enum Decoder {
30+
Null,
31+
Deflate(Box<miniz_oxide::inflate::stream::InflateState>),
32+
#[cfg(feature = "snappy")]
33+
Snappy(snap::raw::Decoder),
34+
#[cfg(feature = "zstandard")]
35+
Zstandard(zstd::stream::raw::Decoder<'static>),
36+
#[cfg(feature = "bzip")]
37+
Bzip2(bzip2::Decompress),
38+
#[cfg(feature = "xz")]
39+
Xz(liblzma::stream::Stream),
40+
}
41+
42+
impl From<Codec> for Decoder {
43+
fn from(value: Codec) -> Self {
44+
match value {
45+
Codec::Null => Self::Null,
46+
Codec::Deflate(_) => {
47+
use miniz_oxide::{DataFormat::Raw, inflate::stream::InflateState};
48+
Self::Deflate(InflateState::new_boxed(Raw))
49+
}
50+
#[cfg(feature = "snappy")]
51+
Codec::Snappy => Self::Snappy(snap::raw::Decoder::new()),
52+
#[cfg(feature = "zstandard")]
53+
Codec::Zstandard(_) => Self::Zstandard(zstd::stream::raw::Decoder::new().unwrap()),
54+
#[cfg(feature = "bzip")]
55+
Codec::Bzip2(_) => Self::Bzip2(bzip2::Decompress::new(false)),
56+
#[cfg(feature = "xz")]
57+
Codec::Xz(_) => {
58+
Self::Xz(liblzma::stream::Stream::new_auto_decoder(u64::MAX, 0).unwrap())
59+
}
60+
}
61+
}
62+
}
63+
64+
impl Decoder {
65+
pub fn reset(&mut self) {
66+
match self {
67+
Decoder::Null => {}
68+
Decoder::Deflate(decoder) => {
69+
decoder.reset_as(miniz_oxide::inflate::stream::MinReset);
70+
}
71+
#[cfg(feature = "snappy")]
72+
Decoder::Snappy(_decoder) => {} // No reset needed
73+
#[cfg(feature = "zstandard")]
74+
Decoder::Zstandard(decoder) => zstd::stream::raw::Operation::reinit(decoder).unwrap(),
75+
#[cfg(feature = "bzip")]
76+
Decoder::Bzip2(decoder) => {
77+
// No reset/reinit API available
78+
let _drop = std::mem::replace(decoder, bzip2::Decompress::new(false));
79+
}
80+
#[cfg(feature = "xz")]
81+
Decoder::Xz(decoder) => {
82+
// No reset/reinit API available
83+
let _drop = std::mem::replace(
84+
decoder,
85+
liblzma::stream::Stream::new_auto_decoder(u64::MAX, 0).unwrap(),
86+
);
87+
}
88+
}
89+
}
90+
}
91+
92+
impl<T: Fsm> Fsm for CodecStateMachine<T> {
93+
type Output = (T::Output, Self);
94+
95+
fn parse(mut self, buffer: &mut Buffer) -> FsmResult<Self, Self::Output> {
96+
let buffer = match &mut self.codec {
97+
Decoder::Null => buffer,
98+
Decoder::Deflate(decoder) => {
99+
use miniz_oxide::{MZFlush, StreamResult, inflate::stream::inflate};
100+
let StreamResult {
101+
bytes_consumed,
102+
bytes_written,
103+
status,
104+
} = inflate(decoder, buffer.data(), self.buffer.space(), MZFlush::None);
105+
status.unwrap();
106+
buffer.consume(bytes_consumed);
107+
self.buffer.fill(bytes_written);
108+
109+
&mut self.buffer
110+
}
111+
#[cfg(feature = "snappy")]
112+
Decoder::Snappy(_decoder) => {
113+
todo!("Snap has no streaming decoder")
114+
}
115+
#[cfg(feature = "zstandard")]
116+
Decoder::Zstandard(decoder) => {
117+
use zstd::stream::raw::{Operation, Status};
118+
let Status {
119+
bytes_read,
120+
bytes_written,
121+
..
122+
} = decoder
123+
.run_on_buffers(buffer.data(), self.buffer.space())
124+
.map_err(crate::error::Details::ZstdDecompress)?;
125+
buffer.consume(bytes_read);
126+
self.buffer.fill(bytes_written);
127+
128+
&mut self.buffer
129+
}
130+
#[cfg(feature = "bzip")]
131+
Decoder::Bzip2(decoder) => {
132+
let prev_total_in = decoder.total_in();
133+
let prev_total_out = decoder.total_out();
134+
135+
let _status = decoder
136+
.decompress(buffer.data(), self.buffer.space())
137+
.unwrap();
138+
139+
let consumed = decoder.total_in() - prev_total_in;
140+
let filled = decoder.total_out() - prev_total_out;
141+
142+
buffer.consume(usize::try_from(consumed).unwrap());
143+
self.buffer.fill(usize::try_from(filled).unwrap());
144+
145+
&mut self.buffer
146+
}
147+
#[cfg(feature = "xz")]
148+
Decoder::Xz(decoder) => {
149+
use liblzma::stream::Action::Run;
150+
151+
let prev_total_in = decoder.total_in();
152+
let prev_total_out = decoder.total_out();
153+
154+
let _status = decoder
155+
.process(buffer.data(), self.buffer.space(), Run)
156+
.unwrap();
157+
158+
let consumed = decoder.total_in() - prev_total_in;
159+
let filled = decoder.total_out() - prev_total_out;
160+
161+
buffer.consume(usize::try_from(consumed).unwrap());
162+
self.buffer.fill(usize::try_from(filled).unwrap());
163+
164+
&mut self.buffer
165+
}
166+
};
167+
match self
168+
.sub_machine
169+
.take()
170+
.expect("CodecStateMachine was not reset!")
171+
.parse(buffer)?
172+
{
173+
FsmControlFlow::NeedMore(fsm) => {
174+
self.sub_machine = Some(fsm);
175+
Ok(FsmControlFlow::NeedMore(self))
176+
}
177+
FsmControlFlow::Done(result) => {
178+
Ok(FsmControlFlow::Done((result, self)))
179+
}
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)