diff --git a/lib/mock-wrapper.js b/lib/mock-wrapper.js index f3d1afc..0b96815 100644 --- a/lib/mock-wrapper.js +++ b/lib/mock-wrapper.js @@ -63,18 +63,34 @@ function default_1(leoStream) { ...config === null || config === void 0 ? void 0 : config.parserOpts } }); + let counter = 0; if (fs_1.default.existsSync(queueDataFileJsonl)) { - mockStream = leoStream.pipeline(fs_1.default.createReadStream(queueDataFileJsonl), leoStream.split((value) => JSONparse(value))); + mockStream = leoStream.pipeline(fs_1.default.createReadStream(queueDataFileJsonl), leoStream.split((value) => { + counter++; + return JSONparse(value); + })); } else if (fs_1.default.existsSync(queueDataFileJson)) { mockStream = leoStream.pipeline( // They may be using a custom parser so we need to convert the json to a string and use the parser - leoStream.eventstream.readArray(requireFn(queueDataFileJson).map(l => JSON.stringify(l) + "\n")), leoStream.split((value) => JSONparse(value))); + leoStream.eventstream.readArray(requireFn(queueDataFileJson).map(l => JSON.stringify(l) + "\n")), leoStream.split((value) => { + counter++; + return JSONparse(value); + })); } else { mockStream = leoStream.eventstream.readArray([]); } mockStream.checkpoint = (callback) => callback(); + mockStream.get = () => { + const now = Date.now(); + const stats = { + eid: leoStream.eventIdFromTimestamp(now), + source_timestamp: now, + units: counter + }; + return stats; + }; return mockStream; }; let oldLoad = leoStream.load.bind(leoStream); diff --git a/lib/mock-wrapper.ts b/lib/mock-wrapper.ts index 683608f..697db0e 100644 --- a/lib/mock-wrapper.ts +++ b/lib/mock-wrapper.ts @@ -1,10 +1,10 @@ import { StreamUtil } from "./lib"; -import { ReadEvent, Event, ReadOptions, ReadableStream, WritableStream, TransformStream, WriteOptions, BaseWriteOptions, ReadableQueueStream } from "./types"; +import { ReadEvent, Event, ReadOptions, TransformStream, BaseWriteOptions, ReadableQueueStream } from "./types"; import fs from "fs"; import path from "path"; import util from "./aws-util"; import stream from "stream"; -import { Callback, CronData, Milliseconds, ReportCompleteOptions } from "./cron"; +import { Callback, Checkpoint, CronData, Milliseconds, ReportCompleteOptions } from "./cron"; //import uuid from "uuid"; import refUtil from "./reference"; import * as parserUtil from "./stream/helper/parser-util"; @@ -49,22 +49,39 @@ export default function (leoStream: LeoStream) { } }); + let counter = 0; + if (fs.existsSync(queueDataFileJsonl)) { mockStream = leoStream.pipeline( fs.createReadStream(queueDataFileJsonl), - leoStream.split((value) => JSONparse(value)) + leoStream.split((value) => { + counter++; + return JSONparse(value); + }) ); } else if (fs.existsSync(queueDataFileJson)) { mockStream = leoStream.pipeline( // They may be using a custom parser so we need to convert the json to a string and use the parser leoStream.eventstream.readArray(requireFn(queueDataFileJson).map(l => JSON.stringify(l) + "\n")), - leoStream.split((value) => JSONparse(value)) + leoStream.split((value) => { + counter++; + return JSONparse(value); + }) ); } else { mockStream = leoStream.eventstream.readArray([]); } mockStream.checkpoint = (callback) => callback(); + mockStream.get = () => { + const now = Date.now(); + const stats: Checkpoint = { + eid: leoStream.eventIdFromTimestamp(now), + source_timestamp: now, + units: counter + }; + return stats; + }; return mockStream; }; diff --git a/lib/streams.js b/lib/streams.js index 8edff79..acd1ec4 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -8,6 +8,7 @@ var zlib = require("zlib"); var fastCsv = require("fast-csv"); var write = require("./flushwrite.js"); const { S3 } = require("@aws-sdk/client-s3"); +const { Upload } = require("@aws-sdk/lib-storage"); const { NodeHttpHandler } = require('@smithy/node-http-handler'); var https = require("https"); @@ -551,11 +552,17 @@ let ls = module.exports = { toS3: (Bucket, File) => { var callback = null; var pass = new PassThrough(); - s3.upload({ - Bucket: Bucket, - Key: File, - Body: pass - }, (err) => { + const upload = new Upload({ + client: s3, + params: { + Bucket: Bucket, + Key: File, + Body: pass + } + }).done().then(() => { + logger.log("done uploading"); + callback(); + }).catch((err) => { logger.log("done uploading", err); callback(err); });