From b199c2daff944e67329b07460fe3dc3865fbf527 Mon Sep 17 00:00:00 2001 From: zhangliyuangit <495726603@qq.com> Date: Mon, 30 Mar 2026 20:04:21 +0800 Subject: [PATCH] fix: avoid blocking Reactor scheduler thread during AutoContext compression When AutoContextMemory compression is triggered in handlePreReasoning, compressIfNeeded() internally calls .block() for LLM-based summarization. Running this on the Reactor scheduler thread causes the thread to be occupied until compression completes, preventing downstream streaming chunks from being dispatched. This results in all chunks being flushed at once after the entire Mono chain completes, breaking incremental streaming. Fix: wrap compressIfNeeded() in Mono.fromCallable(...).subscribeOn( Schedulers.boundedElastic()) so the blocking work runs on the elastic thread pool, keeping the Reactor scheduler thread free to dispatch streaming chunks incrementally. Also extract the event-building logic into a dedicated buildUpdatedEvent() method for clarity. --- .../memory/autocontext/AutoContextHook.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java index f491e5d6f..7cd805aac 100644 --- a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java +++ b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * Hook for automatically registering AutoContextMemory integration with ReActAgent. @@ -208,6 +209,12 @@ private Mono handlePreCall(PreCallEvent event) { *

This ensures compression happens at a deterministic point (before reasoning) * and the LLM receives the compressed context. * + *

Compression is executed on {@link Schedulers#boundedElastic()} to avoid blocking + * the Reactor scheduler thread. Without this, the internal {@code .block()} calls inside + * {@link AutoContextMemory#compressIfNeeded()} would occupy the scheduler thread, preventing + * downstream streaming chunks from being dispatched until the entire Mono chain completes, + * causing all chunks to be flushed at once instead of incrementally. + * * @param event the PreReasoningEvent * @return Mono containing the potentially modified event */ @@ -225,9 +232,24 @@ private Mono handlePreReasoning(PreReasoningEvent event) { return Mono.just(event); } - // Trigger compression if needed (this modifies workingMemoryStorage in place) - autoContextMemory.compressIfNeeded(); + // Trigger compression on a boundedElastic thread to avoid blocking the Reactor scheduler. + // compressIfNeeded() internally calls .block() for LLM-based compression, which must not + // run on a Reactor scheduler thread as it would stall downstream streaming chunk dispatch. + return Mono.fromCallable(autoContextMemory::compressIfNeeded) + .subscribeOn(Schedulers.boundedElastic()) + .then(Mono.defer(() -> buildUpdatedEvent(event, autoContextMemory))); + } + /** + * Builds an updated PreReasoningEvent with compressed memory messages and system prompt + * instructions about compressed context offload tags. + * + * @param event the original PreReasoningEvent + * @param autoContextMemory the AutoContextMemory instance + * @return Mono containing the updated event + */ + private Mono buildUpdatedEvent( + PreReasoningEvent event, AutoContextMemory autoContextMemory) { // Always append system prompt instruction about compressed messages List originalInputMessages = event.getInputMessages(); List newInputMessages = new ArrayList<>();