Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
45b3d85
chore: add storage, worker and core bufioconfig interfaces :octocat:
Mcnoble1 Jul 5, 2025
a7f8c0d
feat: implement memoryStorage to hold records in memory :octocat:
Mcnoble1 Jul 5, 2025
c664cd2
feat: add file based staorage for buffer :octocat:
Mcnoble1 Jul 5, 2025
d89d64a
feat: add core bufio class for batching and flushing with metrics :oc…
Mcnoble1 Jul 11, 2025
5c124e9
chore: add simulated job generator :octocat:
Mcnoble1 Jul 11, 2025
c04f1df
chore: add entry point, memory and file storage demo :octocat:
Mcnoble1 Jul 11, 2025
8c03113
chore: add file-based storage adapter (JSON file) :octocat:
Mcnoble1 Jul 11, 2025
3e262a7
chore: updates :octocat:
Mcnoble1 Jul 11, 2025
078c776
fix: restructure codebase to be lean :octocat:
Mcnoble1 Jul 14, 2025
9abbb27
fix: restructure codebase to be lean :octocat:
Mcnoble1 Jul 14, 2025
9204b5d
fix: restructure codebase to be lean :octocat:
Mcnoble1 Jul 14, 2025
48a4e62
fix: restructure codebase to be lean :octocat:
Mcnoble1 Jul 14, 2025
263d35c
chore: merged main :octocat:
Mcnoble1 Jul 24, 2025
d2520fb
chore: merged main :octocat:
Mcnoble1 Jul 24, 2025
3c8ffc9
fix: chabge faker number and alphanumeric datatypes :octocat:
Mcnoble1 Jul 26, 2025
9d53eea
test: add basic io tests for initialization, default storage, pust, s…
Mcnoble1 Jul 26, 2025
f632a46
tests: add more io tests :octocat:
Mcnoble1 Aug 2, 2025
9d9169c
chore: add initial config checks to pass tests :octocat:
Mcnoble1 Aug 2, 2025
5c98e85
chore: split tests into unit and integration :octocat:
Mcnoble1 Aug 31, 2025
f6917e0
feat: add worker class :octocat:
Mcnoble1 Sep 6, 2025
a59cd71
chore: separate tests into unit and integration :octocat:
Mcnoble1 Sep 6, 2025
bf14bc3
tests: add integration tests for io core :octocat:
Mcnoble1 Sep 12, 2025
eaf55ad
chore: make IO methods async and change Io to IO :octocat:
Mcnoble1 Sep 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
"ts-node": "^10.9.1",
"typescript": "^4.9.4"
},
"dependencies": {},
"engines": {
"node": ">=18.0.0"
}
Expand Down
23 changes: 18 additions & 5 deletions src/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { MemoryStorage, Storage } from "./storage";

import { Worker } from "./worker";

