Skip to content
Open
125 changes: 124 additions & 1 deletion agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.agentscope.core.skill.SkillHook;
import io.agentscope.core.state.AgentMetaState;
import io.agentscope.core.state.SessionKey;
import io.agentscope.core.state.SimpleSessionKey;
import io.agentscope.core.state.StatePersistence;
import io.agentscope.core.state.ToolkitState;
import io.agentscope.core.tool.ToolExecutionContext;
Expand All @@ -73,6 +74,7 @@
import io.agentscope.core.util.ExceptionUtils;
import io.agentscope.core.util.MessageUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -149,11 +151,13 @@ public class ReActAgent extends StructuredOutputCapableAgent {
private final ExecutionConfig toolExecutionConfig;
private final GenerateOptions generateOptions;
private final PlanNotebook planNotebook;
private final SkillBox skillBox;
private final ToolExecutionContext toolExecutionContext;
private final StatePersistence statePersistence;
private List<Msg> anchorInputMsgs;
private List<String> anchorActiveGroups;

// ==================== Constructor ====================

private ReActAgent(Builder builder, Toolkit agentToolkit) {
super(
builder.name,
Expand All @@ -171,6 +175,7 @@ private ReActAgent(Builder builder, Toolkit agentToolkit) {
this.toolExecutionConfig = builder.toolExecutionConfig;
this.generateOptions = builder.generateOptions;
this.planNotebook = builder.planNotebook;
this.skillBox = builder.skillBox;
this.toolExecutionContext = builder.toolExecutionContext;
this.statePersistence =
builder.statePersistence != null
Expand All @@ -180,6 +185,73 @@ private ReActAgent(Builder builder, Toolkit agentToolkit) {

// ==================== New StateModule API ====================

/**
* Loads agent state from the session. When {@code resend} is {@code true}, restores from the
* most recent anchor (snapshot taken before the last call) and returns the original input
* messages, allowing the caller to re-execute with {@code agent.call(inputMsgs)}.
*
* @param session the session to load from
* @param sessionKey the session key
* @param resend if {@code true}, restore from anchor instead of current state
* @return the original input messages from the anchor when resend is true; empty list otherwise
* @throws IllegalStateException if resend is true but no anchor exists
*/
public List<Msg> loadIfExists(Session session, SessionKey sessionKey, boolean resend) {
if (session.exists(sessionKey)) {
loadFrom(session, sessionKey);
}
if (resend) {
if (anchorInputMsgs == null || anchorInputMsgs.isEmpty()) {
throw new IllegalStateException(
"No resend anchor found for the given session key. "
+ "An anchor is automatically saved before each call().");
}
// Restore all components from their in-memory anchors
if (statePersistence.memoryManaged()) {
// Restore memory by finding the first resend input message and
// truncating from that point, instead of using a separate anchor copy.
List<Msg> currentMessages = memory.getMessages();
String firstInputId = anchorInputMsgs.get(0).getId();
int cutIndex = -1;
for (int i = 0; i < currentMessages.size(); i++) {
if (currentMessages.get(i).getId().equals(firstInputId)) {
cutIndex = i;
break;
}
}
if (cutIndex >= 0) {
memory.deleteMessagesFrom(cutIndex);
}
}
if (statePersistence.planNotebookManaged() && planNotebook != null) {
planNotebook.restoreAnchor();
}
if (statePersistence.skillBoxManaged() && skillBox != null) {
skillBox.restoreAnchor();
}
if (statePersistence.toolkitManaged()
&& toolkit != null
&& anchorActiveGroups != null) {
toolkit.setActiveGroups(anchorActiveGroups);
}
return new ArrayList<>(anchorInputMsgs);
}
return Collections.emptyList();
}

/**
* Loads agent state from the session using a string session ID.
*
* @param session the session to load from
* @param sessionId the session identifier as a string
* @param resend if {@code true}, restore from anchor instead of current state
* @return the original input messages from the anchor when resend is true; empty list otherwise
* @throws IllegalStateException if resend is true but no anchor exists
*/
public List<Msg> loadIfExists(Session session, String sessionId, boolean resend) {
return loadIfExists(session, SimpleSessionKey.of(sessionId), resend);
}

/**
* Save agent state to the session using the new API.
*
Expand Down Expand Up @@ -221,6 +293,22 @@ public void saveTo(Session session, SessionKey sessionKey) {
if (statePersistence.planNotebookManaged() && planNotebook != null) {
planNotebook.saveTo(session, sessionKey);
}

// Save SkillBox if managed
if (statePersistence.skillBoxManaged() && skillBox != null) {
skillBox.saveTo(session, sessionKey);
}

// Save resend anchor data
if (anchorInputMsgs != null) {
session.save(sessionKey, "resend_input_msgs", anchorInputMsgs);
}
if (anchorActiveGroups != null) {
session.save(
sessionKey,
"toolkit_activeGroups_anchor",
new ToolkitState(anchorActiveGroups));
}
}

/**
Expand Down Expand Up @@ -256,6 +344,17 @@ public void loadFrom(Session session, SessionKey sessionKey) {
if (statePersistence.planNotebookManaged() && planNotebook != null) {
planNotebook.loadFrom(session, sessionKey);
}

// Load SkillBox if managed
if (statePersistence.skillBoxManaged() && skillBox != null) {
skillBox.loadFrom(session, sessionKey);
}

// Load resend anchor data
List<Msg> loadedAnchor = session.getList(sessionKey, "resend_input_msgs", Msg.class);
anchorInputMsgs = loadedAnchor.isEmpty() ? null : new ArrayList<>(loadedAnchor);
session.get(sessionKey, "toolkit_activeGroups_anchor", ToolkitState.class)
.ifPresent(state -> anchorActiveGroups = new ArrayList<>(state.activeGroups()));
}

// ==================== Protected API ====================
Expand All @@ -266,6 +365,7 @@ protected Mono<Msg> doCall(List<Msg> msgs) {

// No pending tools -> normal processing
if (pendingIds.isEmpty()) {
saveResendAnchor(msgs);
addToMemory(msgs);
return executeIteration(0);
}
Expand Down Expand Up @@ -815,6 +915,29 @@ private Mono<Msg> handleSummaryError(Throwable error) {

// ==================== Helper Methods ====================

/**
* Save a snapshot anchor of all stateful components before each call.
* The snapshot is saved to the agent's bound Session, so it persists
* regardless of the Session implementation (InMemory, Json, MySQL, etc.)
*
* @param inputMsgs the input messages from this call
*/
private void saveResendAnchor(List<Msg> inputMsgs) {
this.anchorInputMsgs = new ArrayList<>(inputMsgs);
if (statePersistence.memoryManaged()) {
memory.saveAnchor();
}
if (statePersistence.planNotebookManaged() && planNotebook != null) {
planNotebook.saveAnchor();
}
if (statePersistence.skillBoxManaged() && skillBox != null) {
skillBox.saveAnchor();
}
if (statePersistence.toolkitManaged() && toolkit != null) {
anchorActiveGroups = new ArrayList<>(toolkit.getActiveGroups());
}
}

/**
* Prepare messages for model input.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class InMemoryMemory implements Memory {

private final List<Msg> messages = new CopyOnWriteArrayList<>();

private List<Msg> anchorMessages;

/** Key prefix for storage. */
private static final String KEY_PREFIX = "memory";

Expand Down Expand Up @@ -124,4 +126,53 @@ public void deleteMessage(int index) {
public void clear() {
messages.clear();
}

/**
* Deletes all messages from the specified index (inclusive) to the end.
*
* <p>This implementation uses {@link List#subList} for efficient bulk removal,
* avoiding per-element deletion overhead. If {@code fromIndex} is out of bounds
* (negative or >= size), this operation is a no-op (no exception thrown).
*
* @param fromIndex The start index (inclusive, 0-based)
*/
@Override
public void deleteMessagesFrom(int fromIndex) {
int size = messages.size();
if (fromIndex < 0 || fromIndex >= size) {
return;
}
messages.subList(fromIndex, size).clear();
}

// ==================== Anchor Implementation ====================

/**
* Saves the current message list as an anchor point.
*/
@Override
public void saveAnchor() {
anchorMessages = new ArrayList<>(messages);
}

/**
* Restores messages to the previously saved anchor point.
*/
@Override
public void restoreAnchor() {
if (anchorMessages != null) {
messages.clear();
messages.addAll(anchorMessages);
}
}

/**
* Returns whether an anchor point has been saved.
*
* @return true if an anchor exists, false otherwise
*/
@Override
public boolean hasAnchor() {
return anchorMessages != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,20 @@ public interface Memory extends StateModule {
* is typically irreversible unless state has been persisted.
*/
void clear();

/**
* Deletes all messages from the specified index (inclusive) to the end.
*
* <p>If the index is out of bounds (negative or >= size), this operation should be a no-op
* rather than throwing an exception.
*
* @param fromIndex The start index (inclusive, 0-based)
*/
default void deleteMessagesFrom(int fromIndex) {
List<Msg> messages = getMessages();
int size = messages.size();
for (int i = size - 1; i >= fromIndex && i >= 0; i--) {
deleteMessage(i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public class PlanNotebook implements StateModule {
+ "'finish_plan' function.";

private Plan currentPlan;
private Plan anchorPlan;
private boolean hasAnchorFlag;
private final PlanToHint planToHint;
private final PlanStorage storage;
private final Integer maxSubtasks;
Expand Down Expand Up @@ -158,6 +160,10 @@ public static Builder builder() {
public void saveTo(Session session, SessionKey sessionKey) {
// Always save, even when null, to ensure cleared state is persisted
session.save(sessionKey, keyPrefix + "_state", new PlanNotebookState(currentPlan));
if (hasAnchorFlag) {
session.save(
sessionKey, keyPrefix + "_state_anchor", new PlanNotebookState(anchorPlan));
}
}

/**
Expand All @@ -172,6 +178,32 @@ public void loadFrom(Session session, SessionKey sessionKey) {
this.currentPlan = null;
session.get(sessionKey, keyPrefix + "_state", PlanNotebookState.class)
.ifPresent(state -> this.currentPlan = state.currentPlan());
hasAnchorFlag = false;
anchorPlan = null;
session.get(sessionKey, keyPrefix + "_state_anchor", PlanNotebookState.class)
.ifPresent(
state -> {
anchorPlan = state.currentPlan();
hasAnchorFlag = true;
});
}

@Override
public void saveAnchor() {
anchorPlan = currentPlan;
hasAnchorFlag = true;
}

@Override
public void restoreAnchor() {
if (hasAnchorFlag) {
currentPlan = anchorPlan;
}
}

@Override
public boolean hasAnchor() {
return hasAnchorFlag;
}

/** Builder for constructing PlanNotebook instances with customizable settings. */
Expand Down
Loading
Loading