Skip to content

Commit 2de3db0

Browse files
committed
Fix polars missing dictionary_page_offset
1 parent 706d261 commit 2de3db0

8 files changed

+163
-32
lines changed

src/plan.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export function parquetPlan({ metadata, rowStart = 0, rowEnd = Infinity, columns
6060
startByte: offsetIndexStart,
6161
endByte: offsetIndexStart + chunk.offset_index_length,
6262
},
63-
bounds: { startByte, endByte },
63+
range: { startByte, endByte },
6464
})
6565
} else {
6666
chunks.push({

src/rowgroup.js

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,28 @@ import { flatten } from './utils.js'
1818
* @returns {AsyncRowGroup} resolves to column data
1919
*/
2020
export function readRowGroup(options, { metadata }, groupPlan) {
21-
const { file, compressors, utf8 } = options
22-
2321
/** @type {AsyncColumn[]} */
2422
const asyncColumns = []
25-
/** @type {ParquetParsers} */
26-
const parsers = { ...DEFAULT_PARSERS, ...options.parsers }
2723

2824
// read column data
29-
for (const chunkPlan of groupPlan.chunks) {
30-
const { codec, path_in_schema: pathInSchema, type } = chunkPlan.columnMetadata
25+
for (const chunk of groupPlan.chunks) {
26+
const { data_page_offset, dictionary_page_offset, path_in_schema: pathInSchema } = chunk.columnMetadata
3127
const schemaPath = getSchemaPath(metadata.schema, pathInSchema)
3228
const columnDecoder = {
3329
pathInSchema,
34-
type,
3530
element: schemaPath[schemaPath.length - 1].element,
3631
schemaPath,
37-
codec,
38-
parsers,
39-
compressors,
40-
utf8,
32+
parsers: { ...DEFAULT_PARSERS, ...options.parsers },
33+
...options,
34+
...chunk.columnMetadata,
4135
}
36+
let { startByte, endByte } = chunk.range
4237

4338
// non-offset-index case
44-
if (!('offsetIndex' in chunkPlan)) {
39+
if (!('offsetIndex' in chunk)) {
4540
asyncColumns.push({
4641
pathInSchema,
47-
data: Promise.resolve(file.slice(chunkPlan.range.startByte, chunkPlan.range.endByte))
42+
data: Promise.resolve(options.file.slice(startByte, endByte))
4843
.then(buffer => {
4944
const reader = { view: new DataView(buffer), offset: 0 }
5045
return readColumn(reader, groupPlan, columnDecoder, options.onPage)
@@ -57,37 +52,30 @@ export function readRowGroup(options, { metadata }, groupPlan) {
5752
asyncColumns.push({
5853
pathInSchema,
5954
// fetch offset index
60-
data: Promise.resolve(file.slice(chunkPlan.offsetIndex.startByte, chunkPlan.offsetIndex.endByte))
55+
data: Promise.resolve(options.file.slice(chunk.offsetIndex.startByte, chunk.offsetIndex.endByte))
6156
.then(async arrayBuffer => {
62-
const offsetIndex = readOffsetIndex({ view: new DataView(arrayBuffer), offset: 0 })
6357
// use offset index to read only necessary pages
6458
const { selectStart, selectEnd } = groupPlan
65-
const pages = offsetIndex.page_locations
66-
let startByte = NaN
67-
let endByte = NaN
59+
const pages = readOffsetIndex({ view: new DataView(arrayBuffer), offset: 0 }).page_locations
6860
let skipped = 0
61+
// include dictionary if present, handle polars missing dictionary_page_offset
62+
const hasDict = dictionary_page_offset || data_page_offset < pages[0].offset
6963
for (let i = 0; i < pages.length; i++) {
7064
const page = pages[i]
7165
const pageStart = Number(page.first_row_index)
7266
const pageEnd = i + 1 < pages.length
7367
? Number(pages[i + 1].first_row_index)
7468
: groupPlan.groupRows // last page extends to end of row group
7569
// check if page overlaps with [selectStart, selectEnd)
76-
if (pageStart < selectEnd && pageEnd > selectStart) {
77-
if (Number.isNaN(startByte)) {
78-
startByte = Number(page.offset)
79-
skipped = pageStart
80-
}
70+
if (!skipped && !hasDict && pageEnd > selectStart) {
71+
startByte = Number(page.offset)
72+
skipped = pageStart
73+
}
74+
if (pageStart < selectEnd) {
8175
endByte = Number(page.offset) + page.compressed_page_size
8276
}
8377
}
84-
// include dictionary page so readColumn can decode dictionary-encoded values
85-
const dictOffset = chunkPlan.columnMetadata.dictionary_page_offset
86-
if (dictOffset !== undefined) {
87-
startByte = Number(dictOffset)
88-
skipped = 0
89-
}
90-
const buffer = await file.slice(startByte, endByte)
78+
const buffer = await options.file.slice(startByte, endByte)
9179
const reader = { view: new DataView(buffer), offset: 0 }
9280
// adjust row selection for skipped pages
9381
const adjustedGroupPlan = skipped ? {

src/types.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ interface ChunkFull {
458458
interface ChunkOffsetIndexed {
459459
columnMetadata: ColumnMetaData
460460
offsetIndex: ByteRange
461-
bounds: ByteRange
461+
range: ByteRange
462462
}
463463

464464
export interface ColumnDecoder {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[
2+
[
3+
"alice",
4+
1
5+
],
6+
[
7+
"bob",
8+
2
9+
],
10+
[
11+
"charlie",
12+
3
13+
]
14+
]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
{
2+
"version": 1,
3+
"schema": [
4+
{
5+
"name": "root",
6+
"num_children": 2
7+
},
8+
{
9+
"type": "BYTE_ARRAY",
10+
"repetition_type": "OPTIONAL",
11+
"name": "name",
12+
"converted_type": "UTF8",
13+
"logical_type": {
14+
"type": "STRING"
15+
}
16+
},
17+
{
18+
"type": "INT64",
19+
"repetition_type": "OPTIONAL",
20+
"name": "value"
21+
}
22+
],
23+
"num_rows": 3,
24+
"row_groups": [
25+
{
26+
"columns": [
27+
{
28+
"file_offset": 91,
29+
"meta_data": {
30+
"type": "BYTE_ARRAY",
31+
"encodings": [
32+
"PLAIN",
33+
"RLE",
34+
"RLE_DICTIONARY"
35+
],
36+
"path_in_schema": [
37+
"name"
38+
],
39+
"codec": "UNCOMPRESSED",
40+
"num_values": 3,
41+
"total_uncompressed_size": 87,
42+
"total_compressed_size": 87,
43+
"data_page_offset": 4,
44+
"statistics": {
45+
"null_count": 0,
46+
"max_value": "charlie",
47+
"min_value": "alice"
48+
}
49+
},
50+
"offset_index_offset": 337,
51+
"offset_index_length": 10,
52+
"column_index_offset": 279,
53+
"column_index_length": 27
54+
},
55+
{
56+
"file_offset": 226,
57+
"meta_data": {
58+
"type": "INT64",
59+
"encodings": [
60+
"PLAIN",
61+
"RLE",
62+
"RLE_DICTIONARY"
63+
],
64+
"path_in_schema": [
65+
"value"
66+
],
67+
"codec": "UNCOMPRESSED",
68+
"num_values": 3,
69+
"total_uncompressed_size": 88,
70+
"total_compressed_size": 88,
71+
"data_page_offset": 138,
72+
"statistics": {
73+
"null_count": 0,
74+
"max_value": 3,
75+
"min_value": 1
76+
}
77+
},
78+
"offset_index_offset": 347,
79+
"offset_index_length": 11,
80+
"column_index_offset": 306,
81+
"column_index_length": 31
82+
}
83+
],
84+
"total_byte_size": 175,
85+
"num_rows": 3,
86+
"file_offset": 4,
87+
"total_compressed_size": 175,
88+
"ordinal": 0
89+
}
90+
],
91+
"key_value_metadata": [
92+
{
93+
"key": "ARROW:schema",
94+
"value": "/////6kAAAAEAAAA8v///xQAAAAEAAEAAAAKAAsACAAKAAQA+P///wwAAAAIAAgAAAAEAAIAAABAAAAABAAAALT///8oAAAAEAAAAAgAAAABAgAAAAAAAPT///9AAAAAAQAAAAgACQAEAAgABQAAAHZhbHVlAAAA7P///ywAAAAgAAAAGAAAAAEUAAAQABIABAAQABEACAAAAAwAAAAAAPz///8EAAQABAAAAG5hbWUA"
95+
}
96+
],
97+
"created_by": "Polars",
98+
"metadata_length": 464
99+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[
2+
[
3+
{
4+
"page_locations": [
5+
{
6+
"offset": 44,
7+
"compressed_page_size": 47,
8+
"first_row_index": 0
9+
}
10+
]
11+
},
12+
{
13+
"page_locations": [
14+
{
15+
"offset": 175,
16+
"compressed_page_size": 51,
17+
"first_row_index": 0
18+
}
19+
]
20+
}
21+
]
22+
]
830 Bytes
Binary file not shown.

test/read.test.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,14 @@ describe('parquetRead', () => {
257257
expect(counting.bytes).toBe(892)
258258
})
259259

260+
it('uses OffsetIndex when dictionary_page_offset is missing (polars)', async () => {
261+
// polars writes RLE_DICTIONARY columns without setting dictionary_page_offset
262+
const file = await asyncBufferFromFile('test/files/offset_index_no_dict_offset.parquet')
263+
const allRows = await parquetReadObjects({ file })
264+
const rows = await parquetReadObjects({ file, rowEnd: 1, useOffsetIndex: true })
265+
expect(rows).toEqual(allRows.slice(0, 1))
266+
})
267+
260268
it('reads only required row groups on the boundary', async () => {
261269
const originalFile = await asyncBufferFromFile('test/files/alpha.parquet')
262270
const metadata = await parquetMetadataAsync(originalFile)

0 commit comments

Comments
 (0)