From 4ac3ac98a4cae468dee670939a9f8c9839b444f5 Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Thu, 22 Aug 2019 10:53:36 -0600 Subject: [PATCH 1/2] added missing let --- lib/mock.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mock.js b/lib/mock.js index 9c6cdd49..a8235ca8 100644 --- a/lib/mock.js +++ b/lib/mock.js @@ -24,7 +24,7 @@ const overrideLeoFunctions = (data = {}, leo=leosdk) => { toDynamoDB: true, testId: process.pid }, data); - readQueueObjectArray = data.queues; + let readQueueObjectArray = data.queues; leo.configuration.validate = () => true; let testId = data.testId; From 650d3ba95afd2e9a71064b9f3e9e513f86087eed Mon Sep 17 00:00:00 2001 From: Clint Zirker Date: Tue, 26 Nov 2019 11:54:51 -0700 Subject: [PATCH 2/2] Fixing issues with reading and ranges --- lib/stream/leo-stream.js | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index ec6a8601..a3a9f2fe 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -1232,29 +1232,45 @@ module.exports = function(configure) { } let getEvents; - if (leoEvent.v >= 2 || !leoEvent.max_eid) { var max_eid = (configure.registry && configure.registry.__cron && configure.registry.__cron.maxeid) || opts.maxOverride || leoEvent.max_eid || ''; + var max_eid_query = max_eid; + var max_end_params = {}; + + // If the end is not on a record boundry + // make sure the containing record is included + if (leoEvent.max_eid !== max_eid){ + max_eid_query = max_eid.replace(/-\d+/, "~"); + max_end_params = { + FilterExpression: "#start <= :max_eid ", + ExpressionAttributeNames: { + "#start": "start" + }, + ExpressionAttributeValues: { + ":max_eid": max_eid + } + } + } var table_name = configure.resources.LeoStream; var eid = "eid"; var range_key = "end"; getEvents = function(callback) { - var params = { + var params = extend(true, { TableName: table_name, KeyConditionExpression: "#event = :event and #key between :start and :maxkey", ExpressionAttributeNames: { "#event": "event", - "#key": range_key, + "#key": range_key }, Limit: 50, ExpressionAttributeValues: { ":event": queue, - ":maxkey": usingSnapshot ? snapshotEnd.replace("_snapshot/", "") + 9 : max_eid, + ":maxkey": usingSnapshot ? snapshotEnd.replace("_snapshot/", "") + 9 : max_eid_query, ":start": usingSnapshot ? start.replace("_snapshot/", "") : start }, "ReturnConsumedCapacity": 'TOTAL' - }; + }, max_end_params); logger.debug(params); dynamodb.docClient.query(params, function(err, data) { logger.debug("Consumed Capacity", data && data.ConsumedCapacity); @@ -1263,6 +1279,7 @@ module.exports = function(configure) { callback(err); return; } + callback(null, data.Items); }); }; @@ -1400,14 +1417,15 @@ module.exports = function(configure) { } }), ls.write((obj, done) => { var e = obj.obj; - totalSize += obj.length; if (!item.archive) { e.eid = createEId(eid++); } let isGreaterThanStart = e.eid.localeCompare(start) > 0; + let isLessThanEnd = e.eid.localeCompare(max_eid) <= 0; - if (!isPassDestroyed && isGreaterThanStart && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at + if (!isPassDestroyed && isGreaterThanStart && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at totalCount++; + totalSize += obj.length; pass.throttledWrite(e, done); } else { //otherwise it means we had a number in the middle of a file if (isPassDestroyed || totalCount >= opts.limit || totalSize >= opts.size) { @@ -1442,7 +1460,8 @@ module.exports = function(configure) { item.payload = JSON.parse(item.payload); } - if (!isPassDestroyed && item.eid.localeCompare(start) > 0 && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at + let isLessThanEnd = item.eid.localeCompare(max_eid) <= 0; + if (!isPassDestroyed && item.eid.localeCompare(start) > 0 && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at totalCount++; pass.throttledWrite(item, done); } else { //otherwise it means we had a number in the middle of a file @@ -1457,7 +1476,8 @@ module.exports = function(configure) { }); pump(gzip, split(JSON.parse), ls.write((e, done) => { e.eid = createEId(e.eid); - if (!isPassDestroyed && e.eid.localeCompare(start) > 0 && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at + let isLessThanEnd = e.eid.localeCompare(max_eid) <= 0; + if (!isPassDestroyed && e.eid.localeCompare(start) > 0 && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at totalCount++; pass.throttledWrite(e, done); } else { //otherwise it means we had a number in the middle of a file