Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"dependencies": {
"axios": "^0.27.2",
"bids-validator": "^1.9.3",
"carbites": "^1.0.6",
"chai": "^4.3.6",
"cors": "^2.8.5",
"dotenv": "^16.0.0",
Expand Down
24 changes: 24 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,28 @@ app.use("/metadata", metadata);
app.use("/uploadToEstuary", uploadToEstuary);
app.use("/initializeUpload", initializeUpload);

// error handlers

// development error handler
// will print stacktrace
if (app.get('env') === 'development') {
app.use(function(err, req, res, next) {
res.status(err.status || 500);
res.render('error', {
message: err.message,
error: err
});
});
}

// production error handler
// no stacktraces leaked to user
app.use(function(err, req, res, next) {
res.status(err.status || 500);
res.render('error', {
message: err.message,
error: {}
});
});

module.exports = app;
4 changes: 2 additions & 2 deletions src/routes/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const router = express.Router();

const metadataService = require("../services/metadata.service");

router.get("/datasets/", metadataService.getDatasetMetadata);
router.get("/datasets/published", metadataService.getPublishedDatasets);
router.get("/datasets/", metadataService.getDatasetMetadata);
router.get("/datasets/published", metadataService.getPublishedDatasets);
router.get("/datasets/published/byUploader", metadataService.getPublishedDatasetsByUploader);
router.get("/datasets/published/search", metadataService.searchPublishedDatasets);
router.post("/datasets/publish", metadataService.publishDataset);
Expand Down
1 change: 0 additions & 1 deletion src/routes/uploadToEstuary.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const storage = multer.diskStorage({
},
});

// const upload = multer({ dest: `estuaryUploads/${Date.now()}` });
const maxSize = 2 ** 20 * 500; // 2^20 == 1 MiB
const upload = multer({ storage: storage, limits: { fileSize: maxSize } });

Expand Down
23 changes: 14 additions & 9 deletions src/services/uploadToEstuary.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const { msgCache } = require("../init");
const dbWrapper = require("../utils/dbWrapper");
const estuaryWrapper = require("../utils/estuaryWrapper");
const utils = require("../utils/utils");
const { error } = require("console");
const { CarReader } = require( '@ipld/car/reader');
const dagCbor = require ('@ipld/dag-cbor');

