From 767235a0f70e2baf8647691c581975e0dc435c98 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 14 Nov 2022 02:22:44 +0530 Subject: [PATCH 01/11] strategy1 --- src/routes/metadata.js | 4 ++-- src/routes/uploadToEstuary.js | 1 - src/services/uploadToEstuary.service.js | 14 +++++++++----- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/routes/metadata.js b/src/routes/metadata.js index 8e8952c..4618ec8 100644 --- a/src/routes/metadata.js +++ b/src/routes/metadata.js @@ -3,8 +3,8 @@ const router = express.Router(); const metadataService = require("../services/metadata.service"); -router.get("/datasets/", metadataService.getDatasetMetadata); -router.get("/datasets/published", metadataService.getPublishedDatasets); +router.get("/datasets/", metadataService.getDatasetMetadata); +router.get("/datasets/published", metadataService.getPublishedDatasets); router.get("/datasets/published/byUploader", metadataService.getPublishedDatasetsByUploader); router.get("/datasets/published/search", metadataService.searchPublishedDatasets); router.post("/datasets/publish", metadataService.publishDataset); diff --git a/src/routes/uploadToEstuary.js b/src/routes/uploadToEstuary.js index 02e3b77..f0f88cc 100644 --- a/src/routes/uploadToEstuary.js +++ b/src/routes/uploadToEstuary.js @@ -15,7 +15,6 @@ const storage = multer.diskStorage({ }, }); -// const upload = multer({ dest: `estuaryUploads/${Date.now()}` }); const maxSize = 2 ** 20 * 500; // 2^20 == 1 MiB const upload = multer({ storage: storage, limits: { fileSize: maxSize } }); diff --git a/src/services/uploadToEstuary.service.js b/src/services/uploadToEstuary.service.js index f5710dc..4142653 100644 --- a/src/services/uploadToEstuary.service.js +++ b/src/services/uploadToEstuary.service.js @@ -13,6 +13,7 @@ const { msgCache } = require("../init"); const dbWrapper = require("../utils/dbWrapper"); const estuaryWrapper = require("../utils/estuaryWrapper"); const utils = require("../utils/utils"); +const { error } = require("console"); const runBidsValidation = async (pathToDirectory) => { return new Promise((resolve) => { @@ -31,7 +32,9 @@ const runBidsValidation = async (pathToDirectory) => { (issues, summary) => { if (issues.errors.length > 0) { console.log("BIDS validation failed"); - resolve(); + console.log(issues.errors); + resolve({ summary: summary, issues: issues }); + } else { console.log("BIDS validation succeeded"); resolve({ summary: summary, issues: issues }); @@ -229,7 +232,6 @@ const insertMetadata = async (datasetMetadata, chunkMetadata, files) => { return false; } - // Chunk for (let numAttempts = 0; numAttempts < maxAttempts; numAttempts++) { chunk = generateChunk({ datasetId: dataset._id, @@ -312,19 +314,21 @@ const uploadFiles = async (req, res) => { const validatorData = await runBidsValidation(userDefinedRootDirLocal); if (!validatorData) { - await utils.removeFiles(timestampedFolder); - return res.status(400).json({ error: "BIDS validation failed." }); + await utils.removeFiles(timestampedFolder); + return res.status(400).json({ error: "BIDS validation failed." + validatorData.issues.key }); } - +// TODO: Replace the rest of the code in this function with uploadDirAsCar from estuaryWrapper const { root, filename: carFilename } = await packToFs({ input: userDefinedRootDirLocal, output: `${timestampedFolder}/${userDefinedRootDir}.car`, blockstore: new FsBlockStore(), + maxChunkSize: 262144 }); // Upload file console.log(`${new Date().toISOString()} Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); + const uploadResp = await estuaryWrapper.uploadFile(file, 3); // const uploadResp = { cid: "0x124", estuaryId: "81" }; // THIS LINE IS FOR TESTING ONLY await utils.removeFiles(timestampedFolder); From 7ecb8228b867bcb78dc454d62b46f11a7c1405de Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 14 Nov 2022 19:29:12 +0530 Subject: [PATCH 02/11] Split Car files --- package-lock.json | 43 +++++++++++++++++++ package.json | 1 + src/services/uploadToEstuary.service.js | 4 +- src/utils/estuaryWrapper.js | 56 +++++++++++++++++++++---- 4 files changed, 96 insertions(+), 8 deletions(-) diff --git a/package-lock.json b/package-lock.json index 24115fa..58098b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "axios": "^0.27.2", "bids-validator": "^1.9.3", + "carbites": "^1.0.6", "chai": "^4.3.6", "cors": "^2.8.5", "dotenv": "^16.0.0", @@ -2887,6 +2888,26 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/carbites": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", + "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", + "dependencies": { + "@ipld/car": "^3.0.1", + "@ipld/dag-cbor": "^6.0.3", + "@ipld/dag-pb": "^2.0.2", + "multiformats": "^9.0.4" + } + }, + "node_modules/carbites/node_modules/@ipld/dag-cbor": { + "version": "6.0.15", + "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", + "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", + "dependencies": { + "cborg": "^1.5.4", + "multiformats": "^9.5.4" + } + }, "node_modules/caseless": { "version": "0.12.0", "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", @@ -11157,6 +11178,28 @@ "quick-lru": "^4.0.1" } }, + "carbites": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", + "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", + "requires": { + "@ipld/car": "^3.0.1", + "@ipld/dag-cbor": "^6.0.3", + "@ipld/dag-pb": "^2.0.2", + "multiformats": "^9.0.4" + }, + "dependencies": { + "@ipld/dag-cbor": { + "version": "6.0.15", + "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", + "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", + "requires": { + "cborg": "^1.5.4", + "multiformats": "^9.5.4" + } + } + } + }, "caseless": { "version": "0.12.0", "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", diff --git a/package.json b/package.json index ccb3523..3c5851e 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "dependencies": { "axios": "^0.27.2", "bids-validator": "^1.9.3", + "carbites": "^1.0.6", "chai": "^4.3.6", "cors": "^2.8.5", "dotenv": "^16.0.0", diff --git a/src/services/uploadToEstuary.service.js b/src/services/uploadToEstuary.service.js index 4142653..759e22a 100644 --- a/src/services/uploadToEstuary.service.js +++ b/src/services/uploadToEstuary.service.js @@ -14,6 +14,8 @@ const dbWrapper = require("../utils/dbWrapper"); const estuaryWrapper = require("../utils/estuaryWrapper"); const utils = require("../utils/utils"); const { error } = require("console"); +const { CarReader } = require( '@ipld/car/reader'); +const dagCbor = require ('@ipld/dag-cbor'); const runBidsValidation = async (pathToDirectory) => { return new Promise((resolve) => { @@ -322,12 +324,12 @@ const uploadFiles = async (req, res) => { input: userDefinedRootDirLocal, output: `${timestampedFolder}/${userDefinedRootDir}.car`, blockstore: new FsBlockStore(), - maxChunkSize: 262144 }); // Upload file console.log(`${new Date().toISOString()} Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); + const uploadRespsplit = await estuaryWrapper.splitCars(carFilename, 3); const uploadResp = await estuaryWrapper.uploadFile(file, 3); // const uploadResp = { cid: "0x124", estuaryId: "81" }; // THIS LINE IS FOR TESTING ONLY diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index a19bee9..b4b3238 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -4,8 +4,14 @@ const FormData = require("form-data"); const { packToFs } = require("ipfs-car/pack/fs"); const { FsBlockStore } = require("ipfs-car/blockstore/fs"); const utils = require("./utils"); +const { TreewalkCarSplitter } = require("carbites/treewalk"); +const { CarReader } = require("@ipld/car/reader"); +const dagCbor = require("@ipld/dag-cbor"); -const estuaryEndpoints = ["https://shuttle-4.estuary.tech/content/add", "https://api.estuary.tech/content/add"]; +const estuaryEndpoints = [ + "https://shuttle-4.estuary.tech/content/add", + "https://api.estuary.tech/content/add", +]; module.exports.getPinsList = async () => { try { @@ -65,12 +71,17 @@ module.exports.deleteFile = async (requestid, maxAttempts = 3) => { let numAttempts = 0; while (numAttempts < maxAttempts) { try { - const resp = await axios.delete(`https://api.estuary.tech/pinning/pins/${requestid}`, { - headers: { - Authorization: "Bearer " + process.env.ESTUARY_API_KEY, - }, - }); - console.log(`estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}`); + const resp = await axios.delete( + `https://api.estuary.tech/pinning/pins/${requestid}`, + { + headers: { + Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + }, + } + ); + console.log( + `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` + ); return true; } catch (err) { numAttempts++; @@ -108,3 +119,34 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => { console.log(err); } }; + +module.exports.splitCars = async (largeCar) => { + console.log("Chunking CAR File"); + try { + const bigCar = await CarReader.fromIterable(fs.createReadStream(largeCar)); + const [rootCid] = await bigCar.getRoots(); + const MaxCarSize1MB = 100000000; + let cars = []; + //const targetSize = 64 * 1024 //1024 * 1024 * 100 // chunk to ~100MB CARs or 64 KB + const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); + let uploadResp; + for await (const car of splitter.cars()) { + // Each `car` is an AsyncIterable + const reader = await CarReader.fromIterable(car); + const [splitCarRootCid] = await reader.getRoots(); + console.assert(rootCid.equals(splitCarRootCid)); + for await (const chunk of smallCar) { + let chunkie = fs.createReadStream(chunk); + + uploadResp = await module.exports.uploadFile(chunkie, 3); + + // all cars will have the same root + cars.push(chunk); + } + } + return uploadResp; + } catch (error) { + console.error(error); + return; + } +}; From 530a8b75d06353821127842f75a9855372fa0f6d Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Fri, 18 Nov 2022 03:34:03 +0530 Subject: [PATCH 03/11] progress --- src/utils/estuaryWrapper.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index b4b3238..dc7e539 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -135,7 +135,7 @@ module.exports.splitCars = async (largeCar) => { const reader = await CarReader.fromIterable(car); const [splitCarRootCid] = await reader.getRoots(); console.assert(rootCid.equals(splitCarRootCid)); - for await (const chunk of smallCar) { + for await (const chunk of car) { let chunkie = fs.createReadStream(chunk); uploadResp = await module.exports.uploadFile(chunkie, 3); From 803206c6e20bb13dd273c260844ce2bac5141b79 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 21 Nov 2022 00:47:29 +0530 Subject: [PATCH 04/11] create car chunks --- src/services/uploadToEstuary.service.js | 6 +-- src/utils/estuaryWrapper.js | 71 +++++++++++++++++++------ 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/src/services/uploadToEstuary.service.js b/src/services/uploadToEstuary.service.js index 759e22a..fae21d2 100644 --- a/src/services/uploadToEstuary.service.js +++ b/src/services/uploadToEstuary.service.js @@ -329,12 +329,12 @@ const uploadFiles = async (req, res) => { // Upload file console.log(`${new Date().toISOString()} Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); - const uploadRespsplit = await estuaryWrapper.splitCars(carFilename, 3); + const uploadRespsplit = await estuaryWrapper.splitCars(carFilename.toString(), 3); - const uploadResp = await estuaryWrapper.uploadFile(file, 3); + //const uploadResp = await estuaryWrapper.uploadFile(file, 3); // const uploadResp = { cid: "0x124", estuaryId: "81" }; // THIS LINE IS FOR TESTING ONLY await utils.removeFiles(timestampedFolder); - if (!uploadResp) { + if (!uploadRespsplit) { console.log(`${new Date().toISOString()} Failed to upload ${carFilename} to Estuary`); return res.status(400).json({ error: "An error occurred trying to upload to Estuary. Try again later." }); } diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index dc7e539..abb5653 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -38,6 +38,40 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { const formData = new FormData(); formData.append("data", file); + let numAttempts = 0; + while (numAttempts < maxAttempts) { + try { + // Get URL of shuttle node with most space + const viewerResp = await axios.get("https://api.estuary.tech/viewer", { + headers: { + Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + }, + }); + console.log(viewerResp.data.settings.uploadEndpoints); + + const url = viewerResp.data.settings.uploadEndpoints[0]; + + // Upload file + const resp = await axios.post(url, formData, { + headers: { + Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + }, + maxContentLength: Infinity, + maxBodyLength: Infinity, + }); + return resp.data; + } catch (err) { + numAttempts++; + console.log( + `estuaryWrapper.uploadFile: Error status: ${err.response?.status}. Error code: ${err.code}. Error message: ${err.message}` + ); + } + } +}; + +module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { + const formData = new FormData(); + formData.append("data", file); let numAttempts = 0; while (numAttempts < maxAttempts) { try { @@ -49,10 +83,12 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { }); const url = viewerResp.data.settings.uploadEndpoints[0]; + console.log(url); // Upload file const resp = await axios.post(url, formData, { headers: { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + "Content-Type": "application/octet-stream", }, maxContentLength: Infinity, maxBodyLength: Infinity, @@ -110,7 +146,7 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => { console.log(`Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); - const uploadResp = await module.exports.uploadFile(file, 3); + const uploadResp = await module.exports.uploadFileAsCAR(file, 3); await utils.removeFiles(pathToCar); if (!uploadResp) console.log(`Failed to upload ${carFilename} to Estuary`); @@ -126,24 +162,29 @@ module.exports.splitCars = async (largeCar) => { const bigCar = await CarReader.fromIterable(fs.createReadStream(largeCar)); const [rootCid] = await bigCar.getRoots(); const MaxCarSize1MB = 100000000; + let uploadResp = undefined; + let chunkie; let cars = []; //const targetSize = 64 * 1024 //1024 * 1024 * 100 // chunk to ~100MB CARs or 64 KB - const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); - let uploadResp; - for await (const car of splitter.cars()) { - // Each `car` is an AsyncIterable - const reader = await CarReader.fromIterable(car); - const [splitCarRootCid] = await reader.getRoots(); - console.assert(rootCid.equals(splitCarRootCid)); - for await (const chunk of car) { - let chunkie = fs.createReadStream(chunk); - - uploadResp = await module.exports.uploadFile(chunkie, 3); - - // all cars will have the same root - cars.push(chunk); + + if (largeCar.size <= MaxCarSize1MB) { + cars.push(car); + } else { + // when size exceeds MaxCarSize1MB, split it into an AsyncIterable + const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); + + for await (const smallCar of splitter.cars()) { + for await (const chunk of smallCar) { + cars.push(chunk); + } } } + + for await (const c of cars) { + chunkie = new Buffer.from(c); + uploadResp = await module.exports.uploadFileAsCAR(chunkie, 3); + } + return uploadResp; } catch (error) { console.error(error); From b9ad1142db2973cb06c116d20a49dd1261f5f24b Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 21 Nov 2022 04:08:30 +0530 Subject: [PATCH 05/11] add error handlers --- src/index.js | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/index.js b/src/index.js index ab346a1..e0610a5 100644 --- a/src/index.js +++ b/src/index.js @@ -20,4 +20,28 @@ app.use("/metadata", metadata); app.use("/uploadToEstuary", uploadToEstuary); app.use("/initializeUpload", initializeUpload); +// error handlers + +// development error handler +// will print stacktrace +if (app.get('env') === 'development') { + app.use(function(err, req, res, next) { + res.status(err.status || 500); + res.render('error', { + message: err.message, + error: err + }); + }); +} + +// production error handler +// no stacktraces leaked to user +app.use(function(err, req, res, next) { + res.status(err.status || 500); + res.render('error', { + message: err.message, + error: {} + }); +}); + module.exports = app; From 1510adb08d04ee624d12563e4aca1cdbc8061b57 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 21 Nov 2022 15:11:19 +0530 Subject: [PATCH 06/11] fs promises --- src/utils/estuaryWrapper.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index abb5653..601403a 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -1,5 +1,6 @@ const axios = require("axios"); const fs = require("fs"); +const fsp = require("fs/promises") const FormData = require("form-data"); const { packToFs } = require("ipfs-car/pack/fs"); const { FsBlockStore } = require("ipfs-car/blockstore/fs"); @@ -71,7 +72,9 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { const formData = new FormData(); - formData.append("data", file); + chunkie = new Buffer.from(file); + + formData.append("data", chunkie, "chunk"); let numAttempts = 0; while (numAttempts < maxAttempts) { try { @@ -81,8 +84,8 @@ module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, }); - const url = viewerResp.data.settings.uploadEndpoints[0]; - + //const url = viewerResp.data.settings.uploadEndpoints[0]; + const url = "https://api.estuary.tech/content/add-car" console.log(url); // Upload file const resp = await axios.post(url, formData, { @@ -115,8 +118,8 @@ module.exports.deleteFile = async (requestid, maxAttempts = 3) => { }, } ); - console.log( - `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` + + `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` ); return true; } catch (err) { @@ -181,8 +184,7 @@ module.exports.splitCars = async (largeCar) => { } for await (const c of cars) { - chunkie = new Buffer.from(c); - uploadResp = await module.exports.uploadFileAsCAR(chunkie, 3); + uploadResp = await module.exports.uploadFileAsCAR(c, 3); } return uploadResp; From 80b28ec6edddbfb1a1cae3455c96757027421ef9 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Mon, 21 Nov 2022 18:04:19 +0530 Subject: [PATCH 07/11] log --- src/utils/estuaryWrapper.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index 601403a..0021895 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -1,6 +1,6 @@ const axios = require("axios"); const fs = require("fs"); -const fsp = require("fs/promises") +const fsp = require("fs/promises"); const FormData = require("form-data"); const { packToFs } = require("ipfs-car/pack/fs"); const { FsBlockStore } = require("ipfs-car/blockstore/fs"); @@ -85,7 +85,7 @@ module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { }, }); //const url = viewerResp.data.settings.uploadEndpoints[0]; - const url = "https://api.estuary.tech/content/add-car" + const url = "https://api.estuary.tech/content/add-car"; console.log(url); // Upload file const resp = await axios.post(url, formData, { @@ -119,8 +119,10 @@ module.exports.deleteFile = async (requestid, maxAttempts = 3) => { } ); - `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` + console.log( + `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` ); + return true; } catch (err) { numAttempts++; From 801233237768aef7b79c3bbcd141eef5ec2a9071 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 22 Nov 2022 05:44:12 +0530 Subject: [PATCH 08/11] trail and error --- src/utils/estuaryWrapper.js | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index 0021895..1be7d0a 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -2,12 +2,15 @@ const axios = require("axios"); const fs = require("fs"); const fsp = require("fs/promises"); const FormData = require("form-data"); -const { packToFs } = require("ipfs-car/pack/fs"); +const { packToF } = require("ipfs-car/pack/fs"); const { FsBlockStore } = require("ipfs-car/blockstore/fs"); const utils = require("./utils"); const { TreewalkCarSplitter } = require("carbites/treewalk"); const { CarReader } = require("@ipld/car/reader"); const dagCbor = require("@ipld/dag-cbor"); +const { packToBlob } = require("ipfs-car/pack/blob"); + +const { MemoryBlockStore } = require("ipfs-car/blockstore/memory"); const estuaryEndpoints = [ "https://shuttle-4.estuary.tech/content/add", @@ -72,9 +75,8 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { const formData = new FormData(); - chunkie = new Buffer.from(file); - formData.append("data", chunkie, "chunk"); + //formData.append("data", file, "chunk"); let numAttempts = 0; while (numAttempts < maxAttempts) { try { @@ -84,21 +86,23 @@ module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, }); - //const url = viewerResp.data.settings.uploadEndpoints[0]; - const url = "https://api.estuary.tech/content/add-car"; + const url = viewerResp.data.settings.uploadEndpoints[0]; + //const url = "https://api.estuary.tech/content/add-car"; + //const url = "https://api.web3.storage/car" console.log(url); // Upload file - const resp = await axios.post(url, formData, { + const resp = await axios.post(url, file, { headers: { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, - "Content-Type": "application/octet-stream", }, maxContentLength: Infinity, maxBodyLength: Infinity, }); + console.log(resp.data) return resp.data; } catch (err) { numAttempts++; + console.log(err) console.log( `estuaryWrapper.uploadFile: Error status: ${err.response?.status}. Error code: ${err.code}. Error message: ${err.message}` ); @@ -173,13 +177,23 @@ module.exports.splitCars = async (largeCar) => { //const targetSize = 64 * 1024 //1024 * 1024 * 100 // chunk to ~100MB CARs or 64 KB if (largeCar.size <= MaxCarSize1MB) { - cars.push(car); + cars.push(largeCar); } else { // when size exceeds MaxCarSize1MB, split it into an AsyncIterable const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); for await (const smallCar of splitter.cars()) { + // const reader = await CarReader.fromIterable(smallCar); + // const [splitCarRootCid] = await reader.getRoots(); + // console.log(splitCarRootCid) + for await (const chunk of smallCar) { + const { root, car } = await packToBlob({ + input: chunk, + blockstore: new MemoryBlockStore(), + }); + console.log(root); + console.log(car); cars.push(chunk); } } From 4b9aa81a670b3c9562761b4b01fc5f1fe9b573e0 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 22 Nov 2022 18:42:52 +0530 Subject: [PATCH 09/11] it works --- src/utils/estuaryWrapper.js | 40 ++++++++++++++++++------------------- src/utils/utils.js | 1 - 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index 1be7d0a..954f941 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -9,8 +9,9 @@ const { TreewalkCarSplitter } = require("carbites/treewalk"); const { CarReader } = require("@ipld/car/reader"); const dagCbor = require("@ipld/dag-cbor"); const { packToBlob } = require("ipfs-car/pack/blob"); - const { MemoryBlockStore } = require("ipfs-car/blockstore/memory"); +const { pack } = require("ipfs-car/pack"); +const { Buffer } = require("buffer"); const estuaryEndpoints = [ "https://shuttle-4.estuary.tech/content/add", @@ -75,8 +76,8 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { const formData = new FormData(); - - //formData.append("data", file, "chunk"); +const chunkie = Buffer.from(file) + formData.append("data", chunkie, "chunk"); let numAttempts = 0; while (numAttempts < maxAttempts) { try { @@ -86,23 +87,23 @@ module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, }); - const url = viewerResp.data.settings.uploadEndpoints[0]; - //const url = "https://api.estuary.tech/content/add-car"; + //const url = viewerResp.data.settings.uploadEndpoints[0]; + const url = "https://api.estuary.tech/content/add-car"; //const url = "https://api.web3.storage/car" console.log(url); // Upload file - const resp = await axios.post(url, file, { + const resp = await axios.post(url, formData, { headers: { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, maxContentLength: Infinity, maxBodyLength: Infinity, }); - console.log(resp.data) + console.log(resp.data); return resp.data; } catch (err) { numAttempts++; - console.log(err) + console.log(err.response); console.log( `estuaryWrapper.uploadFile: Error status: ${err.response?.status}. Error code: ${err.code}. Error message: ${err.message}` ); @@ -174,6 +175,13 @@ module.exports.splitCars = async (largeCar) => { let uploadResp = undefined; let chunkie; let cars = []; + + const { root, out } = await pack({ + input: fs.createReadStream(largeCar), + blockstore: new MemoryBlockStore(), + }); + console.log(root); + console.log(out); //const targetSize = 64 * 1024 //1024 * 1024 * 100 // chunk to ~100MB CARs or 64 KB if (largeCar.size <= MaxCarSize1MB) { @@ -183,29 +191,21 @@ module.exports.splitCars = async (largeCar) => { const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); for await (const smallCar of splitter.cars()) { - // const reader = await CarReader.fromIterable(smallCar); - // const [splitCarRootCid] = await reader.getRoots(); - // console.log(splitCarRootCid) - for await (const chunk of smallCar) { - const { root, car } = await packToBlob({ - input: chunk, - blockstore: new MemoryBlockStore(), - }); - console.log(root); - console.log(car); + + cars.push(chunk); } } } - for await (const c of cars) { + for await (const c of out) { uploadResp = await module.exports.uploadFileAsCAR(c, 3); } return uploadResp; } catch (error) { - console.error(error); + console.error(error.response); return; } }; diff --git a/src/utils/utils.js b/src/utils/utils.js index 99248eb..6c3e4f2 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -10,7 +10,6 @@ module.exports.removeFiles = async (pathToFiles) => { await fse.remove(pathToFiles); console.log(`Removed ${pathToFiles}`); } catch (err) { - console.error(err); } }; From 25d714b72c1515edab0b9c46450a6e45d71b4e51 Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Tue, 22 Nov 2022 18:45:34 +0530 Subject: [PATCH 10/11] get cid and estuaryid --- src/services/uploadToEstuary.service.js | 3 +-- src/utils/estuaryWrapper.js | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/services/uploadToEstuary.service.js b/src/services/uploadToEstuary.service.js index fae21d2..3ab526d 100644 --- a/src/services/uploadToEstuary.service.js +++ b/src/services/uploadToEstuary.service.js @@ -338,8 +338,7 @@ const uploadFiles = async (req, res) => { console.log(`${new Date().toISOString()} Failed to upload ${carFilename} to Estuary`); return res.status(400).json({ error: "An error occurred trying to upload to Estuary. Try again later." }); } - const newUploadCid = uploadResp.cid; - const newUploadEstuaryId = uploadResp.estuaryId; + // Delete this file from Estuary and exit if the user has already uploaded a file with this CID const matchingChunkDocuments = await dbWrapper.getChunks({ "storageIds.cid": newUploadCid }); diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index 954f941..12392dd 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -192,8 +192,7 @@ module.exports.splitCars = async (largeCar) => { for await (const smallCar of splitter.cars()) { for await (const chunk of smallCar) { - - + cars.push(chunk); } } @@ -201,6 +200,9 @@ module.exports.splitCars = async (largeCar) => { for await (const c of out) { uploadResp = await module.exports.uploadFileAsCAR(c, 3); + const newUploadCid = uploadResp.cid; + const newUploadEstuaryId = uploadResp.estuaryId; + } return uploadResp; From 670b1ff4dda4a0c83d6b6fe42cefe4962bab9b6a Mon Sep 17 00:00:00 2001 From: Rashmi V Abbigeri Date: Wed, 23 Nov 2022 01:36:19 +0530 Subject: [PATCH 11/11] todos --- src/utils/estuaryWrapper.js | 39 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index 12392dd..e7f9d22 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -76,7 +76,7 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { const formData = new FormData(); -const chunkie = Buffer.from(file) + const chunkie = Buffer.from(file); formData.append("data", chunkie, "chunk"); let numAttempts = 0; while (numAttempts < maxAttempts) { @@ -175,36 +175,23 @@ module.exports.splitCars = async (largeCar) => { let uploadResp = undefined; let chunkie; let cars = []; - - const { root, out } = await pack({ - input: fs.createReadStream(largeCar), - blockstore: new MemoryBlockStore(), - }); - console.log(root); - console.log(out); - //const targetSize = 64 * 1024 //1024 * 1024 * 100 // chunk to ~100MB CARs or 64 KB - if (largeCar.size <= MaxCarSize1MB) { cars.push(largeCar); } else { - // when size exceeds MaxCarSize1MB, split it into an AsyncIterable - const splitter = new TreewalkCarSplitter(bigCar, MaxCarSize1MB); - - for await (const smallCar of splitter.cars()) { - for await (const chunk of smallCar) { - - cars.push(chunk); - } + const { root, out } = await pack({ + input: fs.createReadStream(largeCar), + blockstore: new MemoryBlockStore(), + }); + console.log(root); + console.log(out); + for await (const c of out) { + uploadResp = await module.exports.uploadFileAsCAR(c, 3); + const newUploadCid = uploadResp.cid; + const newUploadEstuaryId = uploadResp.estuaryId; + // TODO: Store the chunks metadata, ie their cid, retrival url and estuaryID in Dataset Metadata + // TODO: Find a way to unpack and display the CAR file after combining the CHunks } } - - for await (const c of out) { - uploadResp = await module.exports.uploadFileAsCAR(c, 3); - const newUploadCid = uploadResp.cid; - const newUploadEstuaryId = uploadResp.estuaryId; - - } - return uploadResp; } catch (error) { console.error(error.response);