From c128e783ca4c02f88cca59f35947035f84f64e13 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Thu, 8 May 2025 11:36:02 -0600 Subject: [PATCH] Fixing issue with multiple firehose --- bots/s3-load-trigger/index.js | 86 ++++++++--------------------------- 1 file changed, 20 insertions(+), 66 deletions(-) diff --git a/bots/s3-load-trigger/index.js b/bots/s3-load-trigger/index.js index 8367c09..a712192 100644 --- a/bots/s3-load-trigger/index.js +++ b/bots/s3-load-trigger/index.js @@ -30,7 +30,7 @@ function listFilesFromKey(bucket, lastkey, opts, callback) { StartAfter: lastkey.toString(), MaxKeys: opts.limit, Prefix: opts.prefix - }, function (err, data) { + }, function(err, data) { if (err) { callback(err); } else { @@ -46,74 +46,28 @@ exports.handler = (event, context, callback) => { }; console.log(configure); console.log("Triggered By Event", JSON.stringify(event)); - dynamodb.getSetting(setting_id, (err, data) => { - if (err) { - callback(err); - } else { - let position = data && data.value || ""; - console.log("Position:", position); - listFilesFromKey(bucket, position, opts, function (err, files) { - if (err) { - console.log(err); - callback(err); - } else { - if (files.length == 0) { - console.log("No new Files"); - callback(); - return; - } - console.log(files); + let files = event.Records.map(r => ({ bucket: r.s3.bucket.name, key: r.s3.object.key })); + var lastKey = files[files.length - 1].key; - // var firstKey = files[0].Key; - var lastKey = files[files.length - 1].Key; - - var stream = leo.load("Leo_core_s3_load_trigger", "commands.s3_bus_load", { - debug: true - }); - stream.write({ - payload: { - command: "load", - files: files.map(file => { - return { - bucket: bucket, - key: file.Key - }; - }) - } - }); - stream.end((err) => { - if (err) { - callback(err); - } else { - dynamodb.saveSetting(setting_id, lastKey, function () { - callback(); - }); - } - }); + console.log(files); + var stream = leo.load("Leo_core_s3_load_trigger", "commands.s3_bus_load", { + debug: true + }); - // processing.single("commands.s3_bus_load", { - // command: "load", - // files: files.map(file => { - // return { - // bucket: bucket, - // key: file.Key - // }; - // }) - // }, { - // correlation_id: { - // source: bucket, - // start: `${firstKey}`, - // end: `${lastKey}` - // }, - // forceDynamoDB: true, - // id: "Leo_core_s3_load_trigger" - // }).then((response) => { - // dynamodb.saveSetting(setting_id, lastKey, function () { - // callback(); - // }); - // }).catch(callback); - } + stream.write({ + payload: { + command: "load", + files: files + } + }); + stream.end((err) => { + if (err) { + callback(err); + } else { + console.log("Last Key:", lastKey); + dynamodb.saveSetting(setting_id, lastKey, function() { + callback(); }); } });