From 0c6c9d022eace80b4eab1405ddcd8b5f2588f70c Mon Sep 17 00:00:00 2001 From: Tyler Payne Date: Thu, 16 Oct 2025 11:17:19 -0400 Subject: [PATCH 1/2] Protocol returns bool to indicate action persistence This lets the protocol avoid writing noisy actions to the database Empty FetchMessages actions are not persisted to the databse --- .../marketplace/protocol/fetch_messages.py | 13 +++++++++--- .../marketplace/protocol/protocol.py | 15 +++++++++---- .../platform/protocol/base.py | 11 ++++++++-- .../platform/server/routes/actions.py | 21 ++++++++++--------- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py index 5ea6285b..a8d037bd 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py @@ -19,7 +19,7 @@ async def execute_fetch_messages( fetch_messages: FetchMessages, agent: AgentProfile, database: BaseDatabaseController, -) -> ActionExecutionResult: +) -> tuple[ActionExecutionResult, bool]: """Execute a fetch messages action. This function implements the message fetching functionality that was previously @@ -31,7 +31,9 @@ async def execute_fetch_messages( database: Database controller for accessing data Returns: - ActionExecutionResult containing the fetched messages + A tuple of (ActionExecutionResult, bool) where: + - ActionExecutionResult contains the fetched messages + - bool indicates whether messages were fetched (True if messages exist) """ messages, has_more = await _fetch_messages_from_database( @@ -43,7 +45,9 @@ async def execute_fetch_messages( has_more=has_more, ) - return ActionExecutionResult(content=response.model_dump(mode="json")) + return ActionExecutionResult(content=response.model_dump(mode="json")), len( + messages + ) > 0 async def _fetch_messages_from_database( @@ -100,6 +104,9 @@ async def _fetch_messages_from_database( received_messages = received_messages[: fetch_messages.limit - 1] has_more = True + if len(received_messages) > 0: + print(f"Fetched {len(received_messages)} messages") + return received_messages, has_more diff --git a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py index 0f32634c..f840b898 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py @@ -35,12 +35,19 @@ async def execute_action( agent: AgentProfile, action: ActionExecutionRequest, database: BaseDatabaseController, - ) -> ActionExecutionResult: - """Execute an action.""" + ) -> tuple[ActionExecutionResult, bool]: + """Execute an action. + + Returns: + A tuple of (ActionExecutionResult, bool) where: + - ActionExecutionResult contains the action execution result + - bool indicates whether the action should be persisted to the database + + """ parsed_action = ActionAdapter.validate_python(action.parameters) if isinstance(parsed_action, SendMessage): - return await execute_send_message(parsed_action, database) + return await execute_send_message(parsed_action, database), True elif isinstance(parsed_action, FetchMessages): return await execute_fetch_messages(parsed_action, agent, database) @@ -48,6 +55,6 @@ async def execute_action( elif isinstance(parsed_action, Search): return await execute_search( search=parsed_action, agent=agent, database=database - ) + ), True else: raise ValueError(f"Unknown action type: {parsed_action.type}") diff --git a/packages/magentic-marketplace/src/magentic_marketplace/platform/protocol/base.py b/packages/magentic-marketplace/src/magentic_marketplace/platform/protocol/base.py index e89bc763..2e3c40bb 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/platform/protocol/base.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/platform/protocol/base.py @@ -28,6 +28,13 @@ async def execute_action( agent: AgentProfile, action: ActionExecutionRequest, database: BaseDatabaseController, - ) -> ActionExecutionResult: - """Execute a specific action with the given name and parameters.""" + ) -> tuple[ActionExecutionResult, bool]: + """Execute a specific action with the given name and parameters. + + Returns: + A tuple of (ActionExecutionResult, bool) where: + - ActionExecutionResult contains the action execution result + - bool indicates whether the action should be persisted to the database + + """ ... diff --git a/packages/magentic-marketplace/src/magentic_marketplace/platform/server/routes/actions.py b/packages/magentic-marketplace/src/magentic_marketplace/platform/server/routes/actions.py index b8840df8..1d3f08a1 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/platform/server/routes/actions.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/platform/server/routes/actions.py @@ -54,22 +54,23 @@ async def execute_action( agent_with_id = authenticated_agent.data.model_copy() agent_with_id.id = authenticated_agent.id # TODO why is this necessary? - result = await protocol.execute_action( + result, persist = await protocol.execute_action( agent=agent_with_id, action=request, database=db, ) - action_data = ActionRowData( - agent_id=authenticated_agent.id, request=request, result=result - ) - db_action = ActionRow( - id="", # auto-generated by DB - created_at=datetime.now(UTC), - data=action_data, - ) + if persist: + action_data = ActionRowData( + agent_id=authenticated_agent.id, request=request, result=result + ) + db_action = ActionRow( + id="", # auto-generated by DB + created_at=datetime.now(UTC), + data=action_data, + ) - await db.actions.create(db_action) + await db.actions.create(db_action) return result From b426e05a31b7075e5536376b9709e05808976fde Mon Sep 17 00:00:00 2001 From: Tyler Payne Date: Thu, 16 Oct 2025 17:44:38 -0400 Subject: [PATCH 2/2] Add options to drop empty or all FetchMessages actions --- .../src/magentic_marketplace/cli.py | 15 +++++++ .../experiments/run_experiment.py | 19 ++++++++- .../marketplace/protocol/fetch_messages.py | 41 ++++++++++++++----- .../marketplace/protocol/protocol.py | 30 ++++++++++++-- 4 files changed, 88 insertions(+), 17 deletions(-) diff --git a/packages/magentic-marketplace/src/magentic_marketplace/cli.py b/packages/magentic-marketplace/src/magentic_marketplace/cli.py index a3b95451..ccf313bc 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/cli.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/cli.py @@ -94,6 +94,8 @@ def run_experiment_command(args): export_sqlite=args.export, export_dir=args.export_dir, export_filename=args.export_filename, + drop_empty_fetch_messages=args.drop_empty_fetch_messages, + drop_all_fetch_messages=args.drop_all_fetch_messages, ) ) @@ -297,6 +299,19 @@ def main(): help="Output filename for SQLite export (default: .db). Only used with --export.", ) + # Add mutually exclusive group for fetch messages persistence options + fetch_messages_group = experiment_parser.add_mutually_exclusive_group() + fetch_messages_group.add_argument( + "--drop-empty-fetch-messages", + action="store_true", + help="Don't save empty FetchMessages actions to the database (saves only non-empty fetches).", + ) + fetch_messages_group.add_argument( + "--drop-all-fetch-messages", + action="store_true", + help="Don't save any FetchMessages actions to the database.", + ) + # analytics subcommand analytics_parser = subparsers.add_parser( "analyze", help="Analyze marketplace simulation data" diff --git a/packages/magentic-marketplace/src/magentic_marketplace/experiments/run_experiment.py b/packages/magentic-marketplace/src/magentic_marketplace/experiments/run_experiment.py index a69cf579..3abfdf33 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/experiments/run_experiment.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/experiments/run_experiment.py @@ -10,7 +10,10 @@ load_customers_from_yaml, ) from magentic_marketplace.marketplace.agents import BusinessAgent, CustomerAgent -from magentic_marketplace.marketplace.protocol.protocol import SimpleMarketplaceProtocol +from magentic_marketplace.marketplace.protocol.protocol import ( + FetchMessagesPersistence, + SimpleMarketplaceProtocol, +) from magentic_marketplace.platform.database import ( connect_to_postgresql_database, ) @@ -35,6 +38,8 @@ async def run_marketplace_experiment( export_sqlite: bool = False, export_dir: str | None = None, export_filename: str | None = None, + drop_empty_fetch_messages: bool = False, + drop_all_fetch_messages: bool = False, ): """Run a marketplace experiment using YAML configuration files.""" # Load businesses and customers from YAML files @@ -69,8 +74,18 @@ def database_factory(): server_port = s.getsockname()[1] print(f"Auto-assigned server port: {server_port}") + # Determine fetch messages persistence mode based on CLI flags + if drop_all_fetch_messages: + fetch_messages_persistence = FetchMessagesPersistence.NONE + elif drop_empty_fetch_messages: + fetch_messages_persistence = FetchMessagesPersistence.NON_EMPTY + else: + fetch_messages_persistence = FetchMessagesPersistence.ALL + marketplace_launcher = MarketplaceLauncher( - protocol=SimpleMarketplaceProtocol(), + protocol=SimpleMarketplaceProtocol( + fetch_messages_persistence=fetch_messages_persistence + ), database_factory=database_factory, host=server_host, port=server_port, diff --git a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py index a8d037bd..f456eb4d 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/fetch_messages.py @@ -1,5 +1,7 @@ """FetchMessages action implementation for the simple marketplace.""" +from enum import Enum + from magentic_marketplace.platform.database.base import BaseDatabaseController from magentic_marketplace.platform.database.models import ActionRow from magentic_marketplace.platform.database.queries.base import ( @@ -15,10 +17,20 @@ from ..database import queries +class FetchMessagesPersistence(str, Enum): + """Enum for controlling FetchMessages action persistence.""" + + ALL = "all" # Save all fetch messages actions + NON_EMPTY = "non_empty" # Save only non-empty fetch messages + NONE = "none" # Don't save any fetch messages actions + + async def execute_fetch_messages( fetch_messages: FetchMessages, agent: AgentProfile, database: BaseDatabaseController, + agent_last_fetch_messages_count: dict[str, int], + persistence: FetchMessagesPersistence, ) -> tuple[ActionExecutionResult, bool]: """Execute a fetch messages action. @@ -29,25 +41,35 @@ async def execute_fetch_messages( fetch_messages: The fetch messages action containing query parameters agent: The agent fetching messages database: Database controller for accessing data + agent_last_fetch_messages_count: Dictionary mapping agent ids to the length of the messages returned by the last fetch + persistence: Strategy for when to persist FetchMessages actions Returns: A tuple of (ActionExecutionResult, bool) where: - ActionExecutionResult contains the fetched messages - - bool indicates whether messages were fetched (True if messages exist) + - bool indicates whether the action should be persisted """ messages, has_more = await _fetch_messages_from_database( fetch_messages, agent, database ) - response = FetchMessagesResponse( - messages=messages, - has_more=has_more, - ) + last_messages_count = agent_last_fetch_messages_count[agent.id] + agent_last_fetch_messages_count[agent.id] = len(messages) + + content = FetchMessagesResponse(messages=messages, has_more=has_more) + result = ActionExecutionResult(content=content.model_dump(mode="json")) - return ActionExecutionResult(content=response.model_dump(mode="json")), len( - messages - ) > 0 + # Determine if we should persist this FetchMessages action based on the persistence mode + if persistence == FetchMessagesPersistence.ALL: + persist = True + elif persistence == FetchMessagesPersistence.NON_EMPTY: + # Are there more messages now than before? + persist = last_messages_count < len(messages) + else: # FetchMessagesPersistence.NONE + persist = False + + return result, persist async def _fetch_messages_from_database( @@ -104,9 +126,6 @@ async def _fetch_messages_from_database( received_messages = received_messages[: fetch_messages.limit - 1] has_more = True - if len(received_messages) > 0: - print(f"Fetched {len(received_messages)} messages") - return received_messages, has_more diff --git a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py index f840b898..91d9572a 100644 --- a/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py +++ b/packages/magentic-marketplace/src/magentic_marketplace/marketplace/protocol/protocol.py @@ -1,5 +1,7 @@ """Simple marketplace protocol implementation.""" +from collections import defaultdict + from magentic_marketplace.platform.database.base import BaseDatabaseController from magentic_marketplace.platform.protocol.base import BaseMarketplaceProtocol from magentic_marketplace.platform.shared.models import ( @@ -14,7 +16,7 @@ Search, SendMessage, ) -from .fetch_messages import execute_fetch_messages +from .fetch_messages import FetchMessagesPersistence, execute_fetch_messages from .search import execute_search from .send_message import execute_send_message @@ -22,8 +24,22 @@ class SimpleMarketplaceProtocol(BaseMarketplaceProtocol): """Marketplace protocol.""" - def __init__(self): - """Initialize the marketplace protocol.""" + def __init__( + self, + fetch_messages_persistence: FetchMessagesPersistence = FetchMessagesPersistence.ALL, + ): + """Initialize the marketplace protocol. + + Args: + fetch_messages_persistence: Controls which FetchMessages actions are persisted to database. + - ALL (default): Save all FetchMessages actions + - NON_EMPTY: Save only FetchMessages that returned messages + - NONE: Don't save any FetchMessages actions + + """ + self.fetch_messages_persistence = fetch_messages_persistence + # Track how many messages were fetched by an agent in the last count, use it to determine if "new" messages were provided or not + self._to_agent_id_last_fetch_messages_count: dict[str, int] = defaultdict(int) def get_actions(self): """Define available actions in the marketplace.""" @@ -50,7 +66,13 @@ async def execute_action( return await execute_send_message(parsed_action, database), True elif isinstance(parsed_action, FetchMessages): - return await execute_fetch_messages(parsed_action, agent, database) + return await execute_fetch_messages( + parsed_action, + agent, + database, + self._to_agent_id_last_fetch_messages_count, + self.fetch_messages_persistence, + ) elif isinstance(parsed_action, Search): return await execute_search(