From 403e8d2c3dc5788e7e8bf0ea283992ab81dee4a1 Mon Sep 17 00:00:00 2001 From: LearningGp Date: Wed, 1 Apr 2026 21:09:01 +0800 Subject: [PATCH] fix(telemetry): use max instead of sum for streaming token usage aggregation Made-with: Cursor --- .../StreamChatResponseAggregator.java | 15 ++-- .../StreamChatResponseAggregatorTest.java | 81 +++++++++++++++++++ 2 files changed, 88 insertions(+), 8 deletions(-) create mode 100644 agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregatorTest.java diff --git a/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregator.java b/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregator.java index 4b6e72494..a6b744138 100644 --- a/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregator.java +++ b/agentscope-extensions/agentscope-extensions-studio/src/main/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregator.java @@ -27,7 +27,6 @@ import io.agentscope.core.model.ChatUsage; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** * An aggregator for streaming {@link ChatResponse}. @@ -41,9 +40,9 @@ final class StreamChatResponseAggregator { private final ThinkingAccumulator thinkingAcc = new ThinkingAccumulator(); private final ToolCallsAccumulator toolCallsAcc = new ToolCallsAccumulator(); - // Usage - private final AtomicInteger inputTokens = new AtomicInteger(0); - private final AtomicInteger outputTokens = new AtomicInteger(0); + // Usage: take the max value from all chunks, since providers report cumulative totals + private int inputTokens; + private int outputTokens; private double time; private String finishReason; @@ -73,8 +72,8 @@ public void append(ChatResponse chunk) { ChatUsage usage = chunk.getUsage(); if (usage != null) { - inputTokens.addAndGet(usage.getInputTokens()); - outputTokens.addAndGet(usage.getOutputTokens()); + inputTokens = Math.max(inputTokens, usage.getInputTokens()); + outputTokens = Math.max(outputTokens, usage.getOutputTokens()); time = usage.getTime(); } @@ -95,8 +94,8 @@ public ChatResponse getResponse() { .content(contentBlocks) .usage( ChatUsage.builder() - .inputTokens(inputTokens.get()) - .outputTokens(outputTokens.get()) + .inputTokens(inputTokens) + .outputTokens(outputTokens) .time(time) .build()) .finishReason(finishReason) diff --git a/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregatorTest.java b/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregatorTest.java new file mode 100644 index 000000000..ad42246bd --- /dev/null +++ b/agentscope-extensions/agentscope-extensions-studio/src/test/java/io/agentscope/core/tracing/telemetry/StreamChatResponseAggregatorTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024-2026 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.agentscope.core.tracing.telemetry; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.agentscope.core.message.TextBlock; +import io.agentscope.core.model.ChatResponse; +import io.agentscope.core.model.ChatUsage; +import java.util.List; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +@DisplayName("StreamChatResponseAggregator Tests") +class StreamChatResponseAggregatorTest { + + @Test + @DisplayName("Cumulative usage should take max, not sum") + void testCumulativeUsageTakesMax() { + StreamChatResponseAggregator agg = StreamChatResponseAggregator.create(); + + for (int i = 1; i <= 5; i++) { + agg.append( + ChatResponse.builder() + .id("test-id") + .content(List.of(TextBlock.builder().text("chunk" + i).build())) + .usage( + ChatUsage.builder() + .inputTokens(100) + .outputTokens(i * 20) + .time(i * 0.5) + .build()) + .finishReason(i == 5 ? "stop" : null) + .build()); + } + + ChatResponse response = agg.getResponse(); + assertEquals("test-id", response.getId()); + assertEquals(100, response.getUsage().getInputTokens()); + assertEquals(100, response.getUsage().getOutputTokens()); + assertEquals("stop", response.getFinishReason()); + } + + @Test + @DisplayName("Only last chunk carries usage (OpenAI style)") + void testOnlyLastChunkHasUsage() { + StreamChatResponseAggregator agg = StreamChatResponseAggregator.create(); + + for (int i = 0; i < 3; i++) { + agg.append( + ChatResponse.builder() + .id("openai-id") + .content(List.of(TextBlock.builder().text("part" + i).build())) + .build()); + } + + agg.append( + ChatResponse.builder() + .id("openai-id") + .usage(ChatUsage.builder().inputTokens(200).outputTokens(150).build()) + .finishReason("stop") + .build()); + + ChatResponse response = agg.getResponse(); + assertEquals(200, response.getUsage().getInputTokens()); + assertEquals(150, response.getUsage().getOutputTokens()); + } +}