From f2d9e0de9534f1c3e8912bbfdef25e281f1e5315 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Sun, 30 Mar 2025 00:26:29 +0530 Subject: [PATCH 01/23] add: dpsn plugin code --- .../dpsn/examples/test_dpsn_game_functions.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 plugins/dpsn/examples/test_dpsn_game_functions.py diff --git a/plugins/dpsn/examples/test_dpsn_game_functions.py b/plugins/dpsn/examples/test_dpsn_game_functions.py new file mode 100644 index 00000000..8dc3c75d --- /dev/null +++ b/plugins/dpsn/examples/test_dpsn_game_functions.py @@ -0,0 +1,95 @@ +import asyncio +import sys +import os +from pathlib import Path + +# Add the parent directory to Python path +parent_dir = str(Path(__file__).parent.parent) +sys.path.append(parent_dir) + +from dpsn_plugin_gamesdk.dpsn_plugin import plugin +import json +from datetime import datetime + +async def test_dpsn_connection(): + """Test DPSN connection and basic functionality""" + print("\nšŸ”„ Testing DPSN Connection...") + + # Initialize DPSN client + result = await plugin.initialize() + if not result["success"]: + print(f"āŒ Failed to initialize DPSN: {result.get('error')}") + return False + + print("āœ… DPSN initialized successfully") + return True + +async def test_subscribe_and_receive(): + """Test subscribing to topics and receiving messages""" + print("\nšŸ”„ Testing Subscription and Message Reception...") + + # Test topic + topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker" + + # Subscribe to topic + result = await plugin.subscribe(topic) + if not result["success"]: + print(f"āŒ Failed to subscribe to topic: {result.get('error')}") + return False + + print(f"āœ… Subscribed to topic: {topic}") + + # Wait for some messages + print("ā³ Waiting for messages (2 seconds)...") + await asyncio.sleep(2) + + + # Check received messages + # messages = plugin.get_messages() + print(f"\nReceived {len(messages)} messages:") + for msg in messages: + print(f"Topic: {msg['topic']}") + print(f"Payload: {msg['payload']}") + print(f"Timestamp: {msg['timestamp']}\n") + + return True + +async def test_shutdown(): + """Test graceful shutdown""" + print("\nšŸ”„ Testing Shutdown...") + + result = await plugin.shutdown() + if not result["success"]: + print(f"āŒ Failed to shutdown: {result.get('error')}") + return False + + print("āœ… Shutdown successful") + return True + +async def main(): + """Main test function""" + print("šŸš€ Starting DPSN Plugin Tests...") + + try: + # Test connection + if not await test_dpsn_connection(): + return + + # Test subscription and message reception + if not await test_subscribe_and_receive(): + return + + # Test shutdown + if not await test_shutdown(): + return + + print("\n✨ All tests completed successfully!") + + except Exception as e: + print(f"\nāŒ Test failed with error: {str(e)}") + finally: + # Ensure we shutdown properly + await plugin.shutdown() + +if __name__ == "__main__": + asyncio.run(main()) From ac90b3b60d87e00add593d4a6a4feadf44e65903 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Sun, 30 Mar 2025 00:30:37 +0530 Subject: [PATCH 02/23] add: dpsn plugin code --- plugins/dpsn/README.md | 0 plugins/dpsn/__init__.py | 0 .../dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 94 +++++++++++++++++++ plugins/dpsn/plugin_metadata.yml | 0 plugins/dpsn/pyproject.toml | 14 +++ 5 files changed, 108 insertions(+) create mode 100644 plugins/dpsn/README.md create mode 100644 plugins/dpsn/__init__.py create mode 100644 plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py create mode 100644 plugins/dpsn/plugin_metadata.yml create mode 100644 plugins/dpsn/pyproject.toml diff --git a/plugins/dpsn/README.md b/plugins/dpsn/README.md new file mode 100644 index 00000000..e69de29b diff --git a/plugins/dpsn/__init__.py b/plugins/dpsn/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py new file mode 100644 index 00000000..81190424 --- /dev/null +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -0,0 +1,94 @@ +import asyncio +import os +from dotenv import load_dotenv +from dpsn_client.client import DpsnClient, DPSNError +from datetime import datetime + +# Load .env variables +load_dotenv() + +class DpsnPlugin: + """ + DPSN Plugin for handling DPSN client connections and message handling + """ + + def __init__(self): + self.dpsn_url = os.getenv("DPSN_URL") + self.pvt_key = os.getenv("PVT_KEY") + self.client = None + self.messages = [] + + async def initialize(self): + """Initialize the DPSN client with default configuration""" + if not self.dpsn_url or not self.pvt_key: + return {"success": False, "error": "DPSN_URL and PVT_KEY must be set in .env file"} + + chain_options = { + "network": "testnet", + "wallet_chain_type": "ethereum" + } + + connection_options = { + "ssl": True, + "connect_timeout": 5000 + } + + try: + self.client = DpsnClient( + dpsn_url=self.dpsn_url, + private_key=self.pvt_key, + chain_options=chain_options, + connection_options=connection_options + ) + + self.client.event_bus.on("connected", self._on_connected) + self.client.event_bus.on("subscribe", self._on_subscribe) + self.client.event_bus.on("message", self._on_message) + + await self.client.init() + return {"success": self.client.connected} + + except DPSNError as e: + return {"success": False, "error": f"DPSNError: {str(e)}"} + + async def subscribe(self, topic: str): + """Subscribe to a specific topic""" + if not self.client or not self.client.connected: + return {"success": False, "error": "Client not initialized or not connected"} + + try: + await self.client.subscribe(topic) + return {"success": True, "topic": topic} + except DPSNError as e: + return {"success": False, "error": f"Subscription error: {str(e)}"} + + async def shutdown(self): + """Shutdown the DPSN client""" + if self.client: + await self.client.shutdown() + return {"success": True} + return {"success": False, "error": "No client to shutdown"} + + async def _on_connected(self): + """Default connected event handler""" + print("āœ… Connected to DPSN") + + async def _on_subscribe(self, info): + """Default subscribe event handler""" + print(f"šŸ“„ Subscribed to {info['topic']} (QoS:{info['qos']})") + + async def _on_message(self, msg): + """Default message event handler""" + message_data = { + "topic": msg["topic"], + "payload": msg["payload"], + "timestamp": datetime.now().isoformat() + } + self.messages.append(message_data) + return msg + # print(f"šŸ“Ø Message received -> {msg['topic']} : {msg['payload']}") + + +# Create plugin instance +plugin = DpsnPlugin() + \ No newline at end of file diff --git a/plugins/dpsn/plugin_metadata.yml b/plugins/dpsn/plugin_metadata.yml new file mode 100644 index 00000000..e69de29b diff --git a/plugins/dpsn/pyproject.toml b/plugins/dpsn/pyproject.toml new file mode 100644 index 00000000..b5a5ad88 --- /dev/null +++ b/plugins/dpsn/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "dpsn-plugin-gamesdk" +version = "0.1.0" +description = "DPSN Plugin for Python SDK for GAME by Virtuals" +authors = [{ name = "dpsn-dev", email = "dev@dpsn.org" }] +readme = "README.md" +requires-python = ">=3.9" +dependencies = [ + "/Users/anishrane/workspace/dpsn-asyncio-py/asyncio-dpsn-client/dist/dpsn_client-0.1.0-py3-none-any.whl", +] + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" From 0f164b18ac40fc352288dc9d03ee3cbb61484c39 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Sun, 30 Mar 2025 17:26:16 +0530 Subject: [PATCH 03/23] refactor: dpsn-plugin and example with message handler callback --- .../dpsn/examples/test_dpsn_game_functions.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/plugins/dpsn/examples/test_dpsn_game_functions.py b/plugins/dpsn/examples/test_dpsn_game_functions.py index 8dc3c75d..3c661a02 100644 --- a/plugins/dpsn/examples/test_dpsn_game_functions.py +++ b/plugins/dpsn/examples/test_dpsn_game_functions.py @@ -24,10 +24,22 @@ async def test_dpsn_connection(): print("āœ… DPSN initialized successfully") return True + async def test_subscribe_and_receive(): """Test subscribing to topics and receiving messages""" print("\nšŸ”„ Testing Subscription and Message Reception...") + # Define message handler first + async def message_handler(message): + print(f"\nšŸ“Ø Received Message:") + print(f"Topic: {message['topic']}") + print(f"Payload: {message['payload']}") + print(f"Time: {message['timestamp']}") + print("-" * 50) + + # Set the callback before subscribing + plugin.set_message_callback(message_handler) + # Test topic topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker" @@ -39,18 +51,11 @@ async def test_subscribe_and_receive(): print(f"āœ… Subscribed to topic: {topic}") - # Wait for some messages - print("ā³ Waiting for messages (2 seconds)...") - await asyncio.sleep(2) - - - # Check received messages - # messages = plugin.get_messages() - print(f"\nReceived {len(messages)} messages:") - for msg in messages: - print(f"Topic: {msg['topic']}") - print(f"Payload: {msg['payload']}") - print(f"Timestamp: {msg['timestamp']}\n") + # Wait for messages with shorter intervals to see if we're receiving them + print("ā³ Waiting for messages (30 seconds)...") + for _ in range(6): # Check every 5 seconds for 30 seconds total + await asyncio.sleep(5) + print("Checking for messages...") return True From 7eb53e88caebeebe25e17e08ea3d4c0fd1db9964 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Sun, 30 Mar 2025 17:26:45 +0530 Subject: [PATCH 04/23] refactor: dpsn-plugin and example with message handler callback --- .../dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index 81190424..6e9e8388 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -16,7 +16,7 @@ def __init__(self): self.dpsn_url = os.getenv("DPSN_URL") self.pvt_key = os.getenv("PVT_KEY") self.client = None - self.messages = [] + self.message_callback = None # Single callback instead of array async def initialize(self): """Initialize the DPSN client with default configuration""" @@ -77,16 +77,26 @@ async def _on_subscribe(self, info): """Default subscribe event handler""" print(f"šŸ“„ Subscribed to {info['topic']} (QoS:{info['qos']})") + def set_message_callback(self, callback): + """Set a single callback function to handle messages""" + self.message_callback = callback + async def _on_message(self, msg): - """Default message event handler""" + """Process message immediately when received""" message_data = { "topic": msg["topic"], "payload": msg["payload"], "timestamp": datetime.now().isoformat() } - self.messages.append(message_data) - return msg - # print(f"šŸ“Ø Message received -> {msg['topic']} : {msg['payload']}") + + # Execute single callback if set + if self.message_callback: + try: + await self.message_callback(message_data) + except Exception as e: + print(f"Error in message callback: {e}") + + return message_data # Create plugin instance From e78c92eb916894a0c027fd59156f5761c4ab803c Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 01:15:05 +0530 Subject: [PATCH 05/23] refactor: revert from async to synchronous dpsn python package --- .../dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 283 ++++++++++++++---- plugins/dpsn/examples/.env.example | 3 + plugins/dpsn/examples/dpsn_agent.py | 107 +++++++ plugins/dpsn/examples/dpsn_worker.py | 124 ++++++++ .../dpsn/examples/test_dpsn_game_functions.py | 78 ++--- plugins/dpsn/pyproject.toml | 2 +- 6 files changed, 494 insertions(+), 103 deletions(-) create mode 100644 plugins/dpsn/examples/.env.example create mode 100644 plugins/dpsn/examples/dpsn_agent.py create mode 100644 plugins/dpsn/examples/dpsn_worker.py diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index 6e9e8388..d23ba954 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -1,103 +1,260 @@ -import asyncio import os from dotenv import load_dotenv from dpsn_client.client import DpsnClient, DPSNError from datetime import datetime +from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus +from typing import Dict, Any, Callable, Tuple +import json +import logging # Load .env variables load_dotenv() +# Configure logging for the plugin (optional, but good practice) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("DpsnPlugin") + class DpsnPlugin: """ - DPSN Plugin for handling DPSN client connections and message handling + DPSN Plugin using the updated DpsnClient for handling connections, + subscriptions, and message handling. """ def __init__(self): self.dpsn_url = os.getenv("DPSN_URL") self.pvt_key = os.getenv("PVT_KEY") - self.client = None - self.message_callback = None # Single callback instead of array + if not self.dpsn_url or not self.pvt_key: + logger.error("DPSN_URL and PVT_KEY must be set in .env file") + # Consider raising an error or handling this more gracefully + # depending on how the game SDK expects plugin failures + + self.client: DpsnClient | None = None # Type hint for clarity + self.message_callback: Callable[[Dict[str, Any]], None] | None = None # Type hint + + self._functions = { + "initialize": Function( + fn_name="initialize", + fn_description="Initialize DPSN client connection", + args=[], + hint="Initializes or ensures the DPSN client is connected.", + executable=self.initialize + ), + "subscribe": Function( + fn_name="subscribe", + fn_description="Subscribe to a DPSN topic", + args=[ + Argument( + name="topic", + description="The topic string to subscribe to", + type="string", + required=True + ) + ], + hint="Subscribes to a specific DPSN topic to receive messages.", + executable=self.subscribe + ), + # Added unsubscribe function + "unsubscribe": Function( + fn_name="unsubscribe", + fn_description="Unsubscribe from a DPSN topic", + args=[ + Argument( + name="topic", + description="The topic string to unsubscribe from", + type="string", + required=True + ) + ], + hint="Unsubscribes from a specific DPSN topic.", + executable=self.unsubscribe + ), + "shutdown": Function( + fn_name="shutdown", + fn_description="Shutdown DPSN client connection", + args=[], + hint="Disconnects the DPSN client gracefully.", + executable=self.shutdown + ) + # Potential future function: publish + } + + def get_function(self, fn_name: str) -> Function: + """Get a specific function by name""" + if fn_name not in self._functions: + # Log the error as well + logger.error(f"Function '{fn_name}' not found in DpsnPlugin") + raise ValueError(f"Function '{fn_name}' not found") + return self._functions[fn_name] - async def initialize(self): - """Initialize the DPSN client with default configuration""" + def initialize(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Initializes the DpsnClient if not already done, or ensures it's connected. + Uses configuration from environment variables. + """ if not self.dpsn_url or not self.pvt_key: - return {"success": False, "error": "DPSN_URL and PVT_KEY must be set in .env file"} + logger.error("Cannot initialize: DPSN_URL and PVT_KEY are not set.") + # Return Tuple: (Status, Message, Info) + return ( + FunctionResultStatus.FAILED, + "DPSN_URL and PVT_KEY must be set in .env file", + {} + ) + + if self.client and self.client.dpsn_broker and self.client.dpsn_broker.is_connected(): + logger.info("Client already initialized and connected. Re-initializing.") chain_options = { - "network": "testnet", + "network": "testnet", "wallet_chain_type": "ethereum" } - - connection_options = { - "ssl": True, - "connect_timeout": 5000 - } + connection_options = {"ssl": True} try: - self.client = DpsnClient( - dpsn_url=self.dpsn_url, - private_key=self.pvt_key, - chain_options=chain_options, - connection_options=connection_options - ) + if not self.client: + logger.info(f"Creating DpsnClient for {self.dpsn_url}") + self.client = DpsnClient( + dpsn_url=self.dpsn_url, + private_key=self.pvt_key, + chain_options=chain_options, + connection_options=connection_options + ) + self.client.on_error += self._handle_client_error - self.client.event_bus.on("connected", self._on_connected) - self.client.event_bus.on("subscribe", self._on_subscribe) - self.client.event_bus.on("message", self._on_message) + logger.info("Initializing DpsnClient connection...") + self.client.init({ + "retry_options": { + "max_retries": 3, + "initial_delay": 1000, + "max_delay": 5000 + } + }) + logger.info("DpsnClient initialized successfully.") - await self.client.init() - return {"success": self.client.connected} + if self.message_callback: + try: + self.client.on_msg -= self.message_callback + except ValueError: + pass + self.client.on_msg += self.message_callback + logger.info("Message callback re-applied.") + + # Return Tuple: (Status, Message, Info) + return (FunctionResultStatus.DONE, "DPSN client initialized successfully.", {}) except DPSNError as e: - return {"success": False, "error": f"DPSNError: {str(e)}"} + logger.error(f"DPSN Initialization Error: Code={e.code}, Msg={e.message}") + if e.code in [DPSN_ERROR_CODES.INVALID_PRIVATE_KEY, DPSN_ERROR_CODES.BLOCKCHAIN_CONFIG_ERROR]: + self.client = None + # Return Tuple: (Status, Message, Info) + return ( + FunctionResultStatus.FAILED, + f"DPSNError ({e.code.name}): {e.message}", + {} + ) + except Exception as e: + logger.exception("Unexpected error during DPSN initialization:") + self.client = None + # Return Tuple: (Status, Message, Info) + return ( + FunctionResultStatus.FAILED, + f"Unexpected initialization error: {str(e)}", + {} + ) - async def subscribe(self, topic: str): - """Subscribe to a specific topic""" - if not self.client or not self.client.connected: - return {"success": False, "error": "Client not initialized or not connected"} + def subscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Subscribes to a specific topic using the initialized client.""" + if not self.client: + logger.warning("Subscribe called but client is not initialized.") + return (FunctionResultStatus.FAILED, "Client not initialized", {}) + if not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Subscribe attempt failed for topic '{topic}': Client not connected.") + return (FunctionResultStatus.FAILED, "Client not connected", {}) + try: - await self.client.subscribe(topic) - return {"success": True, "topic": topic} + logger.info(f"Subscribing to topic: {topic}") + self.client.subscribe(topic) + logger.info(f"Successfully subscribed to topic: {topic}") + # Return Tuple: (Status, Message, Info - include topic) + return (FunctionResultStatus.DONE, f"Successfully subscribed to topic: {topic}", {"subscribed_topic": topic}) except DPSNError as e: - return {"success": False, "error": f"Subscription error: {str(e)}"} + logger.error(f"DPSN Subscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") + return (FunctionResultStatus.FAILED, f"Subscription error ({e.code.name}): {e.message}", {"topic": topic}) + except Exception as e: + logger.exception(f"Unexpected error during subscription to topic '{topic}':") + return (FunctionResultStatus.FAILED, f"Unexpected subscription error: {str(e)}", {"topic": topic}) - async def shutdown(self): - """Shutdown the DPSN client""" + # Added unsubscribe method + def unsubscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Unsubscribes from a specific topic.""" + if not self.client: + logger.warning("Unsubscribe called but client is not initialized.") + return (FunctionResultStatus.FAILED, "Client not initialized", {}) + + if not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Unsubscribe attempt failed for topic '{topic}': Client not connected.") + return (FunctionResultStatus.FAILED, "Client not connected", {}) + + try: + logger.info(f"Unsubscribing from topic: {topic}") + self.client.unsubscribe(topic) + logger.info(f"Successfully unsubscribed from topic: {topic}") + # Return Tuple: (Status, Message, Info - include topic) + return (FunctionResultStatus.DONE, f"Successfully unsubscribed from topic: {topic}", {"unsubscribed_topic": topic}) + except DPSNError as e: + logger.error(f"DPSN Unsubscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") + return (FunctionResultStatus.FAILED, f"Unsubscription error ({e.code.name}): {e.message}", {"topic": topic}) + except Exception as e: + logger.exception(f"Unexpected error during unsubscription from topic '{topic}':") + return (FunctionResultStatus.FAILED, f"Unexpected unsubscription error: {str(e)}", {"topic": topic}) + + + def shutdown(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Disconnects the DPSN client.""" if self.client: - await self.client.shutdown() - return {"success": True} - return {"success": False, "error": "No client to shutdown"} - - async def _on_connected(self): - """Default connected event handler""" - print("āœ… Connected to DPSN") - - async def _on_subscribe(self, info): - """Default subscribe event handler""" - print(f"šŸ“„ Subscribed to {info['topic']} (QoS:{info['qos']})") - - def set_message_callback(self, callback): - """Set a single callback function to handle messages""" - self.message_callback = callback - - async def _on_message(self, msg): - """Process message immediately when received""" - message_data = { - "topic": msg["topic"], - "payload": msg["payload"], - "timestamp": datetime.now().isoformat() - } - - # Execute single callback if set - if self.message_callback: try: - await self.message_callback(message_data) + logger.info("Shutting down DpsnClient connection...") + self.client.disconnect() + logger.info("DpsnClient shutdown complete.") + # Return Tuple: (Status, Message, Info) + return (FunctionResultStatus.DONE, "DPSN client shutdown complete.", {}) + except DPSNError as e: + logger.error(f"DPSN Shutdown Error: Code={e.code}, Msg={e.message}") + return (FunctionResultStatus.FAILED, f"Shutdown error ({e.code.name}): {e.message}", {}) except Exception as e: - print(f"Error in message callback: {e}") + logger.exception("Unexpected error during DPSN shutdown:") + return (FunctionResultStatus.FAILED, f"Unexpected shutdown error: {str(e)}", {}) + else: + logger.info("Shutdown called but no active client to shutdown.") + # Return Tuple: (Status, Message, Info) + return (FunctionResultStatus.DONE, "No active client to shutdown", {}) + + def set_message_callback(self, callback: Callable[[Dict[str, Any]], None]): + """ + Sets the callback function to handle incoming messages. + The callback receives a dictionary {'topic': str, 'payload': Any}. + Payload is automatically decoded (string or dict/list if JSON). + """ + logger.info(f"Setting message callback to: {callback.__name__ if hasattr(callback, '__name__') else callback}") - return message_data + # Remove old callback if client exists and callback was previously set + if self.client and self.message_callback: + try: + self.client.on_msg -= self.message_callback + except ValueError: + pass # Ignore if it wasn't added + + self.message_callback = callback # Store the new callback + + # Add the new callback if client exists + if self.client: + self.client.on_msg += self.message_callback + logger.info("Message callback applied to existing client.") + def _handle_client_error(self, error: DPSNError): + """Internal handler for errors emitted by the DpsnClient.""" + logger.error(f"[DpsnClient EVENT] Error received: Code={error.code.name}, Msg={error.message}, Status={error.status}") + # You could add more logic here, like notifying the game state # Create plugin instance plugin = DpsnPlugin() diff --git a/plugins/dpsn/examples/.env.example b/plugins/dpsn/examples/.env.example new file mode 100644 index 00000000..ccd5813d --- /dev/null +++ b/plugins/dpsn/examples/.env.example @@ -0,0 +1,3 @@ +DPSN_URL=betanet.dpsn.org +PVT_KEY=9192c2aa125bde2ed2e861f973a6124e0796722dc263dacac70719579df8d98b +GAME_API_KEY=apt-e52b807bf0a05d47cfdcd88413e7099c \ No newline at end of file diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py new file mode 100644 index 00000000..949b7ded --- /dev/null +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -0,0 +1,107 @@ +import os +import sys +import asyncio +from pathlib import Path +import json +from datetime import datetime + +parent_dir = str(Path(__file__).parent.parent) +sys.path.append(parent_dir) + +from game_sdk.game.agent import Agent, WorkerConfig +from game_sdk.game.custom_types import FunctionResult +from dpsn_plugin_gamesdk.dpsn_plugin import plugin + +# --- Add Message Handler --- +def handle_incoming_message(message_data: dict): + """Callback function to process messages received via the plugin.""" + try: + topic = message_data.get('topic', 'N/A') + payload = message_data.get('payload', '{}') + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"\n--- Message Received ({timestamp}) ---") + print(f"Topic: {topic}") + # Pretty print payload if it's likely JSON/dict + if isinstance(payload, (dict, list)): + print(f"Payload:\n{json.dumps(payload, indent=2)}") + else: + print(f"Payload: {payload}") + print("-----------------------------------") + except Exception as e: + print(f"Error in message handler: {e}") + +# Set the callback in the plugin instance *before* running the agent +plugin.set_message_callback(handle_incoming_message) +# --- End Message Handler Setup --- + +def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + +def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + +# Define workers for different DPSN operations +connection_worker = WorkerConfig( + id="connection_worker", + worker_description="Worker specialized in managing DPSN connection lifecycle", + get_state_fn=get_worker_state, + action_space=[ + plugin.get_function("initialize"), + plugin.get_function("shutdown") + ], +) + +subscription_worker = WorkerConfig( + id="subscription_worker", + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, and message handling", + get_state_fn=get_worker_state, + action_space=[ + plugin.get_function("subscribe"), + plugin.get_function("unsubscribe") + ], +) + +# Initialize the agent +agent = Agent( + api_key=os.environ.get("GAME_API_KEY"), + name="DPSN Market Data Agent", + agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", + agent_description=( + "You are an AI agent specialized in DPSN market data processing. " + "You can establish connections to DPSN, manage topic subscriptions, " + "and handle real-time market data messages. " + "You focus on SOLUSDT trading pair data and can process market updates " + "as they arrive. The available topics include price and trading information " + "for SOLUSDT pairs. You should first initialize the connection, then " + "subscribe to relevant topics, and process incoming messages." + "\n\nAvailable topics:" + "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker" + ), + get_agent_state_fn=get_agent_state_fn, + workers=[ + connection_worker, + subscription_worker + ] +) + +# Compile and run the agent +agent.compile() +agent.run() \ No newline at end of file diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py new file mode 100644 index 00000000..03a99050 --- /dev/null +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -0,0 +1,124 @@ +import sys +import os +from pathlib import Path +from typing import Dict, Any, List +from datetime import datetime +import time # Import time for sleep + +# Add the parent directory to Python path +parent_dir = str(Path(__file__).parent.parent) +sys.path.append(parent_dir) + +from dpsn_plugin_gamesdk.dpsn_plugin import plugin + +class DpsnWorker: + """ + DPSN Worker for processing market data and executing trades (Synchronous Version) + """ + def __init__(self): + self.plugin = plugin + self.trades: List[Dict[str, Any]] = [] + self.is_running = False + + def initialize(self): + """Initialize the DPSN worker""" + # Call synchronous initialize + result = self.plugin.initialize() + if not result["success"]: + raise Exception(f"Failed to initialize DPSN: {result.get('error')}") + print("DPSN Worker Initialized Successfully") + return True + + def process_message(self, message: Dict[str, Any]): + """Process incoming messages and execute trades""" + topic = message['topic'] + payload = message['payload'] + + # Log the message + print(f"\n Processing message:") + print(f"Topic: {topic}") + print(f"Payload: {payload}") + + # Execute trade if conditions are met (synchronous call) + trade = self.execute_trade(topic, payload) + if trade: + self.trades.append(trade) + print(f"šŸ’¼ Trade executed: {trade}") + + # Example use case of dpsn plugin worker + def execute_trade(self, topic: str, payload: Dict[str, Any]) -> Dict[str, Any] | None: + """Execute trades based on market data""" + # Example trade execution logic (synchronous) + if "SOLUSDT" in topic: + # Ensure payload is a dictionary before accessing keys + if isinstance(payload, dict): + try: + price = float(payload.get('price', 0)) + if price > 100: + return { + "action": "SELL", + "price": price, + "timestamp": datetime.now().isoformat(), + "status": "EXECUTED" + } + except (ValueError, TypeError) as e: + print(f"Error processing price from payload: {e}") + else: + print(f"Skipping trade execution: Payload is not a dictionary ({type(payload)})") + return None + + def start(self): + """Start the DPSN worker""" + self.initialize() + + # Set up message handler (synchronous) + self.plugin.set_message_callback(self.process_message) + + # Subscribe to topics (synchronous) + topics = [ + "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker", + # Add other topics if needed + ] + + print("Subscribing to topics...") + for topic in topics: + result = self.plugin.subscribe(topic) + if not result["success"]: + print(f"Failed to subscribe to {topic}: {result.get('error')}") + # Decide how to handle subscription failure (e.g., raise error, skip) + # continue + # raise Exception(f"Subscription failed for {topic}") + else: + print(f"Successfully subscribed to {topic}") + + self.is_running = True + print("DPSN Worker Started") + + def stop(self): + """Stop the DPSN worker""" + print("Stopping DPSN Worker...") + self.is_running = False + # Consider unsubscribing from topics here if necessary + # for topic in topics: self.plugin.unsubscribe(topic) + self.plugin.shutdown() + print("DPSN Worker Stopped") + +def main(): + worker = DpsnWorker() + try: + worker.start() + print("Worker running... Press Ctrl+C to stop.") + # Keep the worker running using time.sleep + while worker.is_running: + time.sleep(1) # Check status every second + except KeyboardInterrupt: + print("\nCtrl+C detected. Shutting down worker...") + except Exception as e: + print(f"An error occurred: {e}") + finally: + if worker.is_running: + worker.stop() + +if __name__ == "__main__": + # Run main directly without asyncio + main() \ No newline at end of file diff --git a/plugins/dpsn/examples/test_dpsn_game_functions.py b/plugins/dpsn/examples/test_dpsn_game_functions.py index 3c661a02..b82ad6b2 100644 --- a/plugins/dpsn/examples/test_dpsn_game_functions.py +++ b/plugins/dpsn/examples/test_dpsn_game_functions.py @@ -1,69 +1,75 @@ -import asyncio import sys import os from pathlib import Path +import time +from datetime import datetime # Add the parent directory to Python path parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) from dpsn_plugin_gamesdk.dpsn_plugin import plugin -import json -from datetime import datetime -async def test_dpsn_connection(): +def test_dpsn_connection(): """Test DPSN connection and basic functionality""" print("\nšŸ”„ Testing DPSN Connection...") - # Initialize DPSN client - result = await plugin.initialize() + # Initialize DPSN client (without options since the method doesn't accept them) + result = plugin.initialize() if not result["success"]: print(f"āŒ Failed to initialize DPSN: {result.get('error')}") return False + # Wait for connection to stabilize + time.sleep(1) print("āœ… DPSN initialized successfully") return True - -async def test_subscribe_and_receive(): +def test_subscribe_and_receive(): """Test subscribing to topics and receiving messages""" print("\nšŸ”„ Testing Subscription and Message Reception...") - # Define message handler first - async def message_handler(message): - print(f"\nšŸ“Ø Received Message:") - print(f"Topic: {message['topic']}") - print(f"Payload: {message['payload']}") - print(f"Time: {message['timestamp']}") - print("-" * 50) + # Define message handler + def handle_message(message_data): + topic = message_data['topic'] + payload = message_data['payload'] + print(f"Received message on {topic}: {payload}") - # Set the callback before subscribing - plugin.set_message_callback(message_handler) + # Set the callback + plugin.set_message_callback(handle_message) # Test topic - topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker" + topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" - # Subscribe to topic - result = await plugin.subscribe(topic) + print(f"Subscribing to topic: {topic}") + result = plugin.subscribe(topic) if not result["success"]: print(f"āŒ Failed to subscribe to topic: {result.get('error')}") return False - print(f"āœ… Subscribed to topic: {topic}") + print("Subscription successful!") + print("\nWaiting for messages... (Press Ctrl+C to exit)") - # Wait for messages with shorter intervals to see if we're receiving them - print("ā³ Waiting for messages (30 seconds)...") - for _ in range(6): # Check every 5 seconds for 30 seconds total - await asyncio.sleep(5) - print("Checking for messages...") + try: + while True: + if not plugin.client.dpsn_broker.is_connected(): + print("Connection lost, attempting to reconnect...") + plugin.initialize() + time.sleep(1) + plugin.subscribe(topic) + time.sleep(1) + + except KeyboardInterrupt: + print("\nāš ļø Test interrupted by user") + return True return True -async def test_shutdown(): +def test_shutdown(): """Test graceful shutdown""" print("\nšŸ”„ Testing Shutdown...") - result = await plugin.shutdown() + result = plugin.shutdown() if not result["success"]: print(f"āŒ Failed to shutdown: {result.get('error')}") return False @@ -71,30 +77,24 @@ async def test_shutdown(): print("āœ… Shutdown successful") return True -async def main(): +def main(): """Main test function""" print("šŸš€ Starting DPSN Plugin Tests...") try: # Test connection - if not await test_dpsn_connection(): + if not test_dpsn_connection(): return # Test subscription and message reception - if not await test_subscribe_and_receive(): + if not test_subscribe_and_receive(): return - # Test shutdown - if not await test_shutdown(): - return - - print("\n✨ All tests completed successfully!") - except Exception as e: print(f"\nāŒ Test failed with error: {str(e)}") finally: # Ensure we shutdown properly - await plugin.shutdown() + test_shutdown() if __name__ == "__main__": - asyncio.run(main()) + main() diff --git a/plugins/dpsn/pyproject.toml b/plugins/dpsn/pyproject.toml index b5a5ad88..780d74e3 100644 --- a/plugins/dpsn/pyproject.toml +++ b/plugins/dpsn/pyproject.toml @@ -6,7 +6,7 @@ authors = [{ name = "dpsn-dev", email = "dev@dpsn.org" }] readme = "README.md" requires-python = ">=3.9" dependencies = [ - "/Users/anishrane/workspace/dpsn-asyncio-py/asyncio-dpsn-client/dist/dpsn_client-0.1.0-py3-none-any.whl", + "/Users/anishrane/workspace/dpsn_python_lib/dpsn-python-client/dist/dpsn_client-0.1.0-py3-none-any.whl", ] [build-system] From 63c8c223ad24960eac9e04911a879546bc5d99e6 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 14:25:35 +0530 Subject: [PATCH 06/23] refactor: plugin functions and return types --- plugins/dpsn/examples/dpsn_worker.py | 29 ++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py index 03a99050..fc519325 100644 --- a/plugins/dpsn/examples/dpsn_worker.py +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -9,6 +9,8 @@ parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) +# Import FunctionResultStatus to check the status enum +from game_sdk.game.custom_types import FunctionResultStatus from dpsn_plugin_gamesdk.dpsn_plugin import plugin class DpsnWorker: @@ -22,12 +24,15 @@ def __init__(self): def initialize(self): """Initialize the DPSN worker""" - # Call synchronous initialize result = self.plugin.initialize() - if not result["success"]: - raise Exception(f"Failed to initialize DPSN: {result.get('error')}") + # Check status using tuple index 0 + if result[0] != FunctionResultStatus.DONE: + # Get error message from tuple index 1 + error_message = result[1] if len(result) > 1 else "Unknown initialization error" + raise Exception(f"Failed to initialize DPSN: {error_message}") print("DPSN Worker Initialized Successfully") - return True + # No need to return True, can just proceed or return the result tuple if needed elsewhere + # return True def process_message(self, message: Dict[str, Any]): """Process incoming messages and execute trades""" @@ -71,10 +76,8 @@ def start(self): """Start the DPSN worker""" self.initialize() - # Set up message handler (synchronous) self.plugin.set_message_callback(self.process_message) - # Subscribe to topics (synchronous) topics = [ "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker", # Add other topics if needed @@ -83,13 +86,15 @@ def start(self): print("Subscribing to topics...") for topic in topics: result = self.plugin.subscribe(topic) - if not result["success"]: - print(f"Failed to subscribe to {topic}: {result.get('error')}") - # Decide how to handle subscription failure (e.g., raise error, skip) - # continue - # raise Exception(f"Subscription failed for {topic}") + # Check status using tuple index 0 + if result[0] != FunctionResultStatus.DONE: + # Get error message from tuple index 1 + error_message = result[1] if len(result) > 1 else f"Unknown subscription error for {topic}" + print(f"Failed to subscribe to {topic}: {error_message}") + # Consider raising Exception or implementing retry logic here else: - print(f"Successfully subscribed to {topic}") + # Success message is at index 1 + print(result[1]) # e.g., "Successfully subscribed to topic: ..." self.is_running = True print("DPSN Worker Started") From 9c5ab2f4754e8ebc794786495b7ca51067d956ae Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 14:26:01 +0530 Subject: [PATCH 07/23] add: agent description --- plugins/dpsn/examples/dpsn_agent.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index 949b7ded..311f92bd 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -92,8 +92,9 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di "as they arrive. The available topics include price and trading information " "for SOLUSDT pairs. You should first initialize the connection, then " "subscribe to relevant topics, and process incoming messages." + "Summarize the information received" "\n\nAvailable topics:" - "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker" + "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" ), get_agent_state_fn=get_agent_state_fn, workers=[ From 859d586e0187eca19da4cfbf09ab2c4ade113eda Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 14:27:15 +0530 Subject: [PATCH 08/23] refactor: dpsn client intialization function to take optional params for configuring dpsn_url and pvt_key --- plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index d23ba954..e25209b1 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -3,7 +3,7 @@ from dpsn_client.client import DpsnClient, DPSNError from datetime import datetime from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus -from typing import Dict, Any, Callable, Tuple +from typing import Dict, Any, Callable, Tuple,Optional import json import logging @@ -20,14 +20,14 @@ class DpsnPlugin: subscriptions, and message handling. """ - def __init__(self): - self.dpsn_url = os.getenv("DPSN_URL") - self.pvt_key = os.getenv("PVT_KEY") + def __init__(self, + dpsn_url:Optional[str] = os.getenv("DPSN_URL"), + pvt_key:Optional[str] = os.getenv("PVT_KEY") + ): + self.dpsn_url = dpsn_url + self.pvt_key = pvt_key if not self.dpsn_url or not self.pvt_key: logger.error("DPSN_URL and PVT_KEY must be set in .env file") - # Consider raising an error or handling this more gracefully - # depending on how the game SDK expects plugin failures - self.client: DpsnClient | None = None # Type hint for clarity self.message_callback: Callable[[Dict[str, Any]], None] | None = None # Type hint @@ -53,7 +53,7 @@ def __init__(self): hint="Subscribes to a specific DPSN topic to receive messages.", executable=self.subscribe ), - # Added unsubscribe function + "unsubscribe": Function( fn_name="unsubscribe", fn_description="Unsubscribe from a DPSN topic", From e6c9a0536e5a66af9b2a371f8f5c0c179c0a6251 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 18:06:23 +0530 Subject: [PATCH 09/23] refactor:plugin code to handle dpsn client intialization --- plugins/dpsn/examples/dpsn_agent.py | 32 +++++++++------------------- plugins/dpsn/examples/dpsn_worker.py | 21 ++---------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index 311f92bd..47f377f6 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -1,6 +1,5 @@ import os import sys -import asyncio from pathlib import Path import json from datetime import datetime @@ -24,8 +23,10 @@ def handle_incoming_message(message_data: dict): # Pretty print payload if it's likely JSON/dict if isinstance(payload, (dict, list)): print(f"Payload:\n{json.dumps(payload, indent=2)}") + return payload else: print(f"Payload: {payload}") + return payload print("-----------------------------------") except Exception as e: print(f"Error in message handler: {e}") @@ -58,24 +59,15 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di return current_state -# Define workers for different DPSN operations -connection_worker = WorkerConfig( - id="connection_worker", - worker_description="Worker specialized in managing DPSN connection lifecycle", - get_state_fn=get_worker_state, - action_space=[ - plugin.get_function("initialize"), - plugin.get_function("shutdown") - ], -) subscription_worker = WorkerConfig( id="subscription_worker", - worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, and message handling", + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", get_state_fn=get_worker_state, action_space=[ plugin.get_function("subscribe"), - plugin.get_function("unsubscribe") + plugin.get_function("unsubscribe"), + plugin.get_function("shutdown") ], ) @@ -85,20 +77,16 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di name="DPSN Market Data Agent", agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", agent_description=( - "You are an AI agent specialized in DPSN market data processing. " - "You can establish connections to DPSN, manage topic subscriptions, " - "and handle real-time market data messages. " - "You focus on SOLUSDT trading pair data and can process market updates " - "as they arrive. The available topics include price and trading information " - "for SOLUSDT pairs. You should first initialize the connection, then " - "subscribe to relevant topics, and process incoming messages." - "Summarize the information received" + "You are an AI agent specialized in DPSN market data processing" + "You can subscribe dpsn topic" + "after 5 minutes unsubscribe the topic" + "next 5 minutes close the connection" "\n\nAvailable topics:" "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" ), get_agent_state_fn=get_agent_state_fn, workers=[ - connection_worker, + # connection_worker, subscription_worker ] ) diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py index fc519325..f3b9f3bc 100644 --- a/plugins/dpsn/examples/dpsn_worker.py +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -22,18 +22,6 @@ def __init__(self): self.trades: List[Dict[str, Any]] = [] self.is_running = False - def initialize(self): - """Initialize the DPSN worker""" - result = self.plugin.initialize() - # Check status using tuple index 0 - if result[0] != FunctionResultStatus.DONE: - # Get error message from tuple index 1 - error_message = result[1] if len(result) > 1 else "Unknown initialization error" - raise Exception(f"Failed to initialize DPSN: {error_message}") - print("DPSN Worker Initialized Successfully") - # No need to return True, can just proceed or return the result tuple if needed elsewhere - # return True - def process_message(self, message: Dict[str, Any]): """Process incoming messages and execute trades""" topic = message['topic'] @@ -60,6 +48,7 @@ def execute_trade(self, topic: str, payload: Dict[str, Any]) -> Dict[str, Any] | try: price = float(payload.get('price', 0)) if price > 100: + return { "action": "SELL", "price": price, @@ -74,8 +63,6 @@ def execute_trade(self, topic: str, payload: Dict[str, Any]) -> Dict[str, Any] | def start(self): """Start the DPSN worker""" - self.initialize() - self.plugin.set_message_callback(self.process_message) topics = [ @@ -83,17 +70,13 @@ def start(self): # Add other topics if needed ] - print("Subscribing to topics...") + print("Subscribing to topics (will initialize connection if needed)...") for topic in topics: result = self.plugin.subscribe(topic) - # Check status using tuple index 0 if result[0] != FunctionResultStatus.DONE: - # Get error message from tuple index 1 error_message = result[1] if len(result) > 1 else f"Unknown subscription error for {topic}" print(f"Failed to subscribe to {topic}: {error_message}") - # Consider raising Exception or implementing retry logic here else: - # Success message is at index 1 print(result[1]) # e.g., "Successfully subscribed to topic: ..." self.is_running = True From 5b984ccdf3c59810ac98f31716ab553a961ebc80 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Mon, 31 Mar 2025 18:06:43 +0530 Subject: [PATCH 10/23] refactor:plugin code to handle dpsn client intialization --- .../dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 254 ++++++++++-------- 1 file changed, 142 insertions(+), 112 deletions(-) diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index e25209b1..1023c5a2 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -3,7 +3,7 @@ from dpsn_client.client import DpsnClient, DPSNError from datetime import datetime from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus -from typing import Dict, Any, Callable, Tuple,Optional +from typing import Dict, Any, Callable, Tuple, Optional import json import logging @@ -14,6 +14,11 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("DpsnPlugin") +# {{ Add a custom exception for initialization errors }} +class DpsnInitializationError(Exception): + """Custom exception for DPSN initialization failures.""" + pass + class DpsnPlugin: """ DPSN Plugin using the updated DpsnClient for handling connections, @@ -27,18 +32,41 @@ def __init__(self, self.dpsn_url = dpsn_url self.pvt_key = pvt_key if not self.dpsn_url or not self.pvt_key: - logger.error("DPSN_URL and PVT_KEY must be set in .env file") - self.client: DpsnClient | None = None # Type hint for clarity - self.message_callback: Callable[[Dict[str, Any]], None] | None = None # Type hint + raise ValueError("DPSN_URL and PVT_KEY are required.") + + # {{ Create the client instance here, but don't connect yet }} + self.client: DpsnClient | None = None + try: + if self.dpsn_url and self.pvt_key: + chain_options = { + "network": "testnet", # Consider making configurable + "wallet_chain_type": "ethereum" + } + connection_options = {"ssl": True} # Consider making configurable + + logger.info(f"Creating DpsnClient instance for {self.dpsn_url}") + self.client = DpsnClient( + dpsn_url=self.dpsn_url, + private_key=self.pvt_key, + chain_options=chain_options, + connection_options=connection_options + ) + # Setup internal error handler early + self.client.on_error += self._handle_client_error + else: + # Client remains None if credentials missing + logger.warning("DpsnClient not created due to missing URL or Key.") + + except Exception as e: + logger.exception("Unexpected error during DpsnClient instantiation:") + self.client = None # Ensure client is None on instantiation error + + # {{ Add initialization flag }} + self._initialized = False + self.message_callback: Callable[[Dict[str, Any]], None] | None = None + self._functions = { - "initialize": Function( - fn_name="initialize", - fn_description="Initialize DPSN client connection", - args=[], - hint="Initializes or ensures the DPSN client is connected.", - executable=self.initialize - ), "subscribe": Function( fn_name="subscribe", fn_description="Subscribe to a DPSN topic", @@ -50,10 +78,9 @@ def __init__(self, required=True ) ], - hint="Subscribes to a specific DPSN topic to receive messages.", - executable=self.subscribe + hint="Subscribes to a specific DPSN topic to receive messages. Will initialize connection if needed.", + executable=self.subscribe # Keep executable pointing to the public method ), - "unsubscribe": Function( fn_name="unsubscribe", fn_description="Unsubscribe from a DPSN topic", @@ -65,7 +92,7 @@ def __init__(self, required=True ) ], - hint="Unsubscribes from a specific DPSN topic.", + hint="Unsubscribes from a specific DPSN topic. Will initialize connection if needed.", executable=self.unsubscribe ), "shutdown": Function( @@ -75,107 +102,88 @@ def __init__(self, hint="Disconnects the DPSN client gracefully.", executable=self.shutdown ) - # Potential future function: publish } def get_function(self, fn_name: str) -> Function: """Get a specific function by name""" if fn_name not in self._functions: - # Log the error as well logger.error(f"Function '{fn_name}' not found in DpsnPlugin") raise ValueError(f"Function '{fn_name}' not found") return self._functions[fn_name] - def initialize(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + # {{ New private method to handle initialization logic }} + def _ensure_initialized(self): """ - Initializes the DpsnClient if not already done, or ensures it's connected. - Uses configuration from environment variables. + Ensures the DpsnClient is initialized. Runs initialization logic only once. + Raises DpsnInitializationError on failure. """ - if not self.dpsn_url or not self.pvt_key: - logger.error("Cannot initialize: DPSN_URL and PVT_KEY are not set.") - # Return Tuple: (Status, Message, Info) - return ( - FunctionResultStatus.FAILED, - "DPSN_URL and PVT_KEY must be set in .env file", - {} - ) - - if self.client and self.client.dpsn_broker and self.client.dpsn_broker.is_connected(): - logger.info("Client already initialized and connected. Re-initializing.") + if self._initialized: + return # Already initialized - chain_options = { - "network": "testnet", - "wallet_chain_type": "ethereum" - } - connection_options = {"ssl": True} + if not self.client: + logger.error("Cannot initialize: DpsnClient instance was not created (likely missing credentials).") + raise DpsnInitializationError("Client not configured (missing URL/Key).") + + # Check if already connected (e.g., if init was called externally somehow) + if self.client.dpsn_broker and self.client.dpsn_broker.is_connected(): + logger.info("Client already connected. Marking as initialized.") + self._initialized = True + return + logger.info("Initializing DpsnClient connection (first time)...") try: - if not self.client: - logger.info(f"Creating DpsnClient for {self.dpsn_url}") - self.client = DpsnClient( - dpsn_url=self.dpsn_url, - private_key=self.pvt_key, - chain_options=chain_options, - connection_options=connection_options - ) - self.client.on_error += self._handle_client_error - - logger.info("Initializing DpsnClient connection...") + # Perform the actual initialization / connection self.client.init({ "retry_options": { "max_retries": 3, - "initial_delay": 1000, + "initial_delay": 1000, "max_delay": 5000 } }) - logger.info("DpsnClient initialized successfully.") - - if self.message_callback: - try: - self.client.on_msg -= self.message_callback - except ValueError: - pass - self.client.on_msg += self.message_callback - logger.info("Message callback re-applied.") - # Return Tuple: (Status, Message, Info) - return (FunctionResultStatus.DONE, "DPSN client initialized successfully.", {}) + # Apply message callback if it was set before initialization finished + if self.message_callback: + try: + # Ensure handler isn't added multiple times if init is re-entrant + self.client.on_msg -= self.message_callback + except ValueError: + pass # Wasn't added yet + self.client.on_msg += self.message_callback + logger.info("Message callback applied during initialization.") + + self._initialized = True # Mark as initialized *after* successful init + logger.info("DpsnClient initialized successfully.") except DPSNError as e: logger.error(f"DPSN Initialization Error: Code={e.code}, Msg={e.message}") - if e.code in [DPSN_ERROR_CODES.INVALID_PRIVATE_KEY, DPSN_ERROR_CODES.BLOCKCHAIN_CONFIG_ERROR]: - self.client = None - # Return Tuple: (Status, Message, Info) - return ( - FunctionResultStatus.FAILED, - f"DPSNError ({e.code.name}): {e.message}", - {} - ) + self._initialized = False # Ensure flag is false on error + # Raise specific error to be caught by calling function + raise DpsnInitializationError(f"DPSNError ({e.code.name}): {e.message}") from e except Exception as e: - logger.exception("Unexpected error during DPSN initialization:") - self.client = None - # Return Tuple: (Status, Message, Info) - return ( - FunctionResultStatus.FAILED, - f"Unexpected initialization error: {str(e)}", - {} - ) + logger.exception("Unexpected error during DPSN initialization:") + self._initialized = False # Ensure flag is false on error + raise DpsnInitializationError(f"Unexpected initialization error: {str(e)}") from e def subscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: - """Subscribes to a specific topic using the initialized client.""" - if not self.client: - logger.warning("Subscribe called but client is not initialized.") - return (FunctionResultStatus.FAILED, "Client not initialized", {}) + """Subscribes to a specific topic. Ensures client is initialized first.""" + + try: + self._ensure_initialized() + except DpsnInitializationError as e: + logger.error(f"Subscription failed for topic '{topic}' due to initialization error: {e}") + return (FunctionResultStatus.FAILED, f"Initialization failed: {e}", {"topic": topic}) + + # Existing checks (client should exist if _ensure_initialized passed) + if not self.client or not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Subscribe attempt failed for topic '{topic}': Client not connected (post-init check).") + self._initialized = False # Reset flag if connection lost + return (FunctionResultStatus.FAILED, "Client not connected", {"topic": topic}) + - if not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): - logger.warning(f"Subscribe attempt failed for topic '{topic}': Client not connected.") - return (FunctionResultStatus.FAILED, "Client not connected", {}) - try: logger.info(f"Subscribing to topic: {topic}") - self.client.subscribe(topic) + self.client.subscribe(topic) logger.info(f"Successfully subscribed to topic: {topic}") - # Return Tuple: (Status, Message, Info - include topic) return (FunctionResultStatus.DONE, f"Successfully subscribed to topic: {topic}", {"subscribed_topic": topic}) except DPSNError as e: logger.error(f"DPSN Subscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") @@ -184,22 +192,25 @@ def subscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, An logger.exception(f"Unexpected error during subscription to topic '{topic}':") return (FunctionResultStatus.FAILED, f"Unexpected subscription error: {str(e)}", {"topic": topic}) - # Added unsubscribe method def unsubscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: - """Unsubscribes from a specific topic.""" - if not self.client: - logger.warning("Unsubscribe called but client is not initialized.") - return (FunctionResultStatus.FAILED, "Client not initialized", {}) + """Unsubscribes from a specific topic. Ensures client is initialized first.""" + + try: + self._ensure_initialized() + except DpsnInitializationError as e: + logger.error(f"Unsubscription failed for topic '{topic}' due to initialization error: {e}") + return (FunctionResultStatus.FAILED, f"Initialization failed: {e}", {"topic": topic}) - if not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): - logger.warning(f"Unsubscribe attempt failed for topic '{topic}': Client not connected.") - return (FunctionResultStatus.FAILED, "Client not connected", {}) + # Existing checks + if not self.client or not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Unsubscribe attempt failed for topic '{topic}': Client not connected (post-init check).") + self._initialized = False + return (FunctionResultStatus.FAILED, "Client not connected", {"topic": topic}) try: logger.info(f"Unsubscribing from topic: {topic}") self.client.unsubscribe(topic) logger.info(f"Successfully unsubscribed from topic: {topic}") - # Return Tuple: (Status, Message, Info - include topic) return (FunctionResultStatus.DONE, f"Successfully unsubscribed from topic: {topic}", {"unsubscribed_topic": topic}) except DPSNError as e: logger.error(f"DPSN Unsubscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") @@ -210,52 +221,71 @@ def unsubscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, def shutdown(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: - """Disconnects the DPSN client.""" - if self.client: + """Disconnects the DPSN client if it was initialized.""" + if self._initialized and self.client: try: logger.info("Shutting down DpsnClient connection...") self.client.disconnect() logger.info("DpsnClient shutdown complete.") - # Return Tuple: (Status, Message, Info) + self._initialized = False # Reset flag after successful disconnect return (FunctionResultStatus.DONE, "DPSN client shutdown complete.", {}) except DPSNError as e: logger.error(f"DPSN Shutdown Error: Code={e.code}, Msg={e.message}") + self._initialized = False return (FunctionResultStatus.FAILED, f"Shutdown error ({e.code.name}): {e.message}", {}) except Exception as e: logger.exception("Unexpected error during DPSN shutdown:") + self._initialized = False return (FunctionResultStatus.FAILED, f"Unexpected shutdown error: {str(e)}", {}) + elif not self.client: + logger.info("Shutdown called but client was never created.") + return (FunctionResultStatus.DONE, "Client not configured.", {}) else: - logger.info("Shutdown called but no active client to shutdown.") - # Return Tuple: (Status, Message, Info) - return (FunctionResultStatus.DONE, "No active client to shutdown", {}) + # Client exists but was never initialized or already shut down + logger.info("Shutdown called but client was not initialized or already shut down.") + self._initialized = False # Ensure flag is false + return (FunctionResultStatus.DONE, "Client was not active.", {}) def set_message_callback(self, callback: Callable[[Dict[str, Any]], None]): """ - Sets the callback function to handle incoming messages. - The callback receives a dictionary {'topic': str, 'payload': Any}. - Payload is automatically decoded (string or dict/list if JSON). + Sets the callback function. If client is initialized, applies it immediately. + If not initialized, stores it to be applied upon successful initialization. """ logger.info(f"Setting message callback to: {callback.__name__ if hasattr(callback, '__name__') else callback}") - # Remove old callback if client exists and callback was previously set + # Remove old callback first if client exists and callback was previously set if self.client and self.message_callback: try: self.client.on_msg -= self.message_callback + logger.debug("Removed previous message callback.") except ValueError: - pass # Ignore if it wasn't added + pass # Ignore if it wasn't added or client changed self.message_callback = callback # Store the new callback - # Add the new callback if client exists - if self.client: - self.client.on_msg += self.message_callback - logger.info("Message callback applied to existing client.") + # If client exists and is initialized, add the new callback immediately + # {{ Check _initialized flag }} + if self.client and self._initialized: + try: + self.client.on_msg += self.message_callback + logger.info("Message callback applied to initialized client.") + except Exception as e: + logger.exception("Failed to apply message callback to initialized client:") + elif self.client: + logger.info("Message callback stored and will be applied upon initialization.") + else: + logger.warning("Message callback stored, but client instance does not exist.") def _handle_client_error(self, error: DPSNError): """Internal handler for errors emitted by the DpsnClient.""" - logger.error(f"[DpsnClient EVENT] Error received: Code={error.code.name}, Msg={error.message}, Status={error.status}") - # You could add more logic here, like notifying the game state - + logger.error(f"[DpsnClient EVENT] Error received: Code={error.code.name if hasattr(error.code, 'name') else error.code}, Msg={error.message}, Status={error.status}") + + if self._initialized: + logger.warning("Marking DpsnPlugin as uninitialized due to client error.") + self._initialized = False + else: + logger.warning("Client error received, but plugin was already not marked as initialized.") + # Create plugin instance plugin = DpsnPlugin() \ No newline at end of file From d737393ff3c2e4c966bd8736e9837586daffd916 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 00:31:05 +0530 Subject: [PATCH 11/23] docs: add README.md for dpsn plugin --- plugins/dpsn/README.md | 193 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/plugins/dpsn/README.md b/plugins/dpsn/README.md index e69de29b..8d3a8149 100644 --- a/plugins/dpsn/README.md +++ b/plugins/dpsn/README.md @@ -0,0 +1,193 @@ +# 🌐 DPSN Plugin for Virtuals Protocol (Python) + +> Decentralized Publish-Subscribe Network (DPSN) plugin for Virtuals Protocol agents, implemented in Python. + +[![Virtuals Protocol](https://img.shields.io/badge/Virtuals%20Protocol-plugin-blue)](https://virtuals.io/) +[![Version](https://img.shields.io/badge/version-alpha-orange)](https://github.com/virtuals-protocol/virtuals-game-python) +[![License](https://img.shields.io/badge/license-MIT-green)](../../LICENSE) # Adjust path if needed + +## šŸ“‹ Overview + +This plugin enables Virtuals Protocol agents (written in Python) to connect to, subscribe to, and interact with data streams available on the [DPSN Data Streams Store](https://streams.dpsn.org/). + +Agents can leverage this plugin to consume real-time data for decision-making, reacting to events, or integrating external information feeds. + +To provide personalized data streams for your agents, you can create and publish data into your own DPSN topics using the [dpsn-client for Python](https://github.com/DPSN-org/dpsn-python-client). # Verify link/package name if available + +For more information, visit: +- [DPSN Official Website](https://dpsn.org) + +## ✨ Features + +- **Seamless Integration**: Connects Virtuals Protocol agents (Python) to the DPSN decentralized pub/sub network. +- **Real-time Data Handling**: Subscribe to topics and process incoming messages via a configurable callback. +- **Topic Management**: Provides agent-executable functions to `subscribe` and `unsubscribe` from DPSN topics. +- **Error Handling**: Includes basic error handling and logging for connection and subscription issues. +- **Graceful Shutdown**: Allows the agent to explicitly shut down the DPSN connection. + +## āš™ļø Configuration + +Ensure the following environment variables are set, typically in a `.env` file in your project root: + +> **Note**: The EVM private key (`PVT_KEY`) is used solely for signing authentication messages with the DPSN network. This process does not execute any on-chain transactions or incur gas fees. + +```dotenv +# Your EVM-compatible wallet private key (e.g., Metamask) +PVT_KEY=your_evm_wallet_private_key_here + +# The URL of the DPSN node to connect to (e.g., betanet.dpsn.org) +DPSN_URL=betanet.dpsn.org + +# Optional: Add VIRTUALS_API_KEY if required by your GameAgent setup +# VIRTUALS_API_KEY=your_virtuals_api_key_here +``` + +## šŸ“š Usage + +### Basic Setup + +The `DpsnPlugin` is designed to be used within the Virtuals Protocol Game SDK framework. You would typically instantiate it and potentially pass it to your `GameAgent` or similar construct. + +```python +# Import the pre-instantiated plugin (recommended) +from plugins.dpsn.dpsn_plugin_gamesdk.dpsn_plugin import plugin +# Define a simple message handler +def handle_message(message_data): + topic = message_data.get('topic', 'unknown') + payload = message_data.get('payload', {}) + print(f"Message on {topic}: {payload}") + +# Register the message handler +plugin.set_message_callback(handle_message) + +# Subscribe to a topic +status, message, details = plugin.subscribe( + topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" +) +print(f"Subscription status: {status}, Message: {message}") + +# Later when done: +# plugin.unsubscribe(topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker") +# plugin.shutdown() + +``` + +### Interacting via Agent Tasks + +The Game Agent interacts with the plugin by executing tasks that map to the plugin's `Function` objects. The exact syntax depends on your Game SDK's `runTask` or equivalent method. + +```python +from game_sdk.game.agent import Agent, WorkerConfig +from game_sdk.game.custom_types import FunctionResult +from dpsn_plugin_gamesdk.dpsn_plugin import plugin + +# --- Add Message Handler --- +def handle_incoming_message(message_data: dict): + """Callback function to process messages received via the plugin.""" + try: + topic = message_data.get('topic', 'N/A') + payload = message_data.get('payload', '{}') + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"\n--- Message Received ({timestamp}) ---") + print(f"Topic: {topic}") + # Pretty print payload if it's likely JSON/dict + if isinstance(payload, (dict, list)): + print(f"Payload:\n{json.dumps(payload, indent=2)}") + return payload + else: + print(f"Payload: {payload}") + return payload + print("-----------------------------------") + except Exception as e: + print(f"Error in message handler: {e}") + +# Set the callback in the plugin instance *before* running the agent +plugin.set_message_callback(handle_incoming_message) + +def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + +def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + + +subscription_worker = WorkerConfig( + id="subscription_worker", + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", + get_state_fn=get_worker_state, + action_space=[ + plugin.get_function("subscribe"), + plugin.get_function("unsubscribe"), + plugin.get_function("shutdown") + ], +) + +# Initialize the agent +agent = Agent( + api_key=os.environ.get("GAME_API_KEY"), + name="DPSN Market Data Agent", + agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", + agent_description=( + "You are an AI agent specialized in DPSN market data processing" + "You can subscribe dpsn topic" + "after 5 minutes unsubscribe the topic" + "next 5 minutes close the connection" + "\n\nAvailable topics:" + "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" + ), + get_agent_state_fn=get_agent_state_fn, + workers=[ + subscription_worker + ] +) +``` + +## šŸ“– API Reference (`DpsnPlugin`) + +Key components of the `DpsnPlugin` class: + +- `__init__(dpsn_url: Optional[str] = ..., pvt_key: Optional[str] = ...)`: Constructor. Reads credentials from env vars by default. Raises `ValueError` if credentials are missing. +- `get_function(fn_name: str) -> Function`: Retrieves the `Function` object (`subscribe`, `unsubscribe`, `shutdown`) for the Game SDK. +- `set_message_callback(callback: Callable[[Dict[str, Any]], None])`: Registers the function to call when a message is received on a subscribed topic. The callback receives a dictionary (structure depends on the underlying `dpsn-client`). +- `subscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Subscribes to a topic. Handles initialization if needed. Returns status, message, and details. +- `unsubscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Unsubscribes from a topic. Handles initialization if needed. Returns status, message, and details. +- `shutdown() -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Disconnects the DPSN client gracefully. Returns status, message, and details. +- `_ensure_initialized()`: (Internal) Manages the lazy initialization of the DPSN client connection. Raises `DpsnInitializationError` on failure. + +### Agent-Executable Functions + +The plugin exposes the following functions intended to be called via the Game Agent's task execution mechanism: + +- **`subscribe`**: + - Description: Subscribe to a DPSN topic. + - Args: `topic` (string, required) - The topic to subscribe to. +- **`unsubscribe`**: + - Description: Unsubscribe from a DPSN topic. + - Args: `topic` (string, required) - The topic to unsubscribe from. +- **`shutdown`**: + - Description: Shutdown the DPSN client connection. + - Args: None. + + + +> In case of any queries regarding DPSN, please reach out to the team on [Telegram](https://t.me/dpsn_dev) šŸ“„. + + From 1122c7325fe722b3d4c879fae754fe60849d4f33 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 10:29:30 +0530 Subject: [PATCH 12/23] refactor: dpsn agent example --- plugins/dpsn/examples/dpsn_agent.py | 50 +++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index 47f377f6..2ca7107a 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -23,13 +23,13 @@ def handle_incoming_message(message_data: dict): # Pretty print payload if it's likely JSON/dict if isinstance(payload, (dict, list)): print(f"Payload:\n{json.dumps(payload, indent=2)}") - return payload + return json.dumps(payload) else: print(f"Payload: {payload}") - return payload - print("-----------------------------------") + return str(payload) except Exception as e: print(f"Error in message handler: {e}") + return str(e) # Set the callback in the plugin instance *before* running the agent plugin.set_message_callback(handle_incoming_message) @@ -75,22 +75,44 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di agent = Agent( api_key=os.environ.get("GAME_API_KEY"), name="DPSN Market Data Agent", - agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", + agent_goal="Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up.", agent_description=( - "You are an AI agent specialized in DPSN market data processing" - "You can subscribe dpsn topic" - "after 5 minutes unsubscribe the topic" - "next 5 minutes close the connection" - "\n\nAvailable topics:" - "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" + "You are an AI agent specialized in DPSN market data processing. Follow these steps in order:\n\n" + + "1. IMMEDIATELY subscribe to this DPSN topic: 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc\n" + + "2. Once subscribed, inform the user that you are listening for market data updates.\n" + + "3. When you receive messages on this topic:\n" + " - Report that you received a message\n" + " - Summarize the content of each message in a clear, human-readable format\n" + + "4. After receiving messages for 2 minutes (or at least 3 messages, whichever comes first):\n" + " - Unsubscribe from the topic using the unsubscribe function\n" + " - Report that you have unsubscribed\n" + + "5. After unsubscribing:\n" + " - Shut down the DPSN client connection using the shutdown function\n" + " - Report that the connection has been closed\n" + + "Available functions:\n" + "- subscribe: Subscribe to a DPSN topic to receive data\n" + "- unsubscribe: Unsubscribe from a DPSN topic\n" + "- shutdown: Close the DPSN client connection\n\n" + + "Available topic:\n" + "- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" ), get_agent_state_fn=get_agent_state_fn, workers=[ - # connection_worker, subscription_worker ] ) -# Compile and run the agent -agent.compile() -agent.run() \ No newline at end of file +try: + agent.compile() + agent.run() +except Exception as e: + print(f"Error running agent:{e}") + import traceback + traceback.print_exc() \ No newline at end of file From d06f0c156858a9507e3a02c1659cea4b6ccae016 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 14:25:43 +0530 Subject: [PATCH 13/23] add: docstring for dpsn example demonstration in worker example --- plugins/dpsn/examples/dpsn_worker.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py index f3b9f3bc..f8c94795 100644 --- a/plugins/dpsn/examples/dpsn_worker.py +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -39,8 +39,27 @@ def process_message(self, message: Dict[str, Any]): print(f"šŸ’¼ Trade executed: {trade}") # Example use case of dpsn plugin worker + def execute_trade(self, topic: str, payload: Dict[str, Any]) -> Dict[str, Any] | None: - """Execute trades based on market data""" + """ + Demonstrates a potential use case for reacting to DPSN messages. + + **This is a highly simplified example for demonstration purposes only.** + + It simulates a basic trading decision based on a received price. + It lacks proper error handling, risk management, sophisticated logic, + and integration with actual trading systems. + + **Do NOT use this function in a real-world trading implementation.** + + Args: + topic: The topic the message was received on. + payload: The message payload. + + Returns: + A dictionary representing a trade order if conditions are met, + otherwise None. + """ # Example trade execution logic (synchronous) if "SOLUSDT" in topic: # Ensure payload is a dictionary before accessing keys From 3b8c7278f0199779858f501054ffebd18687614f Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 14:27:03 +0530 Subject: [PATCH 14/23] chore:migrate dpsn-client library from local to published version --- plugins/dpsn/pyproject.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/plugins/dpsn/pyproject.toml b/plugins/dpsn/pyproject.toml index 780d74e3..bb2780ad 100644 --- a/plugins/dpsn/pyproject.toml +++ b/plugins/dpsn/pyproject.toml @@ -5,10 +5,7 @@ description = "DPSN Plugin for Python SDK for GAME by Virtuals" authors = [{ name = "dpsn-dev", email = "dev@dpsn.org" }] readme = "README.md" requires-python = ">=3.9" -dependencies = [ - "/Users/anishrane/workspace/dpsn_python_lib/dpsn-python-client/dist/dpsn_client-0.1.0-py3-none-any.whl", -] - +dependencies = ["dpsn-client==1.0.0.post1"] [build-system] requires = ["poetry-core>=2.0.0,<3.0.0"] build-backend = "poetry.core.masonry.api" From cbc7f90c4974b13f3f2f2fc3b846d258e56175dc Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 15:08:56 +0530 Subject: [PATCH 15/23] refactor: change agent behaviour --- plugins/dpsn/examples/dpsn_agent.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index 2ca7107a..c2cd409a 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -3,12 +3,14 @@ from pathlib import Path import json from datetime import datetime +from collections import deque +from typing import Dict, Any, List parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) from game_sdk.game.agent import Agent, WorkerConfig -from game_sdk.game.custom_types import FunctionResult +from game_sdk.game.custom_types import FunctionResult, Function, Argument, FunctionResultStatus from dpsn_plugin_gamesdk.dpsn_plugin import plugin # --- Add Message Handler --- @@ -18,8 +20,14 @@ def handle_incoming_message(message_data: dict): topic = message_data.get('topic', 'N/A') payload = message_data.get('payload', '{}') timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # Store the message + message_store.add_message(message_data) + print(f"\n--- Message Received ({timestamp}) ---") print(f"Topic: {topic}") + print(f"Message Count: {message_store.message_count}") + # Pretty print payload if it's likely JSON/dict if isinstance(payload, (dict, list)): print(f"Payload:\n{json.dumps(payload, indent=2)}") @@ -85,7 +93,6 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di "3. When you receive messages on this topic:\n" " - Report that you received a message\n" - " - Summarize the content of each message in a clear, human-readable format\n" "4. After receiving messages for 2 minutes (or at least 3 messages, whichever comes first):\n" " - Unsubscribe from the topic using the unsubscribe function\n" From 818ea6f06caf9c16d0047f40b57c3b6fb7efd01e Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 15:17:47 +0530 Subject: [PATCH 16/23] remove:message store --- plugins/dpsn/examples/dpsn_agent.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index c2cd409a..d8d92f01 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -3,14 +3,12 @@ from pathlib import Path import json from datetime import datetime -from collections import deque -from typing import Dict, Any, List parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) from game_sdk.game.agent import Agent, WorkerConfig -from game_sdk.game.custom_types import FunctionResult, Function, Argument, FunctionResultStatus +from game_sdk.game.custom_types import FunctionResult from dpsn_plugin_gamesdk.dpsn_plugin import plugin # --- Add Message Handler --- @@ -20,14 +18,8 @@ def handle_incoming_message(message_data: dict): topic = message_data.get('topic', 'N/A') payload = message_data.get('payload', '{}') timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - # Store the message - message_store.add_message(message_data) - print(f"\n--- Message Received ({timestamp}) ---") print(f"Topic: {topic}") - print(f"Message Count: {message_store.message_count}") - # Pretty print payload if it's likely JSON/dict if isinstance(payload, (dict, list)): print(f"Payload:\n{json.dumps(payload, indent=2)}") From 874b4995d7818b1d2944cf8e9523c31f22e36d15 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 16:01:54 +0530 Subject: [PATCH 17/23] improved dpsn-client agent example --- plugins/dpsn/examples/dpsn_agent.py | 195 ++++++++++++++++++++++++---- 1 file changed, 167 insertions(+), 28 deletions(-) diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index d8d92f01..76f184f5 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -3,33 +3,135 @@ from pathlib import Path import json from datetime import datetime +import time +import threading +import signal +from typing import Dict, Any, Tuple parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) from game_sdk.game.agent import Agent, WorkerConfig -from game_sdk.game.custom_types import FunctionResult +from game_sdk.game.custom_types import FunctionResult, FunctionResultStatus, Function, Argument from dpsn_plugin_gamesdk.dpsn_plugin import plugin -# --- Add Message Handler --- +# Global message counter and timestamp +message_count = 0 +start_time = None +collected_messages = [] +task_completed = False # Flag to track if the main task has been completed + +# Update message handler to track count and time def handle_incoming_message(message_data: dict): """Callback function to process messages received via the plugin.""" - try: - topic = message_data.get('topic', 'N/A') - payload = message_data.get('payload', '{}') - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - print(f"\n--- Message Received ({timestamp}) ---") - print(f"Topic: {topic}") - # Pretty print payload if it's likely JSON/dict - if isinstance(payload, (dict, list)): - print(f"Payload:\n{json.dumps(payload, indent=2)}") - return json.dumps(payload) - else: - print(f"Payload: {payload}") - return str(payload) - except Exception as e: - print(f"Error in message handler: {e}") - return str(e) + global message_count, start_time, collected_messages + + # Don't process messages if task is already completed + if task_completed: + return "Task already completed" + + # Initialize start time on first message + if start_time is None: + start_time = time.time() + + message_count += 1 + collected_messages.append(message_data) + + topic = message_data.get('topic', 'N/A') + payload = message_data.get('payload', '{}') + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + print(f"\n--- Message Received ({timestamp}) ---") + print(f"Topic: {topic}") + print(f"Message count: {message_count}") + print(f"Time elapsed: {time.time() - start_time:.1f} seconds") + + # Pretty print payload + if isinstance(payload, (dict, list)): + print(f"Payload:\n{json.dumps(payload, indent=2)}") + return json.dumps(payload) + else: + print(f"Payload: {payload}") + return str(payload) + +# Add a new function for the agent to check if collection is complete +def check_collection_status() -> tuple[FunctionResultStatus, str, dict]: + """Check if we have collected enough data (2 minutes or 3+ messages)""" + global message_count, start_time, collected_messages + + elapsed = 0 + if start_time: + elapsed = time.time() - start_time + + # Check conditions + time_condition = elapsed >= 120 # 2 minutes + count_condition = message_count >= 3 + + if time_condition or count_condition: + reason = "time limit reached" if time_condition else "message count reached" + summary = { + "messages_received": message_count, + "time_elapsed_seconds": elapsed, + "collection_complete": True, + "reason": reason, + "sample_messages": collected_messages[:3] # First 3 messages as samples + } + return (FunctionResultStatus.DONE, f"Data collection complete: {reason}", summary) + else: + summary = { + "messages_received": message_count, + "time_elapsed_seconds": elapsed, + "collection_complete": False, + "remaining_time": 120 - elapsed, + "remaining_messages": max(0, 3 - message_count) + } + return (FunctionResultStatus.DONE, "Still collecting data...", summary) + +# Function to mark a task completed after dpsn shutdown +def mark_complete_after_shutdown() -> tuple[FunctionResultStatus, str, dict]: + """Mark the task as complete after DPSN client shutdown""" + global task_completed + + # Only allow this to be called if DPSN was already shut down + if task_completed: + return (FunctionResultStatus.DONE, "Task already marked as complete.", {"status": "already_completed"}) + + # Explain what's happening + print("\n=== FINALIZING TASK ===") + print("1. DPSN Client has been shut down") + print("2. Marking task as complete") + print("3. Program will exit shortly") + task_completed = True + + # Schedule a delayed exit to allow time for the agent to report completion + def exit_program(): + print("\n=== TASK COMPLETE ===") + print("Exiting program as all tasks are complete...") + os._exit(0) # Force exit the program + + # Schedule exit after 5 seconds + timer = threading.Timer(5.0, exit_program) + timer.daemon = True + timer.start() + + return (FunctionResultStatus.DONE, "Task complete! Agent is now finished.", {"status": "success"}) + +# Create a function objects for the functions +check_status_function = Function( + fn_name="check_collection_status", + fn_description="Check if enough data has been collected (2 minutes or 3+ messages)", + args=[], + hint="Use this to check if it's time to unsubscribe (returns collection status)", + executable=check_collection_status +) + +complete_task_function = Function( + fn_name="mark_task_complete", + fn_description="Mark the agent task as complete and exit the program", + args=[], + hint="Use this as the VERY LAST step after unsubscribing and shutting down DPSN", + executable=mark_complete_after_shutdown +) # Set the callback in the plugin instance *before* running the agent plugin.set_message_callback(handle_incoming_message) @@ -40,11 +142,32 @@ def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> init_state = {} if current_state is None: - return init_state + current_state = init_state # Initialize if None + + # Check if function_result is None (initial call) + if function_result is None: + return current_state # Return current (likely initial) state if function_result.info is not None: current_state.update(function_result.info) + # Check if we have completion info + if function_result.info and 'status' in function_result.info and function_result.info['status'] == 'success': + current_state['task_completed'] = True + # If we're marking task as complete, set state to indicate we're done + print("Agent state updated: Task marked as complete.") + + # Add a delay if we just checked status and collection is NOT complete + # Check if the 'collection_complete' key exists in the info dict + # to infer that check_collection_status was likely the last function run. + if function_result.info and \ + "collection_complete" in function_result.info and \ + not function_result.info.get("collection_complete", True): + + wait_time = 5 # Wait for 5 seconds before next check + print(f"Collection not complete. Waiting {wait_time} seconds before next action...") + time.sleep(wait_time) + return current_state def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: @@ -59,7 +182,6 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di return current_state - subscription_worker = WorkerConfig( id="subscription_worker", worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", @@ -67,7 +189,9 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di action_space=[ plugin.get_function("subscribe"), plugin.get_function("unsubscribe"), - plugin.get_function("shutdown") + plugin.get_function("shutdown"), + check_status_function, + complete_task_function # Add the task completion function ], ) @@ -75,7 +199,7 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di agent = Agent( api_key=os.environ.get("GAME_API_KEY"), name="DPSN Market Data Agent", - agent_goal="Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up.", + agent_goal="Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up and exit.", agent_description=( "You are an AI agent specialized in DPSN market data processing. Follow these steps in order:\n\n" @@ -86,18 +210,26 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di "3. When you receive messages on this topic:\n" " - Report that you received a message\n" - "4. After receiving messages for 2 minutes (or at least 3 messages, whichever comes first):\n" - " - Unsubscribe from the topic using the unsubscribe function\n" - " - Report that you have unsubscribed\n" + "4. After subscribing, periodically use the check_collection_status function to check if:\n" + " - 2 minutes have passed, OR\n" + " - At least 3 messages have been received\n" - "5. After unsubscribing:\n" + "5. Only when check_collection_status indicates collection is complete:\n" + " - Unsubscribe from the topic using the unsubscribe function\n" " - Shut down the DPSN client connection using the shutdown function\n" - " - Report that the connection has been closed\n" + + "6. FINAL STEP: After shutting down the DPSN connection, call the mark_task_complete function.\n" + " This will finish your task and automatically exit the program after 5 seconds.\n" + " This must be the VERY LAST function you call.\n" + + "IMPORTANT: The program will exit automatically after you mark the task complete.\n" "Available functions:\n" "- subscribe: Subscribe to a DPSN topic to receive data\n" "- unsubscribe: Unsubscribe from a DPSN topic\n" - "- shutdown: Close the DPSN client connection\n\n" + "- shutdown: Close the DPSN client connection\n" + "- check_collection_status: Check if collection is complete\n" + "- mark_task_complete: Mark the agent's task as complete and exit the program\n\n" "Available topic:\n" "- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" @@ -109,6 +241,13 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di ) try: + print("\n=== DPSN AGENT STARTING ===") + print("This agent will:") + print("1. Subscribe to the SOLUSDT/ohlc topic") + print("2. Collect messages for 2 minutes or at least 3 messages") + print("3. Unsubscribe and clean up") + print("4. Exit automatically\n") + agent.compile() agent.run() except Exception as e: From 4ce50f20725c76b97b073525bd78f432e118453f Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 1 Apr 2025 20:00:37 +0530 Subject: [PATCH 18/23] refactore: name and descriptions for plugin functions --- plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index 1023c5a2..805f5c4b 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -68,12 +68,12 @@ def __init__(self, self._functions = { "subscribe": Function( - fn_name="subscribe", - fn_description="Subscribe to a DPSN topic", + fn_name="subscribe_to_topic", + fn_description="Subscribe to a DPSN topic to receive messages", args=[ Argument( name="topic", - description="The topic string to subscribe to", + description="The DPSN topic to subscribe to", type="string", required=True ) @@ -82,12 +82,12 @@ def __init__(self, executable=self.subscribe # Keep executable pointing to the public method ), "unsubscribe": Function( - fn_name="unsubscribe", - fn_description="Unsubscribe from a DPSN topic", + fn_name="unsubscribe_to_topic", + fn_description="unsubscribe to a DPSN topic to stop receiving messages", args=[ Argument( name="topic", - description="The topic string to unsubscribe from", + description="The DPSN topic to unsubscribe to", type="string", required=True ) From dda61110824162c1cacdcbcf35f3bf611f6abe70 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Fri, 4 Apr 2025 12:34:40 +0530 Subject: [PATCH 19/23] chore: update plugin_metadata.yml --- plugins/dpsn/plugin_metadata.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/plugins/dpsn/plugin_metadata.yml b/plugins/dpsn/plugin_metadata.yml index e69de29b..75dfeac6 100644 --- a/plugins/dpsn/plugin_metadata.yml +++ b/plugins/dpsn/plugin_metadata.yml @@ -0,0 +1,14 @@ +# General Information +plugin_name: 'dpsn_plugin' +author: 'DPSN-Tech-Team' +logo_url: '' +release_date: '2025-04' + +# Description +short_description: 'DPSN plugin to subscribe to dpsn data streams for Virtuals Protocol agents' +detailed_description: 'This plugin allows Virtuals Protocol agents in Python to connect to and interact with real-time data streams from the DPSN Data Streams Store. Agents can use it for decision-making and event handling. Developers can also publish custom streams using the dpsn-python-client.' + +# Contact & Support +x_account_handle: '@DPSN_org' +support_contact: 'sanil@dpsn.org' +community_link: 'https://t.me/dpsn_dev' From 0ec40aa9be94b0a352ac4ecfaa1b766d19652d5a Mon Sep 17 00:00:00 2001 From: Anish Rane Date: Fri, 4 Apr 2025 12:50:33 +0530 Subject: [PATCH 20/23] Update README.md --- plugins/dpsn/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/dpsn/README.md b/plugins/dpsn/README.md index 8d3a8149..9ce86f8c 100644 --- a/plugins/dpsn/README.md +++ b/plugins/dpsn/README.md @@ -4,7 +4,7 @@ [![Virtuals Protocol](https://img.shields.io/badge/Virtuals%20Protocol-plugin-blue)](https://virtuals.io/) [![Version](https://img.shields.io/badge/version-alpha-orange)](https://github.com/virtuals-protocol/virtuals-game-python) -[![License](https://img.shields.io/badge/license-MIT-green)](../../LICENSE) # Adjust path if needed +[![License](https://img.shields.io/badge/license-MIT-green)](../../LICENSE) ## šŸ“‹ Overview @@ -12,7 +12,7 @@ This plugin enables Virtuals Protocol agents (written in Python) to connect to, Agents can leverage this plugin to consume real-time data for decision-making, reacting to events, or integrating external information feeds. -To provide personalized data streams for your agents, you can create and publish data into your own DPSN topics using the [dpsn-client for Python](https://github.com/DPSN-org/dpsn-python-client). # Verify link/package name if available +To provide personalized data streams for your agents, you can create and publish data into your own DPSN topics using the [dpsn-client for Python](https://github.com/DPSN-org/dpsn-python-client). For more information, visit: - [DPSN Official Website](https://dpsn.org) From 50855eeacdbe690ced60077a680b7236fc45aee2 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 22 Apr 2025 17:24:05 +0530 Subject: [PATCH 21/23] refactor: removed instance export from plugin and fixed code to adhere to POSIX standard --- .../dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py | 9 +---- plugins/dpsn/examples/dpsn_agent.py | 25 +++++++----- plugins/dpsn/examples/dpsn_worker.py | 16 ++++---- .../dpsn/examples/test_dpsn_game_functions.py | 40 +++++++++---------- 4 files changed, 44 insertions(+), 46 deletions(-) diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py index 805f5c4b..ecaed436 100644 --- a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -1,15 +1,10 @@ import os -from dotenv import load_dotenv from dpsn_client.client import DpsnClient, DPSNError from datetime import datetime from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus from typing import Dict, Any, Callable, Tuple, Optional import json import logging - -# Load .env variables -load_dotenv() - # Configure logging for the plugin (optional, but good practice) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("DpsnPlugin") @@ -285,7 +280,5 @@ def _handle_client_error(self, error: DPSNError): self._initialized = False else: logger.warning("Client error received, but plugin was already not marked as initialized.") - -# Create plugin instance -plugin = DpsnPlugin() + \ No newline at end of file diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py index 76f184f5..f78e9bba 100644 --- a/plugins/dpsn/examples/dpsn_agent.py +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -7,13 +7,17 @@ import threading import signal from typing import Dict, Any, Tuple - -parent_dir = str(Path(__file__).parent.parent) -sys.path.append(parent_dir) - +from dotenv import load_dotenv from game_sdk.game.agent import Agent, WorkerConfig from game_sdk.game.custom_types import FunctionResult, FunctionResultStatus, Function, Argument -from dpsn_plugin_gamesdk.dpsn_plugin import plugin +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin +load_dotenv() + +dpsn_plugin = DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") +) + # Global message counter and timestamp message_count = 0 @@ -134,7 +138,7 @@ def exit_program(): ) # Set the callback in the plugin instance *before* running the agent -plugin.set_message_callback(handle_incoming_message) +dpsn_plugin.set_message_callback(handle_incoming_message) # --- End Message Handler Setup --- def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: @@ -187,14 +191,15 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", get_state_fn=get_worker_state, action_space=[ - plugin.get_function("subscribe"), - plugin.get_function("unsubscribe"), - plugin.get_function("shutdown"), + dpsn_plugin.get_function("subscribe"), + dpsn_plugin.get_function("unsubscribe"), + dpsn_plugin.get_function("shutdown"), check_status_function, complete_task_function # Add the task completion function ], ) + # Initialize the agent agent = Agent( api_key=os.environ.get("GAME_API_KEY"), @@ -253,4 +258,4 @@ def get_worker_state(function_result: FunctionResult, current_state: dict) -> di except Exception as e: print(f"Error running agent:{e}") import traceback - traceback.print_exc() \ No newline at end of file + traceback.print_exc() diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py index f8c94795..fc19b31b 100644 --- a/plugins/dpsn/examples/dpsn_worker.py +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -1,24 +1,25 @@ import sys import os +from dotenv import load_dotenv from pathlib import Path from typing import Dict, Any, List from datetime import datetime import time # Import time for sleep - -# Add the parent directory to Python path -parent_dir = str(Path(__file__).parent.parent) -sys.path.append(parent_dir) +load_dotenv() # Import FunctionResultStatus to check the status enum from game_sdk.game.custom_types import FunctionResultStatus -from dpsn_plugin_gamesdk.dpsn_plugin import plugin +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin class DpsnWorker: """ DPSN Worker for processing market data and executing trades (Synchronous Version) """ def __init__(self): - self.plugin = plugin + self.plugin = DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) self.trades: List[Dict[str, Any]] = [] self.is_running = False @@ -128,4 +129,5 @@ def main(): if __name__ == "__main__": # Run main directly without asyncio - main() \ No newline at end of file + main() + \ No newline at end of file diff --git a/plugins/dpsn/examples/test_dpsn_game_functions.py b/plugins/dpsn/examples/test_dpsn_game_functions.py index b82ad6b2..1da10366 100644 --- a/plugins/dpsn/examples/test_dpsn_game_functions.py +++ b/plugins/dpsn/examples/test_dpsn_game_functions.py @@ -3,23 +3,24 @@ from pathlib import Path import time from datetime import datetime +from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus +from dotenv import load_dotenv +load_dotenv() -# Add the parent directory to Python path -parent_dir = str(Path(__file__).parent.parent) -sys.path.append(parent_dir) -from dpsn_plugin_gamesdk.dpsn_plugin import plugin + +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin + +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") +) def test_dpsn_connection(): """Test DPSN connection and basic functionality""" print("\nšŸ”„ Testing DPSN Connection...") - # Initialize DPSN client (without options since the method doesn't accept them) - result = plugin.initialize() - if not result["success"]: - print(f"āŒ Failed to initialize DPSN: {result.get('error')}") - return False - + # Wait for connection to stabilize time.sleep(1) print("āœ… DPSN initialized successfully") @@ -36,13 +37,13 @@ def handle_message(message_data): print(f"Received message on {topic}: {payload}") # Set the callback - plugin.set_message_callback(handle_message) + dpsn_plugin.set_message_callback(handle_message) # Test topic topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" print(f"Subscribing to topic: {topic}") - result = plugin.subscribe(topic) + result = dpsn_plugin.subscribe(topic) if not result["success"]: print(f"āŒ Failed to subscribe to topic: {result.get('error')}") return False @@ -52,26 +53,24 @@ def handle_message(message_data): try: while True: - if not plugin.client.dpsn_broker.is_connected(): + if not dpsn_plugin.client.dpsn_broker.is_connected(): print("Connection lost, attempting to reconnect...") - plugin.initialize() + dpsn_plugin.initialize() time.sleep(1) - plugin.subscribe(topic) + dpsn_plugin.subscribe(topic) time.sleep(1) except KeyboardInterrupt: print("\nāš ļø Test interrupted by user") return True - return True - def test_shutdown(): """Test graceful shutdown""" print("\nšŸ”„ Testing Shutdown...") - result = plugin.shutdown() - if not result["success"]: - print(f"āŒ Failed to shutdown: {result.get('error')}") + status,message,extra = dpsn_plugin.shutdown() + if status is not FunctionResultStatus.DONE: + print(f"āŒ Failed to shutdown") return False print("āœ… Shutdown successful") @@ -86,7 +85,6 @@ def main(): if not test_dpsn_connection(): return - # Test subscription and message reception if not test_subscribe_and_receive(): return From 710219e79110a18e9704dfbc3ecec8676eda8977 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Tue, 22 Apr 2025 17:24:37 +0530 Subject: [PATCH 22/23] refactor: updated README.md and env example file --- plugins/dpsn/README.md | 37 +++++++++++++++++++----------- plugins/dpsn/examples/.env.example | 4 ++-- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/plugins/dpsn/README.md b/plugins/dpsn/README.md index 9ce86f8c..ea29b8fb 100644 --- a/plugins/dpsn/README.md +++ b/plugins/dpsn/README.md @@ -50,7 +50,14 @@ The `DpsnPlugin` is designed to be used within the Virtuals Protocol Game SDK fr ```python # Import the pre-instantiated plugin (recommended) -from plugins.dpsn.dpsn_plugin_gamesdk.dpsn_plugin import plugin +from plugins.dpsn.dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin +load_dotenv() + +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) + # Define a simple message handler def handle_message(message_data): topic = message_data.get('topic', 'unknown') @@ -58,17 +65,17 @@ def handle_message(message_data): print(f"Message on {topic}: {payload}") # Register the message handler -plugin.set_message_callback(handle_message) +dpsn_plugin.set_message_callback(handle_message) # Subscribe to a topic -status, message, details = plugin.subscribe( +status, message, details = dpsn_plugin.subscribe( topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" ) print(f"Subscription status: {status}, Message: {message}") # Later when done: -# plugin.unsubscribe(topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker") -# plugin.shutdown() +dpsn_plugin.unsubscribe(topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker") +dpsn_plugin.shutdown() ``` @@ -79,8 +86,14 @@ The Game Agent interacts with the plugin by executing tasks that map to the plug ```python from game_sdk.game.agent import Agent, WorkerConfig from game_sdk.game.custom_types import FunctionResult -from dpsn_plugin_gamesdk.dpsn_plugin import plugin +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin + +load_dotenv() +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) # --- Add Message Handler --- def handle_incoming_message(message_data: dict): """Callback function to process messages received via the plugin.""" @@ -102,7 +115,7 @@ def handle_incoming_message(message_data: dict): print(f"Error in message handler: {e}") # Set the callback in the plugin instance *before* running the agent -plugin.set_message_callback(handle_incoming_message) +dpsn_plugin.set_message_callback(handle_incoming_message) def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: """Update state based on the function results""" @@ -134,9 +147,9 @@ subscription_worker = WorkerConfig( worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", get_state_fn=get_worker_state, action_space=[ - plugin.get_function("subscribe"), - plugin.get_function("unsubscribe"), - plugin.get_function("shutdown") + dpsn_plugin.get_function("subscribe"), + dpsn_plugin.get_function("unsubscribe"), + dpsn_plugin.get_function("shutdown") ], ) @@ -188,6 +201,4 @@ The plugin exposes the following functions intended to be called via the Game Ag -> In case of any queries regarding DPSN, please reach out to the team on [Telegram](https://t.me/dpsn_dev) šŸ“„. - - +> In case of any queries regarding DPSN, please reach out to the team on [Telegram](https://t.me/dpsn_dev) šŸ“„. \ No newline at end of file diff --git a/plugins/dpsn/examples/.env.example b/plugins/dpsn/examples/.env.example index ccd5813d..515842b3 100644 --- a/plugins/dpsn/examples/.env.example +++ b/plugins/dpsn/examples/.env.example @@ -1,3 +1,3 @@ DPSN_URL=betanet.dpsn.org -PVT_KEY=9192c2aa125bde2ed2e861f973a6124e0796722dc263dacac70719579df8d98b -GAME_API_KEY=apt-e52b807bf0a05d47cfdcd88413e7099c \ No newline at end of file +PVT_KEY= +GAME_API_KEY=apt- \ No newline at end of file From 4d0567c506368df2906a7d1dee387632bbb18855 Mon Sep 17 00:00:00 2001 From: AnishRane Date: Wed, 23 Apr 2025 15:22:57 +0530 Subject: [PATCH 23/23] chore: update pyproject.toml --- plugins/dpsn/pyproject.toml | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/plugins/dpsn/pyproject.toml b/plugins/dpsn/pyproject.toml index bb2780ad..28b24d33 100644 --- a/plugins/dpsn/pyproject.toml +++ b/plugins/dpsn/pyproject.toml @@ -1,11 +1,26 @@ [project] name = "dpsn-plugin-gamesdk" -version = "0.1.0" +version = "0.0.1" description = "DPSN Plugin for Python SDK for GAME by Virtuals" -authors = [{ name = "dpsn-dev", email = "dev@dpsn.org" }] +authors = [{ name = "dpsn-dev", email = "sanil@dpsn.org" }] readme = "README.md" requires-python = ">=3.9" -dependencies = ["dpsn-client==1.0.0.post1"] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries :: Python Modules", +] +dependencies = ["dpsn-client==1.0.0.post1", "game-sdk>=0.1.1"] [build-system] requires = ["poetry-core>=2.0.0,<3.0.0"] build-backend = "poetry.core.masonry.api" + +[project.urls] +"Homepage" = "https://github.com/game-by-virtuals/game-python/plugins/dpsn" +"Bug Tracker" = "https://github.com/game-by-virtuals/game-python"