Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public class AutoContextConfig {
*/
int minCompressionTokenThreshold = 5000;

/**
* Timeout in milliseconds for a single LLM-based compression attempt.
* If the timeout is reached, AutoContext skips that compression candidate
* so the agent can continue reasoning.
*/
long compressionTimeoutMillis = 30_000;

/**
* Optional custom prompt configuration.
* If null, default prompts from {@link Prompts} will be used.
Expand Down Expand Up @@ -106,6 +113,10 @@ public int getMinCompressionTokenThreshold() {
return minCompressionTokenThreshold;
}

public long getCompressionTimeoutMillis() {
return compressionTimeoutMillis;
}

/**
* Gets the custom prompt configuration.
*
Expand Down Expand Up @@ -149,6 +160,7 @@ public static class Builder {
private int minConsecutiveToolMessages = 6;
private double currentRoundCompressionRatio = 0.3;
private int minCompressionTokenThreshold = 5000;
private long compressionTimeoutMillis = 30_000;
private PromptConfig customPrompt;

/**
Expand Down Expand Up @@ -254,6 +266,18 @@ public Builder minCompressionTokenThreshold(int minCompressionTokenThreshold) {
return this;
}

/**
* Sets the timeout in milliseconds for a single LLM-based compression attempt.
* If the timeout is reached, AutoContext skips that compression candidate.
*
* @param compressionTimeoutMillis the timeout in milliseconds
* @return this builder instance for method chaining
*/
public Builder compressionTimeoutMillis(long compressionTimeoutMillis) {
this.compressionTimeoutMillis = compressionTimeoutMillis;
return this;
}

/**
* Sets custom prompt configuration.
*
Expand Down Expand Up @@ -284,6 +308,7 @@ public AutoContextConfig build() {
config.minConsecutiveToolMessages = this.minConsecutiveToolMessages;
config.currentRoundCompressionRatio = this.currentRoundCompressionRatio;
config.minCompressionTokenThreshold = this.minCompressionTokenThreshold;
config.compressionTimeoutMillis = this.compressionTimeoutMillis;
config.customPrompt = this.customPrompt;
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import io.agentscope.core.session.Session;
import io.agentscope.core.state.SessionKey;
import io.agentscope.core.state.StateModule;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -418,6 +420,10 @@ private boolean summaryCurrentRoundMessages(List<Msg> rawMessages) {

// Step 4: Merge and compress messages (typically tool calls and results)
Msg compressedMsg = mergeAndCompressCurrentRoundMessages(messagesToCompress);
if (compressedMsg == null) {
log.warn("Skipping current round compression because the compression model timed out");
return false;
}

// Build metadata for compression event
Map<String, Object> metadata = new HashMap<>();
Expand Down Expand Up @@ -526,6 +532,14 @@ private boolean summaryCurrentRoundLargeMessages(List<Msg> rawMessages) {

// Step 5: Generate summary using LLM
Msg summaryMsg = generateLargeMessageSummary(msg, uuid);
if (summaryMsg == null) {
clear(uuid);
log.warn(
"Skipping current round large message summary at index {} because the"
+ " compression model did not finish in time",
i);
continue;
}

// Build metadata for compression event
Map<String, Object> metadata = new HashMap<>();
Expand Down Expand Up @@ -599,13 +613,12 @@ private Msg generateLargeMessageSummary(Msg message, String offloadUuid) {
addPlanAwareHintIfNeeded(newMessages);

Msg block =
model.stream(newMessages, null, options)
.concatMap(chunk -> processChunk(chunk, context))
.then(Mono.defer(() -> Mono.just(context.buildFinalMessage())))
.onErrorResume(InterruptedException.class, Mono::error)
.block();
executeCompressionModelCall(newMessages, options, context, "large message summary");
if (block == null) {
return null;
}

if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
log.info(
"Large message summary completed, input tokens: {}, output tokens: {}",
block.getChatUsage().getInputTokens(),
Expand All @@ -622,12 +635,12 @@ private Msg generateLargeMessageSummary(Msg message, String offloadUuid) {
metadata.put("_compress_meta", compressMeta);

// Preserve _chat_usage from the block if available
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
metadata.put(MessageMetadataKeys.CHAT_USAGE, block.getChatUsage());
}

// Create summary message preserving original role and name
String summaryContent = block != null ? block.getTextContent() : "";
String summaryContent = block.getTextContent();
String finalContent = summaryContent;
if (!offloadHint.isEmpty()) {
finalContent = summaryContent + "\n" + offloadHint;
Expand Down Expand Up @@ -658,7 +671,11 @@ private Msg mergeAndCompressCurrentRoundMessages(List<Msg> messages) {
offload(uuid, originalMessages);

// Use model to generate a compressed summary from message list
return generateCurrentRoundSummaryFromMessages(messages, uuid);
Msg summary = generateCurrentRoundSummaryFromMessages(messages, uuid);
if (summary == null) {
clear(uuid);
}
return summary;
}

@Override
Expand Down Expand Up @@ -755,22 +772,21 @@ private Msg generateCurrentRoundSummaryFromMessages(List<Msg> messages, String o
addPlanAwareHintIfNeeded(newMessages);

Msg block =
model.stream(newMessages, null, options)
.concatMap(chunk -> processChunk(chunk, context))
.then(Mono.defer(() -> Mono.just(context.buildFinalMessage())))
.onErrorResume(InterruptedException.class, Mono::error)
.block();
executeCompressionModelCall(newMessages, options, context, "current round summary");
if (block == null) {
return null;
}

// Extract token usage information
int inputTokens = 0;
int outputTokens = 0;
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
inputTokens = block.getChatUsage().getInputTokens();
outputTokens = block.getChatUsage().getOutputTokens();
}

// Calculate actual output character count (including all content blocks)
int actualCharCount = block != null ? MsgUtils.calculateMessageCharCount(block) : 0;
int actualCharCount = MsgUtils.calculateMessageCharCount(block);

log.info(
"Current round summary completed - original: {} chars, target: {} chars ({}%),"
Expand All @@ -792,18 +808,15 @@ private Msg generateCurrentRoundSummaryFromMessages(List<Msg> messages, String o
compressMeta.put("compressed_current_round", true);
Map<String, Object> metadata = new HashMap<>();
metadata.put("_compress_meta", compressMeta);
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
metadata.put(MessageMetadataKeys.CHAT_USAGE, block.getChatUsage());
}

// Create a compressed message
return Msg.builder()
.role(MsgRole.ASSISTANT)
.name("assistant")
.content(
TextBlock.builder()
.text((block != null ? block.getTextContent() : "") + offloadHint)
.build())
.content(TextBlock.builder().text(block.getTextContent() + offloadHint).build())
.metadata(metadata)
.build();
}
Expand Down Expand Up @@ -854,6 +867,12 @@ private boolean summaryToolsMessages(
offload(uuid, toolsMsg);

Msg toolsSummary = compressToolsInvocation(toolsMsg, uuid);
if (toolsSummary == null) {
clear(uuid);
log.warn(
"Skipping tool invocation compression because the compression model timed out");
return false;
}

// Build metadata for compression event
Map<String, Object> metadata = new HashMap<>();
Expand Down Expand Up @@ -1015,6 +1034,14 @@ private boolean summaryPreviousRoundMessages(List<Msg> rawMessages) {

// Step 6: Generate summary
Msg summaryMsg = summaryPreviousRoundConversation(messagesToSummarize, uuid);
if (summaryMsg == null) {
clear(uuid);
log.warn(
"Skipping previous round conversation summary for round {} because the"
+ " compression model did not finish in time",
pairIdx + 1);
continue;
}

// Build metadata for compression event
Map<String, Object> metadata = new HashMap<>();
Expand Down Expand Up @@ -1112,16 +1139,16 @@ private Msg summaryPreviousRoundConversation(List<Msg> messages, String offloadU
addPlanAwareHintIfNeeded(newMessages);

Msg block =
model.stream(newMessages, null, options)
.concatMap(chunk -> processChunk(chunk, context))
.then(Mono.defer(() -> Mono.just(context.buildFinalMessage())))
.onErrorResume(InterruptedException.class, Mono::error)
.block();
executeCompressionModelCall(
newMessages, options, context, "previous round conversation summary");
if (block == null) {
return null;
}

// Extract token usage information
int inputTokens = 0;
int outputTokens = 0;
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
inputTokens = block.getChatUsage().getInputTokens();
outputTokens = block.getChatUsage().getOutputTokens();
log.info(
Expand All @@ -1140,14 +1167,14 @@ private Msg summaryPreviousRoundConversation(List<Msg> messages, String offloadU
metadata.put("_compress_meta", compressMeta);

// Preserve _chat_usage from the block if available
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
metadata.put(MessageMetadataKeys.CHAT_USAGE, block.getChatUsage());
}

// Build the final message content:
// 1. LLM generated summary (contains ASSISTANT summary + tool compression)
// 2. Context offload tag with UUID at the end
String summaryContent = block != null ? block.getTextContent() : "";
String summaryContent = block.getTextContent();
String offloadTag =
offloadUuid != null
? String.format(Prompts.CONTEXT_OFFLOAD_TAG_FORMAT, offloadUuid)
Expand Down Expand Up @@ -1614,17 +1641,15 @@ private Msg compressToolsInvocation(List<Msg> messages, String offloadUUid) {
.build());
// Insert plan-aware hint message at the end to leverage recency effect
addPlanAwareHintIfNeeded(newMessages);
Msg block =
model.stream(newMessages, null, options)
.concatMap(chunk -> processChunk(chunk, context))
.then(Mono.defer(() -> Mono.just(context.buildFinalMessage())))
.onErrorResume(InterruptedException.class, Mono::error)
.block();
Msg block = executeCompressionModelCall(newMessages, options, context, "tool compression");
if (block == null) {
return null;
}

// Extract token usage information
int inputTokens = 0;
int outputTokens = 0;
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
inputTokens = block.getChatUsage().getInputTokens();
outputTokens = block.getChatUsage().getOutputTokens();
log.info(
Expand All @@ -1643,14 +1668,14 @@ private Msg compressToolsInvocation(List<Msg> messages, String offloadUUid) {
metadata.put("_compress_meta", compressMeta);

// Preserve _chat_usage from the block if available
if (block != null && block.getChatUsage() != null) {
if (block.getChatUsage() != null) {
metadata.put(MessageMetadataKeys.CHAT_USAGE, block.getChatUsage());
}

// Build the final message content:
// 1. LLM generated compressed tool invocation content
// 2. Context offload tag with UUID at the end
String compressedContent = block != null ? block.getTextContent() : "";
String compressedContent = block.getTextContent();
String offloadTag =
offloadUUid != null
? String.format(Prompts.CONTEXT_OFFLOAD_TAG_FORMAT, offloadUUid)
Expand All @@ -1670,6 +1695,30 @@ private Msg compressToolsInvocation(List<Msg> messages, String offloadUUid) {
.build();
}

private Msg executeCompressionModelCall(
List<Msg> messages,
GenerateOptions options,
ReasoningContext context,
String operationName) {
long timeoutMillis = Math.max(1L, autoContextConfig.getCompressionTimeoutMillis());

return model.stream(messages, null, options)
.concatMap(chunk -> processChunk(chunk, context))
.then(Mono.defer(() -> Mono.justOrEmpty(context.buildFinalMessage())))
.timeout(Duration.ofMillis(timeoutMillis))
.doOnError(
TimeoutException.class,
error ->
log.warn(
"AutoContext {} timed out after {} ms; skipping this"
+ " compression candidate",
operationName,
timeoutMillis))
.onErrorResume(TimeoutException.class, error -> Mono.empty())
.onErrorResume(InterruptedException.class, Mono::error)
.block();
}

private Mono<Msg> processChunk(ChatResponse chunk, ReasoningContext context) {
return Mono.just(chunk).doOnNext(context::processChunk).then(Mono.empty());
}
Expand Down
Loading
Loading