Improve promise handling for very different promise runtimes (very slow vs. very fast)#17
Improve promise handling for very different promise runtimes (very slow vs. very fast)#17sker65 wants to merge 12 commits intounderstand-ai:mainfrom
Conversation
src/asyncBufferedTransformer.ts
Outdated
| const existingPromise = buffer[index]; | ||
| if (existingPromise) { | ||
| yield await existingPromise.promise; | ||
| const result = await Promise.any(buffer.map(p => p?.promise).filter(p => !!p)); |
There was a problem hiding this comment.
If I read the code right, you're changing the semantics here. asyncBufferedTransformer guarantees the same ordering of results as the promises that go in. This is important e.g. when handling kafka partitions or mongodb logs.
If you want different semantics, please write a new method asyncBufferedUnorderedTransformer
Co-authored-by: Daniel Draper <Germandrummer92@gmail.com>
…issue Fix-the-circular-buffer-inde-issue
| // Note: here we already pulled a promise _and_ have a buffer of bufferSize promises | ||
| // that's why bufferSize + 1 = numberOfParallelExecutions | ||
| if (buffer.size >= bufferSize) { | ||
| const result = await Promise.any(Array.from(buffer).map(p => p.promise)); |
There was a problem hiding this comment.
Sorry I missed your code updates before.
I don't like how this adds a linear cost to every await. However, in practive bufferSize is probably relatively small and this does not matter.
I think this should use Promise.race, though, so that we get rejections as they occur (currently, they would just be ignored here because Promise.any only cares about fulfilled (= non-rejected) promises.
The current error handling means the first rejection stops the stream. This is a bad design choice we made, so feel free to keep it. A better way however would be to catch exceptions from awaiting rejected promises and yield the rejected promise again. Then error would just occur "naturally" and dont necessarily stop the stream. Instead, it would be up to the caller. I would very much prefer that API but I see that consistency with the existing behaviour might also be desired. So I'll leave it to the only user of this API to decide :)
| // Note: here we already pulled a promise _and_ have a buffer of bufferSize promises | ||
| // that's why bufferSize + 1 = numberOfParallelExecutions | ||
| if (buffer.size >= bufferSize) { | ||
| const result = await Promise.any(Array.from(buffer).map(p => p.promise)); |
There was a problem hiding this comment.
If you want to avoid the array-construction-and-iteration-by-promise.race-on-every-await: (Not totally necessary, just something to consider).
Implementing an unbounded channel as a thin wrapper around Web Streams ReadableStream
export function channel() {
let controller;
const stream = new ReadableStream({
start(c) {
controller = c;
},
});
const reader = stream.getReader();
return {
send(value) {
controller.enqueue(value);
},
close() {
controller.close();
},
async receive() {
return reader.read(); // { value, done }
},
};
}You can then wrap each promise with a function that send()s the fulfilled value (both resolve and reject) into the stream. If the buffer is full, you can yield await channel.receive (plus error handling, see other comment). No linear-time operations on each await needed :)
No description provided.