diff --git a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java index f491e5d6f..7cd805aac 100644 --- a/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java +++ b/agentscope-extensions/agentscope-extensions-autocontext-memory/src/main/java/io/agentscope/core/memory/autocontext/AutoContextHook.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; /** * Hook for automatically registering AutoContextMemory integration with ReActAgent. @@ -208,6 +209,12 @@ private Mono handlePreCall(PreCallEvent event) { *

This ensures compression happens at a deterministic point (before reasoning) * and the LLM receives the compressed context. * + *

Compression is executed on {@link Schedulers#boundedElastic()} to avoid blocking + * the Reactor scheduler thread. Without this, the internal {@code .block()} calls inside + * {@link AutoContextMemory#compressIfNeeded()} would occupy the scheduler thread, preventing + * downstream streaming chunks from being dispatched until the entire Mono chain completes, + * causing all chunks to be flushed at once instead of incrementally. + * * @param event the PreReasoningEvent * @return Mono containing the potentially modified event */ @@ -225,9 +232,24 @@ private Mono handlePreReasoning(PreReasoningEvent event) { return Mono.just(event); } - // Trigger compression if needed (this modifies workingMemoryStorage in place) - autoContextMemory.compressIfNeeded(); + // Trigger compression on a boundedElastic thread to avoid blocking the Reactor scheduler. + // compressIfNeeded() internally calls .block() for LLM-based compression, which must not + // run on a Reactor scheduler thread as it would stall downstream streaming chunk dispatch. + return Mono.fromCallable(autoContextMemory::compressIfNeeded) + .subscribeOn(Schedulers.boundedElastic()) + .then(Mono.defer(() -> buildUpdatedEvent(event, autoContextMemory))); + } + /** + * Builds an updated PreReasoningEvent with compressed memory messages and system prompt + * instructions about compressed context offload tags. + * + * @param event the original PreReasoningEvent + * @param autoContextMemory the AutoContextMemory instance + * @return Mono containing the updated event + */ + private Mono buildUpdatedEvent( + PreReasoningEvent event, AutoContextMemory autoContextMemory) { // Always append system prompt instruction about compressed messages List originalInputMessages = event.getInputMessages(); List newInputMessages = new ArrayList<>();