diff --git a/plugins/dpsn/README.md b/plugins/dpsn/README.md new file mode 100644 index 00000000..ea29b8fb --- /dev/null +++ b/plugins/dpsn/README.md @@ -0,0 +1,204 @@ +# 🌐 DPSN Plugin for Virtuals Protocol (Python) + +> Decentralized Publish-Subscribe Network (DPSN) plugin for Virtuals Protocol agents, implemented in Python. + +[![Virtuals Protocol](https://img.shields.io/badge/Virtuals%20Protocol-plugin-blue)](https://virtuals.io/) +[![Version](https://img.shields.io/badge/version-alpha-orange)](https://github.com/virtuals-protocol/virtuals-game-python) +[![License](https://img.shields.io/badge/license-MIT-green)](../../LICENSE) + +## šŸ“‹ Overview + +This plugin enables Virtuals Protocol agents (written in Python) to connect to, subscribe to, and interact with data streams available on the [DPSN Data Streams Store](https://streams.dpsn.org/). + +Agents can leverage this plugin to consume real-time data for decision-making, reacting to events, or integrating external information feeds. + +To provide personalized data streams for your agents, you can create and publish data into your own DPSN topics using the [dpsn-client for Python](https://github.com/DPSN-org/dpsn-python-client). + +For more information, visit: +- [DPSN Official Website](https://dpsn.org) + +## ✨ Features + +- **Seamless Integration**: Connects Virtuals Protocol agents (Python) to the DPSN decentralized pub/sub network. +- **Real-time Data Handling**: Subscribe to topics and process incoming messages via a configurable callback. +- **Topic Management**: Provides agent-executable functions to `subscribe` and `unsubscribe` from DPSN topics. +- **Error Handling**: Includes basic error handling and logging for connection and subscription issues. +- **Graceful Shutdown**: Allows the agent to explicitly shut down the DPSN connection. + +## āš™ļø Configuration + +Ensure the following environment variables are set, typically in a `.env` file in your project root: + +> **Note**: The EVM private key (`PVT_KEY`) is used solely for signing authentication messages with the DPSN network. This process does not execute any on-chain transactions or incur gas fees. + +```dotenv +# Your EVM-compatible wallet private key (e.g., Metamask) +PVT_KEY=your_evm_wallet_private_key_here + +# The URL of the DPSN node to connect to (e.g., betanet.dpsn.org) +DPSN_URL=betanet.dpsn.org + +# Optional: Add VIRTUALS_API_KEY if required by your GameAgent setup +# VIRTUALS_API_KEY=your_virtuals_api_key_here +``` + +## šŸ“š Usage + +### Basic Setup + +The `DpsnPlugin` is designed to be used within the Virtuals Protocol Game SDK framework. You would typically instantiate it and potentially pass it to your `GameAgent` or similar construct. + +```python +# Import the pre-instantiated plugin (recommended) +from plugins.dpsn.dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin +load_dotenv() + +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) + +# Define a simple message handler +def handle_message(message_data): + topic = message_data.get('topic', 'unknown') + payload = message_data.get('payload', {}) + print(f"Message on {topic}: {payload}") + +# Register the message handler +dpsn_plugin.set_message_callback(handle_message) + +# Subscribe to a topic +status, message, details = dpsn_plugin.subscribe( + topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker" +) +print(f"Subscription status: {status}, Message: {message}") + +# Later when done: +dpsn_plugin.unsubscribe(topic="0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/BTCUSDT/ticker") +dpsn_plugin.shutdown() + +``` + +### Interacting via Agent Tasks + +The Game Agent interacts with the plugin by executing tasks that map to the plugin's `Function` objects. The exact syntax depends on your Game SDK's `runTask` or equivalent method. + +```python +from game_sdk.game.agent import Agent, WorkerConfig +from game_sdk.game.custom_types import FunctionResult +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin + +load_dotenv() + +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) +# --- Add Message Handler --- +def handle_incoming_message(message_data: dict): + """Callback function to process messages received via the plugin.""" + try: + topic = message_data.get('topic', 'N/A') + payload = message_data.get('payload', '{}') + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"\n--- Message Received ({timestamp}) ---") + print(f"Topic: {topic}") + # Pretty print payload if it's likely JSON/dict + if isinstance(payload, (dict, list)): + print(f"Payload:\n{json.dumps(payload, indent=2)}") + return payload + else: + print(f"Payload: {payload}") + return payload + print("-----------------------------------") + except Exception as e: + print(f"Error in message handler: {e}") + +# Set the callback in the plugin instance *before* running the agent +dpsn_plugin.set_message_callback(handle_incoming_message) + +def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + +def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + + +subscription_worker = WorkerConfig( + id="subscription_worker", + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", + get_state_fn=get_worker_state, + action_space=[ + dpsn_plugin.get_function("subscribe"), + dpsn_plugin.get_function("unsubscribe"), + dpsn_plugin.get_function("shutdown") + ], +) + +# Initialize the agent +agent = Agent( + api_key=os.environ.get("GAME_API_KEY"), + name="DPSN Market Data Agent", + agent_goal="Monitor SOLUSDT market data from DPSN and process real-time updates.", + agent_description=( + "You are an AI agent specialized in DPSN market data processing" + "You can subscribe dpsn topic" + "after 5 minutes unsubscribe the topic" + "next 5 minutes close the connection" + "\n\nAvailable topics:" + "\n- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" + ), + get_agent_state_fn=get_agent_state_fn, + workers=[ + subscription_worker + ] +) +``` + +## šŸ“– API Reference (`DpsnPlugin`) + +Key components of the `DpsnPlugin` class: + +- `__init__(dpsn_url: Optional[str] = ..., pvt_key: Optional[str] = ...)`: Constructor. Reads credentials from env vars by default. Raises `ValueError` if credentials are missing. +- `get_function(fn_name: str) -> Function`: Retrieves the `Function` object (`subscribe`, `unsubscribe`, `shutdown`) for the Game SDK. +- `set_message_callback(callback: Callable[[Dict[str, Any]], None])`: Registers the function to call when a message is received on a subscribed topic. The callback receives a dictionary (structure depends on the underlying `dpsn-client`). +- `subscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Subscribes to a topic. Handles initialization if needed. Returns status, message, and details. +- `unsubscribe(topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Unsubscribes from a topic. Handles initialization if needed. Returns status, message, and details. +- `shutdown() -> Tuple[FunctionResultStatus, str, Dict[str, Any]]`: (Executable Function) Disconnects the DPSN client gracefully. Returns status, message, and details. +- `_ensure_initialized()`: (Internal) Manages the lazy initialization of the DPSN client connection. Raises `DpsnInitializationError` on failure. + +### Agent-Executable Functions + +The plugin exposes the following functions intended to be called via the Game Agent's task execution mechanism: + +- **`subscribe`**: + - Description: Subscribe to a DPSN topic. + - Args: `topic` (string, required) - The topic to subscribe to. +- **`unsubscribe`**: + - Description: Unsubscribe from a DPSN topic. + - Args: `topic` (string, required) - The topic to unsubscribe from. +- **`shutdown`**: + - Description: Shutdown the DPSN client connection. + - Args: None. + + + +> In case of any queries regarding DPSN, please reach out to the team on [Telegram](https://t.me/dpsn_dev) šŸ“„. \ No newline at end of file diff --git a/plugins/dpsn/__init__.py b/plugins/dpsn/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py new file mode 100644 index 00000000..ecaed436 --- /dev/null +++ b/plugins/dpsn/dpsn_plugin_gamesdk/dpsn_plugin.py @@ -0,0 +1,284 @@ +import os +from dpsn_client.client import DpsnClient, DPSNError +from datetime import datetime +from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus +from typing import Dict, Any, Callable, Tuple, Optional +import json +import logging +# Configure logging for the plugin (optional, but good practice) +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger("DpsnPlugin") + +# {{ Add a custom exception for initialization errors }} +class DpsnInitializationError(Exception): + """Custom exception for DPSN initialization failures.""" + pass + +class DpsnPlugin: + """ + DPSN Plugin using the updated DpsnClient for handling connections, + subscriptions, and message handling. + """ + + def __init__(self, + dpsn_url:Optional[str] = os.getenv("DPSN_URL"), + pvt_key:Optional[str] = os.getenv("PVT_KEY") + ): + self.dpsn_url = dpsn_url + self.pvt_key = pvt_key + if not self.dpsn_url or not self.pvt_key: + raise ValueError("DPSN_URL and PVT_KEY are required.") + + # {{ Create the client instance here, but don't connect yet }} + self.client: DpsnClient | None = None + try: + if self.dpsn_url and self.pvt_key: + chain_options = { + "network": "testnet", # Consider making configurable + "wallet_chain_type": "ethereum" + } + connection_options = {"ssl": True} # Consider making configurable + + logger.info(f"Creating DpsnClient instance for {self.dpsn_url}") + self.client = DpsnClient( + dpsn_url=self.dpsn_url, + private_key=self.pvt_key, + chain_options=chain_options, + connection_options=connection_options + ) + # Setup internal error handler early + self.client.on_error += self._handle_client_error + else: + # Client remains None if credentials missing + logger.warning("DpsnClient not created due to missing URL or Key.") + + except Exception as e: + logger.exception("Unexpected error during DpsnClient instantiation:") + self.client = None # Ensure client is None on instantiation error + + # {{ Add initialization flag }} + self._initialized = False + self.message_callback: Callable[[Dict[str, Any]], None] | None = None + + + self._functions = { + "subscribe": Function( + fn_name="subscribe_to_topic", + fn_description="Subscribe to a DPSN topic to receive messages", + args=[ + Argument( + name="topic", + description="The DPSN topic to subscribe to", + type="string", + required=True + ) + ], + hint="Subscribes to a specific DPSN topic to receive messages. Will initialize connection if needed.", + executable=self.subscribe # Keep executable pointing to the public method + ), + "unsubscribe": Function( + fn_name="unsubscribe_to_topic", + fn_description="unsubscribe to a DPSN topic to stop receiving messages", + args=[ + Argument( + name="topic", + description="The DPSN topic to unsubscribe to", + type="string", + required=True + ) + ], + hint="Unsubscribes from a specific DPSN topic. Will initialize connection if needed.", + executable=self.unsubscribe + ), + "shutdown": Function( + fn_name="shutdown", + fn_description="Shutdown DPSN client connection", + args=[], + hint="Disconnects the DPSN client gracefully.", + executable=self.shutdown + ) + } + + def get_function(self, fn_name: str) -> Function: + """Get a specific function by name""" + if fn_name not in self._functions: + logger.error(f"Function '{fn_name}' not found in DpsnPlugin") + raise ValueError(f"Function '{fn_name}' not found") + return self._functions[fn_name] + + # {{ New private method to handle initialization logic }} + def _ensure_initialized(self): + """ + Ensures the DpsnClient is initialized. Runs initialization logic only once. + Raises DpsnInitializationError on failure. + """ + if self._initialized: + return # Already initialized + + if not self.client: + logger.error("Cannot initialize: DpsnClient instance was not created (likely missing credentials).") + raise DpsnInitializationError("Client not configured (missing URL/Key).") + + # Check if already connected (e.g., if init was called externally somehow) + if self.client.dpsn_broker and self.client.dpsn_broker.is_connected(): + logger.info("Client already connected. Marking as initialized.") + self._initialized = True + return + + logger.info("Initializing DpsnClient connection (first time)...") + try: + # Perform the actual initialization / connection + self.client.init({ + "retry_options": { + "max_retries": 3, + "initial_delay": 1000, + "max_delay": 5000 + } + }) + + # Apply message callback if it was set before initialization finished + if self.message_callback: + try: + # Ensure handler isn't added multiple times if init is re-entrant + self.client.on_msg -= self.message_callback + except ValueError: + pass # Wasn't added yet + self.client.on_msg += self.message_callback + logger.info("Message callback applied during initialization.") + + self._initialized = True # Mark as initialized *after* successful init + logger.info("DpsnClient initialized successfully.") + + except DPSNError as e: + logger.error(f"DPSN Initialization Error: Code={e.code}, Msg={e.message}") + self._initialized = False # Ensure flag is false on error + # Raise specific error to be caught by calling function + raise DpsnInitializationError(f"DPSNError ({e.code.name}): {e.message}") from e + except Exception as e: + logger.exception("Unexpected error during DPSN initialization:") + self._initialized = False # Ensure flag is false on error + raise DpsnInitializationError(f"Unexpected initialization error: {str(e)}") from e + + def subscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Subscribes to a specific topic. Ensures client is initialized first.""" + + try: + self._ensure_initialized() + except DpsnInitializationError as e: + logger.error(f"Subscription failed for topic '{topic}' due to initialization error: {e}") + return (FunctionResultStatus.FAILED, f"Initialization failed: {e}", {"topic": topic}) + + # Existing checks (client should exist if _ensure_initialized passed) + if not self.client or not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Subscribe attempt failed for topic '{topic}': Client not connected (post-init check).") + self._initialized = False # Reset flag if connection lost + return (FunctionResultStatus.FAILED, "Client not connected", {"topic": topic}) + + + try: + logger.info(f"Subscribing to topic: {topic}") + self.client.subscribe(topic) + logger.info(f"Successfully subscribed to topic: {topic}") + return (FunctionResultStatus.DONE, f"Successfully subscribed to topic: {topic}", {"subscribed_topic": topic}) + except DPSNError as e: + logger.error(f"DPSN Subscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") + return (FunctionResultStatus.FAILED, f"Subscription error ({e.code.name}): {e.message}", {"topic": topic}) + except Exception as e: + logger.exception(f"Unexpected error during subscription to topic '{topic}':") + return (FunctionResultStatus.FAILED, f"Unexpected subscription error: {str(e)}", {"topic": topic}) + + def unsubscribe(self, topic: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Unsubscribes from a specific topic. Ensures client is initialized first.""" + + try: + self._ensure_initialized() + except DpsnInitializationError as e: + logger.error(f"Unsubscription failed for topic '{topic}' due to initialization error: {e}") + return (FunctionResultStatus.FAILED, f"Initialization failed: {e}", {"topic": topic}) + + # Existing checks + if not self.client or not self.client.dpsn_broker or not self.client.dpsn_broker.is_connected(): + logger.warning(f"Unsubscribe attempt failed for topic '{topic}': Client not connected (post-init check).") + self._initialized = False + return (FunctionResultStatus.FAILED, "Client not connected", {"topic": topic}) + + try: + logger.info(f"Unsubscribing from topic: {topic}") + self.client.unsubscribe(topic) + logger.info(f"Successfully unsubscribed from topic: {topic}") + return (FunctionResultStatus.DONE, f"Successfully unsubscribed from topic: {topic}", {"unsubscribed_topic": topic}) + except DPSNError as e: + logger.error(f"DPSN Unsubscription Error for topic '{topic}': Code={e.code}, Msg={e.message}") + return (FunctionResultStatus.FAILED, f"Unsubscription error ({e.code.name}): {e.message}", {"topic": topic}) + except Exception as e: + logger.exception(f"Unexpected error during unsubscription from topic '{topic}':") + return (FunctionResultStatus.FAILED, f"Unexpected unsubscription error: {str(e)}", {"topic": topic}) + + + def shutdown(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """Disconnects the DPSN client if it was initialized.""" + if self._initialized and self.client: + try: + logger.info("Shutting down DpsnClient connection...") + self.client.disconnect() + logger.info("DpsnClient shutdown complete.") + self._initialized = False # Reset flag after successful disconnect + return (FunctionResultStatus.DONE, "DPSN client shutdown complete.", {}) + except DPSNError as e: + logger.error(f"DPSN Shutdown Error: Code={e.code}, Msg={e.message}") + self._initialized = False + return (FunctionResultStatus.FAILED, f"Shutdown error ({e.code.name}): {e.message}", {}) + except Exception as e: + logger.exception("Unexpected error during DPSN shutdown:") + self._initialized = False + return (FunctionResultStatus.FAILED, f"Unexpected shutdown error: {str(e)}", {}) + elif not self.client: + logger.info("Shutdown called but client was never created.") + return (FunctionResultStatus.DONE, "Client not configured.", {}) + else: + # Client exists but was never initialized or already shut down + logger.info("Shutdown called but client was not initialized or already shut down.") + self._initialized = False # Ensure flag is false + return (FunctionResultStatus.DONE, "Client was not active.", {}) + + def set_message_callback(self, callback: Callable[[Dict[str, Any]], None]): + """ + Sets the callback function. If client is initialized, applies it immediately. + If not initialized, stores it to be applied upon successful initialization. + """ + logger.info(f"Setting message callback to: {callback.__name__ if hasattr(callback, '__name__') else callback}") + + # Remove old callback first if client exists and callback was previously set + if self.client and self.message_callback: + try: + self.client.on_msg -= self.message_callback + logger.debug("Removed previous message callback.") + except ValueError: + pass # Ignore if it wasn't added or client changed + + self.message_callback = callback # Store the new callback + + # If client exists and is initialized, add the new callback immediately + # {{ Check _initialized flag }} + if self.client and self._initialized: + try: + self.client.on_msg += self.message_callback + logger.info("Message callback applied to initialized client.") + except Exception as e: + logger.exception("Failed to apply message callback to initialized client:") + elif self.client: + logger.info("Message callback stored and will be applied upon initialization.") + else: + logger.warning("Message callback stored, but client instance does not exist.") + + def _handle_client_error(self, error: DPSNError): + """Internal handler for errors emitted by the DpsnClient.""" + logger.error(f"[DpsnClient EVENT] Error received: Code={error.code.name if hasattr(error.code, 'name') else error.code}, Msg={error.message}, Status={error.status}") + + if self._initialized: + logger.warning("Marking DpsnPlugin as uninitialized due to client error.") + self._initialized = False + else: + logger.warning("Client error received, but plugin was already not marked as initialized.") + + \ No newline at end of file diff --git a/plugins/dpsn/examples/.env.example b/plugins/dpsn/examples/.env.example new file mode 100644 index 00000000..515842b3 --- /dev/null +++ b/plugins/dpsn/examples/.env.example @@ -0,0 +1,3 @@ +DPSN_URL=betanet.dpsn.org +PVT_KEY= +GAME_API_KEY=apt- \ No newline at end of file diff --git a/plugins/dpsn/examples/dpsn_agent.py b/plugins/dpsn/examples/dpsn_agent.py new file mode 100644 index 00000000..f78e9bba --- /dev/null +++ b/plugins/dpsn/examples/dpsn_agent.py @@ -0,0 +1,261 @@ +import os +import sys +from pathlib import Path +import json +from datetime import datetime +import time +import threading +import signal +from typing import Dict, Any, Tuple +from dotenv import load_dotenv +from game_sdk.game.agent import Agent, WorkerConfig +from game_sdk.game.custom_types import FunctionResult, FunctionResultStatus, Function, Argument +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin +load_dotenv() + +dpsn_plugin = DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") +) + + +# Global message counter and timestamp +message_count = 0 +start_time = None +collected_messages = [] +task_completed = False # Flag to track if the main task has been completed + +# Update message handler to track count and time +def handle_incoming_message(message_data: dict): + """Callback function to process messages received via the plugin.""" + global message_count, start_time, collected_messages + + # Don't process messages if task is already completed + if task_completed: + return "Task already completed" + + # Initialize start time on first message + if start_time is None: + start_time = time.time() + + message_count += 1 + collected_messages.append(message_data) + + topic = message_data.get('topic', 'N/A') + payload = message_data.get('payload', '{}') + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + print(f"\n--- Message Received ({timestamp}) ---") + print(f"Topic: {topic}") + print(f"Message count: {message_count}") + print(f"Time elapsed: {time.time() - start_time:.1f} seconds") + + # Pretty print payload + if isinstance(payload, (dict, list)): + print(f"Payload:\n{json.dumps(payload, indent=2)}") + return json.dumps(payload) + else: + print(f"Payload: {payload}") + return str(payload) + +# Add a new function for the agent to check if collection is complete +def check_collection_status() -> tuple[FunctionResultStatus, str, dict]: + """Check if we have collected enough data (2 minutes or 3+ messages)""" + global message_count, start_time, collected_messages + + elapsed = 0 + if start_time: + elapsed = time.time() - start_time + + # Check conditions + time_condition = elapsed >= 120 # 2 minutes + count_condition = message_count >= 3 + + if time_condition or count_condition: + reason = "time limit reached" if time_condition else "message count reached" + summary = { + "messages_received": message_count, + "time_elapsed_seconds": elapsed, + "collection_complete": True, + "reason": reason, + "sample_messages": collected_messages[:3] # First 3 messages as samples + } + return (FunctionResultStatus.DONE, f"Data collection complete: {reason}", summary) + else: + summary = { + "messages_received": message_count, + "time_elapsed_seconds": elapsed, + "collection_complete": False, + "remaining_time": 120 - elapsed, + "remaining_messages": max(0, 3 - message_count) + } + return (FunctionResultStatus.DONE, "Still collecting data...", summary) + +# Function to mark a task completed after dpsn shutdown +def mark_complete_after_shutdown() -> tuple[FunctionResultStatus, str, dict]: + """Mark the task as complete after DPSN client shutdown""" + global task_completed + + # Only allow this to be called if DPSN was already shut down + if task_completed: + return (FunctionResultStatus.DONE, "Task already marked as complete.", {"status": "already_completed"}) + + # Explain what's happening + print("\n=== FINALIZING TASK ===") + print("1. DPSN Client has been shut down") + print("2. Marking task as complete") + print("3. Program will exit shortly") + task_completed = True + + # Schedule a delayed exit to allow time for the agent to report completion + def exit_program(): + print("\n=== TASK COMPLETE ===") + print("Exiting program as all tasks are complete...") + os._exit(0) # Force exit the program + + # Schedule exit after 5 seconds + timer = threading.Timer(5.0, exit_program) + timer.daemon = True + timer.start() + + return (FunctionResultStatus.DONE, "Task complete! Agent is now finished.", {"status": "success"}) + +# Create a function objects for the functions +check_status_function = Function( + fn_name="check_collection_status", + fn_description="Check if enough data has been collected (2 minutes or 3+ messages)", + args=[], + hint="Use this to check if it's time to unsubscribe (returns collection status)", + executable=check_collection_status +) + +complete_task_function = Function( + fn_name="mark_task_complete", + fn_description="Mark the agent task as complete and exit the program", + args=[], + hint="Use this as the VERY LAST step after unsubscribing and shutting down DPSN", + executable=mark_complete_after_shutdown +) + +# Set the callback in the plugin instance *before* running the agent +dpsn_plugin.set_message_callback(handle_incoming_message) +# --- End Message Handler Setup --- + +def get_agent_state_fn(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + current_state = init_state # Initialize if None + + # Check if function_result is None (initial call) + if function_result is None: + return current_state # Return current (likely initial) state + + if function_result.info is not None: + current_state.update(function_result.info) + + # Check if we have completion info + if function_result.info and 'status' in function_result.info and function_result.info['status'] == 'success': + current_state['task_completed'] = True + # If we're marking task as complete, set state to indicate we're done + print("Agent state updated: Task marked as complete.") + + # Add a delay if we just checked status and collection is NOT complete + # Check if the 'collection_complete' key exists in the info dict + # to infer that check_collection_status was likely the last function run. + if function_result.info and \ + "collection_complete" in function_result.info and \ + not function_result.info.get("collection_complete", True): + + wait_time = 5 # Wait for 5 seconds before next check + print(f"Collection not complete. Waiting {wait_time} seconds before next action...") + time.sleep(wait_time) + + return current_state + +def get_worker_state(function_result: FunctionResult, current_state: dict) -> dict: + """Update state based on the function results""" + init_state = {} + + if current_state is None: + return init_state + + if function_result.info is not None: + current_state.update(function_result.info) + + return current_state + +subscription_worker = WorkerConfig( + id="subscription_worker", + worker_description="Worker specialized in managing DPSN topic subscriptions, unsubscriptions, message handling, and shutdown.", + get_state_fn=get_worker_state, + action_space=[ + dpsn_plugin.get_function("subscribe"), + dpsn_plugin.get_function("unsubscribe"), + dpsn_plugin.get_function("shutdown"), + check_status_function, + complete_task_function # Add the task completion function + ], +) + + +# Initialize the agent +agent = Agent( + api_key=os.environ.get("GAME_API_KEY"), + name="DPSN Market Data Agent", + agent_goal="Monitor SOLUSDT market data from DPSN, process messages for 2 minutes, then clean up and exit.", + agent_description=( + "You are an AI agent specialized in DPSN market data processing. Follow these steps in order:\n\n" + + "1. IMMEDIATELY subscribe to this DPSN topic: 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc\n" + + "2. Once subscribed, inform the user that you are listening for market data updates.\n" + + "3. When you receive messages on this topic:\n" + " - Report that you received a message\n" + + "4. After subscribing, periodically use the check_collection_status function to check if:\n" + " - 2 minutes have passed, OR\n" + " - At least 3 messages have been received\n" + + "5. Only when check_collection_status indicates collection is complete:\n" + " - Unsubscribe from the topic using the unsubscribe function\n" + " - Shut down the DPSN client connection using the shutdown function\n" + + "6. FINAL STEP: After shutting down the DPSN connection, call the mark_task_complete function.\n" + " This will finish your task and automatically exit the program after 5 seconds.\n" + " This must be the VERY LAST function you call.\n" + + "IMPORTANT: The program will exit automatically after you mark the task complete.\n" + + "Available functions:\n" + "- subscribe: Subscribe to a DPSN topic to receive data\n" + "- unsubscribe: Unsubscribe from a DPSN topic\n" + "- shutdown: Close the DPSN client connection\n" + "- check_collection_status: Check if collection is complete\n" + "- mark_task_complete: Mark the agent's task as complete and exit the program\n\n" + + "Available topic:\n" + "- 0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" + ), + get_agent_state_fn=get_agent_state_fn, + workers=[ + subscription_worker + ] +) + +try: + print("\n=== DPSN AGENT STARTING ===") + print("This agent will:") + print("1. Subscribe to the SOLUSDT/ohlc topic") + print("2. Collect messages for 2 minutes or at least 3 messages") + print("3. Unsubscribe and clean up") + print("4. Exit automatically\n") + + agent.compile() + agent.run() +except Exception as e: + print(f"Error running agent:{e}") + import traceback + traceback.print_exc() diff --git a/plugins/dpsn/examples/dpsn_worker.py b/plugins/dpsn/examples/dpsn_worker.py new file mode 100644 index 00000000..fc19b31b --- /dev/null +++ b/plugins/dpsn/examples/dpsn_worker.py @@ -0,0 +1,133 @@ +import sys +import os +from dotenv import load_dotenv +from pathlib import Path +from typing import Dict, Any, List +from datetime import datetime +import time # Import time for sleep +load_dotenv() + +# Import FunctionResultStatus to check the status enum +from game_sdk.game.custom_types import FunctionResultStatus +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin + +class DpsnWorker: + """ + DPSN Worker for processing market data and executing trades (Synchronous Version) + """ + def __init__(self): + self.plugin = DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") + ) + self.trades: List[Dict[str, Any]] = [] + self.is_running = False + + def process_message(self, message: Dict[str, Any]): + """Process incoming messages and execute trades""" + topic = message['topic'] + payload = message['payload'] + + # Log the message + print(f"\n Processing message:") + print(f"Topic: {topic}") + print(f"Payload: {payload}") + + # Execute trade if conditions are met (synchronous call) + trade = self.execute_trade(topic, payload) + if trade: + self.trades.append(trade) + print(f"šŸ’¼ Trade executed: {trade}") + + # Example use case of dpsn plugin worker + + def execute_trade(self, topic: str, payload: Dict[str, Any]) -> Dict[str, Any] | None: + """ + Demonstrates a potential use case for reacting to DPSN messages. + + **This is a highly simplified example for demonstration purposes only.** + + It simulates a basic trading decision based on a received price. + It lacks proper error handling, risk management, sophisticated logic, + and integration with actual trading systems. + + **Do NOT use this function in a real-world trading implementation.** + + Args: + topic: The topic the message was received on. + payload: The message payload. + + Returns: + A dictionary representing a trade order if conditions are met, + otherwise None. + """ + # Example trade execution logic (synchronous) + if "SOLUSDT" in topic: + # Ensure payload is a dictionary before accessing keys + if isinstance(payload, dict): + try: + price = float(payload.get('price', 0)) + if price > 100: + + return { + "action": "SELL", + "price": price, + "timestamp": datetime.now().isoformat(), + "status": "EXECUTED" + } + except (ValueError, TypeError) as e: + print(f"Error processing price from payload: {e}") + else: + print(f"Skipping trade execution: Payload is not a dictionary ({type(payload)})") + return None + + def start(self): + """Start the DPSN worker""" + self.plugin.set_message_callback(self.process_message) + + topics = [ + "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ticker", + # Add other topics if needed + ] + + print("Subscribing to topics (will initialize connection if needed)...") + for topic in topics: + result = self.plugin.subscribe(topic) + if result[0] != FunctionResultStatus.DONE: + error_message = result[1] if len(result) > 1 else f"Unknown subscription error for {topic}" + print(f"Failed to subscribe to {topic}: {error_message}") + else: + print(result[1]) # e.g., "Successfully subscribed to topic: ..." + + self.is_running = True + print("DPSN Worker Started") + + def stop(self): + """Stop the DPSN worker""" + print("Stopping DPSN Worker...") + self.is_running = False + # Consider unsubscribing from topics here if necessary + # for topic in topics: self.plugin.unsubscribe(topic) + self.plugin.shutdown() + print("DPSN Worker Stopped") + +def main(): + worker = DpsnWorker() + try: + worker.start() + print("Worker running... Press Ctrl+C to stop.") + # Keep the worker running using time.sleep + while worker.is_running: + time.sleep(1) # Check status every second + except KeyboardInterrupt: + print("\nCtrl+C detected. Shutting down worker...") + except Exception as e: + print(f"An error occurred: {e}") + finally: + if worker.is_running: + worker.stop() + +if __name__ == "__main__": + # Run main directly without asyncio + main() + \ No newline at end of file diff --git a/plugins/dpsn/examples/test_dpsn_game_functions.py b/plugins/dpsn/examples/test_dpsn_game_functions.py new file mode 100644 index 00000000..1da10366 --- /dev/null +++ b/plugins/dpsn/examples/test_dpsn_game_functions.py @@ -0,0 +1,98 @@ +import sys +import os +from pathlib import Path +import time +from datetime import datetime +from game_sdk.game.custom_types import Function, Argument, FunctionResultStatus +from dotenv import load_dotenv +load_dotenv() + + + +from dpsn_plugin_gamesdk.dpsn_plugin import DpsnPlugin + +dpsn_plugin=DpsnPlugin( + dpsn_url=os.getenv("DPSN_URL"), + pvt_key=os.getenv("PVT_KEY") +) + +def test_dpsn_connection(): + """Test DPSN connection and basic functionality""" + print("\nšŸ”„ Testing DPSN Connection...") + + + # Wait for connection to stabilize + time.sleep(1) + print("āœ… DPSN initialized successfully") + return True + +def test_subscribe_and_receive(): + """Test subscribing to topics and receiving messages""" + print("\nšŸ”„ Testing Subscription and Message Reception...") + + # Define message handler + def handle_message(message_data): + topic = message_data['topic'] + payload = message_data['payload'] + print(f"Received message on {topic}: {payload}") + + # Set the callback + dpsn_plugin.set_message_callback(handle_message) + + # Test topic + topic = "0xe14768a6d8798e4390ec4cb8a4c991202c2115a5cd7a6c0a7ababcaf93b4d2d4/SOLUSDT/ohlc" + + print(f"Subscribing to topic: {topic}") + result = dpsn_plugin.subscribe(topic) + if not result["success"]: + print(f"āŒ Failed to subscribe to topic: {result.get('error')}") + return False + + print("Subscription successful!") + print("\nWaiting for messages... (Press Ctrl+C to exit)") + + try: + while True: + if not dpsn_plugin.client.dpsn_broker.is_connected(): + print("Connection lost, attempting to reconnect...") + dpsn_plugin.initialize() + time.sleep(1) + dpsn_plugin.subscribe(topic) + time.sleep(1) + + except KeyboardInterrupt: + print("\nāš ļø Test interrupted by user") + return True + +def test_shutdown(): + """Test graceful shutdown""" + print("\nšŸ”„ Testing Shutdown...") + + status,message,extra = dpsn_plugin.shutdown() + if status is not FunctionResultStatus.DONE: + print(f"āŒ Failed to shutdown") + return False + + print("āœ… Shutdown successful") + return True + +def main(): + """Main test function""" + print("šŸš€ Starting DPSN Plugin Tests...") + + try: + # Test connection + if not test_dpsn_connection(): + return + + if not test_subscribe_and_receive(): + return + + except Exception as e: + print(f"\nāŒ Test failed with error: {str(e)}") + finally: + # Ensure we shutdown properly + test_shutdown() + +if __name__ == "__main__": + main() diff --git a/plugins/dpsn/plugin_metadata.yml b/plugins/dpsn/plugin_metadata.yml new file mode 100644 index 00000000..75dfeac6 --- /dev/null +++ b/plugins/dpsn/plugin_metadata.yml @@ -0,0 +1,14 @@ +# General Information +plugin_name: 'dpsn_plugin' +author: 'DPSN-Tech-Team' +logo_url: '' +release_date: '2025-04' + +# Description +short_description: 'DPSN plugin to subscribe to dpsn data streams for Virtuals Protocol agents' +detailed_description: 'This plugin allows Virtuals Protocol agents in Python to connect to and interact with real-time data streams from the DPSN Data Streams Store. Agents can use it for decision-making and event handling. Developers can also publish custom streams using the dpsn-python-client.' + +# Contact & Support +x_account_handle: '@DPSN_org' +support_contact: 'sanil@dpsn.org' +community_link: 'https://t.me/dpsn_dev' diff --git a/plugins/dpsn/pyproject.toml b/plugins/dpsn/pyproject.toml new file mode 100644 index 00000000..28b24d33 --- /dev/null +++ b/plugins/dpsn/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "dpsn-plugin-gamesdk" +version = "0.0.1" +description = "DPSN Plugin for Python SDK for GAME by Virtuals" +authors = [{ name = "dpsn-dev", email = "sanil@dpsn.org" }] +readme = "README.md" +requires-python = ">=3.9" +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Topic :: Software Development :: Libraries :: Python Modules", +] +dependencies = ["dpsn-client==1.0.0.post1", "game-sdk>=0.1.1"] +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" + +[project.urls] +"Homepage" = "https://github.com/game-by-virtuals/game-python/plugins/dpsn" +"Bug Tracker" = "https://github.com/game-by-virtuals/game-python"