diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 709967a..e6a42d0 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -1,8 +1,10 @@ +import ast import json import traceback +from contextlib import suppress from dataclasses import dataclass from datetime import datetime, timezone, timedelta -from typing import List, Dict, Any, Optional,Tuple +from typing import List, Dict, Any, Optional, Tuple, Union import requests @@ -10,7 +12,7 @@ from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin from virtuals_acp import IDeliverable -from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus +from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus, ACPJobPhase from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IInventory, ACP_JOB_PHASE_MAP from virtuals_acp.client import VirtualsACP @@ -241,7 +243,11 @@ def _search_agents_executable(self, reasoning: str, keyword: str) -> Tuple[Funct "wallet_address": agent.wallet_address, "offerings": ( [ - {"name": offering.type, "price": offering.price} + { + "name": offering.name, + "price": offering.price, + "requirement_schema": offering.requirement_schema, + } for offering in agent.offerings ] if agent.offerings @@ -287,22 +293,20 @@ def initiate_job(self) -> Function: description="The seller's agent wallet address you want to buy from", ) - price_arg = Argument( - name="price", + service_name_arg = Argument( + name="service_name", type="string", - description="Offered price for service", + description="The service name you want to purchase", ) - reasoning_arg = Argument( - name="reasoning", - type="string", - description="Why you are making this purchase request", - ) - - service_requirements_arg = Argument( - name="service_requirements", - type="string", - description="Detailed specifications for service-based items", + service_requirement_arg = Argument( + name="service_requirement", + type="string or python dictionary", + description=""" + Detailed specifications for service-based items, + give in python dictionary (NOT WRAPPED WITH '', just python dictionary as is) that MATCHES the service's + requirement schema if required, else give in the requirement in a natural language sentence. + """, ) require_evaluation_arg = Argument( @@ -317,7 +321,20 @@ def initiate_job(self) -> Function: description="Keyword to search for a evaluator", ) - args = [seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg, require_evaluation_arg, evaluator_keyword_arg] + reasoning_arg = Argument( + name="reasoning", + type="string", + description="Why you are making this purchase request", + ) + + args = [ + seller_wallet_address_arg, + service_name_arg, + service_requirement_arg, + require_evaluation_arg, + evaluator_keyword_arg, + reasoning_arg + ] if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None: tweet_content_arg = Argument( @@ -334,16 +351,13 @@ def initiate_job(self) -> Function: executable=self._initiate_job_executable ) - def _initiate_job_executable(self, seller_wallet_address: str, price: str, reasoning: str, service_requirements: str, require_evaluation: str, evaluator_keyword: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: + def _initiate_job_executable(self, seller_wallet_address: str, service_name: str, service_requirement: Union[str, Dict[str, Any]], require_evaluation: str, evaluator_keyword: str, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if isinstance(require_evaluation, str): require_evaluation = require_evaluation.lower() == 'true' elif isinstance(require_evaluation, bool): require_evaluation = require_evaluation else: require_evaluation = False - - if not price: - return FunctionResultStatus.FAILED, "Missing price - specify how much you're offering per unit", {} if not reasoning: return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this purchase request", {} @@ -364,29 +378,48 @@ def _initiate_job_executable(self, seller_wallet_address: str, price: str, reaso return FunctionResultStatus.FAILED, "No evaluator found - try a different keyword", {} evaluator_address = validators[0].wallet_address - + expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.job_expiry_duration_mins) - job_id = self.acp_client.initiate_job( - seller_wallet_address, - service_requirements, - float(price), + + agent = self.acp_client.get_agent(seller_wallet_address) + offering = next( + (o for o in agent.offerings if o.name == service_name), + None + ) + + if offering is None: + return ( + FunctionResultStatus.FAILED, + f"No offering found with name '{service_name}', available offerings are: {', '.join(str(agent.offerings))}", + {} + ) + + if isinstance(service_requirement, str): + # failsafe if GAME still passes in dictionary wrapped with quotes and treated as a str + if (sr := service_requirement.strip()).startswith("{") and sr.endswith("}"): + with suppress(ValueError, SyntaxError): + service_requirement = ast.literal_eval(sr) + + job_id = offering.initiate_job( + service_requirement, evaluator_address, expired_at ) + job = self.acp_client.get_job_by_onchain_id(job_id) if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None: - self._tweet_job(job_id, f"{tweet_content} #{job_id}") + self._tweet_job(job, f"{tweet_content} #{job_id}") return FunctionResultStatus.DONE, json.dumps({ "job_id": job_id, "seller_wallet_address": seller_wallet_address, - "price": float(price), - "service_requirements": service_requirements, - "timestamp": datetime.now().timestamp(), + "service_name": service_name, + "service_requirements": service_requirement, + "timestamp": datetime.now().timestamp() }), {} except Exception as e: print(traceback.format_exc()) - return FunctionResultStatus.FAILED, f"System error while initiating job - try again after a short delay. {str(e)}", {} + return FunctionResultStatus.FAILED, f"Error while initiating job - try initiating the job again.\n{str(e)}", {} @property def respond_job(self) -> Function: @@ -436,30 +469,18 @@ def _respond_job_executable(self, job_id: int, decision: str, reasoning: str, tw return FunctionResultStatus.FAILED, "Missing reasoning - explain why you made this decision", {} try: - state = self.get_acp_state() - - job = next( - (c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id), - None - ) + job = self.acp_client.get_job_by_onchain_id(job_id) if not job: return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {} - if job["phase"] != AcpJobPhasesDesc.REQUEST: - return FunctionResultStatus.FAILED, f"Cannot respond - job is in '{job['phase']}' phase, must be in 'request' phase", {} + if job.phase != ACPJobPhase.REQUEST: + return FunctionResultStatus.FAILED, f"Cannot respond - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'request' phase", {} - self.acp_client.respond_to_job_memo( - job_id, - job["memo"][0]["id"], - decision == "ACCEPT", - reasoning - ) + job.respond(decision == "ACCEPT", None, reasoning) if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None: - tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None - if tweet_id: - self._tweet_job(job_id, tweet_content, tweet_id) + self._tweet_job(job, tweet_content) return FunctionResultStatus.DONE, json.dumps({ "job_id": job_id, @@ -477,19 +498,13 @@ def pay_job(self) -> Function: description="The job ID you are paying for", ) - amount_arg = Argument( - name="amount", - type="float", - description="The total amount to pay", # in Ether - ) - reasoning_arg = Argument( name="reasoning", type="string", description="Why you are making this payment", ) - args = [job_id_arg, amount_arg, reasoning_arg] + args = [job_id_arg, reasoning_arg] if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None: tweet_content_arg = Argument( @@ -506,46 +521,30 @@ def pay_job(self) -> Function: executable=self._pay_job_executable ) - def _pay_job_executable(self, job_id: int, amount: float, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: + def _pay_job_executable(self, job_id: int, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if not job_id: return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're paying for", {} - if not amount: - return FunctionResultStatus.FAILED, "Missing amount - specify how much you're paying", {} - if not reasoning: return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this payment", {} try: - state = self.get_acp_state() - - job = next( - (c for c in state["jobs"]["active"]["as_a_buyer"] if c["job_id"] == job_id), - None - ) + job = self.acp_client.get_job_by_onchain_id(job_id) if not job: return FunctionResultStatus.FAILED, "Job not found in your buyer jobs - check the ID and verify you're the buyer", {} - if job["phase"] != AcpJobPhasesDesc.NEGOTIATION: - return FunctionResultStatus.FAILED, f"Cannot pay - job is in '{job['phase']}' phase, must be in 'negotiation' phase", {} + if job.phase != ACPJobPhase.NEGOTIATION: + return FunctionResultStatus.FAILED, f"Cannot pay - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'negotiation' phase", {} - - self.acp_client.pay_for_job( - job_id, - job["memo"][0]["id"], - amount, - reasoning - ) + job.pay(job.price, reasoning) if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None: - tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None - if tweet_id: - self._tweet_job(job_id, tweet_content, tweet_id) + self._tweet_job(job, tweet_content) return FunctionResultStatus.DONE, json.dumps({ "job_id": job_id, - "amount_paid": amount, + "amount_paid": job.price, "timestamp": datetime.now().timestamp() }), {} except Exception as e: @@ -578,7 +577,7 @@ def deliver_job(self) -> Function: return Function( fn_name="deliver_job", - fn_description="Completes a sale by delivering items to the buyer", + fn_description="Deliver the requested result to the client. Use this function after producing the deliverable", args=args, executable=self._deliver_job_executable ) @@ -591,21 +590,16 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this delivery", {} try: - state = self.get_acp_state() - - job = next( - (c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id), - None - ) + job = self.acp_client.get_job_by_onchain_id(job_id) if not job: return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {} - if job["phase"] != AcpJobPhasesDesc.TRANSACTION: - return FunctionResultStatus.FAILED, f"Cannot deliver - job is in '{job['phase']}' phase, must be in 'transaction' phase", {} + if job.phase != ACPJobPhase.TRANSACTION: + return FunctionResultStatus.FAILED, f"Cannot deliver - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'transaction' phase", {} produced = next( - (i for i in self.produced_inventory if i.job_id == job["job_id"]), + (i for i in self.produced_inventory if i.job_id == job.id), None ) @@ -617,15 +611,10 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op value=produced.value ) - self.acp_client.submit_job_deliverable( - job_id, - deliverable, - ) + job.deliver(deliverable) if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None: - tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None - if tweet_id: - self._tweet_job(job_id, tweet_content, tweet_id) + self._tweet_job(job, tweet_content) return FunctionResultStatus.DONE, json.dumps({ "status": "success", @@ -637,23 +626,17 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op print(traceback.format_exc()) return FunctionResultStatus.FAILED, f"System error while delivering items - try again after a short delay. {str(e)}", {} - def _tweet_job(self, job_id: int, content: str, tweet_id: Optional[str] = None): - if not hasattr(self, 'twitter_plugin') or self.twitter_plugin is None: - return - - job = self.acp_client.get_job_by_onchain_id(job_id) - if not job: - raise Exception("ERROR (tweetJob): Job not found") - - - if tweet_id : + def _tweet_job(self, job: ACPJob, tweet_content: str): + tweets = job.context.get("tweets", []) if job.context else [] + if tweets: + latest_tweet = max(tweets, key=lambda t: t["createdAt"]) + tweet_id = latest_tweet.get("tweetId", None) response = self.twitter_plugin.twitter_client.create_tweet( - text=content, + text=tweet_content, in_reply_to_tweet_id=tweet_id ) else: - response = self.twitter_plugin.twitter_client.create_tweet(text=content) - + response = self.twitter_plugin.twitter_client.create_tweet(text=tweet_content) role = "buyer" if job.client_address.lower() == self.acp_client.agent_address.lower() else "seller" @@ -661,7 +644,7 @@ def _tweet_job(self, job_id: int, content: str, tweet_id: Optional[str] = None): tweet_id = None if isinstance(response, dict): tweet_id = response.get('data', {}).get('id') or response.get('id') - + context = { **(job.context or {}), 'tweets': [ @@ -669,14 +652,14 @@ def _tweet_job(self, job_id: int, content: str, tweet_id: Optional[str] = None): { 'type': role, 'tweetId': tweet_id, - 'content': content, + 'content': tweet_content, 'createdAt': int(datetime.now().timestamp() * 1000) }, ], } response = requests.patch( - f"{self.acp_base_url}/jobs/{job_id}/context", + f"{self.acp_base_url}/jobs/{job.id}/context", headers={ "Content-Type": "application/json", "wallet-address": self.acp_client.agent_address, diff --git a/plugins/acp/acp_plugin_gamesdk/interface.py b/plugins/acp/acp_plugin_gamesdk/interface.py index eade02c..702396b 100644 --- a/plugins/acp/acp_plugin_gamesdk/interface.py +++ b/plugins/acp/acp_plugin_gamesdk/interface.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from enum import Enum -from typing import Optional, List, Literal, Dict, Any +from typing import Optional, List, Literal, Dict, Any, Union from pydantic import BaseModel from virtuals_acp.models import ACPJobPhase, IDeliverable @@ -58,7 +58,7 @@ class IAcpJob(BaseModel): job_id: Optional[int] client_name: Optional[str] provider_name: Optional[str] - desc: str + desc: Union[str, Dict[str, Any]] price: str provider_address: Optional[str] phase: AcpJobPhasesDesc diff --git a/plugins/acp/examples/agentic/buyer.py b/plugins/acp/examples/agentic/buyer.py index b19d4c2..b667cea 100644 --- a/plugins/acp/examples/agentic/buyer.py +++ b/plugins/acp/examples/agentic/buyer.py @@ -143,7 +143,7 @@ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str, while True: print("🟢"*40) agent.step() - state = from_dict(data_class=AcpState, data=agent.agent_state, config=Config(type_hooks={AcpJobPhasesDesc: AcpJobPhasesDesc})) + state = AcpState.model_validate(agent.agent_state) print(Panel(f"{state}", title="Agent State", box=box.ROUNDED, title_align="left")) print("🔴"*40) input("\nPress any key to continue...\n") diff --git a/plugins/acp/examples/reactive/buyer.py b/plugins/acp/examples/reactive/buyer.py index de2f5c9..538ecd0 100644 --- a/plugins/acp/examples/reactive/buyer.py +++ b/plugins/acp/examples/reactive/buyer.py @@ -1,14 +1,15 @@ import threading +from collections import deque -from typing import Tuple -from game_sdk.game.agent import Agent, WorkerConfig +from typing import Tuple, Optional, Deque +from game_sdk.game.agent import Agent from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus from acp_plugin_gamesdk.interface import AcpState, to_serializable_dict from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions from acp_plugin_gamesdk.env import PluginEnvSettings from virtuals_acp.client import VirtualsACP -from virtuals_acp import ACPJob, ACPJobPhase +from virtuals_acp import ACPJob, ACPJobPhase, ACPMemo from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus from rich import print, box from rich.panel import Panel @@ -24,14 +25,6 @@ env = PluginEnvSettings() -def on_evaluate(job: ACPJob): - for memo in job.memos: - if memo.next_phase == ACPJobPhase.COMPLETED: - print(f"Evaluating deliverable for job {job.id}") - # Auto-accept all deliverables for this example - job.evaluate(True) - break - # GAME Twitter Plugin options options = { "id": "twitter_plugin", @@ -64,19 +57,19 @@ def buyer(use_thread_lock: bool = True): return # Thread-safe job queue setup - job_queue = [] + job_queue: Deque[Tuple[ACPJob, Optional[ACPMemo]]] = deque() job_queue_lock = threading.Lock() job_event = threading.Event() # Thread-safe append with optional lock - def safe_append_job(job): + def safe_append_job(job: ACPJob, memo_to_sign: Optional[ACPMemo] = None): if use_thread_lock: print(f"[safe_append_job] Acquiring lock to append job {job.id}") with job_queue_lock: print(f"[safe_append_job] Lock acquired, appending job {job.id} to queue") - job_queue.append(job) + job_queue.append((job, memo_to_sign)) else: - job_queue.append(job) + job_queue.append((job, memo_to_sign)) # Thread-safe pop with optional lock def safe_pop_job(): @@ -84,19 +77,20 @@ def safe_pop_job(): print(f"[safe_pop_job] Acquiring lock to pop job") with job_queue_lock: if job_queue: - job = job_queue.pop(0) + job, memo_to_sign = job_queue.popleft() print(f"[safe_pop_job] Lock acquired, popped job {job.id}") - return job + return job, memo_to_sign else: print("[safe_pop_job] Queue is empty after acquiring lock") + return None, None else: if job_queue: - job = job_queue.pop(0) + job, memo_to_sign = job_queue.popleft() print(f"[safe_pop_job] Popped job {job.id} without lock") - return job + return job, memo_to_sign else: print("[safe_pop_job] Queue is empty (no lock)") - return None + return None, None # Background thread worker: process jobs one by one def job_worker(): @@ -105,11 +99,11 @@ def job_worker(): # Process all available jobs while True: - job = safe_pop_job() + job, memo_to_sign = safe_pop_job() if not job: break try: - process_job(job) + process_job(job, memo_to_sign) except Exception as e: print(f"❌ Error processing job: {e}") # Continue processing other jobs even if one fails @@ -124,23 +118,34 @@ def job_worker(): job_event.clear() # Event-triggered job task receiver - def on_new_task(job: ACPJob): + def on_new_task(job: ACPJob, memo_to_sign: ACPMemo): print(f"[on_new_task] Received job {job.id} (phase: {job.phase})") + safe_append_job(job, memo_to_sign) + job_event.set() + + def on_evaluate(job: ACPJob): + print(f"[on_evaluate] Received job {job.id}") safe_append_job(job) job_event.set() - def process_job(job: ACPJob): + def process_job(job: ACPJob, memo_to_sign: Optional[ACPMemo]): out = "" print(job.phase, "job.phase") - if job.phase == ACPJobPhase.NEGOTIATION: - for memo in job.memos: - print(memo.next_phase, "memo.next_phase") - if memo.next_phase == ACPJobPhase.TRANSACTION: - out += f"Buyer agent is reacting to job:\n{job}\n\n" - buyer_agent.get_worker("acp_worker").run( - f"Respond to the following transaction: {job}", - ) - out += "Buyer agent has responded to the job\n" + if ( + job.phase == ACPJobPhase.NEGOTIATION and + memo_to_sign is not None and + memo_to_sign.next_phase == ACPJobPhase.TRANSACTION + ): + out += f"Buyer agent is reacting to job:\n{job}\n\n" + buyer_agent.get_worker("acp_worker").run( + f"Respond to the following transaction: {job}", + ) + out += "Buyer agent has responded to the job\n" + elif job.phase == ACPJobPhase.EVALUATION: + out += f"Buyer agent is evaluating to job:\n{job}\n\n" + # Auto-accept all deliverables for this example + job.evaluate(True) + out += f"Buyer agent has evaluated the job:\n{job}\n\n" print(Panel(out, title="🔁 Reaction", box=box.ROUNDED, title_align="left", border_style="red")) @@ -154,10 +159,10 @@ def process_job(job: ACPJob): on_new_task=on_new_task, entity_id=env.BUYER_ENTITY_ID ), - twitter_plugin=TwitterPlugin(options), cluster="", #example cluster graduation_status=ACPGraduationStatus.ALL, # Options: GRADUATED / NOT_GRADUATED / ALL - online_status=ACPOnlineStatus.ALL # Options: ONLINE / OFFLINE / ALL + online_status=ACPOnlineStatus.ALL, # Options: ONLINE / OFFLINE / ALL + twitter_plugin=TwitterPlugin(options), ) ) @@ -167,44 +172,36 @@ def get_agent_state(_: None, _e: None) -> dict: return state_dict def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str, dict]: - return FunctionResultStatus.DONE, "Tweet has been posted", {} - # if acp_plugin.twitter_plugin is not None: - # post_tweet_fn = acp_plugin.twitter_plugin.get_function('post_tweet') - # post_tweet_fn(content) - # return FunctionResultStatus.DONE, "Tweet has been posted", {} - - # return FunctionResultStatus.FAILED, "Twitter plugin is not initialized", {} - - core_worker = WorkerConfig( - id="core-worker", - worker_description="This worker is to post tweet", - action_space=[ - Function( - fn_name="post_tweet", - fn_description="This function is to post tweet", - args=[ - Argument( - name="content", - type="string", - description="The content of the tweet" - ), - Argument( - name="reasoning", - type="string", - description="The reasoning of the tweet" - ) - ], - executable=post_tweet + if acp_plugin.twitter_plugin is not None: + acp_plugin.twitter_plugin.twitter_client.create_tweet(text=content) + return FunctionResultStatus.DONE, "Tweet has been posted", {} + + return FunctionResultStatus.FAILED, "Twitter plugin is not initialized", {} + + post_tweet_fn = Function( + fn_name="post_tweet", + fn_description="This function is to post tweet", + args=[ + Argument( + name="content", + type="string", + description="The content of the tweet" + ), + Argument( + name="reasoning", + type="string", + description="The reasoning of the tweet" ) ], - get_state_fn=get_agent_state + executable=post_tweet ) acp_worker = acp_plugin.get_worker( { "functions": [ acp_plugin.search_agents_functions, - acp_plugin.initiate_job + acp_plugin.initiate_job, + post_tweet_fn ] } ) @@ -220,7 +217,7 @@ def post_tweet(content: str, reasoning: str) -> Tuple[FunctionResultStatus, str, {acp_plugin.agent_description} """, - workers=[core_worker, acp_worker], + workers=[acp_worker], get_agent_state_fn=get_agent_state ) diff --git a/plugins/acp/examples/reactive/seller.py b/plugins/acp/examples/reactive/seller.py index 8840888..8dea2bb 100644 --- a/plugins/acp/examples/reactive/seller.py +++ b/plugins/acp/examples/reactive/seller.py @@ -1,11 +1,11 @@ import threading -from typing import Tuple +from typing import Tuple, Optional, Deque from acp_plugin_gamesdk.acp_plugin import AcpPlugin, AcpPluginOptions from acp_plugin_gamesdk.interface import AcpState, IInventory, to_serializable_dict from acp_plugin_gamesdk.env import PluginEnvSettings from virtuals_acp.client import VirtualsACP -from virtuals_acp import ACPJob, ACPJobPhase +from virtuals_acp import ACPJob, ACPJobPhase, ACPMemo from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus from game_sdk.game.agent import Agent from collections import deque @@ -56,20 +56,20 @@ def seller(use_thread_lock: bool = True): return # Thread-safe job queue setup - job_queue = deque() + job_queue: Deque[Tuple[ACPJob, Optional[ACPMemo]]] = deque() job_queue_lock = threading.Lock() job_event = threading.Event() # Thread-safe append wrapper - def safe_append_job(job): + def safe_append_job(job: ACPJob, memo_to_sign: Optional[ACPMemo] = None): if use_thread_lock: print("[append] Attempting to acquire job_queue_lock") with job_queue_lock: print("[append] Lock acquired. Appending job to queue:", job.id) - job_queue.append(job) + job_queue.append((job, memo_to_sign)) print(f"[append] Queue size is now {len(job_queue)}") else: - job_queue.append(job) + job_queue.append((job, memo_to_sign)) print(f"[append] Appended job (no lock). Queue size is now {len(job_queue)}") # Thread-safe pop wrapper @@ -79,19 +79,20 @@ def safe_pop_job(): with job_queue_lock: print("[pop] Lock acquired.") if job_queue: - job = job_queue.popleft() + job, memo_to_sign = job_queue.popleft() print(f"[pop] Job popped: {job.id}") - return job + return job, memo_to_sign else: print("[pop] Queue is empty.") + return None, None else: if job_queue: - job = job_queue.popleft() + job, memo_to_sign = job_queue.popleft() print(f"[pop] Job popped (no lock): {job.id}") - return job + return job, memo_to_sign else: print("[pop] Queue is empty (no lock).") - return None + return None, None # Background thread worker: process jobs one by one def job_worker(): @@ -100,11 +101,11 @@ def job_worker(): # Process all available jobs while True: - job = safe_pop_job() + job, memo_to_sign = safe_pop_job() if not job: break try: - process_job(job) + process_job(job, memo_to_sign) except Exception as e: print(f"❌ Error processing job: {e}") # Continue processing other jobs even if one fails @@ -119,31 +120,35 @@ def job_worker(): job_event.clear() # Event-triggered job task receiver - def on_new_task(job: ACPJob): + def on_new_task(job: ACPJob, memo_to_sign: ACPMemo): print(f"[on_new_task] New job received: {job.id}") - safe_append_job(job) + safe_append_job(job, memo_to_sign) job_event.set() print("[on_new_task] job_event set.") - def process_job(job: ACPJob): + def process_job(job: ACPJob, memo_to_sign: Optional[ACPMemo]): out = "" out += f"Reacting to job:\n{job}\n\n" prompt = "" - if job.phase == ACPJobPhase.REQUEST: - for memo in job.memos: - if memo.next_phase == ACPJobPhase.NEGOTIATION: - prompt = f""" + if ( + job.phase == ACPJobPhase.REQUEST and + memo_to_sign is not None and + memo_to_sign.next_phase == ACPJobPhase.NEGOTIATION + ): + prompt = f""" Respond to the following transaction: {job} decide whether you should accept the job or not. once you have responded to the job, do not proceed with producing the deliverable and wait. """ - elif job.phase == ACPJobPhase.TRANSACTION: - for memo in job.memos: - if memo.next_phase == ACPJobPhase.EVALUATION: - prompt = f""" + elif ( + job.phase == ACPJobPhase.TRANSACTION and + memo_to_sign is not None and + memo_to_sign.next_phase == ACPJobPhase.EVALUATION + ): + prompt = f""" Respond to the following transaction: {job} @@ -206,7 +211,7 @@ def generate_meme(description: str, job_id: int, reasoning: str) -> Tuple[Functi acp_plugin.add_produce_item(meme) - return FunctionResultStatus.DONE, f"Meme generated with the URL: {url}", {} + return FunctionResultStatus.DONE, f"Meme generated with the URL: {url}, next step is to deliver it to the client.", {} generate_meme_function = Function( fn_name="generate_meme", diff --git a/plugins/acp/pyproject.toml b/plugins/acp/pyproject.toml index 9c7e7e5..1ea99d6 100644 --- a/plugins/acp/pyproject.toml +++ b/plugins/acp/pyproject.toml @@ -12,7 +12,7 @@ game-sdk = ">=0.1.5" python-dotenv = "^1.1.0" dacite = "^1.9.2" rich = ">=13.9.4,<15.0.0" -virtuals-acp = "^0.1.22" +virtuals-acp = "^0.2.6" [build-system] requires = ["poetry-core"]