-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/multicontroller #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fece9ec
1b3aafe
aa65e4c
3aea7c8
a8ed0d2
857b953
c4a8338
0cdbf76
ec4cfd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 16.14.2 | ||
| 16.14.0 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,8 @@ | |
| // You should have received a copy of the GNU General Public License | ||
| // along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
|
||
| /* eslint max-statements: ["error", 25] */ | ||
|
|
||
| import EventEmitter from 'events' | ||
|
|
||
| /** | ||
|
|
@@ -32,6 +34,13 @@ export default class DataBuffer extends EventEmitter { | |
| #currentStatus | ||
| #cache | ||
|
|
||
| #semaphore = 'DIBS' | ||
| #closing = false | ||
| #foundSemaphore = false | ||
| #semaphoreChecking = false | ||
| #semaphoreAmountChecks = 10 | ||
| #semaphoreCheckCounter = 0 | ||
|
|
||
| /** | ||
| * Initialize the DataBuffer | ||
| * | ||
|
|
@@ -56,11 +65,17 @@ export default class DataBuffer extends EventEmitter { | |
| this.#currentStatus = this.#status.init | ||
| } | ||
|
|
||
| close () { | ||
| this.logger.debug('Closing DataBuffer') | ||
| this.removeAllListeners() | ||
| this.#closing = true | ||
| } | ||
|
|
||
| get ttl () { | ||
| return this.#stdTTL | ||
| } | ||
|
|
||
| get raceTime () { | ||
| get raceTimeMs () { | ||
| return this.#allowedRaceTimeMs | ||
| } | ||
|
|
||
|
|
@@ -90,15 +105,22 @@ export default class DataBuffer extends EventEmitter { | |
| * Return a promise that resolves with the data | ||
| * or undefined when it was expired, not available or timed out | ||
| * | ||
| * @returns {Promise<undefined|object|Array>} | ||
| * @returns {Promise<undefined|object|array>} | ||
| */ | ||
| async get () { | ||
| // If cache exists, set the status and proceed | ||
| try { | ||
| const result = await this.#cache.exists(this.key) | ||
| if (result !== false) { | ||
| this.#currentStatus = this.#status.finished | ||
| } | ||
| } catch (e) { } | ||
|
|
||
| // if the status is still set on initializing set a semaphore | ||
| if (this.#currentStatus === this.#status.init) { | ||
apiest-pon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.#cache.set(this.key, this.#semaphore) | ||
| } | ||
|
|
||
| return this.waitForResponse() | ||
| } | ||
|
|
||
|
|
@@ -115,10 +137,39 @@ export default class DataBuffer extends EventEmitter { | |
| } | ||
|
|
||
| const result = await this.#cache.set(this.key, JSON.stringify(value), { EX: ttl }) | ||
| this.triggerItemSet(ttl) | ||
| return result | ||
| } | ||
|
|
||
| async checkSemaphore () { | ||
apiest-pon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await new Promise(resolve => setTimeout(resolve, this.#allowedRaceTimeMs / this.#semaphoreAmountChecks)) | ||
| this.logger.debug('Checking semaphore') | ||
| this.#semaphoreCheckCounter++ | ||
| const data = await this.#cache.get(this.key) | ||
| if (data !== this.#semaphore) { | ||
| this.logger.debug('Semaphore is replaced with real data!') | ||
| this.triggerItemSet() | ||
| this.#semaphoreChecking = false | ||
| this.#foundSemaphore = false | ||
| this.#semaphoreCheckCounter = 0 | ||
| return true | ||
| } | ||
|
|
||
| // Jump out of the recursion if this variable is set to false | ||
| // or when the racetime has passed | ||
| if (this.#semaphoreChecking === false || this.#semaphoreCheckCounter > this.#semaphoreAmountChecks) { | ||
| this.logger.debug('Semaphore is taking too long, aborting!') | ||
| this.#semaphoreCheckCounter = 0 | ||
| this.#semaphoreChecking = false | ||
| return false | ||
| } | ||
| return this.checkSemaphore() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why recursion?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could rewrite to while(true) loop with a break, but this seemed better controllable
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do need to convert to while loop |
||
| } | ||
|
|
||
| triggerItemSet (ttl = this.#stdTTL) { | ||
| this.#currentStatus = this.#status.finished | ||
| this.setExpiry(ttl) // reset expiry | ||
| this.emit(this.#status.finished) // notify all observers waiting for this request | ||
| return result | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -132,19 +183,33 @@ export default class DataBuffer extends EventEmitter { | |
| return undefined | ||
| } | ||
|
|
||
| // the status is finished if the data is still there return it | ||
| // the status is finished and if the data is still there return it | ||
| if (this.#currentStatus === this.#status.finished) { | ||
| const data = await this.#cache.get(this.key) | ||
|
|
||
| if (data === this.#semaphore) { | ||
| this.#currentStatus = this.#status.running | ||
| this.#foundSemaphore = true | ||
| this.logger.debug('Semaphore found') | ||
| // first one will start the checking, so there will only be one checkSemaphore process per databuffer | ||
| if (this.#semaphoreChecking === false) { | ||
| this.#semaphoreChecking = true | ||
| this.checkSemaphore() | ||
| } | ||
| return this.waitForResponse() | ||
| } | ||
|
|
||
| // it is possible that the cache is expired between exists call and the get call | ||
| // if that happens restart the process | ||
| if (data === undefined || data === null) { | ||
| const parsedJSON = this.tryParseJSONObject(data) | ||
|
|
||
| if (parsedJSON === false) { | ||
| this.#currentStatus = this.#status.running | ||
| return undefined | ||
| } | ||
|
|
||
| this.logger.trace(`Cache hit for key: ${this.key}`) | ||
| return JSON.parse(data) | ||
| this.logger.debug(`Cache hit for key: ${this.key}`) | ||
| return parsedJSON | ||
| } | ||
|
|
||
| // the status is running, so we wait until the cache gets set | ||
|
|
@@ -173,4 +238,21 @@ export default class DataBuffer extends EventEmitter { | |
| // return who's done first | ||
| return Promise.race([dataPromise, timeoutPromise]) | ||
| } | ||
|
|
||
| // StackOverflow: https://stackoverflow.com/a/20392392 | ||
| tryParseJSONObject (jsonString) { | ||
| try { | ||
| const o = JSON.parse(jsonString) | ||
|
|
||
| // Handle non-exception-throwing cases: | ||
| // Neither JSON.parse(false) or JSON.parse(1234) throw errors, hence the type-checking, | ||
| // but... JSON.parse(null) returns null, and typeof null === "object", | ||
| // so we must check for that, too. Thankfully, null is falsey, so this suffices: | ||
| if (o && typeof o === 'object') { | ||
| return o | ||
| } | ||
| } catch (e) { } | ||
|
|
||
| return false | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stdTTL seems like a fixed value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can set it in the constructor