From 0f0c7c84cf22c2672ac8deb744fb3235e46f180f Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Tue, 28 Oct 2025 11:09:23 +0100 Subject: [PATCH 1/7] fix: stamped putter support for streamed chunk endpoint --- pkg/api/chunk_stream.go | 56 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 2f91939f21a..eac8cd9c5c1 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -95,13 +95,14 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques } s.wsWg.Add(1) - go s.handleUploadStream(logger, wsConn, putter) + go s.handleUploadStream(logger, wsConn, putter, tag) } func (s *Service) handleUploadStream( logger log.Logger, conn *websocket.Conn, putter storer.PutterSession, + tag uint64, ) { defer s.wsWg.Done() @@ -190,14 +191,56 @@ func (s *Service) handleUploadStream( return } - chunk, err := cac.NewWithDataSpan(msg) + // Check if this message contains a per-chunk stamp prepended to the chunk data + // Format: stamp[113 bytes] + chunk[data] + var ( + chunk swarm.Chunk + chunkPutter = putter // default to connection-level putter + chunkData = msg + ) + + // If message is large enough to contain a stamp + chunk data, try to extract the stamp + if len(msg) >= postage.StampSize+swarm.SpanSize { + potentialStamp := msg[:postage.StampSize] + potentialChunkData := msg[postage.StampSize:] + + // Try to unmarshal as a stamp + stamp := postage.Stamp{} + if err := stamp.UnmarshalBinary(potentialStamp); err == nil { + // Valid stamp found - create a per-chunk putter + chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ + BatchID: stamp.BatchID(), + TagID: tag, + Deferred: tag != 0, + }, &stamp) + if err != nil { + logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) + logger.Error(nil, "chunk upload stream: failed to create stamped putter") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") + case errors.Is(err, postage.ErrNotFound): + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + default: + sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } + return + } + // Use the chunk data without the stamp + chunkData = potentialChunkData + // Note: we need to call Done on the per-chunk putter after Put + } + // If unmarshal failed, fall through to use the whole message as chunk data + } + + chunk, err = cac.NewWithDataSpan(chunkData) if err != nil { logger.Debug("chunk upload stream: create chunk failed", "error", err) logger.Error(nil, "chunk upload stream: create chunk failed") return } - err = putter.Put(ctx, chunk) + err = chunkPutter.Put(ctx, chunk) if err != nil { logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err) logger.Error(nil, "chunk upload stream: write chunk failed") @@ -210,6 +253,13 @@ func (s *Service) handleUploadStream( return } + // If we created a per-chunk putter, clean it up + if chunkPutter != putter { + if err := chunkPutter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter") + } + } + err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { s.logger.Debug("chunk upload stream: sending success message failed", "error", err) From 4ef1cb333ea72e0ee2daac6687b74965d9f0e590 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Thu, 30 Oct 2025 17:15:41 +0100 Subject: [PATCH 2/7] fix: per chunk stamp --- pkg/api/chunk_stream.go | 89 +++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index eac8cd9c5c1..cc96013981b 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -28,7 +28,7 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques logger := s.logger.WithName("chunks_stream").Build() headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: can be omitted for per-chunk stamping SwarmTag uint64 `map:"Swarm-Tag"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { @@ -55,29 +55,34 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques } } - // if tag not specified use direct upload - // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. - putter, err := s.newStamperPutter(context.Background(), putterOptions{ - BatchID: headers.BatchID, - TagID: tag, - Deferred: tag != 0, - }) - if err != nil { - logger.Debug("get putter failed", "error", err) - logger.Error(nil, "get putter failed") - switch { - case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): - jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") - case errors.Is(err, postage.ErrNotFound): - jsonhttp.NotFound(w, "batch with id not found") - case errors.Is(err, errInvalidPostageBatch): - jsonhttp.BadRequest(w, "invalid batch id") - case errors.Is(err, errUnsupportedDevNodeOperation): - jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) - default: - jsonhttp.BadRequest(w, nil) + // Create connection-level putter only if BatchID is provided + // If BatchID is not provided, per-chunk stamps must be used + var putter storer.PutterSession + if len(headers.BatchID) > 0 { + // if tag not specified use direct upload + // Using context.Background here because the putter's lifetime extends beyond that of the HTTP request. + putter, err = s.newStamperPutter(context.Background(), putterOptions{ + BatchID: headers.BatchID, + TagID: tag, + Deferred: tag != 0, + }) + if err != nil { + logger.Debug("get putter failed", "error", err) + logger.Error(nil, "get putter failed") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist") + case errors.Is(err, postage.ErrNotFound): + jsonhttp.NotFound(w, "batch with id not found") + case errors.Is(err, errInvalidPostageBatch): + jsonhttp.BadRequest(w, "invalid batch id") + case errors.Is(err, errUnsupportedDevNodeOperation): + jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation) + default: + jsonhttp.BadRequest(w, nil) + } + return } - return } upgrader := websocket.Upgrader{ @@ -115,8 +120,11 @@ func (s *Service) handleUploadStream( defer func() { cancel() _ = conn.Close() - if err = putter.Done(swarm.ZeroAddress); err != nil { - logger.Error(err, "chunk upload stream: syncing chunks failed") + // Only call Done on connection-level putter if it exists + if putter != nil { + if err = putter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: syncing chunks failed") + } } }() @@ -191,6 +199,11 @@ func (s *Service) handleUploadStream( return } + logger.Debug("chunk upload stream", + "message_size", len(msg), + "stamp_size", postage.StampSize, + "first_8_bytes", msg[:8]) + // Check if this message contains a per-chunk stamp prepended to the chunk data // Format: stamp[113 bytes] + chunk[data] var ( @@ -204,10 +217,18 @@ func (s *Service) handleUploadStream( potentialStamp := msg[:postage.StampSize] potentialChunkData := msg[postage.StampSize:] + logger.Debug("chunk upload stream: attempting to extract stamp", + "message_size", len(msg), + "stamp_size", postage.StampSize, + "potential_stamp_len", len(potentialStamp), + "potential_chunk_len", len(potentialChunkData), + "first_8_bytes", msg[:8]) + // Try to unmarshal as a stamp stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(potentialStamp); err == nil { // Valid stamp found - create a per-chunk putter + logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", stamp.BatchID(), "chunk_size", len(potentialChunkData)) chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ BatchID: stamp.BatchID(), TagID: tag, @@ -229,14 +250,30 @@ func (s *Service) handleUploadStream( // Use the chunk data without the stamp chunkData = potentialChunkData // Note: we need to call Done on the per-chunk putter after Put + } else { + // Stamp unmarshal failed - log for debugging + logger.Debug("chunk upload stream: stamp unmarshal failed, treating message as unstamped chunk", + "error", err, + "message_size", len(msg), + "stamp_size_expected", postage.StampSize, + "potential_stamp_len", len(potentialStamp)) } // If unmarshal failed, fall through to use the whole message as chunk data } + // If we don't have a putter at this point, the client must provide per-chunk stamps + if chunkPutter == nil { + logger.Debug("chunk upload stream: no stamp provided") + logger.Error(nil, "chunk upload stream: no batch ID in headers and no per-chunk stamp in message") + sendErrorClose(websocket.CloseInternalServerErr, "batch ID or per-chunk stamp required") + return + } + chunk, err = cac.NewWithDataSpan(chunkData) if err != nil { - logger.Debug("chunk upload stream: create chunk failed", "error", err) + logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData)) logger.Error(nil, "chunk upload stream: create chunk failed") + sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data") return } From 5687d88b994cbb6afe702f00c7f8f2b582aa7968 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Fri, 7 Nov 2025 18:49:00 +0100 Subject: [PATCH 3/7] fix: putter caching --- pkg/api/chunk_stream.go | 74 +++++++++++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index cc96013981b..be05cb4b139 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -117,9 +117,22 @@ func (s *Service) handleUploadStream( gone = make(chan struct{}) err error ) + + // Cache for per-chunk putters to avoid creating new sessions for every chunk + // Key: batch ID hex string, Value: putter session + putterCache := make(map[string]storer.PutterSession) + defer func() { cancel() _ = conn.Close() + + // Clean up all cached per-chunk putters + for batchID, cachedPutter := range putterCache { + if err := cachedPutter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: failed to finalize cached putter", "batch_id", batchID) + } + } + // Only call Done on connection-level putter if it exists if putter != nil { if err = putter.Done(swarm.ZeroAddress); err != nil { @@ -227,29 +240,44 @@ func (s *Service) handleUploadStream( // Try to unmarshal as a stamp stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(potentialStamp); err == nil { - // Valid stamp found - create a per-chunk putter - logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", stamp.BatchID(), "chunk_size", len(potentialChunkData)) - chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ - BatchID: stamp.BatchID(), - TagID: tag, - Deferred: tag != 0, - }, &stamp) - if err != nil { - logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) - logger.Error(nil, "chunk upload stream: failed to create stamped putter") - switch { - case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): - sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") - case errors.Is(err, postage.ErrNotFound): - sendErrorClose(websocket.CloseInternalServerErr, "batch not found") - default: - sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + // Valid stamp found - check cache first, create if needed + batchID := stamp.BatchID() + batchIDHex := string(batchID) // Use batch ID bytes as map key + logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", batchID, "chunk_size", len(potentialChunkData)) + + // Check if we already have a cached putter for this batch + cachedPutter, exists := putterCache[batchIDHex] + if exists { + // Reuse cached putter + chunkPutter = cachedPutter + logger.Debug("chunk upload stream: reusing cached putter", "batch_id", batchIDHex) + } else { + // Create new putter and cache it + chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ + BatchID: stamp.BatchID(), + TagID: tag, + Deferred: tag != 0, + }, &stamp) + if err != nil { + logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) + logger.Error(nil, "chunk upload stream: failed to create stamped putter") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") + case errors.Is(err, postage.ErrNotFound): + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + default: + sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } + return } - return + // Cache the putter for reuse + putterCache[batchIDHex] = chunkPutter + logger.Debug("chunk upload stream: cached new putter", "batch_id", batchIDHex) } + // Use the chunk data without the stamp chunkData = potentialChunkData - // Note: we need to call Done on the per-chunk putter after Put } else { // Stamp unmarshal failed - log for debugging logger.Debug("chunk upload stream: stamp unmarshal failed, treating message as unstamped chunk", @@ -290,12 +318,8 @@ func (s *Service) handleUploadStream( return } - // If we created a per-chunk putter, clean it up - if chunkPutter != putter { - if err := chunkPutter.Done(swarm.ZeroAddress); err != nil { - logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter") - } - } + // Note: Per-chunk putters are now cached and will be cleaned up in the defer function + // We don't call Done() here anymore to allow putter reuse across multiple chunks err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { From 033ca1f76705ee0fbdb90b1da0458c556ce4ed09 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Mon, 10 Nov 2025 12:26:15 +0100 Subject: [PATCH 4/7] fix: cache session instead of putter --- pkg/api/chunk_stream.go | 79 +++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index be05cb4b139..7a4dbea14f1 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -118,18 +118,20 @@ func (s *Service) handleUploadStream( err error ) - // Cache for per-chunk putters to avoid creating new sessions for every chunk - // Key: batch ID hex string, Value: putter session - putterCache := make(map[string]storer.PutterSession) + // Cache for storage sessions to avoid creating new sessions for every chunk + // Key: batch ID hex string, Value: storage session (not wrapped with stamper) + // Note: We cache the underlying session, not the stamped putter, because + // each chunk has a unique stamp signature and needs its own stamper wrapper + sessionCache := make(map[string]storer.PutterSession) defer func() { cancel() _ = conn.Close() - // Clean up all cached per-chunk putters - for batchID, cachedPutter := range putterCache { - if err := cachedPutter.Done(swarm.ZeroAddress); err != nil { - logger.Error(err, "chunk upload stream: failed to finalize cached putter", "batch_id", batchID) + // Clean up all cached sessions + for batchID, cachedSession := range sessionCache { + if err := cachedSession.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: failed to finalize cached session", "batch_id", batchID) } } @@ -240,42 +242,50 @@ func (s *Service) handleUploadStream( // Try to unmarshal as a stamp stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(potentialStamp); err == nil { - // Valid stamp found - check cache first, create if needed + // Valid stamp found - get or create cached session, then wrap with this chunk's stamp batchID := stamp.BatchID() batchIDHex := string(batchID) // Use batch ID bytes as map key logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", batchID, "chunk_size", len(potentialChunkData)) - // Check if we already have a cached putter for this batch - cachedPutter, exists := putterCache[batchIDHex] - if exists { - // Reuse cached putter - chunkPutter = cachedPutter - logger.Debug("chunk upload stream: reusing cached putter", "batch_id", batchIDHex) - } else { - // Create new putter and cache it - chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ - BatchID: stamp.BatchID(), - TagID: tag, - Deferred: tag != 0, - }, &stamp) + // Check if we already have a cached session for this batch + cachedSession, exists := sessionCache[batchIDHex] + if !exists { + // Validate batch exists + _, err := s.batchStore.Get(stamp.BatchID()) if err != nil { - logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) - logger.Error(nil, "chunk upload stream: failed to create stamped putter") - switch { - case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): - sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") - case errors.Is(err, postage.ErrNotFound): + logger.Debug("chunk upload stream: batch validation failed", "error", err) + logger.Error(nil, "chunk upload stream: batch validation failed") + if errors.Is(err, storage.ErrNotFound) { sendErrorClose(websocket.CloseInternalServerErr, "batch not found") - default: - sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } else { + sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed") } return } - // Cache the putter for reuse - putterCache[batchIDHex] = chunkPutter - logger.Debug("chunk upload stream: cached new putter", "batch_id", batchIDHex) + + // Create a new storage session and cache it + // For pre-stamped chunks, we use the session directly without a stamper wrapper + if tag != 0 { + cachedSession, err = s.storer.Upload(ctx, false, tag) + } else { + cachedSession = s.storer.DirectUpload() + } + if err != nil { + logger.Debug("chunk upload stream: failed to create session", "error", err) + logger.Error(nil, "chunk upload stream: failed to create session") + sendErrorClose(websocket.CloseInternalServerErr, "session creation failed") + return + } + sessionCache[batchIDHex] = cachedSession + logger.Debug("chunk upload stream: cached new session", "batch_id", batchIDHex) + } else { + logger.Debug("chunk upload stream: reusing cached session", "batch_id", batchIDHex) } + // Use the cached session directly - chunks arrive pre-stamped + // No need for a stamper wrapper since client already signed the chunks + chunkPutter = cachedSession + // Use the chunk data without the stamp chunkData = potentialChunkData } else { @@ -318,8 +328,9 @@ func (s *Service) handleUploadStream( return } - // Note: Per-chunk putters are now cached and will be cleaned up in the defer function - // We don't call Done() here anymore to allow putter reuse across multiple chunks + // Note: We don't call Done() here for per-chunk stamped uploads + // because we're reusing the cached session across multiple chunks + // The cached sessions are cleaned up in the defer function when the connection closes err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { From 2157a940d5409e266898fc71939c21758eb8c44a Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Mon, 10 Nov 2025 12:32:50 +0100 Subject: [PATCH 5/7] fix: cache batches, not session --- pkg/api/chunk_stream.go | 76 ++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index 7a4dbea14f1..eab17be5c8b 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -118,22 +118,16 @@ func (s *Service) handleUploadStream( err error ) - // Cache for storage sessions to avoid creating new sessions for every chunk - // Key: batch ID hex string, Value: storage session (not wrapped with stamper) - // Note: We cache the underlying session, not the stamped putter, because - // each chunk has a unique stamp signature and needs its own stamper wrapper - sessionCache := make(map[string]storer.PutterSession) + // Cache for batch validation to avoid database lookups for every chunk + // Key: batch ID hex string, Value: stored batch info + // This avoids the expensive batchStore.Get() call for each chunk + batchCache := make(map[string]*postage.Batch) defer func() { cancel() _ = conn.Close() - // Clean up all cached sessions - for batchID, cachedSession := range sessionCache { - if err := cachedSession.Done(swarm.ZeroAddress); err != nil { - logger.Error(err, "chunk upload stream: failed to finalize cached session", "batch_id", batchID) - } - } + // No cleanup needed for batch cache - it's just metadata // Only call Done on connection-level putter if it exists if putter != nil { @@ -242,16 +236,16 @@ func (s *Service) handleUploadStream( // Try to unmarshal as a stamp stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(potentialStamp); err == nil { - // Valid stamp found - get or create cached session, then wrap with this chunk's stamp + // Valid stamp found - validate using cached batch info batchID := stamp.BatchID() batchIDHex := string(batchID) // Use batch ID bytes as map key logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", batchID, "chunk_size", len(potentialChunkData)) - // Check if we already have a cached session for this batch - cachedSession, exists := sessionCache[batchIDHex] + // Check if we already have cached batch info + storedBatch, exists := batchCache[batchIDHex] if !exists { - // Validate batch exists - _, err := s.batchStore.Get(stamp.BatchID()) + // Fetch and cache batch info + storedBatch, err = s.batchStore.Get(stamp.BatchID()) if err != nil { logger.Debug("chunk upload stream: batch validation failed", "error", err) logger.Error(nil, "chunk upload stream: batch validation failed") @@ -262,30 +256,31 @@ func (s *Service) handleUploadStream( } return } + batchCache[batchIDHex] = storedBatch + logger.Debug("chunk upload stream: cached batch info", "batch_id", batchIDHex) + } - // Create a new storage session and cache it - // For pre-stamped chunks, we use the session directly without a stamper wrapper - if tag != 0 { - cachedSession, err = s.storer.Upload(ctx, false, tag) - } else { - cachedSession = s.storer.DirectUpload() + // Create a stamped putter for this chunk + // This still creates a new putter wrapper, but skips the expensive batch lookup + chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ + BatchID: stamp.BatchID(), + TagID: tag, + Deferred: tag != 0, + }, &stamp) + if err != nil { + logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) + logger.Error(nil, "chunk upload stream: failed to create stamped putter") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") + case errors.Is(err, postage.ErrNotFound): + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + default: + sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") } - if err != nil { - logger.Debug("chunk upload stream: failed to create session", "error", err) - logger.Error(nil, "chunk upload stream: failed to create session") - sendErrorClose(websocket.CloseInternalServerErr, "session creation failed") - return - } - sessionCache[batchIDHex] = cachedSession - logger.Debug("chunk upload stream: cached new session", "batch_id", batchIDHex) - } else { - logger.Debug("chunk upload stream: reusing cached session", "batch_id", batchIDHex) + return } - // Use the cached session directly - chunks arrive pre-stamped - // No need for a stamper wrapper since client already signed the chunks - chunkPutter = cachedSession - // Use the chunk data without the stamp chunkData = potentialChunkData } else { @@ -328,9 +323,12 @@ func (s *Service) handleUploadStream( return } - // Note: We don't call Done() here for per-chunk stamped uploads - // because we're reusing the cached session across multiple chunks - // The cached sessions are cleaned up in the defer function when the connection closes + // Clean up per-chunk putter + if chunkPutter != putter { + if err := chunkPutter.Done(swarm.ZeroAddress); err != nil { + logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter") + } + } err = sendMsg(websocket.BinaryMessage, successWsMsg) if err != nil { From 2610be0c7984761a4c26af82d2bfebea64fd716e Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:36:38 +0100 Subject: [PATCH 6/7] fix: use newStampedPutterWithBatch --- pkg/api/api.go | 11 +++++++++++ pkg/api/chunk_stream.go | 8 ++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 5f747237bc4..38f7cff11ee 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -836,7 +836,18 @@ func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stam return nil, errInvalidPostageBatch } + return s.newStampedPutterWithBatch(ctx, opts, stamp, storedBatch) +} + +// newStampedPutterWithBatch creates a stamped putter using a pre-fetched batch +// This avoids the database lookup when batch info is already cached +func (s *Service) newStampedPutterWithBatch(ctx context.Context, opts putterOptions, stamp *postage.Stamp, storedBatch *postage.Batch) (storer.PutterSession, error) { + if !opts.Deferred && s.beeMode == DevMode { + return nil, errUnsupportedDevNodeOperation + } + var session storer.PutterSession + var err error if opts.Deferred || opts.Pin { session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID) if err != nil { diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index eab17be5c8b..c23f3f873c2 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -260,13 +260,13 @@ func (s *Service) handleUploadStream( logger.Debug("chunk upload stream: cached batch info", "batch_id", batchIDHex) } - // Create a stamped putter for this chunk - // This still creates a new putter wrapper, but skips the expensive batch lookup - chunkPutter, err = s.newStampedPutter(ctx, putterOptions{ + // Create a stamped putter using cached batch info + // This skips the expensive database lookup + chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{ BatchID: stamp.BatchID(), TagID: tag, Deferred: tag != 0, - }, &stamp) + }, &stamp, storedBatch) if err != nil { logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) logger.Error(nil, "chunk upload stream: failed to create stamped putter") From 00c53dddd0fbe204335e5c65cd551aabd4ca0514 Mon Sep 17 00:00:00 2001 From: Attila Gazso <230163+agazso@users.noreply.github.com> Date: Fri, 23 Jan 2026 17:53:50 +0100 Subject: [PATCH 7/7] fix: add decoder --- pkg/api/chunk_stream.go | 154 +++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 80 deletions(-) diff --git a/pkg/api/chunk_stream.go b/pkg/api/chunk_stream.go index c23f3f873c2..fb083a19f88 100644 --- a/pkg/api/chunk_stream.go +++ b/pkg/api/chunk_stream.go @@ -100,7 +100,38 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques } s.wsWg.Add(1) - go s.handleUploadStream(logger, wsConn, putter, tag) + var decode chunkDecoder + if len(headers.BatchID) > 0 { + decode = decodeChunkWithoutStamp + } else { + decode = decodeChunkWithStamp + } + go s.handleUploadStream(logger, wsConn, putter, tag, decode) +} + +// chunkDecoder extracts chunk data and optionally a stamp from a websocket message. +// When BatchID is provided in headers, decodeChunkWithoutStamp is used (no stamp in message). +// When BatchID is not provided, decodeChunkWithStamp is used (stamp prepended to chunk data). +type chunkDecoder func(msg []byte) (chunkData []byte, stamp *postage.Stamp, err error) + +// decodeChunkWithoutStamp returns the message as-is (used when BatchID provided in headers). +func decodeChunkWithoutStamp(msg []byte) ([]byte, *postage.Stamp, error) { + return msg, nil, nil +} + +// decodeChunkWithStamp extracts a stamp from the first 113 bytes of the message. +// Returns an error if the message is too small or the stamp is invalid. +func decodeChunkWithStamp(msg []byte) ([]byte, *postage.Stamp, error) { + if len(msg) < postage.StampSize+swarm.SpanSize { + return nil, nil, errors.New("message too small for stamp + chunk") + } + + stamp := &postage.Stamp{} + if err := stamp.UnmarshalBinary(msg[:postage.StampSize]); err != nil { + return nil, nil, errors.New("invalid stamp") + } + + return msg[postage.StampSize:], stamp, nil } func (s *Service) handleUploadStream( @@ -108,6 +139,7 @@ func (s *Service) handleUploadStream( conn *websocket.Conn, putter storer.PutterSession, tag uint64, + decode chunkDecoder, ) { defer s.wsWg.Done() @@ -208,98 +240,60 @@ func (s *Service) handleUploadStream( return } - logger.Debug("chunk upload stream", - "message_size", len(msg), - "stamp_size", postage.StampSize, - "first_8_bytes", msg[:8]) + // Decode the message using the appropriate decoder + chunkData, stamp, err := decode(msg) + if err != nil { + logger.Debug("chunk upload stream: decode failed", "error", err) + logger.Error(nil, "chunk upload stream: "+err.Error()) + sendErrorClose(websocket.CloseInternalServerErr, err.Error()) + return + } - // Check if this message contains a per-chunk stamp prepended to the chunk data - // Format: stamp[113 bytes] + chunk[data] + // Determine the putter to use var ( chunk swarm.Chunk - chunkPutter = putter // default to connection-level putter - chunkData = msg + chunkPutter = putter ) - // If message is large enough to contain a stamp + chunk data, try to extract the stamp - if len(msg) >= postage.StampSize+swarm.SpanSize { - potentialStamp := msg[:postage.StampSize] - potentialChunkData := msg[postage.StampSize:] - - logger.Debug("chunk upload stream: attempting to extract stamp", - "message_size", len(msg), - "stamp_size", postage.StampSize, - "potential_stamp_len", len(potentialStamp), - "potential_chunk_len", len(potentialChunkData), - "first_8_bytes", msg[:8]) - - // Try to unmarshal as a stamp - stamp := postage.Stamp{} - if err := stamp.UnmarshalBinary(potentialStamp); err == nil { - // Valid stamp found - validate using cached batch info - batchID := stamp.BatchID() - batchIDHex := string(batchID) // Use batch ID bytes as map key - logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", batchID, "chunk_size", len(potentialChunkData)) - - // Check if we already have cached batch info - storedBatch, exists := batchCache[batchIDHex] - if !exists { - // Fetch and cache batch info - storedBatch, err = s.batchStore.Get(stamp.BatchID()) - if err != nil { - logger.Debug("chunk upload stream: batch validation failed", "error", err) - logger.Error(nil, "chunk upload stream: batch validation failed") - if errors.Is(err, storage.ErrNotFound) { - sendErrorClose(websocket.CloseInternalServerErr, "batch not found") - } else { - sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed") - } - return - } - batchCache[batchIDHex] = storedBatch - logger.Debug("chunk upload stream: cached batch info", "batch_id", batchIDHex) - } + // If stamp was extracted, create a per-chunk putter + if stamp != nil { + batchID := stamp.BatchID() + batchIDHex := string(batchID) - // Create a stamped putter using cached batch info - // This skips the expensive database lookup - chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{ - BatchID: stamp.BatchID(), - TagID: tag, - Deferred: tag != 0, - }, &stamp, storedBatch) + storedBatch, exists := batchCache[batchIDHex] + if !exists { + storedBatch, err = s.batchStore.Get(batchID) if err != nil { - logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) - logger.Error(nil, "chunk upload stream: failed to create stamped putter") - switch { - case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): - sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") - case errors.Is(err, postage.ErrNotFound): + logger.Debug("chunk upload stream: batch validation failed", "error", err) + logger.Error(nil, "chunk upload stream: batch validation failed") + if errors.Is(err, storage.ErrNotFound) { sendErrorClose(websocket.CloseInternalServerErr, "batch not found") - default: - sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } else { + sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed") } return } - - // Use the chunk data without the stamp - chunkData = potentialChunkData - } else { - // Stamp unmarshal failed - log for debugging - logger.Debug("chunk upload stream: stamp unmarshal failed, treating message as unstamped chunk", - "error", err, - "message_size", len(msg), - "stamp_size_expected", postage.StampSize, - "potential_stamp_len", len(potentialStamp)) + batchCache[batchIDHex] = storedBatch } - // If unmarshal failed, fall through to use the whole message as chunk data - } - // If we don't have a putter at this point, the client must provide per-chunk stamps - if chunkPutter == nil { - logger.Debug("chunk upload stream: no stamp provided") - logger.Error(nil, "chunk upload stream: no batch ID in headers and no per-chunk stamp in message") - sendErrorClose(websocket.CloseInternalServerErr, "batch ID or per-chunk stamp required") - return + chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{ + BatchID: batchID, + TagID: tag, + Deferred: tag != 0, + }, stamp, storedBatch) + if err != nil { + logger.Debug("chunk upload stream: failed to create stamped putter", "error", err) + logger.Error(nil, "chunk upload stream: failed to create stamped putter") + switch { + case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable): + sendErrorClose(websocket.CloseInternalServerErr, "batch not usable") + case errors.Is(err, postage.ErrNotFound): + sendErrorClose(websocket.CloseInternalServerErr, "batch not found") + default: + sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed") + } + return + } } chunk, err = cac.NewWithDataSpan(chunkData)