Skip to content

Commit 1cbf065

Browse files
committed
fix: add /register-iceberg endpoint to bypass miniflare R2 list() bug
Miniflare R2 list() doesn't reliably return objects even when uploaded through the Worker's DATA_BUCKET binding. Add /register-iceberg endpoint that accepts an explicit metadata.json key and uses only R2 get()/head() (which work reliably) to discover and register Iceberg tables. Seed script now uses this instead of query-based lazy loading.
1 parent d727668 commit 1cbf065

File tree

3 files changed

+102
-16
lines changed

3 files changed

+102
-16
lines changed

scripts/seed-local-r2.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -122,22 +122,28 @@ async function seedIcebergTable(dir: string): Promise<void> {
122122
console.log(` PUT ${r2Key} (${(data.length / 1024).toFixed(0)} KB)`);
123123
await uploadViaWorker(r2Key, data);
124124
}
125-
// Trigger Iceberg discovery via query (should work immediately since R2 list() sees objects)
126-
await new Promise(r => setTimeout(r, 500));
127-
try {
128-
const resp = await fetch(`${BASE_URL}/query`, {
129-
method: "POST",
130-
headers: { "content-type": "application/json" },
131-
body: JSON.stringify({ table: name, filters: [], projections: ["id"], limit: 1 }),
132-
});
133-
if (resp.ok) {
134-
console.log(` Iceberg registered: ${name}`);
135-
} else {
136-
const text = await resp.text();
137-
console.log(` Iceberg registration: ${resp.status} (${text.slice(0, 80)})`);
125+
// Register Iceberg table explicitly (bypasses R2 list() which is unreliable in miniflare)
126+
// Find the metadata.json key we just uploaded
127+
const metadataFiles = walkDir(dir)
128+
.map(f => `${name}/${relative(dir, f)}`)
129+
.filter(k => k.endsWith(".metadata.json"));
130+
const metadataKey = metadataFiles[metadataFiles.length - 1]; // latest
131+
if (metadataKey) {
132+
try {
133+
const resp = await fetch(`${BASE_URL}/register-iceberg`, {
134+
method: "POST",
135+
headers: { "content-type": "application/json" },
136+
body: JSON.stringify({ table: name, metadataKey }),
137+
});
138+
if (resp.ok) {
139+
console.log(` Iceberg registered:`, await resp.json());
140+
} else {
141+
const text = await resp.text();
142+
console.log(` Iceberg registration: ${resp.status} (${text.slice(0, 80)})`);
143+
}
144+
} catch (err) {
145+
console.log(` Iceberg registration error: ${String(err).slice(0, 80)}`);
138146
}
139-
} catch (err) {
140-
console.log(` Iceberg registration error: ${String(err).slice(0, 80)}`);
141147
}
142148
}
143149

src/query-do.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export class QueryDO implements DurableObject {
8080
case "/tables": return this.handleListTables();
8181
case "/meta": return this.handleGetMeta(request);
8282
case "/diagnostics": return this.handleDiagnostics();
83+
case "/register-iceberg": return this.handleRegisterIceberg(request);
8384
default: return new Response("Not found", { status: 404 });
8485
}
8586
}
@@ -999,6 +1000,84 @@ export class QueryDO implements DurableObject {
9991000
return null;
10001001
}
10011002

