diff --git a/.nvmrc b/.nvmrc
index a1fe187..0b77208 100644
--- a/.nvmrc
+++ b/.nvmrc
@@ -1 +1 @@
-16.14.2
\ No newline at end of file
+16.14.0
\ No newline at end of file
diff --git a/README.md b/README.md
index fe89e23..e7486b3 100644
--- a/README.md
+++ b/README.md
@@ -8,9 +8,10 @@ The first request is responsible for setting the cache with the correct data. Th
More examples in the src/example directory
```javascript
-import DataBufferController from './DataBufferController.js'
+import {DataBufferController, Cache} from '@pondigitalsolutions/data-buffer-cache'
-const controller = new DataBufferController(null, console )
+const cache = new Cache()
+const controller = await DataBufferController.create({cache, logger: console})
const val = await controller.get('test')
console.log('Should be undefined', val) // undefined
diff --git a/src/DataBuffer.js b/src/DataBuffer.js
index 9975071..262ecc9 100644
--- a/src/DataBuffer.js
+++ b/src/DataBuffer.js
@@ -13,6 +13,8 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
+/* eslint max-statements: ["error", 25] */
+
import EventEmitter from 'events'
/**
@@ -32,6 +34,13 @@ export default class DataBuffer extends EventEmitter {
#currentStatus
#cache
+ #semaphore = 'DIBS'
+ #closing = false
+ #foundSemaphore = false
+ #semaphoreChecking = false
+ #semaphoreAmountChecks = 10
+ #semaphoreCheckCounter = 0
+
/**
* Initialize the DataBuffer
*
@@ -56,11 +65,17 @@ export default class DataBuffer extends EventEmitter {
this.#currentStatus = this.#status.init
}
+ close () {
+ this.logger.debug('Closing DataBuffer')
+ this.removeAllListeners()
+ this.#closing = true
+ }
+
get ttl () {
return this.#stdTTL
}
- get raceTime () {
+ get raceTimeMs () {
return this.#allowedRaceTimeMs
}
@@ -90,15 +105,22 @@ export default class DataBuffer extends EventEmitter {
* Return a promise that resolves with the data
* or undefined when it was expired, not available or timed out
*
- * @returns {Promise}
+ * @returns {Promise}
*/
async get () {
+ // If cache exists, set the status and proceed
try {
const result = await this.#cache.exists(this.key)
if (result !== false) {
this.#currentStatus = this.#status.finished
}
} catch (e) { }
+
+ // if the status is still set on initializing set a semaphore
+ if (this.#currentStatus === this.#status.init) {
+ this.#cache.set(this.key, this.#semaphore)
+ }
+
return this.waitForResponse()
}
@@ -115,10 +137,39 @@ export default class DataBuffer extends EventEmitter {
}
const result = await this.#cache.set(this.key, JSON.stringify(value), { EX: ttl })
+ this.triggerItemSet(ttl)
+ return result
+ }
+
+ async checkSemaphore () {
+ await new Promise(resolve => setTimeout(resolve, this.#allowedRaceTimeMs / this.#semaphoreAmountChecks))
+ this.logger.debug('Checking semaphore')
+ this.#semaphoreCheckCounter++
+ const data = await this.#cache.get(this.key)
+ if (data !== this.#semaphore) {
+ this.logger.debug('Semaphore is replaced with real data!')
+ this.triggerItemSet()
+ this.#semaphoreChecking = false
+ this.#foundSemaphore = false
+ this.#semaphoreCheckCounter = 0
+ return true
+ }
+
+ // Jump out of the recursion if this variable is set to false
+ // or when the racetime has passed
+ if (this.#semaphoreChecking === false || this.#semaphoreCheckCounter > this.#semaphoreAmountChecks) {
+ this.logger.debug('Semaphore is taking too long, aborting!')
+ this.#semaphoreCheckCounter = 0
+ this.#semaphoreChecking = false
+ return false
+ }
+ return this.checkSemaphore()
+ }
+
+ triggerItemSet (ttl = this.#stdTTL) {
this.#currentStatus = this.#status.finished
this.setExpiry(ttl) // reset expiry
this.emit(this.#status.finished) // notify all observers waiting for this request
- return result
}
/**
@@ -132,19 +183,33 @@ export default class DataBuffer extends EventEmitter {
return undefined
}
- // the status is finished if the data is still there return it
+ // the status is finished and if the data is still there return it
if (this.#currentStatus === this.#status.finished) {
const data = await this.#cache.get(this.key)
+ if (data === this.#semaphore) {
+ this.#currentStatus = this.#status.running
+ this.#foundSemaphore = true
+ this.logger.debug('Semaphore found')
+ // first one will start the checking, so there will only be one checkSemaphore process per databuffer
+ if (this.#semaphoreChecking === false) {
+ this.#semaphoreChecking = true
+ this.checkSemaphore()
+ }
+ return this.waitForResponse()
+ }
+
// it is possible that the cache is expired between exists call and the get call
// if that happens restart the process
- if (data === undefined || data === null) {
+ const parsedJSON = this.tryParseJSONObject(data)
+
+ if (parsedJSON === false) {
this.#currentStatus = this.#status.running
return undefined
}
- this.logger.trace(`Cache hit for key: ${this.key}`)
- return JSON.parse(data)
+ this.logger.debug(`Cache hit for key: ${this.key}`)
+ return parsedJSON
}
// the status is running, so we wait until the cache gets set
@@ -173,4 +238,21 @@ export default class DataBuffer extends EventEmitter {
// return who's done first
return Promise.race([dataPromise, timeoutPromise])
}
+
+ // StackOverflow: https://stackoverflow.com/a/20392392
+ tryParseJSONObject (jsonString) {
+ try {
+ const o = JSON.parse(jsonString)
+
+ // Handle non-exception-throwing cases:
+ // Neither JSON.parse(false) or JSON.parse(1234) throw errors, hence the type-checking,
+ // but... JSON.parse(null) returns null, and typeof null === "object",
+ // so we must check for that, too. Thankfully, null is falsey, so this suffices:
+ if (o && typeof o === 'object') {
+ return o
+ }
+ } catch (e) { }
+
+ return false
+ }
}
diff --git a/src/DataBufferController.js b/src/DataBufferController.js
index 2e047fa..f36a047 100644
--- a/src/DataBufferController.js
+++ b/src/DataBufferController.js
@@ -30,6 +30,15 @@ import DataBuffer from './DataBuffer.js'
* @typedef {import('./Cache.js').default} Cache
*/
+/**
+ * A DataBufferController Object
+ * @typedef {Object} DataBufferControllerObject
+ * @property {Cache} cache A Cache object
+ * @property {Logger} logger A Logger object
+ * @property {number} ttl Time To Live in seconds
+ * @property {number} raceTimeMs How long a request can be queued, before it is ignored and retried in milli-seconds
+ */
+
export default class DataBufferController {
#items
#cache
@@ -39,11 +48,7 @@ export default class DataBufferController {
/**
* Setup the Controller
*
- * @param {object} obj
- * @param {Cache} obj.cache A Cache object
- * @param {Logger} obj.logger A Logger object
- * @param {number} obj.ttl Time To Live in seconds
- * @param {number} obj.raceTimeMs How long a request can be queued, before it is ignored and retried in milli-seconds
+ * @param {DataBufferControllerObject} param
* @throws {Error} When the cache is not set
*/
constructor ({ cache, logger = console, ttl = 300, raceTimeMs = 30000 }) {
@@ -57,9 +62,6 @@ export default class DataBufferController {
}
this.#cache = cache
- // start the cache
- this.cacheStart()
-
// remove expired caches, call every 1/5 of the stdTTL
this.#intervalRef = setInterval(this.cacheCleaning.bind(this), this.ttl * this.#fifthSecondInMs)
}
@@ -67,9 +69,11 @@ export default class DataBufferController {
// cleanup
async close () {
this.logger.debug('Stopping DataBufferController')
+ clearInterval(this.#intervalRef)
await this.#cache.quit()
+ Object.values(this.#items).forEach(dataBuffer => dataBuffer.close())
this.#items = null
- clearInterval(this.#intervalRef)
+ return 'bye'
}
/**
@@ -146,4 +150,22 @@ export default class DataBufferController {
get amountOfCachedKeys () {
return Object.keys(this.#items).length
}
+
+ // returns the statusus of the cache, usefull for tests
+ get bufferStatus () {
+ return Object.values(this.#items).map(dataBuffer => dataBuffer.status)
+ }
+
+ /**
+ * Setup the Controller
+ *
+ * @param {DataBufferControllerObject} data
+ * @return {DataBufferController}
+ **/
+ static async create (data) {
+ const dbc = new DataBufferController(data)
+ // start the cache
+ await dbc.cacheStart()
+ return dbc
+ }
}
diff --git a/src/__tests__/databuffer.unit.js b/src/__tests__/databuffer.unit.js
index d3a4640..8cbcfa3 100644
--- a/src/__tests__/databuffer.unit.js
+++ b/src/__tests__/databuffer.unit.js
@@ -33,7 +33,7 @@ describe('Test the DataBuffer', () => {
])('Basic initialization', (params, expected) => {
const db = new DataBuffer(params)
expect(db.ttl).toEqual(expected.ttl)
- expect(db.raceTime).toEqual(expected.raceTimeMs)
+ expect(db.raceTimeMs).toEqual(expected.raceTimeMs)
expect(db.raceTimeInSeconds).toEqual(expected.raceTimeMs / 1000)
expect(db.logger).toBeDefined()
db.cleanUp()
diff --git a/src/__tests__/databuffercontroller.unit.js b/src/__tests__/databuffercontroller.unit.js
index 32e3f1a..9138bd1 100644
--- a/src/__tests__/databuffercontroller.unit.js
+++ b/src/__tests__/databuffercontroller.unit.js
@@ -13,7 +13,9 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-import { expect, describe, test, afterAll } from '@jest/globals'
+/* eslint max-statements: ["error", 25] */
+
+import { expect, describe, test, jest } from '@jest/globals'
import DataBufferController from '../DataBufferController.js'
import Cache from '../Cache.js'
@@ -25,13 +27,10 @@ const logger = {
}
const cache = new Cache()
-const controller = new DataBufferController({ logger, ttl: 2, cache })
-
-afterAll(() => controller.close())
describe('Test the Controller', () => {
test('If the controller throws an error when the cache is not set', () => {
- expect(() => new DataBufferController({ logger, ttl: 1 })).toThrow('Cache is not provided.')
+ expect(DataBufferController.create({ logger, ttl: 1 })).rejects.toThrow('Cache is not provided.')
})
test.each([
@@ -40,15 +39,16 @@ describe('Test the Controller', () => {
[{ logger, cache, raceTimeMs: 20 }, { ttl: 300, raceTimeMs: 20 }],
[{ logger, cache, raceTimeMs: 20, ttl: 200 }, { ttl: 200, raceTimeMs: 20 }],
[{ cache }, { ttl: 300, raceTimeMs: 30000 }]
- ])('Basic initialization', (params, expected) => {
- const dbc = new DataBufferController(params)
+ ])('Basic initialization', async (params, expected) => {
+ const dbc = await DataBufferController.create(params)
expect(dbc.ttl).toEqual(expected.ttl)
- expect(dbc.raceTime).toEqual(expected.raceTime)
+ expect(dbc.raceTimeMs).toEqual(expected.raceTimeMs)
expect(dbc.logger).toBeDefined()
- dbc.close()
+ await dbc.close()
})
test('The basic functionallity', async () => {
+ const controller = await DataBufferController.create({ logger, ttl: 2, cache })
const key = 'controller_test'
const expected = await controller.get(key)
expect(expected).toEqual(undefined)
@@ -56,9 +56,11 @@ describe('Test the Controller', () => {
const expected2 = await controller.get(key)
expect(expected2).toEqual({ found: true })
+ await controller.close()
})
test('The sequential requests should be queued and waiting till the cache has been set', async () => {
+ const controller = await DataBufferController.create({ logger, ttl: 2, cache })
const key = 'test2'
const list = [
controller.get(key),
@@ -77,5 +79,84 @@ describe('Test the Controller', () => {
controller.set(key, { found: true })
const expected2 = await Promise.all(list)
expect(expected2.filter(item => item).length).toBe(9)
+ await controller.close()
+ })
+})
+
+describe('Multicontroller - single cache usecase', () => {
+ test('The sequential requests should be queued, also over multiple controllers, and waiting till the cache has been set', async () => {
+ const singleCache = new Cache()
+ const controllerA = await DataBufferController.create({ logger, cache: singleCache, ttl: 10, raceTimeMs: 10000 })
+ const controllerB = await DataBufferController.create({ logger, cache: singleCache, ttl: 10, raceTimeMs: 10000 })
+
+ const key = 'multitest'
+ const list = [
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key)
+ ]
+
+ const expectedA = await list[0]
+ expect(expectedA).toBe(undefined)
+ expect(controllerA.bufferStatus).toEqual(['running'])
+ expect(controllerB.bufferStatus).toEqual([])
+
+ list.push(controllerB.get(key))
+ // give async job a change to initialize
+ await new Promise(resolve => setTimeout(resolve, 500))
+
+ const expectedB = list[5]
+ expect(expectedB).not.toBe(undefined)
+
+ for (let i = 0; i < 4; i++) {
+ list.push(controllerB.get(key))
+ }
+
+ expect(controllerB.bufferStatus).toEqual(['running'])
+
+ controllerA.set(key, { found: 42 })
+
+ const expectedList = await Promise.all(list)
+ expect(expectedList.filter(item => item).length).toBe(9)
+
+ const result = await Promise.all([controllerA.close(), controllerB.close()])
+ expect(result).toEqual(['bye', 'bye'])
+ })
+
+ test('Multiple controllers, test the semaphore checker', async () => {
+ const singleCache = new Cache()
+ const controllerA = await DataBufferController.create({ logger, cache: singleCache, ttl: 10, raceTimeMs: 5000 })
+ const controllerB = await DataBufferController.create({ logger, cache: singleCache, ttl: 10, raceTimeMs: 2000 })
+ const debugSpy = jest.fn()
+ controllerB.logger.debug = (txt) => debugSpy(txt)
+
+ const key = 'semaphore-checker'
+ const list = [
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key)
+ ]
+ const expectedA = await list[0]
+ expect(expectedA).toBe(undefined)
+
+ list.push(controllerB.get(key))
+ // give async job a change to initialize
+ await new Promise(resolve => setTimeout(resolve, 2500))
+ expect(debugSpy).toBeCalledTimes(13)
+ expect(debugSpy).toHaveBeenCalledWith('Semaphore found')
+ expect(debugSpy).toHaveBeenNthCalledWith(10, 'Checking semaphore')
+ expect(debugSpy).toHaveBeenLastCalledWith('Semaphore is taking too long, aborting!')
+
+ const expectedB = await list[5]
+ expect(expectedB).toBe(undefined)
+
+ const result = await Promise.all([controllerA.close(), controllerB.close()])
+ expect(result).toEqual(['bye', 'bye'])
+ expect(debugSpy).toHaveBeenCalledWith('Closing DataBuffer')
+ expect(debugSpy).toHaveBeenCalledWith('Stopping DataBufferController')
})
})
diff --git a/src/__tests__/databuffercontroller2.unit.js b/src/__tests__/databuffercontroller2.unit.js
index 6ed466f..2544444 100644
--- a/src/__tests__/databuffercontroller2.unit.js
+++ b/src/__tests__/databuffercontroller2.unit.js
@@ -25,13 +25,13 @@ const logger = {
}
describe('Test the Controller', () => {
- test('The timer should call the CacheClean function periodically', () => {
+ test('The timer should call the CacheClean function periodically', async () => {
const cache = new Cache()
jest.useFakeTimers('modern')
jest.setSystemTime(new Date(2020, 12, 5, 20, 0, 0))
const cleanSpy = jest.fn()
const ttl = 2
- const controller = new DataBufferController({ logger, ttl, cache })
+ const controller = await DataBufferController.create({ logger, ttl, cache })
controller.logger.trace = (txt) => cleanSpy(txt)
expect(cleanSpy).toHaveBeenCalledTimes(0)
expect(controller.amountOfCachedKeys).toBe(0)
@@ -48,7 +48,7 @@ describe('Test the Controller', () => {
controller.close()
})
- test('Test the Cache emitters', () => {
+ test('Test the Cache emitters', async () => {
const cache = new Cache()
const cleanSpy1 = jest.fn()
@@ -61,11 +61,12 @@ describe('Test the Controller', () => {
}
const ttl = 2
- const controller = new DataBufferController({ logger: loggerSpy, cache, ttl, raceTimeMs: 20 })
+ const controller = await DataBufferController.create({ logger: loggerSpy, cache, ttl, raceTimeMs: 20 })
- expect(cleanSpy2).toHaveBeenCalledTimes(2)
+ expect(cleanSpy2).toHaveBeenCalledTimes(3)
expect(cleanSpy2).toHaveBeenCalledWith('Cache is Connected')
- expect(cleanSpy2).toHaveBeenLastCalledWith('Cache is Ready')
+ expect(cleanSpy2).toHaveBeenCalledWith('Cache is Ready')
+ expect(cleanSpy2).toHaveBeenLastCalledWith('Cache is Setup')
cache.emit('error', 'Emitted Error')
expect(cleanSpy1).toHaveBeenCalledTimes(2)
expect(cleanSpy1).toHaveBeenCalledWith('CACHE ERROR')
diff --git a/src/example/default.example.js b/src/example/default.example.js
index ebaad58..4af2e8c 100644
--- a/src/example/default.example.js
+++ b/src/example/default.example.js
@@ -18,7 +18,7 @@ import { Cache, DataBufferController } from '../index.js'
const logger = console
const inMemoryCache = new Cache()
-const controller = new DataBufferController({ logger, cache: inMemoryCache })
+const controller = await DataBufferController.create({ logger, cache: inMemoryCache })
const val = await controller.get('test')
logger.log('Should be undefined', val) // undefined
@@ -46,4 +46,4 @@ await controller.set('test_multi', { data: 'Buffer' })
logger.log(await Promise.all(vals))
-controller.close()
+await controller.close()
diff --git a/src/example/multiController.example.js b/src/example/multiController.example.js
new file mode 100644
index 0000000..0d845d2
--- /dev/null
+++ b/src/example/multiController.example.js
@@ -0,0 +1,31 @@
+import { Cache, DataBufferController } from '../index.js'
+
+const logger = console
+
+const singleCache = new Cache()
+const controllerA = await DataBufferController.create({ logger, cache: singleCache })
+const controllerB = await DataBufferController.create({ logger, cache: singleCache })
+
+const key = 'multitest'
+const list = [
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key),
+ controllerA.get(key)
+]
+await list[0]
+
+list.push(controllerB.get(key))
+list.push(controllerB.get(key))
+list.push(controllerB.get(key))
+
+await new Promise(resolve => setTimeout(resolve, 5000))
+
+controllerA.set(key, { found: 42 })
+
+Promise.all(list).then(res => {
+ logger.info(res)
+ controllerA.close()
+ controllerB.close()
+})
diff --git a/src/example/redis.example.js b/src/example/redis.example.js
index b02cad7..b38a96d 100644
--- a/src/example/redis.example.js
+++ b/src/example/redis.example.js
@@ -19,7 +19,7 @@ import { DataBufferController } from '../index.js'
const logger = console
const cache = createClient() // redis
-const controller = new DataBufferController({ logger, cache })
+const controller = await DataBufferController.create({ logger, cache })
const val = await controller.get('test')
logger.log('Should be undefined', val) // undefined
@@ -47,4 +47,4 @@ await controller.set('test_multi', { data: 'Buffer' })
logger.log(await Promise.all(vals))
-controller.close()
+await controller.close()