Skip to content

Commit 996c2bf

Browse files
committed
Add support for Decimal 32 and 64 REE arrays
1 parent 74ffa2f commit 996c2bf

File tree

1 file changed

+158
-6
lines changed
  • parquet/src/arrow/arrow_writer

1 file changed

+158
-6
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 158 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,12 +1343,34 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13431343
write_primitive(typed, array.values(), levels)
13441344
}
13451345
},
1346-
ArrowDataType::RunEndEncoded(_run_ends, _values) => {
1347-
Err(ParquetError::NYI(
1348-
"Int64ColumnWriter: Attempting to write an Arrow RunEndEncoded type that is not yet implemented"
1349-
.to_string(),
1350-
))
1351-
}
1346+
ArrowDataType::RunEndEncoded(_, values_field) => match values_field.data_type() {
1347+
ArrowDataType::Decimal64(_, _) => {
1348+
let array = arrow_cast::cast(column, values_field.data_type())?;
1349+
let array = array
1350+
.as_primitive::<Decimal64Type>()
1351+
.unary::<_, Int64Type>(|v| v);
1352+
write_primitive(typed, array.values(), levels)
1353+
}
1354+
ArrowDataType::Decimal128(_, _) => {
1355+
let array = arrow_cast::cast(column, values_field.data_type())?;
1356+
let array = array
1357+
.as_primitive::<Decimal128Type>()
1358+
.unary::<_, Int64Type>(|v| v as i64);
1359+
write_primitive(typed, array.values(), levels)
1360+
}
1361+
ArrowDataType::Decimal256(_, _) => {
1362+
let array = arrow_cast::cast(column, values_field.data_type())?;
1363+
let array = array
1364+
.as_primitive::<Decimal256Type>()
1365+
.unary::<_, Int64Type>(|v| v.as_i128() as i64);
1366+
write_primitive(typed, array.values(), levels)
1367+
}
1368+
_ => {
1369+
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
1370+
let array = array.as_primitive::<Int64Type>();
1371+
write_primitive(typed, array.values(), levels)
1372+
}
1373+
},
13521374
_ => {
13531375
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
13541376
let array = array.as_primitive::<Int64Type>();
@@ -4688,4 +4710,134 @@ mod tests {
46884710
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
46894711
roundtrip(batch, None);
46904712
}
4713+
4714+
#[test]
4715+
fn arrow_writer_run_end_encoded_decimal32() {
4716+
// Create a run array of Decimal32 values
4717+
let mut builder = PrimitiveRunBuilder::<Int32Type, Decimal32Type>::new();
4718+
builder.extend(
4719+
vec![Some(12345i32); 100000]
4720+
.into_iter()
4721+
.chain(vec![Some(56789i32); 100000]),
4722+
);
4723+
let run_array: RunArray<Int32Type> = builder.finish();
4724+
let schema = Arc::new(Schema::new(vec![Field::new(
4725+
"ree",
4726+
run_array.data_type().clone(),
4727+
run_array.is_nullable(),
4728+
)]));
4729+
4730+
// Write to parquet
4731+
let mut parquet_bytes: Vec<u8> = Vec::new();
4732+
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
4733+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4734+
writer.write(&batch).unwrap();
4735+
writer.close().unwrap();
4736+
4737+
// Read back and verify
4738+
let bytes = Bytes::from(parquet_bytes);
4739+
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4740+
4741+
// Check if dictionary was used by examining the metadata
4742+
let metadata = reader.metadata();
4743+
let row_group = &metadata.row_groups()[0];
4744+
let col_meta = &row_group.columns()[0];
4745+
let has_dict_encoding = col_meta.encodings().any(|e| e == Encoding::RLE_DICTIONARY);
4746+
let has_dict_page = col_meta.dictionary_page_offset().is_some();
4747+
4748+
// Verify the schema is REE encoded when we read it back
4749+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4750+
"ree",
4751+
DataType::RunEndEncoded(
4752+
Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)),
4753+
Arc::new(Field::new(
4754+
"values",
4755+
arrow_schema::DataType::Decimal32(9, 2),
4756+
true,
4757+
)),
4758+
),
4759+
false,
4760+
)]));
4761+
assert_eq!(&expected_schema, reader.schema());
4762+
4763+
// Read the data back
4764+
let batches: Vec<_> = reader
4765+
.build()
4766+
.unwrap()
4767+
.collect::<ArrowResult<Vec<_>>>()
4768+
.unwrap();
4769+
assert_eq!(batches.len(), 196);
4770+
// Count rows in total
4771+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
4772+
assert_eq!(total_rows, 200000);
4773+
4774+
// Ensure dictionary encoding
4775+
assert!(has_dict_encoding, "RunArray should be dictionary encoded");
4776+
assert!(has_dict_page, "RunArray should have dictionary page");
4777+
}
4778+
4779+
#[test]
4780+
fn arrow_writer_run_end_encoded_decimal64() {
4781+
// Create a run array of Decimal64 values
4782+
let mut builder = PrimitiveRunBuilder::<Int32Type, Decimal64Type>::new();
4783+
builder.extend(
4784+
vec![Some(12345i64); 100000]
4785+
.into_iter()
4786+
.chain(vec![Some(56789i64); 100000]),
4787+
);
4788+
let run_array: RunArray<Int32Type> = builder.finish();
4789+
let schema = Arc::new(Schema::new(vec![Field::new(
4790+
"ree",
4791+
run_array.data_type().clone(),
4792+
run_array.is_nullable(),
4793+
)]));
4794+
4795+
// Write to parquet
4796+
let mut parquet_bytes: Vec<u8> = Vec::new();
4797+
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
4798+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4799+
writer.write(&batch).unwrap();
4800+
writer.close().unwrap();
4801+
4802+
// Read back and verify
4803+
let bytes = Bytes::from(parquet_bytes);
4804+
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4805+
4806+
// Check if dictionary was used by examining the metadata
4807+
let metadata = reader.metadata();
4808+
let row_group = &metadata.row_groups()[0];
4809+
let col_meta = &row_group.columns()[0];
4810+
let has_dict_encoding = col_meta.encodings().any(|e| e == Encoding::RLE_DICTIONARY);
4811+
let has_dict_page = col_meta.dictionary_page_offset().is_some();
4812+
4813+
// Verify the schema is REE encoded when we read it back
4814+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4815+
"ree",
4816+
DataType::RunEndEncoded(
4817+
Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)),
4818+
Arc::new(Field::new(
4819+
"values",
4820+
arrow_schema::DataType::Decimal64(18, 6),
4821+
true,
4822+
)),
4823+
),
4824+
false,
4825+
)]));
4826+
assert_eq!(&expected_schema, reader.schema());
4827+
4828+
// Read the data back
4829+
let batches: Vec<_> = reader
4830+
.build()
4831+
.unwrap()
4832+
.collect::<ArrowResult<Vec<_>>>()
4833+
.unwrap();
4834+
assert_eq!(batches.len(), 196);
4835+
// Count rows in total
4836+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
4837+
assert_eq!(total_rows, 200000);
4838+
4839+
// Ensure dictionary encoding
4840+
assert!(has_dict_encoding, "RunArray should be dictionary encoded");
4841+
assert!(has_dict_page, "RunArray should have dictionary page");
4842+
}
46914843
}

0 commit comments

Comments
 (0)