Skip to content

HttpFetchStream lost chunks #8201

@60-hz

Description

@60-hz

I had frequently missing chunks when using some API and HttpFetchStream. Claude identified two bugs causing dropped chunks:

  1. Concurrent readssetInterval(read, 50) fires new reader.read() calls before previous ones resolve, causing silent data loss on the ReadableStream.
  2. No inter-chunk buffer – a single network chunk can split a JSON payload mid-line; without buffering incomplete lines across chunks, JSON.parse fails and those tokens are dropped.

Fix: replace the interval with a recursive .then() loop, and accumulate decoded text in a buffer, only processing complete \n-terminated lines.

Her is a fixed version with comments as example, I also added a very handy "done" output trigger fired on stream end.

const inres   = op.inObject("Fetch Response"),
      outObje  = op.outObject("Result"),
      outTrig  = op.outTrigger("Received Result"),
      outStart = op.outTrigger("Started"),
      outDone  = op.outTrigger("Done");          // new: fires when stream is fully received

inres.onChange = start;

let reader;
let decoder;
let buffer = "";                                 // new: inter-chunk buffer to handle split JSON lines

function start()
{
    const response = inres.get();
    if (!response) return;

    outStart.trigger();
    buffer = "";                                 // new: reset buffer on each new stream

    try
    {
        reader  = response.body.getReader();
        decoder = new TextDecoder("utf-8");
        readNext();                              // changed: single recursive call instead of setInterval
    }
    catch (e) { console.log("stream init error", e); }
}

function readNext()
{
    reader.read().then(({ done, value }) =>      // changed: proper destructuring of done + value
    {
        if (done)
        {
            outDone.trigger();                   // new: universal stream end, works with any SSE API
            return;
        }

        buffer += decoder.decode(value, { stream: true }); // new: accumulate into buffer (no .trim() on raw chunk)

        const lines = buffer.split("\n");
        buffer = lines.pop();                    // new: keep last incomplete line in buffer for next chunk

        lines.forEach((line) =>
        {
            line = line.trim();
            if (!line) return;
            if (line === "data: [DONE]") return; // changed: just skip OpenAI-specific marker, Done trigger is handled above
            if (line.startsWith("data: ")) line = line.slice(6);
            else return;

            try
            {
                const json = JSON.parse(line);
                outObje.setRef(json);
                outTrig.trigger();
            }
            catch (err) { console.log("JSON parse error:", err, "line:", line); }
        });

        readNext();                              // changed: recursive call AFTER processing, never concurrent
    })
    .catch((err) => { console.log("read error", err); });
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions