From 558dd3cee77644e7e2c9f2f7a33a2c9e85d2bdd2 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Tue, 8 Dec 2020 12:55:07 -0700 Subject: [PATCH 1/2] Allows for the ability to bypass kinesis and write directly to the bus. This is in response to the Nov 2020 AWS Kinesis outage. Usage: leo.write and leo.load now accept `option.disable_kinesis: true` or you can use the environment variable `DISABLE_KINESIS=true` Note: Bypassing kinises can create a race condition if 2 bots are writing to the same queue. The race happens in the bot reading from that queue. If the bot reads too quickly it will checkpoint and potentially skip events that were created by the seconds source bot but still being written to the DB. A workaround is to not read all the way to the current timestamp as this will allow the in flight events to land in the correct place before being read. Also, when there are 2 source bots there is the possibility that they generate the same eid. This would happen when both bots begin the kinesis stream processor code on the same millisecond. There is no workaround for this currently. However, ideas may include giving each writing bot a block record range for the eid, extending the eid to include an partition/writer identifier between the timestamp and record number, making queue partitions a first class feature This does NOT solve the case of firehose. firehose is used to collect the events over the course of 1 mintue and then it emits an event to kinesis. A workaround for firehose is to stop sending to firehose and bypass kinesis. --- lib/kinesis-stream-processor.js | 358 ++++++++++++++++++++++++++++++++ lib/stream/leo-stream.js | 6 +- 2 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 lib/kinesis-stream-processor.js diff --git a/lib/kinesis-stream-processor.js b/lib/kinesis-stream-processor.js new file mode 100644 index 00000000..e9588b2c --- /dev/null +++ b/lib/kinesis-stream-processor.js @@ -0,0 +1,358 @@ +"use strict"; +/** + * This is the code lifted from LeoBus Kinesis Processor + * https://github.com/LeoPlatform/bus/blob/master/bots/kinesis_processor/index.js + * + * Changes from Source: + * - Get config from `this` instead of `let leo = require("leo-sdk")` + * - Changed require statements to be local to this module instead of `leo-sdk` + * - Added `putRecords` as an interface to match `kinesis.putRecords` + * - Changed `new Buffer("", "base64")` to `Buffer.from("", "base64")` + * - Converted `console.log()` to `logger.log()` + * + * Calling this module: + * - `handler` and `putRecords` need to be called with `require("leo-sdk").streams` as the `this` parameter + * eg. `require("kinesis-stream-processor").putRecords.call(leo.streams, data, callback)` + */ + +const moment = require("moment"); +const zlib = require("zlib"); +const async = require("async"); +const refUtil = require("./reference.js"); +const logger = require("leo-logger")("kinesis-stream-processor"); + +const pad = "0000000"; +const padLength = -1 * pad.length; +const ttlSeconds = parseInt(process.env.ttlSeconds) || 604800; // Seconds in a week + +exports.handler = function(event, context, callback) { + // Config data is pass via `this` + //const leo = require("../index"); + const ls = this; //leo.streams; + const cron = this.cron; //leo.bot; + const StreamTable = this.configuration.resources.LeoStream; + const EventTable = this.configuration.resources.LeoEvent; + + + let eventsToSkip = {}; + let botsToSkip = {}; + + if (process.env.skip_events) { + eventsToSkip = process.env.skip_events.split(",").reduce((out, e) => { + logger.log(`Skipping all events to queue "${e}"`); + out[refUtil.ref(e)] = true; + out[e] = true; + return out; + }, {}); + } + if (process.env.skip_bots) { + botsToSkip = process.env.skip_bots.split(",").reduce((out, e) => { + logger.log(`Skipping all events from bot "${e}"`); + out[e] = true; + return out; + }, {}); + } + + var timestamp = moment.utc(event.Records[0].kinesis.approximateArrivalTimestamp * 1000); + var ttl = Math.floor(timestamp.clone().add(ttlSeconds, "seconds").valueOf() / 1000); + + var diff = moment.duration(moment.utc().diff(timestamp)); + var currentTimeMilliseconds = moment.utc().valueOf(); + + var useS3Mode = false; + if (diff.asSeconds() > 3 || event.Records.length > 100) { + useS3Mode = true; + } + var events = {}; + var maxKinesis = {}; + var snapshots = {}; + var stats = {}; + + let eventIdFormat = "[z/]YYYY/MM/DD/HH/mm/"; + var eventId = timestamp.format(eventIdFormat) + timestamp.valueOf(); + var recordCount = 0; + + function getEventStream(event, forceEventId, archive = null) { + if (!(event in events)) { + var assignIds = ls.through((obj, done) => { + if (archive) { + obj.end = archive.end; + obj.start = archive.start; + } else { + if (forceEventId) { + obj.start = forceEventId + "-" + (pad + recordCount).slice(padLength); + obj.end = forceEventId + "-" + (pad + (recordCount + obj.end)).slice(padLength); + } else { + obj.start = eventId + "-" + (pad + recordCount).slice(padLength); + obj.end = eventId + "-" + (pad + (recordCount + obj.end)).slice(padLength); + } + obj.ttl = ttl; + } + maxKinesis[event].max = obj.end; + recordCount += obj.records; + obj.v = 2; + + for (let botid in obj.stats) { + if (!(botid in stats)) { + stats[botid] = { + [event]: obj.stats[botid] + }; + } else { + if (!(event in stats[botid])) { + stats[botid][event] = obj.stats[botid]; + } else { + let s = stats[botid][event]; + let r = obj.stats[botid]; + s.units += r.units; + s.start = r.start; + s.end = r.end; + s.checkpoint = r.checkpoint; + } + } + } + delete obj.stats; + delete obj.correlations; + + if (obj.records) { + done(null, obj); + } else { + done(); + } + }); + if (useS3Mode) { + events[event] = ls.pipeline(ls.toS3GzipChunks(event, {}), assignIds, ls.toDynamoDB(StreamTable)); + } else { + events[event] = ls.pipeline(ls.toGzipChunks(event, {}), assignIds, ls.toDynamoDB(StreamTable)); + } + maxKinesis[event] = { + max: null + }; + } + return events[event]; + } + + function closeStreams(callback) { + var tasks = []; + var eventUpdateTasks = []; + + for (let event in events) { + tasks.push((done) => { + logger.log("closing streams", event); + events[event].on("finish", () => { + logger.log("got finish from stream", event, maxKinesis[event].max); + eventUpdateTasks.push({ + table: EventTable, + key: { + event: event + }, + set: { + max_eid: maxKinesis[event].max, + timestamp: moment.now(), + v: 2 + } + }); + + if (event.match(/\/_archive$/)) { + let oEvent = event.replace(/\/_archive$/, ''); + + eventUpdateTasks.push({ + table: EventTable, + key: { + event: oEvent + }, + set: { + archive: { + end: maxKinesis[event].max + } + } + }); + } + done(); + }).on("error", (err) => { + logger.log(err); + done(err); + }); + events[event].end(); + }); + } + + Object.keys(snapshots).forEach(event => { + let oEvent = event.replace(/\/_snapshot$/, ''); + eventUpdateTasks.push({ + table: EventTable, + key: { + event: oEvent + }, + set: { + snapshot: snapshots[event] + } + }); + }); + + async.parallel(tasks, (err) => { + if (err) { + logger.log("error"); + logger.log(err); + callback(err); + } else { + logger.log("finished writing"); + ls.dynamodb.updateMulti(eventUpdateTasks, (err) => { + if (err) { + callback("Cannot write event locations to dynamoDB"); + } else { + var checkpointTasks = []; + for (let bot in stats) { + for (let event in stats[bot]) { + let stat = stats[bot][event]; + checkpointTasks.push(function(done) { + cron.checkpoint(bot, event, { + eid: eventId + "-" + (pad + stat.checkpoint).slice(padLength), + source_timestamp: stat.start, + started_timestamp: stat.end, + ended_timestamp: timestamp.valueOf(), + records: stat.units, + type: "write" + }, function(err) { + done(err); + }); + }); + } + } + logger.log("checkpointing"); + async.parallelLimit(checkpointTasks, 100, function(err) { + if (err) { + logger.log(err); + callback(err); + } else { + callback(null, "Successfully processed " + event.Records.length + " records."); + } + }); + } + }); + } + }); + } + + var stream = ls.parse(true); + ls.pipe(stream, ls.through((event, callback) => { + //We can't process it without these + if (event._cmd) { + if (event._cmd == "registerSnapshot") { + snapshots[refUtil.ref(event.event + "/_snapshot").queue().id] = { + start: "_snapshot/" + moment(event.start).format(eventIdFormat), + next: moment(event.next).format(eventIdFormat) + }; + } + return callback(); + } else if (!event.event || ((!event.id || !event.payload) && !event.s3) || eventsToSkip[refUtil.ref(event.event)] || botsToSkip[event.id]) { + return callback(null); + } + let forceEventId = null; + let archive = null; + if (event.archive) { + event.event = refUtil.ref(event.event + "/_archive").queue().id; + archive = { + start: event.start, + end: event.end + }; + } else if (event.snapshot) { + event.event = refUtil.ref(event.event + "/_snapshot").queue().id; + forceEventId = moment(event.snapshot).format(eventIdFormat) + timestamp.valueOf(); + } else { + event.event = refUtil.ref(event.event).queue().id; + } + + //If it is missing these, we can just create them. + if (!event.timestamp) { + event.timestamp = currentTimeMilliseconds; + } + if (!event.event_source_timestamp) { + event.event_source_timestamp = event.timestamp; + } + if (typeof event.event_source_timestamp !== "number"){ + event.event_source_timestamp = moment(event.event_source_timestamp).valueOf(); + } + getEventStream(event.event, forceEventId, archive).write(event, callback); + }), function(err) { + if (err) { + callback(err); + } else { + closeStreams(callback); + } + }); + event.Records.map((record) => { + if (record.kinesis.data[0] === 'H') { + stream.write(zlib.gunzipSync(Buffer.from(record.kinesis.data, 'base64'))); + } else if (record.kinesis.data[0] === 'e' && record.kinesis.data[1] === 'J') { + stream.write(zlib.inflateSync(Buffer.from(record.kinesis.data, 'base64'))); + } else if (record.kinesis.data[0] === 'e' && record.kinesis.data[1] === 'y') { + stream.write(Buffer.from(record.kinesis.data, 'base64').toString() + "\n"); + } + }); + stream.end(); +}; + +exports.putRecords = function (event, callback){ + // putRecords Event: + // let params = { + // Records: + // [{ + // Data: "GZIP Buffer", + // ExplicitHashKey: "N/A", + // PartitionKey: "N/A" + // }], + // StreamName: "N/A" + // }; + // + // putRecords Response: + // const response = { + // EncryptionType: "NONE", + // FailedRecordCount: 0, + // Records: [ + // { + // SequenceNumber: "N/A", + // ShardId: "N/A", + // ErrorCode: "ProvisionedThroughputExceededException|InternalFailure", + // ErrorMessage: "" + // } + // ], + // } + + // kinesisStreamProcessor Event: + // let event = { + // Records: [{ + // kinesis: { + // approximateArrivalTimestamp: TimestampInSeconds, + // data: "BASE64 Encoded GZIP string", + // } + // }] + // } + + // Prepare an event to be handled by the Kinesis Stream Processor Code + const kinesisEvent = { + Records: event.Records.map(record => { + // record.approximateArrivalTimestamp = Date.now() / 1000; + // record.data = record.Data.toString("base64"); + // delete record.Data; + return { + kinesis: { + approximateArrivalTimestamp:Date.now() / 1000, + data: record.Data.toString("base64") + } + } + }) + }; + + // Send event to the processor and remap the response to look like a kinesis response + exports.handler.call(this, kinesisEvent, {}, (err) => { + + // leo-sdk only uses FailedRecordCount & Records.ErrorCode + callback(err, { + FailedRecordCount: err ? event.Records.length : 0, + Records: event.Records.map(r => ({ + ErrorCode: "InternalFailure", + })) + }); + + }); +}; \ No newline at end of file diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index 8307b28e..fbb56c88 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -22,6 +22,7 @@ var _streams = require("../streams"); const logger = require("leo-logger")("leo-stream"); const { promiseStreams } = require('leo-streams'); const es = require('event-stream'); +const kinesisStreamProcessor = require("../kinesis-stream-processor"); const twoHundredK = 1024 * 200; @@ -75,6 +76,7 @@ module.exports = function(configure) { var ls = { s3: s3, dynamodb: dynamodb, + kinesis: kinesis, cron: cron, configuration: configure, through: _streams.through, @@ -578,6 +580,8 @@ module.exports = function(configure) { checkpoint: false }, defaults[type], opts || {}); + const putRecords = (process.env.DISABLE_KINESIS == "true" || opts.disable_kinesis) ? (...args) => kinesisStreamProcessor.putRecords.call(ls, ...args) : kinesis.putRecords; + var records, correlations; function reset() { @@ -655,7 +659,7 @@ module.exports = function(configure) { } else { logger.debug("sending", records.length, number, delay); logger.time("kinesis request"); - kinesis.putRecords({ + putRecords.call(kinesis, { Records: records.map((r) => { return { Data: r, From 200b6586afcb8b4ca92d50addce84ae03bc67f18 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Tue, 8 Dec 2020 13:06:24 -0700 Subject: [PATCH 2/2] removing commented code --- lib/kinesis-stream-processor.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/kinesis-stream-processor.js b/lib/kinesis-stream-processor.js index e9588b2c..9f93a195 100644 --- a/lib/kinesis-stream-processor.js +++ b/lib/kinesis-stream-processor.js @@ -27,9 +27,8 @@ const ttlSeconds = parseInt(process.env.ttlSeconds) || 604800; // Seconds in a w exports.handler = function(event, context, callback) { // Config data is pass via `this` - //const leo = require("../index"); - const ls = this; //leo.streams; - const cron = this.cron; //leo.bot; + const ls = this; + const cron = this.cron; const StreamTable = this.configuration.resources.LeoStream; const EventTable = this.configuration.resources.LeoEvent; @@ -331,9 +330,6 @@ exports.putRecords = function (event, callback){ // Prepare an event to be handled by the Kinesis Stream Processor Code const kinesisEvent = { Records: event.Records.map(record => { - // record.approximateArrivalTimestamp = Date.now() / 1000; - // record.data = record.Data.toString("base64"); - // delete record.Data; return { kinesis: { approximateArrivalTimestamp:Date.now() / 1000,