Skip to content

Commit 0105378

Browse files
Implement coerce_types for Timestamp(Second), Interval(MonthDayNano)
- Timestamp(Second) with coerce_types=true: store as INT64 with LogicalType::Timestamp(MILLIS), values multiplied by 1000 - Interval(MonthDayNano) with coerce_types=true: store as 12-byte Parquet INTERVAL, nanoseconds truncated to milliseconds - Interval(MonthDayNano) with coerce_types=false: store as 16-byte raw FIXED_LEN_BYTE_ARRAY, preserving full nanosecond precision - Add reader support for MonthDayNano from both 12-byte and 16-byte representations - Add apply_hint rule for FixedSizeBinary(16) -> Interval(MonthDayNano) - Remove NYI error for writing IntervalMonthDayNanoArray - Add schema, round-trip, and edge-case tests
1 parent 68851ef commit 0105378

5 files changed

Lines changed: 544 additions & 28 deletions

File tree

parquet/src/arrow/array_reader/fixed_len_byte_array.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ use crate::errors::{ParquetError, Result};
2828
use crate::schema::types::ColumnDescPtr;
2929
use arrow_array::{
3030
ArrayRef, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
31-
FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalYearMonthArray,
31+
FixedSizeBinaryArray, Float16Array, IntervalDayTimeArray, IntervalMonthDayNanoArray,
32+
IntervalYearMonthArray,
3233
};
33-
use arrow_buffer::{Buffer, IntervalDayTime, i256};
34+
use arrow_buffer::{Buffer, IntervalDayTime, IntervalMonthDayNano, i256};
3435
use arrow_data::ArrayDataBuilder;
3536
use arrow_schema::{DataType as ArrowType, IntervalUnit};
3637
use bytes::Bytes;
@@ -96,6 +97,14 @@ pub fn make_fixed_len_byte_array_reader(
9697
));
9798
}
9899
}
100+
ArrowType::Interval(IntervalUnit::MonthDayNano) => {
101+
if byte_length != 12 && byte_length != 16 {
102+
return Err(general_err!(
103+
"MonthDayNano interval must be 12 or 16 bytes, got {}",
104+
byte_length
105+
));
106+
}
107+
}
99108
ArrowType::Interval(_) => {
100109
if byte_length != 12 {
101110
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
@@ -222,7 +231,31 @@ impl ArrayReader for FixedLenByteArrayReader {
222231
Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
223232
}
224233
IntervalUnit::MonthDayNano => {
225-
return Err(nyi_err!("MonthDayNano intervals not supported"));
234+
if binary.value_length() == 16 {
235+
// Raw 16-byte: months(4) + days(4) + nanoseconds(8)
236+
let f = |b: &[u8]| {
237+
IntervalMonthDayNano::new(
238+
i32::from_le_bytes(b[0..4].try_into().unwrap()),
239+
i32::from_le_bytes(b[4..8].try_into().unwrap()),
240+
i64::from_le_bytes(b[8..16].try_into().unwrap()),
241+
)
242+
};
243+
Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f))
244+
as ArrayRef
245+
} else {
246+
// Coerced 12-byte: months(4) + days(4) + millis(4)
247+
let f = |b: &[u8]| {
248+
let millis =
249+
i32::from_le_bytes(b[8..12].try_into().unwrap());
250+
IntervalMonthDayNano::new(
251+
i32::from_le_bytes(b[0..4].try_into().unwrap()),
252+
i32::from_le_bytes(b[4..8].try_into().unwrap()),
253+
millis as i64 * 1_000_000,
254+
)
255+
};
256+
Arc::new(IntervalMonthDayNanoArray::from_unary(&binary, f))
257+
as ArrayRef
258+
}
226259
}
227260
}
228261
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 295 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,11 +1605,13 @@ pub(crate) mod tests {
16051605
use arrow_array::cast::AsArray;
16061606
use arrow_array::types::{
16071607
Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1608-
DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1609-
Time64MicrosecondType,
1608+
DecimalType, Float16Type, Float32Type, Float64Type, IntervalMonthDayNanoType,
1609+
Time32MillisecondType, Time64MicrosecondType, TimestampMillisecondType,
16101610
};
16111611
use arrow_array::*;
1612-
use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1612+
use arrow_buffer::{
1613+
ArrowNativeType, Buffer, IntervalDayTime, IntervalMonthDayNano, NullBuffer, i256,
1614+
};
16131615
use arrow_data::{ArrayData, ArrayDataBuilder};
16141616
use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit};
16151617
use arrow_select::concat::concat_batches;
@@ -2176,6 +2178,296 @@ pub(crate) mod tests {
21762178

21772179
Ok(())
21782180
}
2181+
#[test]
2182+
fn test_timestamp_second_roundtrip() -> Result<()> {
2183+
use arrow_array::TimestampSecondArray;
2184+
2185+
let schema = Arc::new(Schema::new(vec![
2186+
Field::new(
2187+
"ts-second-no-tz",
2188+
ArrowDataType::Timestamp(TimeUnit::Second, None),
2189+
false,
2190+
),
2191+
Field::new(
2192+
"ts-second-utc",
2193+
ArrowDataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
2194+
false,
2195+
),
2196+
]));
2197+
2198+
let mut default_buf = Vec::with_capacity(1024);
2199+
let mut coerce_buf = Vec::with_capacity(1024);
2200+
2201+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2202+
2203+
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2204+
let mut coerce_writer =
2205+
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2206+
2207+
let original = RecordBatch::try_new(
2208+
schema,
2209+
vec![
2210+
Arc::new(TimestampSecondArray::from(vec![
2211+
0, 1, -1, 1_000_000, -1_000_000,
2212+
])),
2213+
Arc::new(
2214+
TimestampSecondArray::from(vec![0, 1, -1, 1_000_000, -1_000_000])
2215+
.with_timezone("UTC"),
2216+
),
2217+
],
2218+
)?;
2219+
2220+
default_writer.write(&original)?;
2221+
coerce_writer.write(&original)?;
2222+
2223+
default_writer.close()?;
2224+
coerce_writer.close()?;
2225+
2226+
let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2227+
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2228+
2229+
let default_ret = default_reader.next().unwrap()?;
2230+
let coerce_ret = coerce_reader.next().unwrap()?;
2231+
2232+
// Default writer: lossless round-trip
2233+
assert_eq!(default_ret, original);
2234+
2235+
// Coerce writer: values are stored as milliseconds, read back as Timestamp(Millisecond)
2236+
// Values should be original * 1000
2237+
let coerce_no_tz = coerce_ret
2238+
.column(0)
2239+
.as_primitive::<TimestampMillisecondType>();
2240+
let coerce_utc = coerce_ret
2241+
.column(1)
2242+
.as_primitive::<TimestampMillisecondType>();
2243+
assert_eq!(
2244+
coerce_no_tz.values().as_ref(),
2245+
&[0, 1000, -1000, 1_000_000_000, -1_000_000_000]
2246+
);
2247+
assert_eq!(
2248+
coerce_utc.values().as_ref(),
2249+
&[0, 1000, -1000, 1_000_000_000, -1_000_000_000]
2250+
);
2251+
2252+
Ok(())
2253+
}
2254+
2255+
#[test]
2256+
fn test_interval_month_day_nano_roundtrip() -> Result<()> {
2257+
use arrow_array::IntervalMonthDayNanoArray;
2258+
use arrow_buffer::IntervalMonthDayNano;
2259+
2260+
let schema = Arc::new(Schema::new(vec![Field::new(
2261+
"interval-mdn",
2262+
ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano),
2263+
false,
2264+
)]));
2265+
2266+
let mut default_buf = Vec::with_capacity(1024);
2267+
let mut coerce_buf = Vec::with_capacity(1024);
2268+
2269+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2270+
2271+
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2272+
let mut coerce_writer =
2273+
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2274+
2275+
let original = RecordBatch::try_new(
2276+
schema,
2277+
vec![Arc::new(IntervalMonthDayNanoArray::from(vec![
2278+
IntervalMonthDayNano::new(1, 2, 3_000_000), // exactly 3ms
2279+
IntervalMonthDayNano::new(-1, -2, -3_000_000), // exactly -3ms
2280+
IntervalMonthDayNano::new(12, 30, 5_500_000_000), // 5500ms = 5.5s
2281+
IntervalMonthDayNano::new(0, 0, 999_999), // sub-millisecond
2282+
]))],
2283+
)?;
2284+
2285+
default_writer.write(&original)?;
2286+
coerce_writer.write(&original)?;
2287+
2288+
default_writer.close()?;
2289+
coerce_writer.close()?;
2290+
2291+
let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2292+
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2293+
2294+
let default_ret = default_reader.next().unwrap()?;
2295+
let coerce_ret = coerce_reader.next().unwrap()?;
2296+
2297+
// Default writer (16-byte raw): lossless round-trip
2298+
assert_eq!(default_ret, original);
2299+
2300+
// Coerce writer (12-byte INTERVAL): nanoseconds truncated to milliseconds
2301+
let coerce_col = coerce_ret
2302+
.column(0)
2303+
.as_primitive::<IntervalMonthDayNanoType>();
2304+
assert_eq!(
2305+
coerce_col.value(0),
2306+
IntervalMonthDayNano::new(1, 2, 3_000_000)
2307+
); // exact
2308+
assert_eq!(
2309+
coerce_col.value(1),
2310+
IntervalMonthDayNano::new(-1, -2, -3_000_000)
2311+
); // exact
2312+
assert_eq!(
2313+
coerce_col.value(2),
2314+
IntervalMonthDayNano::new(12, 30, 5_500_000_000)
2315+
); // exact
2316+
assert_eq!(coerce_col.value(3), IntervalMonthDayNano::new(0, 0, 0)); // sub-ms truncated to 0
2317+
2318+
Ok(())
2319+
}
2320+
2321+
#[test]
2322+
fn test_timestamp_second_coerce_edge_cases() -> Result<()> {
2323+
use arrow_array::TimestampSecondArray;
2324+
2325+
let schema = Arc::new(Schema::new(vec![Field::new(
2326+
"ts",
2327+
ArrowDataType::Timestamp(TimeUnit::Second, None),
2328+
true,
2329+
)]));
2330+
2331+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2332+
let mut buf = Vec::with_capacity(1024);
2333+
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?;
2334+
2335+
// Edge cases: large values near overflow boundary, zero, nulls
2336+
let max_safe = i64::MAX / 1000; // largest value that won't overflow * 1000
2337+
let min_safe = i64::MIN / 1000;
2338+
let original = RecordBatch::try_new(
2339+
schema,
2340+
vec![Arc::new(TimestampSecondArray::from(vec![
2341+
Some(0),
2342+
Some(max_safe),
2343+
Some(min_safe),
2344+
None,
2345+
Some(1),
2346+
Some(-1),
2347+
]))],
2348+
)?;
2349+
2350+
writer.write(&original)?;
2351+
writer.close()?;
2352+
2353+
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2354+
let ret = reader.next().unwrap()?;
2355+
2356+
let col = ret.column(0).as_primitive::<TimestampMillisecondType>();
2357+
assert_eq!(col.value(0), 0);
2358+
assert_eq!(col.value(1), max_safe * 1000);
2359+
assert_eq!(col.value(2), min_safe * 1000);
2360+
assert!(col.is_null(3)); // null preserved
2361+
assert_eq!(col.value(4), 1000);
2362+
assert_eq!(col.value(5), -1000);
2363+
2364+
Ok(())
2365+
}
2366+
2367+
#[test]
2368+
fn test_interval_month_day_nano_coerce_edge_cases() -> Result<()> {
2369+
use arrow_array::IntervalMonthDayNanoArray;
2370+
2371+
let schema = Arc::new(Schema::new(vec![Field::new(
2372+
"interval-mdn",
2373+
ArrowDataType::Interval(arrow_schema::IntervalUnit::MonthDayNano),
2374+
true,
2375+
)]));
2376+
2377+
// Test coerce_types=false: lossless with nulls
2378+
let mut default_buf = Vec::with_capacity(1024);
2379+
let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2380+
2381+
let original = RecordBatch::try_new(
2382+
schema.clone(),
2383+
vec![Arc::new(IntervalMonthDayNanoArray::from(vec![
2384+
Some(IntervalMonthDayNano::new(1, 2, -3_000_000)), // negative ms
2385+
Some(IntervalMonthDayNano::new(0, 0, -999_999)), // negative sub-ms
2386+
Some(IntervalMonthDayNano::new(0, 0, 2_000_000_000_000)), // large ns
2387+
None, // null
2388+
]))],
2389+
)?;
2390+
2391+
default_writer.write(&original)?;
2392+
default_writer.close()?;
2393+
2394+
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2395+
let ret = reader.next().unwrap()?;
2396+
// Raw 16-byte: lossless round-trip including nulls
2397+
assert_eq!(ret, original);
2398+
2399+
// Test coerce_types=true: truncation edge cases
2400+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2401+
let mut coerce_buf = Vec::with_capacity(1024);
2402+
let mut coerce_writer = ArrowWriter::try_new(&mut coerce_buf, schema, Some(coerce_props))?;
2403+
2404+
coerce_writer.write(&original)?;
2405+
coerce_writer.close()?;
2406+
2407+
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2408+
let ret = reader.next().unwrap()?;
2409+
2410+
let col = ret.column(0).as_primitive::<IntervalMonthDayNanoType>();
2411+
// -3_000_000 ns / 1_000_000 = -3 ms → -3 * 1_000_000 = -3_000_000 ns (exact)
2412+
assert_eq!(col.value(0), IntervalMonthDayNano::new(1, 2, -3_000_000));
2413+
// -999_999 ns / 1_000_000 = 0 ms (truncation toward zero) → 0 ns
2414+
assert_eq!(col.value(1), IntervalMonthDayNano::new(0, 0, 0));
2415+
// 2_000_000_000_000 ns / 1_000_000 = 2_000_000 ms → 2_000_000 * 1_000_000 = 2_000_000_000_000 ns
2416+
assert_eq!(
2417+
col.value(2),
2418+
IntervalMonthDayNano::new(0, 0, 2_000_000_000_000)
2419+
);
2420+
assert!(col.is_null(3)); // null preserved
2421+
2422+
Ok(())
2423+
}
2424+
2425+
#[test]
2426+
fn test_interval_year_month_day_time_unaffected_by_coerce() -> Result<()> {
2427+
use arrow_array::{IntervalDayTimeArray, IntervalYearMonthArray};
2428+
2429+
let schema = Arc::new(Schema::new(vec![
2430+
Field::new(
2431+
"ym",
2432+
ArrowDataType::Interval(arrow_schema::IntervalUnit::YearMonth),
2433+
false,
2434+
),
2435+
Field::new(
2436+
"dt",
2437+
ArrowDataType::Interval(arrow_schema::IntervalUnit::DayTime),
2438+
false,
2439+
),
2440+
]));
2441+
2442+
let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2443+
let mut buf = Vec::with_capacity(1024);
2444+
let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(coerce_props))?;
2445+
2446+
let original = RecordBatch::try_new(
2447+
schema,
2448+
vec![
2449+
Arc::new(IntervalYearMonthArray::from(vec![0, 12, -6, 100])),
2450+
Arc::new(IntervalDayTimeArray::from(vec![
2451+
IntervalDayTime::new(0, 0),
2452+
IntervalDayTime::new(30, 1000),
2453+
IntervalDayTime::new(-1, -500),
2454+
IntervalDayTime::new(365, 86_400_000),
2455+
])),
2456+
],
2457+
)?;
2458+
2459+
writer.write(&original)?;
2460+
writer.close()?;
2461+
2462+
let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2463+
let ret = reader.next().unwrap()?;
2464+
2465+
// YearMonth and DayTime should round-trip losslessly even with coerce_types=true
2466+
assert_eq!(ret, original);
2467+
2468+
Ok(())
2469+
}
2470+
21792471
struct RandFixedLenGen {}
21802472

21812473
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {

0 commit comments

Comments
 (0)