export class BufIO<T, U> {
export class IO<T, U> {
private storage: Storage<T>;
private worker: Worker<T, U>;
private flushInterval: number;
Expand All @@ -17,22 +17,35 @@ export class BufIO<T, U> {
flushInterval?: number;
onError?: (err: Error, records: T[]) => void;
}) {
if (!config.worker) {
throw new Error("Worker must be provided");
}
if (config.batchSize <= 0) {
throw new Error("batchSize must be greater than 0");
}
this.storage = config.storage ?? new MemoryStorage<T>();
this.worker = config.worker;
this.batchSize = config.batchSize;
this.flushInterval = config.flushInterval ?? 5000;
this.onError = config.onError;
}

public push(record: T) {
public async push(record: T) {
if (record === null || record === undefined) return;
this.storage.put(record);
}

public start() {
this.intervalId = setInterval(() => this.flush(), this.flushInterval);
public async start() {
if (this.intervalId && !(this.intervalId as any)._destroyed === false)
return;
if (!this.intervalId || (this.intervalId as any)._destroyed) {
this.intervalId = setInterval(() => {
this.flush();
}, this.flushInterval);
}
}

public stop() {
public async stop() {
if (this.intervalId) clearInterval(this.intervalId);
}

Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./base";
10 changes: 10 additions & 0 deletions tests/helpers/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Worker } from "../../src/worker/base";

export class TestWorker<T> implements Worker<T, T> {
public processed: T[] = [];

async work(records: T[]): Promise<T[]> {
this.processed.push(...records);
return records;
}
}
130 changes: 130 additions & 0 deletions tests/integration/io.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import { IO } from "../../src/io";
import { MemoryStorage } from "../../src/storage/memory";
import { Worker } from "../../src/worker";
import { TestWorker } from "../helpers/worker";

chai.use(chaiAsPromised);

type RecordType = { id: number; value: string };

describe("IO Integration", () => {
let storage: MemoryStorage<RecordType>;
let worker: TestWorker<RecordType>;
let io: IO<RecordType, RecordType>;

beforeEach(() => {
storage = new MemoryStorage<RecordType>();
worker = new TestWorker();
io = new IO({
storage,
worker,
batchSize: 2,
flushInterval: 100,
});
});

afterEach(() => {
io.stop();
});

it("should initialize with MemoryStorage and Worker", () => {
expect(io).to.have.property("storage").that.equals(storage);
expect(io).to.have.property("worker").that.equals(worker);
});

it("should push records into storage correctly", () => {
io.push({ id: 1, value: "A" });
io.push({ id: 2, value: "B" });
const records = storage.get(10);
expect(records).to.deep.equal([
{ id: 1, value: "A" },
{ id: 2, value: "B" },
]);
});

it("should flush records manually", async () => {
io.push({ id: 1, value: "A" });
io.push({ id: 2, value: "B" });

await (io as any).flush();

expect(worker.processed).to.deep.equal([
{ id: 1, value: "A" },
{ id: 2, value: "B" },
]);

const remaining = storage.get(10);
expect(remaining).to.be.empty;
});

it("should automatically flush records at interval", async () => {
io.start();
io.push({ id: 1, value: "X" });
io.push({ id: 2, value: "Y" });

await new Promise((resolve) => setTimeout(resolve, 200)); // wait for interval flush

expect(worker.processed).to.deep.include.members([
{ id: 1, value: "X" },
{ id: 2, value: "Y" },
]);
});

it("should respect batchSize when flushing", async () => {
io.push({ id: 1, value: "one" });
io.push({ id: 2, value: "two" });
io.push({ id: 3, value: "three" });

await (io as any).flush();

// first 2 processed, 1 left in storage
expect(worker.processed).to.deep.equal([
{ id: 1, value: "one" },
{ id: 2, value: "two" },
]);
expect(storage.get(10)).to.deep.equal([{ id: 3, value: "three" }]);
});

it("should handle worker errors gracefully", async () => {
class FailingWorker implements Worker<RecordType, RecordType> {
async work(): Promise<RecordType[]> {
throw new Error("worker failed");
}
}
const failingIO = new IO({
storage: new MemoryStorage<RecordType>(),
worker: new FailingWorker(),
batchSize: 2,
flushInterval: 100,
});
failingIO.push({ id: 99, value: "bad" });

await expect((failingIO as any).flush()).to.be.fulfilled;
});

it("should stop flushing when stop() is called", async () => {
io.start();
io.stop();

io.push({ id: 1, value: "stop-test" });
await new Promise((resolve) => setTimeout(resolve, 200));

expect(worker.processed).to.not.deep.include({ id: 1, value: "stop-test" });
});

it("should handle concurrent push and flush safely", async () => {
io.push({ id: 1, value: "X" });
const flushPromise = (io as any).flush();
io.push({ id: 2, value: "Y" });
await flushPromise;

// at least one record should remain in storage, but none should be lost
const all = [...worker.processed, ...storage.get(10)];
expect(all).to.deep.include.members([
{ id: 1, value: "X" },
{ id: 2, value: "Y" },
]);
});
});
7 changes: 0 additions & 7 deletions tests/io.spec.ts

This file was deleted.

Loading