const runBidsValidation = async (pathToDirectory) => {
return new Promise((resolve) => {
Expand All @@ -31,7 +34,9 @@ const runBidsValidation = async (pathToDirectory) => {
(issues, summary) => {
if (issues.errors.length > 0) {
console.log("BIDS validation failed");
resolve();
console.log(issues.errors);
resolve({ summary: summary, issues: issues });

} else {
console.log("BIDS validation succeeded");
resolve({ summary: summary, issues: issues });
Expand Down Expand Up @@ -229,7 +234,6 @@ const insertMetadata = async (datasetMetadata, chunkMetadata, files) => {
return false;
}

// Chunk
for (let numAttempts = 0; numAttempts < maxAttempts; numAttempts++) {
chunk = generateChunk({
datasetId: dataset._id,
Expand Down Expand Up @@ -312,10 +316,10 @@ const uploadFiles = async (req, res) => {

const validatorData = await runBidsValidation(userDefinedRootDirLocal);
if (!validatorData) {
await utils.removeFiles(timestampedFolder);
return res.status(400).json({ error: "BIDS validation failed." });
await utils.removeFiles(timestampedFolder);
return res.status(400).json({ error: "BIDS validation failed." + validatorData.issues.key });
}

// TODO: Replace the rest of the code in this function with uploadDirAsCar from estuaryWrapper
const { root, filename: carFilename } = await packToFs({
input: userDefinedRootDirLocal,
output: `${timestampedFolder}/${userDefinedRootDir}.car`,
Expand All @@ -325,15 +329,16 @@ const uploadFiles = async (req, res) => {
// Upload file
console.log(`${new Date().toISOString()} Uploading ${carFilename} to Estuary`);
const file = fs.createReadStream(carFilename);
const uploadResp = await estuaryWrapper.uploadFile(file, 3);
const uploadRespsplit = await estuaryWrapper.splitCars(carFilename.toString(), 3);

//const uploadResp = await estuaryWrapper.uploadFile(file, 3);
// const uploadResp = { cid: "0x124", estuaryId: "81" }; // THIS LINE IS FOR TESTING ONLY
await utils.removeFiles(timestampedFolder);
if (!uploadResp) {
if (!uploadRespsplit) {
console.log(`${new Date().toISOString()} Failed to upload ${carFilename} to Estuary`);
return res.status(400).json({ error: "An error occurred trying to upload to Estuary. Try again later." });
}
const newUploadCid = uploadResp.cid;
const newUploadEstuaryId = uploadResp.estuaryId;


// Delete this file from Estuary and exit if the user has already uploaded a file with this CID
const matchingChunkDocuments = await dbWrapper.getChunks({ "storageIds.cid": newUploadCid });
Expand Down
102 changes: 96 additions & 6 deletions src/utils/estuaryWrapper.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
const axios = require("axios");
const fs = require("fs");
const fsp = require("fs/promises");
const FormData = require("form-data");
const { packToFs } = require("ipfs-car/pack/fs");
const { packToF } = require("ipfs-car/pack/fs");
const { FsBlockStore } = require("ipfs-car/blockstore/fs");
const utils = require("./utils");
const { TreewalkCarSplitter } = require("carbites/treewalk");
const { CarReader } = require("@ipld/car/reader");
const dagCbor = require("@ipld/dag-cbor");
const { packToBlob } = require("ipfs-car/pack/blob");
const { MemoryBlockStore } = require("ipfs-car/blockstore/memory");
const { pack } = require("ipfs-car/pack");
const { Buffer } = require("buffer");

const estuaryEndpoints = ["https://shuttle-4.estuary.tech/content/add", "https://api.estuary.tech/content/add"];
const estuaryEndpoints = [
"https://shuttle-4.estuary.tech/content/add",
"https://api.estuary.tech/content/add",
];

module.exports.getPinsList = async () => {
try {
Expand Down Expand Up @@ -41,6 +52,8 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => {
Authorization: "Bearer " + process.env.ESTUARY_API_KEY,
},
});
console.log(viewerResp.data.settings.uploadEndpoints);

const url = viewerResp.data.settings.uploadEndpoints[0];

// Upload file
Expand All @@ -61,16 +74,60 @@ module.exports.uploadFile = async (file, maxAttempts = 3) => {
}
};

module.exports.deleteFile = async (requestid, maxAttempts = 3) => {
module.exports.uploadFileAsCAR = async (file, maxAttempts = 3) => {
const formData = new FormData();
const chunkie = Buffer.from(file);
formData.append("data", chunkie, "chunk");
let numAttempts = 0;
while (numAttempts < maxAttempts) {
try {
const resp = await axios.delete(`https://api.estuary.tech/pinning/pins/${requestid}`, {
// Get URL of shuttle node with most space
const viewerResp = await axios.get("https://api.estuary.tech/viewer", {
headers: {
Authorization: "Bearer " + process.env.ESTUARY_API_KEY,
},
});
//const url = viewerResp.data.settings.uploadEndpoints[0];
const url = "https://api.estuary.tech/content/add-car";
//const url = "https://api.web3.storage/car"
console.log(url);
// Upload file
const resp = await axios.post(url, formData, {
headers: {
Authorization: "Bearer " + process.env.ESTUARY_API_KEY,
},
maxContentLength: Infinity,
maxBodyLength: Infinity,
});
console.log(`estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}`);
console.log(resp.data);
return resp.data;
} catch (err) {
numAttempts++;
console.log(err.response);
console.log(
`estuaryWrapper.uploadFile: Error status: ${err.response?.status}. Error code: ${err.code}. Error message: ${err.message}`
);
}
}
};

module.exports.deleteFile = async (requestid, maxAttempts = 3) => {
let numAttempts = 0;
while (numAttempts < maxAttempts) {
try {
const resp = await axios.delete(
`https://api.estuary.tech/pinning/pins/${requestid}`,
{
headers: {
Authorization: "Bearer " + process.env.ESTUARY_API_KEY,
},
}
);

console.log(
`estuaryWrapper.deleteFile: Deleted file with requestid ${requestid}`
);

return true;
} catch (err) {
numAttempts++;
Expand Down Expand Up @@ -99,7 +156,7 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => {

console.log(`Uploading ${carFilename} to Estuary`);
const file = fs.createReadStream(carFilename);
const uploadResp = await module.exports.uploadFile(file, 3);
const uploadResp = await module.exports.uploadFileAsCAR(file, 3);

await utils.removeFiles(pathToCar);
if (!uploadResp) console.log(`Failed to upload ${carFilename} to Estuary`);
Expand All @@ -108,3 +165,36 @@ module.exports.uploadDirAsCar = async (pathToDir, pathToCar) => {
console.log(err);
}
};

module.exports.splitCars = async (largeCar) => {
console.log("Chunking CAR File");
try {
const bigCar = await CarReader.fromIterable(fs.createReadStream(largeCar));
const [rootCid] = await bigCar.getRoots();
const MaxCarSize1MB = 100000000;
let uploadResp = undefined;
let chunkie;
let cars = [];
if (largeCar.size <= MaxCarSize1MB) {
cars.push(largeCar);
} else {
const { root, out } = await pack({
input: fs.createReadStream(largeCar),
blockstore: new MemoryBlockStore(),
});
console.log(root);
console.log(out);
for await (const c of out) {
uploadResp = await module.exports.uploadFileAsCAR(c, 3);
const newUploadCid = uploadResp.cid;
const newUploadEstuaryId = uploadResp.estuaryId;
// TODO: Store the chunks metadata, ie their cid, retrival url and estuaryID in Dataset Metadata
// TODO: Find a way to unpack and display the CAR file after combining the CHunks
}
}
return uploadResp;
} catch (error) {
console.error(error.response);
return;
}
};
1 change: 0 additions & 1 deletion src/utils/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ module.exports.removeFiles = async (pathToFiles) => {
await fse.remove(pathToFiles);
console.log(`Removed ${pathToFiles}`);
} catch (err) {
console.error(err);
}
};

Expand Down