1003+
/** Register an Iceberg table by explicit metadata path (bypasses R2 list()). */
1004+
private async handleRegisterIceberg(request: Request): Promise<Response> {
1005+
const { table, metadataKey } = (await request.json()) as { table: string; metadataKey: string };
1006+
if (!table || !metadataKey) return this.json({ error: "Missing table or metadataKey" }, 400);
1007+
1008+
const result = await this.loadIcebergByKey(table, metadataKey);
1009+
if (!result) return this.json({ error: "Failed to load Iceberg metadata" }, 500);
1010+
return this.json({ registered: true, table, totalRows: result.totalRows, files: result.parquetFiles.length });
1011+
}
1012+
1013+
/** Load an Iceberg table from an explicit metadata.json key (no R2 list() needed). */
1014+
private async loadIcebergByKey(tableName: string, metadataKey: string): Promise<IcebergDatasetMeta | null> {
1015+
const metaObj = await this.env.DATA_BUCKET.get(metadataKey);
1016+
if (!metaObj) return null;
1017+
1018+
const metaJson = await metaObj.text();
1019+
const icebergMeta = parseIcebergMetadata(metaJson);
1020+
if (!icebergMeta) return null;
1021+
1022+
// Derive prefix from metadataKey (e.g., "bench_iceberg_100k/metadata/v1.metadata.json" → "bench_iceberg_100k/")
1023+
const prefix = metadataKey.replace(/metadata\/.*$/, "");
1024+
1025+
const manifestListKey = `${prefix}${icebergMeta.manifestListPath}`;
1026+
const manifestListObj = await this.env.DATA_BUCKET.get(manifestListKey);
1027+
if (!manifestListObj) return null;
1028+
1029+
const manifestListBytes = await manifestListObj.arrayBuffer();
1030+
const parquetPaths = extractParquetPathsFromManifest(manifestListBytes);
1031+
if (parquetPaths.length === 0) return null;
1032+
1033+
// Load each Parquet file's metadata
1034+
const fragmentMetas = new Map<number, TableMeta>();
1035+
let totalRows = 0;
1036+
for (let i = 0; i < parquetPaths.length; i++) {
1037+
const parquetKey = parquetPaths[i].startsWith(prefix) ? parquetPaths[i] : `${prefix}${parquetPaths[i]}`;
1038+
const head = await this.env.DATA_BUCKET.head(parquetKey);
1039+
if (!head) continue;
1040+
1041+
const fileSize = BigInt(head.size);
1042+
const tailObj = await this.env.DATA_BUCKET.get(parquetKey, {
1043+
range: { offset: Math.max(0, Number(fileSize) - 8), length: Math.min(8, Number(fileSize)) },
1044+
});
1045+
if (!tailObj) continue;
1046+
1047+
const tailBuf = await tailObj.arrayBuffer();
1048+
const footerLen = getParquetFooterLength(tailBuf);
1049+
if (!footerLen) continue;
1050+
1051+
const footerObj = await this.env.DATA_BUCKET.get(parquetKey, {
1052+
range: { offset: Number(fileSize) - footerLen - 8, length: footerLen },
1053+
});
1054+
if (!footerObj) continue;
1055+
1056+
const parquetFileMeta = parseParquetFooter(await footerObj.arrayBuffer());
1057+
if (!parquetFileMeta) continue;
1058+
1059+
const meta = parquetMetaToTableMeta(parquetFileMeta, parquetKey, fileSize);
1060+
meta.name = parquetPaths[i];
1061+
fragmentMetas.set(i, meta);
1062+
totalRows += meta.totalRows;
1063+
}
1064+
1065+
if (fragmentMetas.size === 0) return null;
1066+
1067+
const dataset: IcebergDatasetMeta = {
1068+
name: tableName, r2Prefix: prefix,
1069+
schema: icebergMeta.schema, snapshotId: icebergMeta.currentSnapshotId,
1070+
parquetFiles: parquetPaths, fragmentMetas, totalRows, updatedAt: Date.now(),
1071+
};
1072+
this.datasetCache.set(tableName, {
1073+
name: tableName, r2Prefix: prefix,
1074+
manifest: { version: 0, fragments: parquetPaths.map((p, idx) => ({ id: idx, filePath: p, physicalRows: 0 })), totalRows, schema: [] },
1075+
fragmentMetas, totalRows, updatedAt: Date.now(),
1076+
});
1077+
this.evictDatasetCache();
1078+
return dataset;
1079+
}
1080+
10021081
/** Discover an Iceberg table in R2 by listing metadata/ for .metadata.json files. */
10031082
private async loadIcebergFromR2(tableName: string): Promise<IcebergDatasetMeta | null> {
10041083
for (const prefix of [`${tableName}/`, `data/${tableName}/`]) {

src/worker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ export default {
7878
url.pathname === "/query" ||
7979
url.pathname === "/query/stream" ||
8080
url.pathname === "/tables" ||
81-
url.pathname === "/meta"
81+
url.pathname === "/meta" ||
82+
url.pathname === "/register-iceberg"
8283
) {
8384
// Use a deterministic name per region so each region gets one Query DO.
8485
// The CF-Ray header contains the datacenter code (e.g., "SJC", "NRT").

0 commit comments

Comments
 (0)