diff --git a/package-lock.json b/package-lock.json index 24115fa..58098b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "axios": "^0.27.2", "bids-validator": "^1.9.3", + "carbites": "^1.0.6", "chai": "^4.3.6", "cors": "^2.8.5", "dotenv": "^16.0.0", @@ -2887,6 +2888,26 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/carbites": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", + "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", + "dependencies": { + "@ipld/car": "^3.0.1", + "@ipld/dag-cbor": "^6.0.3", + "@ipld/dag-pb": "^2.0.2", + "multiformats": "^9.0.4" + } + }, + "node_modules/carbites/node_modules/@ipld/dag-cbor": { + "version": "6.0.15", + "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", + "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", + "dependencies": { + "cborg": "^1.5.4", + "multiformats": "^9.5.4" + } + }, "node_modules/caseless": { "version": "0.12.0", "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", @@ -11157,6 +11178,28 @@ "quick-lru": "^4.0.1" } }, + "carbites": { + "version": "1.0.6", + "resolved": "https://registry.npmjs.org/carbites/-/carbites-1.0.6.tgz", + "integrity": "sha512-dS9IQvnrb5VIRvSTNz5Ff+mB9d2MFfi5mojtJi7Rlss79VeF190jr0sZdA7eW0CGHotvHkZaWuM6wgfD9PEFRg==", + "requires": { + "@ipld/car": "^3.0.1", + "@ipld/dag-cbor": "^6.0.3", + "@ipld/dag-pb": "^2.0.2", + "multiformats": "^9.0.4" + }, + "dependencies": { + "@ipld/dag-cbor": { + "version": "6.0.15", + "resolved": "https://registry.npmjs.org/@ipld/dag-cbor/-/dag-cbor-6.0.15.tgz", + "integrity": "sha512-Vm3VTSTwlmGV92a3C5aeY+r2A18zbH2amehNhsX8PBa3muXICaWrN8Uri85A5hLH7D7ElhE8PdjxD6kNqUmTZA==", + "requires": { + "cborg": "^1.5.4", + "multiformats": "^9.5.4" + } + } + } + }, "caseless": { "version": "0.12.0", "resolved": "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz", diff --git a/package.json b/package.json index ccb3523..3c5851e 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "dependencies": { "axios": "^0.27.2", "bids-validator": "^1.9.3", + "carbites": "^1.0.6", "chai": "^4.3.6", "cors": "^2.8.5", "dotenv": "^16.0.0", diff --git a/src/index.js b/src/index.js index ab346a1..e0610a5 100644 --- a/src/index.js +++ b/src/index.js @@ -20,4 +20,28 @@ app.use("/metadata", metadata); app.use("/uploadToEstuary", uploadToEstuary); app.use("/initializeUpload", initializeUpload); +// error handlers + +// development error handler +// will print stacktrace +if (app.get('env') === 'development') { + app.use(function(err, req, res, next) { + res.status(err.status || 500); + res.render('error', { + message: err.message, + error: err + }); + }); +} + +// production error handler +// no stacktraces leaked to user +app.use(function(err, req, res, next) { + res.status(err.status || 500); + res.render('error', { + message: err.message, + error: {} + }); +}); + module.exports = app; diff --git a/src/routes/metadata.js b/src/routes/metadata.js index 8e8952c..4618ec8 100644 --- a/src/routes/metadata.js +++ b/src/routes/metadata.js @@ -3,8 +3,8 @@ const router = express.Router(); const metadataService = require("../services/metadata.service"); -router.get("/datasets/", metadataService.getDatasetMetadata); -router.get("/datasets/published", metadataService.getPublishedDatasets); +router.get("/datasets/", metadataService.getDatasetMetadata); +router.get("/datasets/published", metadataService.getPublishedDatasets); router.get("/datasets/published/byUploader", metadataService.getPublishedDatasetsByUploader); router.get("/datasets/published/search", metadataService.searchPublishedDatasets); router.post("/datasets/publish", metadataService.publishDataset); diff --git a/src/routes/uploadToEstuary.js b/src/routes/uploadToEstuary.js index 02e3b77..f0f88cc 100644 --- a/src/routes/uploadToEstuary.js +++ b/src/routes/uploadToEstuary.js @@ -15,7 +15,6 @@ const storage = multer.diskStorage({ }, }); -// const upload = multer({ dest: `estuaryUploads/${Date.now()}` }); const maxSize = 2 ** 20 * 500; // 2^20 == 1 MiB const upload = multer({ storage: storage, limits: { fileSize: maxSize } }); diff --git a/src/services/uploadToEstuary.service.js b/src/services/uploadToEstuary.service.js index f5710dc..3ab526d 100644 --- a/src/services/uploadToEstuary.service.js +++ b/src/services/uploadToEstuary.service.js @@ -13,6 +13,9 @@ const { msgCache } = require("../init"); const dbWrapper = require("../utils/dbWrapper"); const estuaryWrapper = require("../utils/estuaryWrapper"); const utils = require("../utils/utils"); +const { error } = require("console"); +const { CarReader } = require( '@ipld/car/reader'); +const dagCbor = require ('@ipld/dag-cbor'); const runBidsValidation = async (pathToDirectory) => { return new Promise((resolve) => { @@ -31,7 +34,9 @@ const runBidsValidation = async (pathToDirectory) => { (issues, summary) => { if (issues.errors.length > 0) { console.log("BIDS validation failed"); - resolve(); + console.log(issues.errors); + resolve({ summary: summary, issues: issues }); + } else { console.log("BIDS validation succeeded"); resolve({ summary: summary, issues: issues }); @@ -229,7 +234,6 @@ const insertMetadata = async (datasetMetadata, chunkMetadata, files) => { return false; } - // Chunk for (let numAttempts = 0; numAttempts < maxAttempts; numAttempts++) { chunk = generateChunk({ datasetId: dataset._id, @@ -312,10 +316,10 @@ const uploadFiles = async (req, res) => { const validatorData = await runBidsValidation(userDefinedRootDirLocal); if (!validatorData) { - await utils.removeFiles(timestampedFolder); - return res.status(400).json({ error: "BIDS validation failed." }); + await utils.removeFiles(timestampedFolder); + return res.status(400).json({ error: "BIDS validation failed." + validatorData.issues.key }); } - +// TODO: Replace the rest of the code in this function with uploadDirAsCar from estuaryWrapper const { root, filename: carFilename } = await packToFs({ input: userDefinedRootDirLocal, output: `${timestampedFolder}/${userDefinedRootDir}.car`, @@ -325,15 +329,16 @@ const uploadFiles = async (req, res) => { // Upload file console.log(`${new Date().toISOString()} Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); - const uploadResp = await estuaryWrapper.uploadFile(file, 3); + const uploadRespsplit = await estuaryWrapper.splitCars(carFilename.toString(), 3); + + //const uploadResp = await estuaryWrapper.uploadFile(file, 3); // const uploadResp = { cid: "0x124", estuaryId: "81" }; // THIS LINE IS FOR TESTING ONLY await utils.removeFiles(timestampedFolder); - if (!uploadResp) { + if (!uploadRespsplit) { console.log(`${new Date().toISOString()} Failed to upload ${carFilename} to Estuary`); return res.status(400).json({ error: "An error occurred trying to upload to Estuary. Try again later." }); } - const newUploadCid = uploadResp.cid; - const newUploadEstuaryId = uploadResp.estuaryId; + // Delete this file from Estuary and exit if the user has already uploaded a file with this CID const matchingChunkDocuments = await dbWrapper.getChunks({ "storageIds.cid": newUploadCid }); diff --git a/src/utils/estuaryWrapper.js b/src/utils/estuaryWrapper.js index a19bee9..e7f9d22 100644 --- a/src/utils/estuaryWrapper.js +++ b/src/utils/estuaryWrapper.js @@ -1,11 +1,22 @@ const axios = require("axios"); const fs = require("fs"); +const fsp = require("fs/promises"); const FormData = require("form-data"); -const { packToFs } = require("ipfs-car/pack/fs"); +const { packToF } = require("ipfs-car/pack/fs"); const { FsBlockStore } = require("ipfs-car/blockstore/fs"); const utils = require("./utils"); +const { TreewalkCarSplitter } = require("carbites/treewalk"); +const { CarReader } = require("@ipld/car/reader"); +const dagCbor = require("@ipld/dag-cbor"); +const { packToBlob } = require("ipfs-car/pack/blob"); +const { MemoryBlockStore } = require("ipfs-car/blockstore/memory"); +const { pack } = require("ipfs-car/pack"); +const { Buffer } = require("buffer"); -const estuaryEndpoints = ["https://shuttle-4.estuary.tech/content/add", "https://api.estuary.tech/content/add"]; +const estuaryEndpoints = [ + "https://shuttle-4.estuary.tech/content/add", + "https://api.estuary.tech/content/add", +]; module.exports.getPinsList = async () => { try { @@ -41,6 +52,8 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, }); + console.log(viewerResp.data.settings.uploadEndpoints); + const url = viewerResp.data.settings.uploadEndpoints[0]; // Upload file @@ -61,16 +74,60 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => { } }; -module.exports.deleteFile = async (requestid, maxAttempts = 3) => { +module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => { + const formData = new FormData(); + const chunkie = Buffer.from(file); + formData.append("data", chunkie, "chunk"); let numAttempts = 0; while (numAttempts < maxAttempts) { try { - const resp = await axios.delete(`https://api.estuary.tech/pinning/pins/${requestid}`, { + // Get URL of shuttle node with most space + const viewerResp = await axios.get("https://api.estuary.tech/viewer", { + headers: { + Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + }, + }); + //const url = viewerResp.data.settings.uploadEndpoints[0]; + const url = "https://api.estuary.tech/content/add-car"; + //const url = "https://api.web3.storage/car" + console.log(url); + // Upload file + const resp = await axios.post(url, formData, { headers: { Authorization: "Bearer " + process.env.ESTUARY_API_KEY, }, + maxContentLength: Infinity, + maxBodyLength: Infinity, }); - console.log(`estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}`); + console.log(resp.data); + return resp.data; + } catch (err) { + numAttempts++; + console.log(err.response); + console.log( + `estuaryWrapper.uploadFile: Error status: ${err.response?.status}. Error code: ${err.code}. Error message: ${err.message}` + ); + } + } +}; + +module.exports.deleteFile = async (requestid, maxAttempts = 3) => { + let numAttempts = 0; + while (numAttempts < maxAttempts) { + try { + const resp = await axios.delete( + `https://api.estuary.tech/pinning/pins/${requestid}`, + { + headers: { + Authorization: "Bearer " + process.env.ESTUARY_API_KEY, + }, + } + ); + + console.log( + `estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}` + ); + return true; } catch (err) { numAttempts++; @@ -99,7 +156,7 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => { console.log(`Uploading ${carFilename} to Estuary`); const file = fs.createReadStream(carFilename); - const uploadResp = await module.exports.uploadFile(file, 3); + const uploadResp = await module.exports.uploadFileAsCAR(file, 3); await utils.removeFiles(pathToCar); if (!uploadResp) console.log(`Failed to upload ${carFilename} to Estuary`); @@ -108,3 +165,36 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => { console.log(err); } }; + +module.exports.splitCars = async (largeCar) => { + console.log("Chunking CAR File"); + try { + const bigCar = await CarReader.fromIterable(fs.createReadStream(largeCar)); + const [rootCid] = await bigCar.getRoots(); + const MaxCarSize1MB = 100000000; + let uploadResp = undefined; + let chunkie; + let cars = []; + if (largeCar.size <= MaxCarSize1MB) { + cars.push(largeCar); + } else { + const { root, out } = await pack({ + input: fs.createReadStream(largeCar), + blockstore: new MemoryBlockStore(), + }); + console.log(root); + console.log(out); + for await (const c of out) { + uploadResp = await module.exports.uploadFileAsCAR(c, 3); + const newUploadCid = uploadResp.cid; + const newUploadEstuaryId = uploadResp.estuaryId; + // TODO: Store the chunks metadata, ie their cid, retrival url and estuaryID in Dataset Metadata + // TODO: Find a way to unpack and display the CAR file after combining the CHunks + } + } + return uploadResp; + } catch (error) { + console.error(error.response); + return; + } +}; diff --git a/src/utils/utils.js b/src/utils/utils.js index 99248eb..6c3e4f2 100644 --- a/src/utils/utils.js +++ b/src/utils/utils.js @@ -10,7 +10,6 @@ module.exports.removeFiles = async (pathToFiles) => { await fse.remove(pathToFiles); console.log(`Removed ${pathToFiles}`); } catch (err) { - console.error(err); } };