Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Hook for automatically registering AutoContextMemory integration with ReActAgent.
Expand Down Expand Up @@ -208,6 +209,12 @@ private Mono<PreCallEvent> handlePreCall(PreCallEvent event) {
* <p>This ensures compression happens at a deterministic point (before reasoning)
* and the LLM receives the compressed context.
*
* <p>Compression is executed on {@link Schedulers#boundedElastic()} to avoid blocking
* the Reactor scheduler thread. Without this, the internal {@code .block()} calls inside
* {@link AutoContextMemory#compressIfNeeded()} would occupy the scheduler thread, preventing
* downstream streaming chunks from being dispatched until the entire Mono chain completes,
* causing all chunks to be flushed at once instead of incrementally.
*
* @param event the PreReasoningEvent
* @return Mono containing the potentially modified event
*/
Expand All @@ -225,9 +232,24 @@ private Mono<PreReasoningEvent> handlePreReasoning(PreReasoningEvent event) {
return Mono.just(event);
}

// Trigger compression if needed (this modifies workingMemoryStorage in place)
autoContextMemory.compressIfNeeded();
// Trigger compression on a boundedElastic thread to avoid blocking the Reactor scheduler.
// compressIfNeeded() internally calls .block() for LLM-based compression, which must not
// run on a Reactor scheduler thread as it would stall downstream streaming chunk dispatch.
return Mono.fromCallable(autoContextMemory::compressIfNeeded)
.subscribeOn(Schedulers.boundedElastic())
.then(Mono.defer(() -> buildUpdatedEvent(event, autoContextMemory)));
}

/**
* Builds an updated PreReasoningEvent with compressed memory messages and system prompt
* instructions about compressed context offload tags.
*
* @param event the original PreReasoningEvent
* @param autoContextMemory the AutoContextMemory instance
* @return Mono containing the updated event
*/
private Mono<PreReasoningEvent> buildUpdatedEvent(
PreReasoningEvent event, AutoContextMemory autoContextMemory) {
// Always append system prompt instruction about compressed messages
List<Msg> originalInputMessages = event.getInputMessages();
List<Msg> newInputMessages = new ArrayList<>();
Expand Down
Loading