Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 94 additions & 111 deletions plugins/acp/acp_plugin_gamesdk/acp_plugin.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import ast
import json
import traceback
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional,Tuple
from typing import List, Dict, Any, Optional, Tuple, Union

import requests

from game_sdk.game.agent import WorkerConfig
from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
from virtuals_acp import IDeliverable
from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus
from virtuals_acp.models import ACPGraduationStatus, ACPOnlineStatus, ACPJobPhase

from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IInventory, ACP_JOB_PHASE_MAP
from virtuals_acp.client import VirtualsACP
Expand Down Expand Up @@ -241,7 +243,11 @@ def _search_agents_executable(self, reasoning: str, keyword: str) -> Tuple[Funct
"wallet_address": agent.wallet_address,
"offerings": (
[
{"name": offering.type, "price": offering.price}
{
"name": offering.name,
"price": offering.price,
"requirement_schema": offering.requirement_schema,
}
for offering in agent.offerings
]
if agent.offerings
Expand Down Expand Up @@ -287,22 +293,20 @@ def initiate_job(self) -> Function:
description="The seller's agent wallet address you want to buy from",
)

price_arg = Argument(
name="price",
service_name_arg = Argument(
name="service_name",
type="string",
description="Offered price for service",
description="The service name you want to purchase",
)

reasoning_arg = Argument(
name="reasoning",
type="string",
description="Why you are making this purchase request",
)

service_requirements_arg = Argument(
name="service_requirements",
type="string",
description="Detailed specifications for service-based items",
service_requirement_arg = Argument(
name="service_requirement",
type="string or python dictionary",
description="""
Detailed specifications for service-based items,
give in python dictionary (NOT WRAPPED WITH '', just python dictionary as is) that MATCHES the service's
requirement schema if required, else give in the requirement in a natural language sentence.
""",
)

require_evaluation_arg = Argument(
Expand All @@ -317,7 +321,20 @@ def initiate_job(self) -> Function:
description="Keyword to search for a evaluator",
)

args = [seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg, require_evaluation_arg, evaluator_keyword_arg]
reasoning_arg = Argument(
name="reasoning",
type="string",
description="Why you are making this purchase request",
)

args = [
seller_wallet_address_arg,
service_name_arg,
service_requirement_arg,
require_evaluation_arg,
evaluator_keyword_arg,
reasoning_arg
]

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
tweet_content_arg = Argument(
Expand All @@ -334,16 +351,13 @@ def initiate_job(self) -> Function:
executable=self._initiate_job_executable
)

def _initiate_job_executable(self, seller_wallet_address: str, price: str, reasoning: str, service_requirements: str, require_evaluation: str, evaluator_keyword: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
def _initiate_job_executable(self, seller_wallet_address: str, service_name: str, service_requirement: Union[str, Dict[str, Any]], require_evaluation: str, evaluator_keyword: str, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if isinstance(require_evaluation, str):
require_evaluation = require_evaluation.lower() == 'true'
elif isinstance(require_evaluation, bool):
require_evaluation = require_evaluation
else:
require_evaluation = False

if not price:
return FunctionResultStatus.FAILED, "Missing price - specify how much you're offering per unit", {}

if not reasoning:
return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this purchase request", {}
Expand All @@ -364,29 +378,48 @@ def _initiate_job_executable(self, seller_wallet_address: str, price: str, reaso
return FunctionResultStatus.FAILED, "No evaluator found - try a different keyword", {}

evaluator_address = validators[0].wallet_address

expired_at = datetime.now(timezone.utc) + timedelta(minutes=self.job_expiry_duration_mins)
job_id = self.acp_client.initiate_job(
seller_wallet_address,
service_requirements,
float(price),

agent = self.acp_client.get_agent(seller_wallet_address)
offering = next(
(o for o in agent.offerings if o.name == service_name),
None
)

if offering is None:
return (
FunctionResultStatus.FAILED,
f"No offering found with name '{service_name}', available offerings are: {', '.join(str(agent.offerings))}",
{}
)

if isinstance(service_requirement, str):
# failsafe if GAME still passes in dictionary wrapped with quotes and treated as a str
if (sr := service_requirement.strip()).startswith("{") and sr.endswith("}"):
with suppress(ValueError, SyntaxError):
service_requirement = ast.literal_eval(sr)

job_id = offering.initiate_job(
service_requirement,
evaluator_address,
expired_at
)
job = self.acp_client.get_job_by_onchain_id(job_id)

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
self._tweet_job(job_id, f"{tweet_content} #{job_id}")
self._tweet_job(job, f"{tweet_content} #{job_id}")

return FunctionResultStatus.DONE, json.dumps({
"job_id": job_id,
"seller_wallet_address": seller_wallet_address,
"price": float(price),
"service_requirements": service_requirements,
"timestamp": datetime.now().timestamp(),
"service_name": service_name,
"service_requirements": service_requirement,
"timestamp": datetime.now().timestamp()
}), {}
except Exception as e:
print(traceback.format_exc())
return FunctionResultStatus.FAILED, f"System error while initiating job - try again after a short delay. {str(e)}", {}
return FunctionResultStatus.FAILED, f"Error while initiating job - try initiating the job again.\n{str(e)}", {}

@property
def respond_job(self) -> Function:
Expand Down Expand Up @@ -436,30 +469,18 @@ def _respond_job_executable(self, job_id: int, decision: str, reasoning: str, tw
return FunctionResultStatus.FAILED, "Missing reasoning - explain why you made this decision", {}

try:
state = self.get_acp_state()

job = next(
(c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id),
None
)
job = self.acp_client.get_job_by_onchain_id(job_id)

if not job:
return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {}

if job["phase"] != AcpJobPhasesDesc.REQUEST:
return FunctionResultStatus.FAILED, f"Cannot respond - job is in '{job['phase']}' phase, must be in 'request' phase", {}
if job.phase != ACPJobPhase.REQUEST:
return FunctionResultStatus.FAILED, f"Cannot respond - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'request' phase", {}

self.acp_client.respond_to_job_memo(
job_id,
job["memo"][0]["id"],
decision == "ACCEPT",
reasoning
)
job.respond(decision == "ACCEPT", None, reasoning)

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
if tweet_id:
self._tweet_job(job_id, tweet_content, tweet_id)
self._tweet_job(job, tweet_content)

return FunctionResultStatus.DONE, json.dumps({
"job_id": job_id,
Expand All @@ -477,19 +498,13 @@ def pay_job(self) -> Function:
description="The job ID you are paying for",
)

amount_arg = Argument(
name="amount",
type="float",
description="The total amount to pay", # in Ether
)

reasoning_arg = Argument(
name="reasoning",
type="string",
description="Why you are making this payment",
)

args = [job_id_arg, amount_arg, reasoning_arg]
args = [job_id_arg, reasoning_arg]

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None:
tweet_content_arg = Argument(
Expand All @@ -506,46 +521,30 @@ def pay_job(self) -> Function:
executable=self._pay_job_executable
)

def _pay_job_executable(self, job_id: int, amount: float, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
def _pay_job_executable(self, job_id: int, reasoning: str, tweet_content: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if not job_id:
return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're paying for", {}

if not amount:
return FunctionResultStatus.FAILED, "Missing amount - specify how much you're paying", {}

if not reasoning:
return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this payment", {}

try:
state = self.get_acp_state()

job = next(
(c for c in state["jobs"]["active"]["as_a_buyer"] if c["job_id"] == job_id),
None
)
job = self.acp_client.get_job_by_onchain_id(job_id)

if not job:
return FunctionResultStatus.FAILED, "Job not found in your buyer jobs - check the ID and verify you're the buyer", {}

if job["phase"] != AcpJobPhasesDesc.NEGOTIATION:
return FunctionResultStatus.FAILED, f"Cannot pay - job is in '{job['phase']}' phase, must be in 'negotiation' phase", {}
if job.phase != ACPJobPhase.NEGOTIATION:
return FunctionResultStatus.FAILED, f"Cannot pay - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'negotiation' phase", {}


self.acp_client.pay_for_job(
job_id,
job["memo"][0]["id"],
amount,
reasoning
)
job.pay(job.price, reasoning)

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
if tweet_id:
self._tweet_job(job_id, tweet_content, tweet_id)
self._tweet_job(job, tweet_content)

return FunctionResultStatus.DONE, json.dumps({
"job_id": job_id,
"amount_paid": amount,
"amount_paid": job.price,
"timestamp": datetime.now().timestamp()
}), {}
except Exception as e:
Expand Down Expand Up @@ -578,7 +577,7 @@ def deliver_job(self) -> Function:

return Function(
fn_name="deliver_job",
fn_description="Completes a sale by delivering items to the buyer",
fn_description="Deliver the requested result to the client. Use this function after producing the deliverable",
args=args,
executable=self._deliver_job_executable
)
Expand All @@ -591,21 +590,16 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op
return FunctionResultStatus.FAILED, "Missing reasoning - explain why you're making this delivery", {}

try:
state = self.get_acp_state()

job = next(
(c for c in state["jobs"]["active"]["as_a_seller"] if c["job_id"] == job_id),
None
)
job = self.acp_client.get_job_by_onchain_id(job_id)

if not job:
return FunctionResultStatus.FAILED, "Job not found in your seller jobs - check the ID and verify you're the seller", {}

if job["phase"] != AcpJobPhasesDesc.TRANSACTION:
return FunctionResultStatus.FAILED, f"Cannot deliver - job is in '{job['phase']}' phase, must be in 'transaction' phase", {}
if job.phase != ACPJobPhase.TRANSACTION:
return FunctionResultStatus.FAILED, f"Cannot deliver - job is in '{ACP_JOB_PHASE_MAP.get(job.phase)}' phase, must be in 'transaction' phase", {}

produced = next(
(i for i in self.produced_inventory if i.job_id == job["job_id"]),
(i for i in self.produced_inventory if i.job_id == job.id),
None
)

Expand All @@ -617,15 +611,10 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op
value=produced.value
)

self.acp_client.submit_job_deliverable(
job_id,
deliverable,
)
job.deliver(deliverable)

if hasattr(self, 'twitter_plugin') and self.twitter_plugin is not None and tweet_content is not None:
tweet_id = job.get("tweet_history", [])[0].get("tweet_id") if job.get("tweet_history") else None
if tweet_id:
self._tweet_job(job_id, tweet_content, tweet_id)
self._tweet_job(job, tweet_content)

return FunctionResultStatus.DONE, json.dumps({
"status": "success",
Expand All @@ -637,46 +626,40 @@ def _deliver_job_executable(self, job_id: int, reasoning: str, tweet_content: Op
print(traceback.format_exc())
return FunctionResultStatus.FAILED, f"System error while delivering items - try again after a short delay. {str(e)}", {}

def _tweet_job(self, job_id: int, content: str, tweet_id: Optional[str] = None):
if not hasattr(self, 'twitter_plugin') or self.twitter_plugin is None:
return

job = self.acp_client.get_job_by_onchain_id(job_id)
if not job:
raise Exception("ERROR (tweetJob): Job not found")


if tweet_id :
def _tweet_job(self, job: ACPJob, tweet_content: str):
tweets = job.context.get("tweets", []) if job.context else []
if tweets:
latest_tweet = max(tweets, key=lambda t: t["createdAt"])
tweet_id = latest_tweet.get("tweetId", None)
response = self.twitter_plugin.twitter_client.create_tweet(
text=content,
text=tweet_content,
in_reply_to_tweet_id=tweet_id
)
else:
response = self.twitter_plugin.twitter_client.create_tweet(text=content)

response = self.twitter_plugin.twitter_client.create_tweet(text=tweet_content)

role = "buyer" if job.client_address.lower() == self.acp_client.agent_address.lower() else "seller"

# Safely extract tweet ID
tweet_id = None
if isinstance(response, dict):
tweet_id = response.get('data', {}).get('id') or response.get('id')

context = {
**(job.context or {}),
'tweets': [
*((job.context or {}).get('tweets', [])),
{
'type': role,
'tweetId': tweet_id,
'content': content,
'content': tweet_content,
'createdAt': int(datetime.now().timestamp() * 1000)
},
],
}

response = requests.patch(
f"{self.acp_base_url}/jobs/{job_id}/context",
f"{self.acp_base_url}/jobs/{job.id}/context",
headers={
"Content-Type": "application/json",
"wallet-address": self.acp_client.agent_address,
Expand Down
Loading