Skip to content

Commit d727668

Browse files
committed
fix: upload Iceberg files via Worker R2 binding to fix miniflare list()
Miniflare R2 list() doesn't see objects uploaded via wrangler CLI. Add /upload endpoint that writes through the Worker's DATA_BUCKET binding, then use it for Iceberg file uploads in the seed script. This ensures R2 list() in loadIcebergFromR2 can discover the metadata.
1 parent 510a498 commit d727668

2 files changed

Lines changed: 36 additions & 22 deletions

File tree

scripts/seed-local-r2.ts

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,39 +102,43 @@ async function seedParquetFile(filePath: string, r2Key?: string): Promise<void>
102102
}
103103
}
104104

105+
/** Upload file via Worker's /upload endpoint (writes through R2 binding, avoids miniflare list() inconsistency). */
106+
async function uploadViaWorker(r2Key: string, data: Uint8Array): Promise<void> {
107+
const resp = await fetch(`${BASE_URL}/upload?key=${encodeURIComponent(r2Key)}`, {
108+
method: "POST",
109+
body: data,
110+
});
111+
if (!resp.ok) throw new Error(`Upload failed: ${resp.status}`);
112+
}
113+
105114
async function seedIcebergTable(dir: string): Promise<void> {
106115
const name = basename(dir);
107116
console.log(`\nSeeding Iceberg table: ${name}`);
117+
// Upload via Worker's R2 binding (not wrangler CLI) so R2 list() sees the objects
108118
for (const file of walkDir(dir)) {
109119
const relPath = relative(dir, file);
110120
const r2Key = `${name}/${relPath}`;
111121
const data = readFileSync(file);
112122
console.log(` PUT ${r2Key} (${(data.length / 1024).toFixed(0)} KB)`);
113-
uploadFile(r2Key, data);
123+
await uploadViaWorker(r2Key, data);
114124
}
115-
// Trigger Iceberg discovery immediately after upload (R2 list() needs objects to be visible).
116-
// Retry with delays since miniflare R2 list() has eventual consistency.
117-
let ok = false;
118-
for (let attempt = 0; attempt < 8 && !ok; attempt++) {
119-
if (attempt > 0) await new Promise(r => setTimeout(r, 2000));
120-
try {
121-
const resp = await fetch(`${BASE_URL}/query`, {
122-
method: "POST",
123-
headers: { "content-type": "application/json" },
124-
body: JSON.stringify({ table: name, filters: [], projections: ["id"], limit: 1 }),
125-
});
126-
if (resp.ok) {
127-
console.log(` Iceberg registered: ${name}`);
128-
ok = true;
129-
} else if (attempt >= 2) {
130-
// Only log after a few attempts to reduce noise
131-
console.log(` Iceberg attempt ${attempt + 1}: ${resp.status}`);
132-
}
133-
} catch {
134-
if (attempt >= 2) console.log(` Iceberg attempt ${attempt + 1}: fetch error`);
125+
// Trigger Iceberg discovery via query (should work immediately since R2 list() sees objects)
126+
await new Promise(r => setTimeout(r, 500));
127+
try {
128+
const resp = await fetch(`${BASE_URL}/query`, {
129+
method: "POST",
130+
headers: { "content-type": "application/json" },
131+
body: JSON.stringify({ table: name, filters: [], projections: ["id"], limit: 1 }),
132+
});
133+
if (resp.ok) {
134+
console.log(` Iceberg registered: ${name}`);
135+
} else {
136+
const text = await resp.text();
137+
console.log(` Iceberg registration: ${resp.status} (${text.slice(0, 80)})`);
135138
}
139+
} catch (err) {
140+
console.log(` Iceberg registration error: ${String(err).slice(0, 80)}`);
136141
}
137-
if (!ok) console.log(` Iceberg lazy-load deferred (will retry in bench)`);
138142
}
139143

140144
async function main(): Promise<void> {

src/worker.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ export default {
5656
});
5757
}
5858

59+
// Direct R2 upload (local dev only — used by seed script to avoid miniflare list() inconsistency)
60+
if (url.pathname === "/upload" && request.method === "POST") {
61+
const key = url.searchParams.get("key");
62+
if (!key) return new Response("Missing ?key=", { status: 400 });
63+
await env.DATA_BUCKET.put(key, request.body);
64+
return new Response(JSON.stringify({ uploaded: key }), {
65+
headers: { "content-type": "application/json" },
66+
});
67+
}
68+
5969
// Write operations go to the Master DO (single writer)
6070
if (url.pathname === "/write" || url.pathname === "/refresh") {
6171
const masterId = env.MASTER_DO.idFromName("master");

0 commit comments

Comments
 (0)