From 2f58376e0242206b4c6efdd926e54195e95fffcc Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Tue, 18 Jun 2024 15:09:02 +0200 Subject: [PATCH 1/2] UnionIterator skips not readable subiterators --- asynciterator.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 38f5872..715f9a2 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1717,7 +1717,7 @@ export class MultiTransformIterator extends TransformIterator { @extends module:asynciterator.BufferedIterator */ export class UnionIterator extends BufferedIterator { - private _sources : InternalSource[] = []; + private _sources : {read: boolean, source: InternalSource}[] = []; private _pending? : { loading: boolean, sources?: AsyncIterator>> }; private _currentSource = -1; protected _destroySources: boolean; @@ -1784,10 +1784,14 @@ export class UnionIterator extends BufferedIterator { if (isPromise(source)) source = wrap(source) as any as InternalSource; if (!source.done) { - this._sources.push(source); + const sourceObj = { read: true, source }; + this._sources.push(sourceObj); source[DESTINATION] = this; source.on('error', destinationEmitError); - source.on('readable', destinationFillBuffer); + source.on('readable', () => { + sourceObj.read = true; + destinationFillBuffer.bind(>sourceObj.source)(); + }); source.on('end', destinationRemoveEmptySources); } } @@ -1796,9 +1800,9 @@ export class UnionIterator extends BufferedIterator { protected _removeEmptySources() { this._sources = this._sources.filter((source, index) => { // Adjust the index of the current source if needed - if (source.done && index <= this._currentSource) + if (source.source.done && index <= this._currentSource) this._currentSource--; - return !source.done; + return !source.source.done; }); this._fillBuffer(); } @@ -1818,7 +1822,7 @@ export class UnionIterator extends BufferedIterator { this._currentSource = (this._currentSource + 1) % this._sources.length; const source = this._sources[this._currentSource]; // Attempt to read an item from that source - if ((item = source.read()) !== null) { + if ((item = source.source.read()) !== null) { count--; this._push(item); } @@ -1837,7 +1841,7 @@ export class UnionIterator extends BufferedIterator { // Destroy all sources that are still readable if (this._destroySources) { for (const source of this._sources) - source.destroy(); + source.source.destroy(); // Also close the sources stream if applicable if (this._pending) { From ffc6c1e0aa7d1820b6bd579a10dbc52bbe95296c Mon Sep 17 00:00:00 2001 From: maartenvandenbrande Date: Tue, 18 Jun 2024 16:19:06 +0200 Subject: [PATCH 2/2] Fix lint, rename variable, add skip in _read() function --- asynciterator.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 715f9a2..4da632e 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1717,7 +1717,7 @@ export class MultiTransformIterator extends TransformIterator { @extends module:asynciterator.BufferedIterator */ export class UnionIterator extends BufferedIterator { - private _sources : {read: boolean, source: InternalSource}[] = []; + private _sources : { requiresRead: boolean, source: InternalSource }[] = []; private _pending? : { loading: boolean, sources?: AsyncIterator>> }; private _currentSource = -1; protected _destroySources: boolean; @@ -1784,12 +1784,12 @@ export class UnionIterator extends BufferedIterator { if (isPromise(source)) source = wrap(source) as any as InternalSource; if (!source.done) { - const sourceObj = { read: true, source }; + const sourceObj = { requiresRead: true, source }; this._sources.push(sourceObj); source[DESTINATION] = this; source.on('error', destinationEmitError); source.on('readable', () => { - sourceObj.read = true; + sourceObj.requiresRead = true; destinationFillBuffer.bind(>sourceObj.source)(); }); source.on('end', destinationRemoveEmptySources); @@ -1821,6 +1821,9 @@ export class UnionIterator extends BufferedIterator { // Pick the next source this._currentSource = (this._currentSource + 1) % this._sources.length; const source = this._sources[this._currentSource]; + if (!source.source.readable && !source.requiresRead) + continue; + source.requiresRead = false; // Attempt to read an item from that source if ((item = source.source.read()) !== null) { count--;