diff --git a/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java b/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java index caa0c5cbf..0dfce6088 100644 --- a/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java +++ b/server-common/src/main/java/io/a2a/server/agentexecution/AgentExecutor.java @@ -3,8 +3,152 @@ import io.a2a.server.events.EventQueue; import io.a2a.spec.A2AError; +/** + * Core business logic interface for implementing A2A agent functionality. + *
+ * This is the primary extension point where agent developers implement their agent's behavior - + * LLM interactions, data processing, external API calls, or any custom logic. Along with an + * {@link io.a2a.spec.AgentCard}, implementing this interface is the minimum requirement to + * create a functioning A2A agent. + *
+ * + *{@code
+ * @ApplicationScoped
+ * public class MyAgentExecutorProducer {
+ * @Inject
+ * MyService myService; // Your business logic
+ *
+ * @Produces
+ * public AgentExecutor agentExecutor() {
+ * return new MyAgentExecutor(myService);
+ * }
+ * }
+ * }
+ *
+ * {@code
+ * public class WeatherAgentExecutor implements AgentExecutor {
+ * private final WeatherService weatherService;
+ *
+ * public WeatherAgentExecutor(WeatherService weatherService) {
+ * this.weatherService = weatherService;
+ * }
+ *
+ * @Override
+ * public void execute(RequestContext context, EventQueue eventQueue) {
+ * TaskUpdater updater = new TaskUpdater(context, eventQueue);
+ *
+ * // Initialize task if this is a new conversation
+ * if (context.getTask() == null) {
+ * updater.submit();
+ * }
+ * updater.startWork();
+ *
+ * // Extract user input from the message
+ * String userMessage = context.getUserInput("\n");
+ *
+ * // Process request (your business logic)
+ * String weatherData = weatherService.getWeather(userMessage);
+ *
+ * // Return result as artifact
+ * updater.addArtifact(List.of(new TextPart(weatherData, null)));
+ * updater.complete();
+ * }
+ *
+ * @Override
+ * public void cancel(RequestContext context, EventQueue eventQueue) {
+ * // Clean up resources and mark as canceled
+ * new TaskUpdater(context, eventQueue).cancel();
+ * }
+ * }
+ * }
+ *
+ * {@code
+ * updater.startWork();
+ * for (String chunk : llmService.stream(userInput)) {
+ * updater.addArtifact(List.of(new TextPart(chunk, null)));
+ * }
+ * updater.complete(); // Final event closes the queue
+ * }
+ *
+ * @see RequestContext
+ * @see io.a2a.server.tasks.TaskUpdater
+ * @see io.a2a.server.events.EventQueue
+ * @see io.a2a.server.requesthandlers.DefaultRequestHandler
+ * @see io.a2a.spec.AgentCard
+ */
public interface AgentExecutor {
+ /**
+ * Executes the agent's business logic for a message.
+ * + * Called asynchronously by {@link io.a2a.server.requesthandlers.DefaultRequestHandler} + * in a background thread when a client sends a message. Enqueue events to the queue as + * processing progresses. The queue remains open until you enqueue a final event + * (COMPLETED, FAILED, or CANCELED state). + *
+ *+ * Important: Don't throw exceptions for business logic errors. Instead, use + * {@code updater.fail(errorMessage)} to communicate failures to the client gracefully. + * Only throw {@link A2AError} for truly exceptional conditions. + *
+ * + * @param context the request context containing the message, task state, and configuration + * @param eventQueue the queue for enqueueing status updates and artifacts + * @throws A2AError if execution fails catastrophically (exception propagates to client) + */ void execute(RequestContext context, EventQueue eventQueue) throws A2AError; + /** + * Cancels an ongoing agent execution. + *+ * Called when a client requests task cancellation via the cancelTask operation. + * You should: + *
+ * Note: The {@link #execute(RequestContext, EventQueue)} method may still be + * running on another thread. Use appropriate synchronization or interruption mechanisms + * if your agent maintains cancellable state. + *
+ * Error Handling: + *
+ * This class encapsulates all the information an agent needs to process a request: + * the user's message, existing task state (for continuing conversations), configuration, + * and server call context. It's the primary way agents access request data. + *
+ * + *{@code
+ * public void execute(RequestContext context, EventQueue queue) {
+ * // Check if this is a new conversation or continuation
+ * Task existingTask = context.getTask();
+ * if (existingTask == null) {
+ * // New conversation - initialize
+ * } else {
+ * // Continuing conversation - access history
+ * List history = existingTask.history();
+ * }
+ *
+ * // Extract user input
+ * String userMessage = context.getUserInput("\n");
+ *
+ * // Access configuration if needed
+ * MessageSendConfiguration config = context.getConfiguration();
+ * boolean isBlocking = config != null && config.blocking();
+ *
+ * // Process and respond...
+ * }
+ * }
+ *
+ * {@code
+ * // Get all text parts joined with newlines
+ * String text = context.getUserInput("\n");
+ *
+ * // Get all text parts joined with spaces
+ * String text = context.getUserInput(" ");
+ * }
+ *
+ * @see AgentExecutor
+ * @see Message
+ * @see Task
+ * @see MessageSendConfiguration
+ */
public class RequestContext {
private @Nullable MessageSendParams params;
@@ -54,38 +110,128 @@ public RequestContext(
}
}
- public @Nullable MessageSendParams getParams() {
- return params;
- }
-
+ /**
+ * Returns the task identifier.
+ * + * This is auto-generated (UUID) if not provided by the client in the message parameters. + * It can be null if the context was not created from message parameters. + *
+ * + * @return the task ID + */ public @Nullable String getTaskId() { return taskId; } + /** + * Returns the conversation context identifier. + *+ * Conversation contexts group related tasks together (e.g., multiple tasks + * in the same user session). This is auto-generated (UUID) if not provided by the client + * in the message parameters. It can be null if the context was not created from message parameters. + *
+ * + * @return the context ID + */ public @Nullable String getContextId() { return contextId; } + /** + * Returns the existing task state, if this is a continuation of a conversation. + *+ * For new conversations, this is null. For continuing conversations, contains + * the full task state including history, artifacts, and status. + *
+ * Common Pattern: + *
{@code
+ * if (context.getTask() == null) {
+ * // New conversation - initialize state
+ * } else {
+ * // Continuing - access previous messages
+ * List history = context.getTask().history();
+ * }
+ * }
+ *
+ * @return the existing task, or null if this is a new conversation
+ */
public @Nullable Task getTask() {
return task;
}
+ /**
+ * Returns other tasks in the same conversation context.
+ * + * Useful for multi-task conversations where the agent needs to access + * state from related tasks. + *
+ * + * @return unmodifiable list of related tasks (empty if none) + */ public List+ * Contains the message parts (text, images, etc.) sent by the client. + * Use {@link #getUserInput(String)} for convenient text extraction. + *
+ * + * @return the message, or null if not available + * @see #getUserInput(String) + */ public @Nullable Message getMessage() { return params != null ? params.message() : null; } + /** + * Returns the request configuration. + *+ * Contains settings like blocking mode, push notification config, etc. + *
+ * + * @return the configuration, or null if not provided + */ public @Nullable MessageSendConfiguration getConfiguration() { return params != null ? params.configuration() : null; } + /** + * Returns the server call context. + *+ * Contains transport-specific information like authentication, headers, etc. + * Most agents don't need this. + *
+ * + * @return the call context, or null if not available + */ public @Nullable ServerCallContext getCallContext() { return callContext; } + /** + * Extracts all text content from the message and joins with the specified delimiter. + *+ * This is a convenience method for getting text input from messages that may contain + * multiple text parts. Non-text parts (images, etc.) are ignored. + *
+ * Examples: + *
{@code
+ * // Join with newlines (common for multi-paragraph input)
+ * String text = context.getUserInput("\n");
+ *
+ * // Join with spaces (common for single-line input)
+ * String text = context.getUserInput(" ");
+ *
+ * // Default delimiter is newline
+ * String text = context.getUserInput(null); // uses "\n"
+ * }
+ *
+ * @param delimiter the string to insert between text parts (null defaults to "\n")
+ * @return all text parts joined with delimiter, or empty string if no message
+ */
public String getUserInput(String delimiter) {
if (params == null) {
return "";
diff --git a/server-common/src/main/java/io/a2a/server/events/QueueManager.java b/server-common/src/main/java/io/a2a/server/events/QueueManager.java
index 54ef30e81..01e754fcb 100644
--- a/server-common/src/main/java/io/a2a/server/events/QueueManager.java
+++ b/server-common/src/main/java/io/a2a/server/events/QueueManager.java
@@ -2,30 +2,193 @@
import org.jspecify.annotations.Nullable;
+/**
+ * Manages {@link EventQueue} lifecycle for task-based event routing and consumption.
+ * + * The QueueManager is responsible for creating, storing, and managing event queues that + * coordinate asynchronous communication between agent executors (producers) and transport + * consumers. It supports both simple in-memory queuing and sophisticated patterns like + * queue tapping for resubscription and distributed event replication. + *
+ * + *{@code
+ * // Client disconnects and later reconnects
+ * EventQueue childQueue = queueManager.tap(taskId);
+ * if (childQueue != null) {
+ * // Receive events from this point forward
+ * // (Historical events before tap are not replayed)
+ * }
+ * }
+ * Use cases:
+ * {@code
+ * @ApplicationScoped
+ * @Alternative
+ * @Priority(50) // Higher than default InMemoryQueueManager
+ * public class KafkaQueueManager implements QueueManager {
+ * // Custom implementation with event replication
+ * }
+ * }
+ *
+ * + * Throws {@link TaskQueueExistsException} if a queue already exists for this task. + * Typically used internally - prefer {@link #createOrTap(String)} for most use cases. + *
+ * + * @param taskId the task identifier + * @param queue the queue to add + * @throws TaskQueueExistsException if queue already exists for this task ID + */ void add(String taskId, EventQueue queue); + /** + * Retrieves the MainQueue for a task, if it exists. + *+ * Returns the primary queue for the task. Does not create a new queue if none exists. + *
+ * + * @param taskId the task identifier + * @return the MainQueue, or null if no queue exists for this task + */ @Nullable EventQueue get(String taskId); + /** + * Creates a ChildQueue that receives copies of events from the MainQueue. + *+ * Use this for: + *
+ * The ChildQueue receives events enqueued AFTER it's created. Historical events + * are not replayed. + * + * @param taskId the task identifier + * @return a ChildQueue that receives future events, or null if the MainQueue doesn't exist + */ @Nullable EventQueue tap(String taskId); + /** + * Closes and removes the queue for a task. + *
+ * This closes the MainQueue and all ChildQueues, then removes it from the manager. + * Called during cleanup after task completion or error conditions. + *
+ * + * @param taskId the task identifier + */ void close(String taskId); + /** + * Creates a MainQueue if none exists, or taps the existing queue to create a ChildQueue. + *+ * This is the primary method used by {@link io.a2a.server.requesthandlers.DefaultRequestHandler}: + *
+ * Used internally to ensure the consumer is ready before the agent starts + * enqueueing events, avoiding race conditions where events might be enqueued + * before the consumer begins polling. + *
+ * + * @param eventQueue the queue to wait for + * @throws InterruptedException if interrupted while waiting + */ void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException; + /** + * Returns an EventQueueBuilder for creating queues with task-specific configuration. + *+ * Implementations can override to provide custom queue configurations per task, + * such as different capacities, hooks, or event processors. + *
+ *+ * Default implementation returns a standard builder with no customization. + *
+ * + * @param taskId the task ID for context (may be used to customize queue configuration) + * @return a builder for creating event queues + */ default EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) { return EventQueue.builder(); } /** - * Get the count of active child queues for a given task. - * Used for testing to verify reference counting mechanism. + * Returns the number of active ChildQueues for a task. + *+ * Used for testing to verify reference counting and queue lifecycle management. + * In production, indicates how many consumers are actively subscribed to a task's events. + *
* * @param taskId the task ID - * @return number of active child queues, or -1 if queue doesn't exist + * @return number of active child queues, or -1 if the MainQueue doesn't exist */ int getActiveChildQueueCount(String taskId); } diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index f3cb6f02e..ba377dd0c 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -70,6 +70,110 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Central request orchestrator that coordinates transport requests with agent execution, + * task persistence, event routing, and push notifications. + *+ * This class is the core of the A2A server runtime. It receives requests from transport + * layers (JSON-RPC, gRPC, REST), executes user-provided {@link AgentExecutor} logic + * asynchronously, manages event queues for response streaming, and ensures task state + * is persisted through {@link TaskStore}. + *
+ * + *+ * Transport Layer (JSON-RPC/gRPC/REST) + * ↓ calls DefaultRequestHandler methods + * DefaultRequestHandler (orchestrates) + * ↓ + * ┌─────────────┬──────────────┬─────────────────┬──────────────────┐ + * │ AgentExecutor│ TaskStore │ QueueManager │ PushNotification │ + * │ (user logic) │ (persistence)│ (event routing) │ (notifications) │ + * └─────────────┴──────────────┴─────────────────┴──────────────────┘ + *+ * + *
+ * Important: Avoid blocking operations on Vert.x worker threads - they are limited + * and shared across all requests. + *
+ * + *+ * Push notification configurations specify where and how to deliver task state updates + * to external systems (webhook URLs, authentication headers, etc.). A task can have + * multiple push notification configurations for different endpoints or use cases. + *
+ * + *{@code
+ * @ApplicationScoped
+ * @Alternative
+ * @Priority(50)
+ * public class JpaDatabasePushNotificationConfigStore implements PushNotificationConfigStore {
+ * @PersistenceContext
+ * EntityManager em;
+ *
+ * @Transactional
+ * public PushNotificationConfig setInfo(String taskId, PushNotificationConfig config) {
+ * // JPA persistence logic
+ * }
+ * }
+ * }
+ *
+ * + * If {@code notificationConfig.id()} is null or empty, it's set to the task ID. + * If a config with the same ID already exists for this task, it's replaced. + *
+ * * @param taskId the task ID * @param notificationConfig the push notification configuration - * @return the potentially updated push notification configuration + * @return the potentially updated configuration (with ID set if it was null) */ PushNotificationConfig setInfo(String taskId, PushNotificationConfig notificationConfig); /** - * Retrieves the push notification configuration for a task. - * @param params the parameters for listing push notification configurations - * @return the push notification configurations for a task, or with empty list if not found + * Retrieves push notification configurations for a task with pagination support. + *+ * Returns all configs if {@code params.pageSize()} is 0. Otherwise, returns up to + * {@code pageSize} configs and a continuation token for the next page. + *
+ * Pagination Example: + *
{@code
+ * // First page
+ * ListTaskPushNotificationConfigParams params =
+ * new ListTaskPushNotificationConfigParams(taskId, 10, null, tenant);
+ * ListTaskPushNotificationConfigResult result = store.getInfo(params);
+ *
+ * // Next page
+ * if (result.nextPageToken() != null) {
+ * params = new ListTaskPushNotificationConfigParams(
+ * taskId, 10, result.nextPageToken(), tenant);
+ * result = store.getInfo(params);
+ * }
+ * }
+ *
+ * @param params the query parameters including task ID, page size, and page token
+ * @return the configurations for this task (empty list if none found)
*/
ListTaskPushNotificationConfigResult getInfo(ListTaskPushNotificationConfigParams params);
/**
- * Deletes the push notification configuration for a task.
+ * Deletes a push notification configuration for a task.
+ * + * If {@code configId} is null, it defaults to the task ID (deletes the default config). + * If no config exists with the given ID, this method returns normally (idempotent). + *
+ * * @param taskId the task ID - * @param configId the push notification configuration + * @param configId the push notification configuration ID (null = use task ID) */ void deleteInfo(String taskId, String configId); diff --git a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java index 81d577f46..2013d6a22 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java +++ b/server-common/src/main/java/io/a2a/server/tasks/PushNotificationSender.java @@ -3,13 +3,93 @@ import io.a2a.spec.Task; /** - * Interface for sending push notifications for tasks. + * Interface for delivering push notifications containing task state updates to external systems. + *+ * Push notifications enable asynchronous, out-of-band communication of task progress to + * configured webhook URLs or messaging systems. This allows clients to receive updates + * without maintaining persistent connections or polling. + *
+ * + *+ * Push notifications are always sent AFTER the task state is persisted and the client + * has received the event, ensuring consistency. + *
+ * + *{@code
+ * @ApplicationScoped
+ * @Alternative
+ * @Priority(100)
+ * public class KafkaPushNotificationSender implements PushNotificationSender {
+ * @Inject
+ * KafkaProducer producer;
+ *
+ * @Override
+ * public void sendNotification(Task task) {
+ * producer.send("task-updates", task.id(), task);
+ * }
+ * }
+ * }
+ *
+ * + * Called after the task has been persisted to {@link TaskStore}. Retrieve push + * notification URLs or messaging configurations from {@link PushNotificationConfigStore} + * using {@code task.id()}. + *
+ *+ * Error Handling: Log errors but don't throw exceptions. Notifications are + * best-effort and should not fail the primary request. + *
+ * + * @param task the task with current state and artifacts to send */ void sendNotification(Task task); } diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java index 00f9e56a2..18707fba2 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskStore.java @@ -6,11 +6,94 @@ import org.jspecify.annotations.Nullable; /** - * Storage interface for managing task persistence. + * Storage interface for managing task persistence across the task lifecycle. *- * Implementations can use in-memory storage, databases, or other persistence mechanisms. - * Default implementation is {@code InMemoryTaskStore}. + * TaskStore is responsible for persisting task state including status updates, artifacts, + * message history, and metadata. It's called by {@link io.a2a.server.requesthandlers.DefaultRequestHandler} + * and {@link TaskManager} to save task state as agents process requests and generate events. *
+ * + *{@code
+ * @ApplicationScoped
+ * public class InMemoryTaskStore implements TaskStore, TaskStateProvider {
+ * // Provides both persistence and state queries
+ * public boolean isTaskFinalized(String taskId) {
+ * Task task = tasks.get(taskId);
+ * return task != null && task.status().state().isFinal();
+ * }
+ * }
+ * }
+ *
+ * {@code
+ * @ApplicationScoped
+ * @Alternative
+ * @Priority(50) // Higher than default InMemoryTaskStore
+ * public class JpaDatabaseTaskStore implements TaskStore, TaskStateProvider {
+ * @PersistenceContext
+ * EntityManager em;
+ *
+ * @Transactional
+ * public void save(Task task) {
+ * TaskEntity entity = toEntity(task);
+ * em.merge(entity);
+ * }
+ * }
+ * }
+ *
+ *