diff --git a/hummingbot/__init__.py b/hummingbot/__init__.py index 012045b6a05..4a6116aaa50 100644 --- a/hummingbot/__init__.py +++ b/hummingbot/__init__.py @@ -18,7 +18,6 @@ # Do not raise exceptions during log handling logging.setLogRecordFactory(StructLogRecord) logging.setLoggerClass(StructLogger) - _shared_executor = None _data_path = None _cert_path = None diff --git a/hummingbot/client/config/client_config_map.py b/hummingbot/client/config/client_config_map.py index 7e745d0c31c..852e167236b 100644 --- a/hummingbot/client/config/client_config_map.py +++ b/hummingbot/client/config/client_config_map.py @@ -658,6 +658,11 @@ class HyperliquidPerpetualRateSourceMode(ExchangeRateSourceModeBase): model_config = ConfigDict(title="hyperliquid_perpetual") +class BluefinPerpetualRateSourceMode(ExchangeRateSourceModeBase): + name: str = Field(default="bluefin_perpetual") + model_config = ConfigDict(title="bluefin_perpetual") + + class DeriveRateSourceMode(ExchangeRateSourceModeBase): name: str = Field(default="derive") model_config = ConfigDict(title="derive") @@ -666,6 +671,7 @@ class DeriveRateSourceMode(ExchangeRateSourceModeBase): RATE_SOURCE_MODES = { AscendExRateSourceMode.model_config["title"]: AscendExRateSourceMode, BinanceRateSourceMode.model_config["title"]: BinanceRateSourceMode, + BluefinPerpetualRateSourceMode.model_config["title"]: BluefinPerpetualRateSourceMode, CoinGeckoRateSourceMode.model_config["title"]: CoinGeckoRateSourceMode, CoinCapRateSourceMode.model_config["title"]: CoinCapRateSourceMode, DexalotRateSourceMode.model_config["title"]: DexalotRateSourceMode, diff --git a/hummingbot/connector/derivative/bluefin_perpetual/__init__.py b/hummingbot/connector/derivative/bluefin_perpetual/__init__.py new file mode 100644 index 00000000000..6bf71f99b86 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/__init__.py @@ -0,0 +1 @@ +# Bluefin Perpetual connector for Hummingbot diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_api_order_book_data_source.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_api_order_book_data_source.py new file mode 100644 index 00000000000..9546ddfe144 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_api_order_book_data_source.py @@ -0,0 +1,271 @@ +""" +Order book data source for Bluefin Perpetual connector. + +This data source consumes market events from the Bluefin SDK wrapper and +adapts them to Hummingbot order book and funding info messages. +""" +import asyncio +import time +from decimal import Decimal +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from hummingbot.connector.derivative.bluefin_perpetual import bluefin_perpetual_constants as CONSTANTS +from hummingbot.connector.derivative.bluefin_perpetual.data_sources.bluefin_data_source import BluefinDataSource +from hummingbot.core.data_type.common import TradeType +from hummingbot.core.data_type.funding_info import FundingInfo, FundingInfoUpdate +from hummingbot.core.data_type.order_book_message import OrderBookMessage, OrderBookMessageType +from hummingbot.core.data_type.perpetual_api_order_book_data_source import PerpetualAPIOrderBookDataSource +from hummingbot.logger import HummingbotLogger + +if TYPE_CHECKING: + from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative import ( + BluefinPerpetualDerivative, + ) + + +class BluefinPerpetualAPIOrderBookDataSource(PerpetualAPIOrderBookDataSource): + """Order book data source for Bluefin Perpetual.""" + + _logger: Optional[HummingbotLogger] = None + + def __init__( + self, + trading_pairs: List[str], + connector: "BluefinPerpetualDerivative", + data_source: BluefinDataSource, + domain: str = CONSTANTS.DOMAIN, + ): + super().__init__(trading_pairs) + self._connector = connector + self._data_source = data_source + self._domain = domain + + # Funding info cache keyed by HB trading pair + self._last_funding_info: Dict[str, FundingInfo] = {} + self._last_oracle_prices: Dict[str, Decimal] = {} + self._last_mark_prices: Dict[str, Decimal] = {} + + async def get_last_traded_prices( + self, trading_pairs: List[str], domain: Optional[str] = None + ) -> Dict[str, float]: + del domain + prices: Dict[str, float] = {} + for trading_pair in trading_pairs: + ticker = await self._data_source.get_market_ticker(trading_pair) + prices[trading_pair] = float(self._data_source.from_e9(getattr(ticker, "last_trade_price_e9", "0"))) + return prices + + async def get_funding_info(self, trading_pair: str) -> FundingInfo: + try: + ticker = await self._data_source.get_market_ticker(trading_pair) + funding_info = FundingInfo( + trading_pair=trading_pair, + index_price=self._data_source.from_e9(getattr(ticker, "oracle_price_e9", "0")), + mark_price=self._data_source.from_e9(getattr(ticker, "mark_price_e9", "0")), + next_funding_utc_timestamp=self._next_funding_time(), + rate=self._data_source.from_e9(getattr(ticker, "predicted_funding_rate_e9", "0")), + ) + self._last_funding_info[trading_pair] = funding_info + return funding_info + except (AttributeError, TypeError, ValueError, ArithmeticError): + return self._last_funding_info.get( + trading_pair, + FundingInfo( + trading_pair=trading_pair, + index_price=Decimal("0"), + mark_price=Decimal("0"), + next_funding_utc_timestamp=self._next_funding_time(), + rate=Decimal("0"), + ), + ) + + async def listen_for_funding_info(self, output: asyncio.Queue[Any]): + while True: + try: + event = await self._data_source.get_market_funding_event() + await self._parse_funding_info_message(raw_message=event, message_queue=output) + except (AttributeError, TypeError, ValueError): + self.logger().exception("Error processing funding info from Bluefin stream") + await asyncio.sleep(5) + + async def _parse_funding_info_message(self, raw_message: Any, message_queue: asyncio.Queue[Any]): + symbol = self._data_source.bluefin_to_hb_symbol(getattr(raw_message, "symbol", "")) + if not symbol: + return + + oracle_price_e9 = getattr(raw_message, "oracle_price_e9", None) + mark_price_e9 = getattr(raw_message, "mark_price_e9", None) + + if oracle_price_e9 is not None: + self._last_oracle_prices[symbol] = self._data_source.from_e9(oracle_price_e9) + if mark_price_e9 is not None: + self._last_mark_prices[symbol] = self._data_source.from_e9(mark_price_e9) + + if symbol in self._last_oracle_prices and symbol in self._last_mark_prices: + funding_info = FundingInfo( + trading_pair=symbol, + index_price=self._last_oracle_prices[symbol], + mark_price=self._last_mark_prices[symbol], + next_funding_utc_timestamp=self._next_funding_time(), + rate=Decimal("0"), + ) + self._last_funding_info[symbol] = funding_info + message_queue.put_nowait( + FundingInfoUpdate( + trading_pair=symbol, + index_price=funding_info.index_price, + mark_price=funding_info.mark_price, + next_funding_utc_timestamp=funding_info.next_funding_utc_timestamp, + rate=funding_info.rate, + ) + ) + + async def _parse_trade_message(self, raw_message: Any, message_queue: asyncio.Queue[Any]): + trades = getattr(raw_message, "trades", None) + if not trades: + return + + for trade in trades: + trading_pair = self._data_source.bluefin_to_hb_symbol(getattr(trade, "symbol", "")) + if not trading_pair: + continue + + side = str(getattr(getattr(trade, "side", None), "value", getattr(trade, "side", ""))).upper() + trade_type = float(TradeType.BUY.value) if side == "LONG" else float(TradeType.SELL.value) + price = self._data_source.from_e9(getattr(trade, "price_e9", "0")) + amount = self._data_source.from_e9(getattr(trade, "quantity_e9", "0")) + ts_ms = int(getattr(trade, "executed_at_millis", int(time.time() * 1000))) + + message_queue.put_nowait( + OrderBookMessage( + message_type=OrderBookMessageType.TRADE, + content={ + "trade_id": str(getattr(trade, "id", ts_ms)), + "trading_pair": trading_pair, + "trade_type": trade_type, + "amount": str(amount), + "price": str(price), + }, + timestamp=ts_ms * 1e-3, + ) + ) + + async def _parse_order_book_diff_message(self, raw_message: Any, message_queue: asyncio.Queue[Any]): + bids_e9 = getattr(raw_message, "bids_e9", None) + asks_e9 = getattr(raw_message, "asks_e9", None) + symbol = getattr(raw_message, "symbol", None) + if bids_e9 is None or asks_e9 is None or symbol is None: + return + + trading_pair = self._data_source.bluefin_to_hb_symbol(symbol) + update_id = int(getattr(raw_message, "last_update_id", getattr(raw_message, "updated_at_millis", 0))) + timestamp_ms = int(getattr(raw_message, "updated_at_millis", int(time.time() * 1000))) + + message_queue.put_nowait( + OrderBookMessage( + OrderBookMessageType.DIFF, + { + "trading_pair": trading_pair, + "update_id": update_id, + "bids": self._convert_levels_from_e9(bids_e9), + "asks": self._convert_levels_from_e9(asks_e9), + }, + timestamp=timestamp_ms * 1e-3, + ) + ) + + async def _parse_order_book_snapshot_message(self, raw_message: Any, message_queue: asyncio.Queue[Any]): + bids_e9 = getattr(raw_message, "bids_e9", None) + asks_e9 = getattr(raw_message, "asks_e9", None) + symbol = getattr(raw_message, "symbol", None) + if bids_e9 is None or asks_e9 is None or symbol is None: + return + + trading_pair = self._data_source.bluefin_to_hb_symbol(symbol) + update_id = int(getattr(raw_message, "orderbook_update_id", getattr(raw_message, "updated_at_millis", 0))) + timestamp_ms = int(getattr(raw_message, "updated_at_millis", int(time.time() * 1000))) + + message_queue.put_nowait( + OrderBookMessage( + OrderBookMessageType.SNAPSHOT, + { + "trading_pair": trading_pair, + "update_id": update_id, + "bids": self._convert_levels_from_e9(bids_e9), + "asks": self._convert_levels_from_e9(asks_e9), + }, + timestamp=timestamp_ms * 1e-3, + ) + ) + + async def _request_order_book_snapshot(self, trading_pair: str) -> Any: + return await self._data_source.get_orderbook(trading_pair) + + async def _order_book_snapshot(self, trading_pair: str) -> OrderBookMessage: + snapshot_response = await self._request_order_book_snapshot(trading_pair) + update_id = int(getattr(snapshot_response, "last_update_id", getattr(snapshot_response, "updated_at_millis", 0))) + timestamp_ms = int(getattr(snapshot_response, "updated_at_millis", int(time.time() * 1000))) + + return OrderBookMessage( + OrderBookMessageType.SNAPSHOT, + { + "trading_pair": trading_pair, + "update_id": update_id, + "bids": self._convert_levels_from_e9(getattr(snapshot_response, "bids_e9", [])), + "asks": self._convert_levels_from_e9(getattr(snapshot_response, "asks_e9", [])), + }, + timestamp=timestamp_ms * 1e-3, + ) + + async def listen_for_order_book_diffs(self, ev_loop: asyncio.AbstractEventLoop, output: asyncio.Queue[Any]): + while True: + try: + event = await self._data_source.get_market_order_book_event() + event_name = type(event).__name__ + if event_name == "OrderbookDiffDepthUpdate": + await self._parse_order_book_diff_message(event, output) + elif event_name == "OrderbookPartialDepthUpdate": + await self._parse_order_book_snapshot_message(event, output) + except (AttributeError, TypeError, ValueError): + self.logger().exception("Error processing order book/trade updates from Bluefin stream") + await asyncio.sleep(5) + + async def listen_for_trades(self, ev_loop: asyncio.AbstractEventLoop, output: asyncio.Queue[Any]): + while True: + try: + event = await self._data_source.get_market_trade_event() + await self._parse_trade_message(event, output) + except (AttributeError, TypeError, ValueError): + self.logger().exception("Error processing trade updates from Bluefin stream") + await asyncio.sleep(5) + + async def listen_for_order_book_snapshots(self, ev_loop: asyncio.AbstractEventLoop, output: asyncio.Queue[Any]): + while True: + await asyncio.sleep(60) + + async def subscribe_to_trading_pair(self, trading_pair: str) -> bool: + if trading_pair not in self._trading_pairs: + self._trading_pairs.append(trading_pair) + # The underlying SDK listener currently uses connection-level subscription setup. + return True + + async def unsubscribe_from_trading_pair(self, trading_pair: str) -> bool: + if trading_pair in self._trading_pairs: + self._trading_pairs.remove(trading_pair) + # The SDK listener does not expose per-pair unsubscribe yet. + return True + + def _next_funding_time(self) -> int: + current_time = int(time.time()) + return ((current_time // 3600) + 1) * 3600 + + def _convert_levels_from_e9(self, levels_e9: List[List[str]]) -> List[List[Decimal]]: + converted: List[List[Decimal]] = [] + for level in levels_e9: + if len(level) < 2: + continue + converted.append([ + self._data_source.from_e9(level[0]), + self._data_source.from_e9(level[1]), + ]) + return converted diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_auth.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_auth.py new file mode 100644 index 00000000000..f19208d9cd1 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_auth.py @@ -0,0 +1,79 @@ +""" +Authentication module for Bluefin Perpetual connector. + +The Bluefin SDK handles authentication internally via JWT tokens. +This AuthBase implementation serves as a thin wrapper to integrate +with hummingbot's authentication system. +""" +from typing import Dict, Any + +from hummingbot.core.web_assistant.auth import AuthBase +from hummingbot.core.web_assistant.connections.data_types import RESTRequest, WSRequest + + +class BluefinPerpetualAuth(AuthBase): + """ + Authentication class for Bluefin Perpetual API. + + The Bluefin SDK manages authentication internally using: + 1. SuiWallet (created from mnemonic) + 2. JWT token obtained via signature-based login + 3. Automatic token refresh + + This class stores the credentials and provides them to the SDK data source. + """ + + def __init__(self, wallet_mnemonic: str, network: str): + """ + Initialize Bluefin authentication. + + :param wallet_mnemonic: 24-word mnemonic phrase + :param network: Network name ("MAINNET" or "STAGING") + """ + self._wallet_mnemonic = wallet_mnemonic + self._network = network + + @property + def wallet_mnemonic(self) -> str: + """Get wallet mnemonic.""" + return self._wallet_mnemonic + + @property + def network(self) -> str: + """Get network name.""" + return self._network + + async def rest_authenticate(self, request: RESTRequest) -> RESTRequest: + """ + Add authentication to REST request. + + Note: The Bluefin SDK handles REST authentication internally via JWT. + This method is a no-op for compatibility with hummingbot's auth system. + + :param request: REST request + :return: Unchanged request + """ + return request + + async def ws_authenticate(self, request: WSRequest) -> WSRequest: + """ + Add authentication to WebSocket request. + + Note: The Bluefin SDK handles WebSocket authentication internally via JWT. + This method is a no-op for compatibility with hummingbot's auth system. + + :param request: WebSocket request + :return: Unchanged request + """ + return request + + def get_headers(self) -> Dict[str, Any]: + """ + Get authentication headers. + + Note: The Bluefin SDK manages JWT headers internally. + Returns empty dict for compatibility. + + :return: Empty dict + """ + return {} diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_constants.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_constants.py new file mode 100644 index 00000000000..31b71659651 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_constants.py @@ -0,0 +1,66 @@ +from hummingbot.core.api_throttler.data_types import RateLimit +from hummingbot.core.data_type.in_flight_order import OrderState + +EXCHANGE_NAME = "bluefin_perpetual" +MAX_ORDER_ID_LEN = None + +# Domains +DOMAIN = EXCHANGE_NAME +STAGING_DOMAIN = "bluefin_perpetual_staging" + +# Environment names used in SDK +MAINNET_ENV_NAME = "sui-prod" +STAGING_ENV_NAME = "sui-staging" + +# Base URLs - constructed from environment name +# For mainnet (sui-prod): +# Auth: https://auth.api.sui-prod.bluefin.io +# API: https://api.sui-prod.bluefin.io +# Trade: https://trade.api.sui-prod.bluefin.io +# Account WS: wss://stream.api.sui-prod.bluefin.io/ws/account +# Market WS: wss://stream.api.sui-prod.bluefin.io/ws/market +# For staging (sui-staging): +# Auth: https://auth.api.sui-staging.bluefin.io +# API: https://api.sui-staging.bluefin.io +# Trade: https://trade.api.sui-staging.bluefin.io +# Account WS: wss://stream.api.sui-staging.bluefin.io/ws/account +# Market WS: wss://stream.api.sui-staging.bluefin.io/ws/market + +def get_rest_url_for_env(env_name: str, service: str = "api") -> str: + """Get REST base URL for a given environment and service.""" + return f"https://{service}.api.{env_name}.bluefin.io" + +def get_ws_url_for_env(env_name: str, stream_type: str = "market") -> str: + """Get WebSocket URL for a given environment and stream type.""" + return f"wss://stream.api.{env_name}.bluefin.io/ws/{stream_type}" + +# Funding rate update interval +FUNDING_RATE_UPDATE_INTERVAL_SECOND = 3600 # Hourly + +# Collateral currency +CURRENCY = "USDC" + +# Order state mapping from Bluefin OrderStatus to Hummingbot OrderState +ORDER_STATE = { + "STANDBY": OrderState.OPEN, + "OPEN": OrderState.OPEN, + "PARTIALLY_FILLED_OPEN": OrderState.PARTIALLY_FILLED, + "PARTIALLY_FILLED_CANCELED": OrderState.CANCELED, + "FILLED": OrderState.FILLED, + "CANCELLED": OrderState.CANCELED, + "EXPIRED": OrderState.CANCELED, + "PARTIALLY_FILLED_EXPIRED": OrderState.CANCELED, + "UNSPECIFIED": OrderState.FAILED, +} + +# Heartbeat +HEARTBEAT_TIME_INTERVAL = 30.0 + +# Rate Limits +# Based on typical exchange limits - conservative estimate +MAX_REQUEST = 1200 +ALL_ENDPOINTS_LIMIT = "All" + +RATE_LIMITS = [ + RateLimit(ALL_ENDPOINTS_LIMIT, limit=MAX_REQUEST, time_interval=60), +] diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_derivative.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_derivative.py new file mode 100644 index 00000000000..b252c403160 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_derivative.py @@ -0,0 +1,878 @@ +""" +Bluefin Perpetual derivative connector. + +Main connector class that implements PerpetualDerivativePyBase for Bluefin. +""" +# pyright: reportMissingTypeStubs=false, reportMissingImports=false + +import asyncio +import time +from decimal import Decimal +from typing import Any, AsyncIterable, Dict, List, Optional, Tuple + +from bidict import bidict + +from hummingbot.connector.constants import s_decimal_NaN +from hummingbot.connector.derivative.bluefin_perpetual import ( + bluefin_perpetual_constants as CONSTANTS, + bluefin_perpetual_web_utils as web_utils, +) +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_api_order_book_data_source import ( + BluefinPerpetualAPIOrderBookDataSource, +) +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_auth import BluefinPerpetualAuth +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_user_stream_data_source import ( + BluefinPerpetualUserStreamDataSource, +) +from hummingbot.connector.derivative.bluefin_perpetual.data_sources.bluefin_data_source import BluefinDataSource +from hummingbot.connector.derivative.position import Position +from hummingbot.connector.perpetual_derivative_py_base import PerpetualDerivativePyBase +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.connector.utils import combine_to_hb_trading_pair +from hummingbot.core.api_throttler.data_types import RateLimit +from hummingbot.core.data_type.common import OrderType, PositionAction, PositionMode, PositionSide, TradeType +from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderState, OrderUpdate, TradeUpdate +from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource +from hummingbot.core.data_type.trade_fee import TokenAmount, TradeFeeBase +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.core.utils.estimate_fee import build_trade_fee +from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory + +# Bluefin SDK imports +try: + from bluefin_pro_sdk import Order + from openapi_client import ( + OrderType as BluefinOrderType, + OrderSide as BluefinOrderSide, + OrderTimeInForce, + SelfTradePreventionType, + ) +except ImportError: + Order = None + BluefinOrderType = None + BluefinOrderSide = None + OrderTimeInForce = None + SelfTradePreventionType = None + + +class BluefinPerpetualDerivative(PerpetualDerivativePyBase): + """Bluefin Perpetual derivative connector.""" + + web_utils = web_utils + + SHORT_POLL_INTERVAL = 5.0 + LONG_POLL_INTERVAL = 120.0 + + def __init__( + self, + bluefin_perpetual_wallet_mnemonic: str, + bluefin_perpetual_network: str = "MAINNET", + trading_pairs: Optional[List[str]] = None, + trading_required: bool = True, + balance_asset_limit: Optional[Dict[str, Dict[str, Decimal]]] = None, + rate_limits_share_pct: Decimal = Decimal("100"), + ): + """ + Initialize Bluefin Perpetual connector. + + :param bluefin_perpetual_wallet_mnemonic: 24-word mnemonic + :param bluefin_perpetual_network: Network ("MAINNET" or "STAGING") + :param trading_pairs: List of trading pairs to trade + :param trading_required: Whether trading is required + :param balance_asset_limit: Balance limits + :param rate_limits_share_pct: Rate limit sharing percentage + """ + self._wallet_mnemonic = bluefin_perpetual_wallet_mnemonic + self._network = bluefin_perpetual_network + self._trading_required = trading_required + self._trading_pairs = trading_pairs or [] + self._domain = CONSTANTS.DOMAIN if bluefin_perpetual_network == "MAINNET" else CONSTANTS.STAGING_DOMAIN + self._is_starting_network = False + + # SDK wrapper is initialized lazily and reused by all connector components + self._data_source: BluefinDataSource = BluefinDataSource( + wallet_mnemonic=self._wallet_mnemonic, + network=self._network, + debug=False, + ) + + super().__init__(balance_asset_limit, rate_limits_share_pct) + + @property + def name(self) -> str: + """Get connector name.""" + return self._domain + + @property + def authenticator(self) -> Optional[BluefinPerpetualAuth]: + """Get authenticator.""" + if self._trading_required: + return BluefinPerpetualAuth( + wallet_mnemonic=self._wallet_mnemonic, + network=self._network + ) + return None + + @property + def rate_limits_rules(self) -> List[RateLimit]: + """Get rate limit rules.""" + return CONSTANTS.RATE_LIMITS + + @property + def domain(self) -> str: + """Get domain.""" + return self._domain + + @property + def client_order_id_max_length(self) -> int: + """Get max client order ID length.""" + return CONSTANTS.MAX_ORDER_ID_LEN or 64 + + @property + def client_order_id_prefix(self) -> str: + """Get client order ID prefix.""" + return "" + + @property + def trading_rules_request_path(self) -> str: + """Get trading rules request path.""" + return "" # Not used - SDK handles this + + @property + def trading_pairs_request_path(self) -> str: + """Get trading pairs request path.""" + return "" # Not used - SDK handles this + + @property + def check_network_request_path(self) -> str: + """Get check network request path.""" + return "" # Not used - SDK handles this + + @property + def funding_fee_poll_interval(self) -> int: + """Get funding fee poll interval in seconds.""" + return CONSTANTS.FUNDING_RATE_UPDATE_INTERVAL_SECOND + + @property + def trading_pairs(self) -> List[str]: + """Get trading pairs.""" + return self._trading_pairs + + @property + def is_cancel_request_in_exchange_synchronous(self) -> bool: + """Check if cancel request is synchronous.""" + return False # Async via SDK + + @property + def is_trading_required(self) -> bool: + """Check if trading is required.""" + return self._trading_required + + def supported_position_modes(self) -> List[PositionMode]: + """Get supported position modes.""" + # Bluefin only supports ONEWAY position mode + return [PositionMode.ONEWAY] + + def supported_order_types(self) -> List[OrderType]: + """ + :return: a list of OrderType supported by this connector + """ + return [OrderType.LIMIT, OrderType.MARKET, OrderType.LIMIT_MAKER] + + def get_buy_collateral_token(self, trading_pair: str) -> str: + """Get buy collateral token.""" + return CONSTANTS.CURRENCY # Always USDC + + def get_sell_collateral_token(self, trading_pair: str) -> str: + """Get sell collateral token.""" + return CONSTANTS.CURRENCY # Always USDC + + def _is_request_exception_related_to_time_synchronizer(self, request_exception: Exception) -> bool: + """Check if exception is related to time synchronizer.""" + return False # SDK handles time synchronization + + def _create_web_assistants_factory(self) -> WebAssistantsFactory: + """Create web assistants factory.""" + return web_utils.build_api_factory( + throttler=self._throttler, + auth=self._auth + ) + + def _create_order_book_data_source(self) -> OrderBookTrackerDataSource: + """Create order book data source.""" + return BluefinPerpetualAPIOrderBookDataSource( + trading_pairs=self._trading_pairs, + connector=self, + data_source=self._data_source, + domain=self._domain + ) + + def _create_user_stream_data_source(self) -> UserStreamTrackerDataSource: + """Create user stream data source.""" + return BluefinPerpetualUserStreamDataSource( + trading_pairs=self._trading_pairs, + connector=self, + data_source=self._data_source, + domain=self._domain + ) + + async def start_network(self): + """Start connector services and Bluefin SDK connections.""" + self._is_starting_network = True + try: + await self._ensure_data_source_started() + await self._data_source.create_market_data_stream(self._trading_pairs) + await self._data_source.create_account_data_stream() + await super().start_network() + finally: + self._is_starting_network = False + + async def stop_network(self): + """Stop connector services and Bluefin SDK connections.""" + await super().stop_network() + if not self._is_starting_network and self._data_source.is_initialized: + await self._data_source.shutdown() + + async def _ensure_data_source_started(self): + if not self._data_source.is_initialized: + await self._data_source.initialize() + + async def _make_trading_rules_request(self) -> Any: + """Make trading rules request.""" + await self._ensure_data_source_started() + return await self._data_source.get_exchange_info() + + async def _make_trading_pairs_request(self) -> Any: + """Make trading pairs request.""" + await self._ensure_data_source_started() + return await self._data_source.get_exchange_info() + + async def _update_trading_rules(self): + """Update trading rules from exchange.""" + exchange_info = await self._make_trading_rules_request() + trading_rules_list = await self._format_trading_rules(exchange_info) + self._trading_rules.clear() + for trading_rule in trading_rules_list: + self._trading_rules[trading_rule.trading_pair] = trading_rule + + async def _format_trading_rules(self, exchange_info_dict: Any) -> List[TradingRule]: + """ + Format trading rules from exchange info. + + :param exchange_info_dict: Exchange info response + :return: List of TradingRule objects + """ + trading_rules: List[TradingRule] = [] + + for market in exchange_info_dict.markets: + try: + # Convert Bluefin symbol to hummingbot symbol + bluefin_symbol = market.symbol + trading_pair = self._data_source.bluefin_to_hb_symbol(bluefin_symbol) + + trading_rules.append( + TradingRule( + trading_pair=trading_pair, + min_order_size=self._data_source.from_e9(market.min_trade_quantity_e9), + min_price_increment=self._data_source.from_e9(market.tick_size_e9), + min_base_amount_increment=self._data_source.from_e9(market.step_size_e9), + buy_order_collateral_token=CONSTANTS.CURRENCY, + sell_order_collateral_token=CONSTANTS.CURRENCY, + ) + ) + except (AttributeError, TypeError, ValueError, KeyError): + self.logger().exception("Error parsing trading rule for %s. Skipping.", getattr(market, "symbol", "unknown")) + + return trading_rules + + def _is_order_not_found_during_status_update_error(self, status_update_exception: Exception) -> bool: + """Check if error is order not found.""" + return "not found" in str(status_update_exception).lower() + + def _is_order_not_found_during_cancelation_error(self, cancelation_exception: Exception) -> bool: + """Check if cancellation error is order not found.""" + return "not found" in str(cancelation_exception).lower() + + async def _place_order( + self, + order_id: str, + trading_pair: str, + amount: Decimal, + trade_type: TradeType, + order_type: OrderType, + price: Decimal, + position_action: PositionAction = PositionAction.NIL, + **kwargs: Any, + ) -> Tuple[str, float]: + """ + Place an order. + + :param order_id: Client order ID + :param trading_pair: Trading pair + :param amount: Order amount + :param trade_type: BUY or SELL + :param order_type: LIMIT or MARKET + :param price: Order price + :param position_action: Position action + :return: Tuple of (exchange_order_id, timestamp) + """ + del kwargs + + if any(v is None for v in (Order, BluefinOrderType, BluefinOrderSide, OrderTimeInForce, SelfTradePreventionType)): + raise RuntimeError("Bluefin SDK models are unavailable. Verify bluefin_pro_sdk installation.") + + # Convert hummingbot order type to Bluefin order type + is_post_only = False + if order_type == OrderType.LIMIT: + bluefin_order_type = BluefinOrderType.LIMIT + elif order_type == OrderType.LIMIT_MAKER: + bluefin_order_type = BluefinOrderType.LIMIT + is_post_only = True + elif order_type == OrderType.MARKET: + bluefin_order_type = BluefinOrderType.MARKET + else: + raise ValueError(f"Unsupported order type: {order_type}") + + # Convert trade type to Bluefin order side + bluefin_side = BluefinOrderSide.LONG if trade_type == TradeType.BUY else BluefinOrderSide.SHORT + + await self._ensure_data_source_started() + + # Quantize price and amount + quantized_price = self.quantize_order_price(trading_pair, price) + quantized_amount = self.quantize_order_amount(trading_pair, amount) + + # Create order + order = Order( + client_order_id=order_id, + type=bluefin_order_type, + symbol=trading_pair, # Will be converted to Bluefin symbol in data_source + price_e9=self._data_source.to_e9(quantized_price), + quantity_e9=self._data_source.to_e9(quantized_amount), + side=bluefin_side, + leverage_e9=self._data_source.to_e9(Decimal("1")), # Default 1x leverage + is_isolated=False, # Cross margin by default + expires_at_millis=int(time.time() * 1000) + 120000, # 2 minute expiry + reduce_only=position_action == PositionAction.CLOSE, + post_only=is_post_only, + time_in_force=OrderTimeInForce.GTT, + self_trade_prevention_type=SelfTradePreventionType.TAKER, + ) + + # Place order via SDK with retry logic + max_retries = 3 + retry_delay = 1.0 + last_exception = None + + for attempt in range(max_retries): + try: + response = await self._data_source.place_order(order) + return response.order_hash, time.time() + except Exception as e: + last_exception = e + if attempt < max_retries - 1: + self.logger().warning( + f"Order placement failed (attempt {attempt + 1}/{max_retries}): {str(e)}. " + f"Retrying in {retry_delay}s..." + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + self.logger().error( + f"Order placement failed after {max_retries} attempts: {str(e)}", + exc_info=True + ) + + # If all retries failed, raise the last exception + raise last_exception or RuntimeError("Order placement failed with unknown error") + + async def _place_cancel(self, order_id: str, tracked_order: InFlightOrder): + """ + Cancel an order with retry logic. + + :param order_id: Client order ID + :param tracked_order: Tracked order + """ + await self._ensure_data_source_started() + exchange_order_id = tracked_order.exchange_order_id + + if exchange_order_id is None: + self.logger().warning(f"Order {order_id} has no exchange order ID. Cannot cancel.") + return + + max_retries = 3 + retry_delay = 0.5 + last_exception = None + + for attempt in range(max_retries): + try: + await self._data_source.cancel_order( + symbol=tracked_order.trading_pair, + order_hash=exchange_order_id + ) + return + except Exception as e: + last_exception = e + # Check if error is "order not found" - this is acceptable + if self._is_order_not_found_during_cancelation_error(e): + self.logger().info(f"Order {order_id} already cancelled or filled") + return + + if attempt < max_retries - 1: + self.logger().warning( + f"Order cancellation failed (attempt {attempt + 1}/{max_retries}): {str(e)}. " + f"Retrying in {retry_delay}s..." + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 + else: + self.logger().error( + f"Order cancellation failed after {max_retries} attempts: {str(e)}", + exc_info=True + ) + + # If all retries failed, raise the last exception + raise last_exception or RuntimeError("Order cancellation failed with unknown error") + + async def _update_balances(self): + """Update account balances with retry logic.""" + await self._ensure_data_source_started() + + max_retries = 2 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + account = await self._data_source.get_account() + + self._account_balances[CONSTANTS.CURRENCY] = Decimal("0") + self._account_available_balances[CONSTANTS.CURRENCY] = Decimal("0") + + for asset in getattr(account, "assets", []): + symbol = getattr(asset, "symbol", None) + if symbol is None: + continue + total_balance = self._data_source.from_e9(getattr(asset, "quantity_e9", "0")) + available_balance = self._data_source.from_e9(getattr(asset, "max_withdraw_quantity_e9", "0")) + self._account_balances[symbol] = total_balance + self._account_available_balances[symbol] = available_balance + + return # Success + + except Exception as e: + if attempt < max_retries - 1: + self.logger().warning( + f"Balance update failed (attempt {attempt + 1}/{max_retries}): {str(e)}. " + f"Retrying in {retry_delay}s..." + ) + await asyncio.sleep(retry_delay) + else: + self.logger().error( + f"Balance update failed after {max_retries} attempts: {str(e)}", + exc_info=True + ) + # Don't raise - allow connector to continue with stale data + # This prevents complete failure if balance API is temporarily unavailable + + async def _update_positions(self): + """Update positions with retry logic.""" + await self._ensure_data_source_started() + + max_retries = 2 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + account = await self._data_source.get_account() + + # Clear existing positions + self._account_positions.clear() + + # Update positions from account data + if hasattr(account, 'positions') and account.positions: + for pos_data in account.positions: + trading_pair = self._data_source.bluefin_to_hb_symbol(pos_data.symbol) + amount = self._data_source.from_e9(getattr(pos_data, "size_e9", "0")) + + if amount == Decimal("0"): + continue + + side_name = getattr(getattr(pos_data, "side", None), "value", str(getattr(pos_data, "side", ""))) + position_side = PositionSide.LONG if side_name.upper() == "LONG" else PositionSide.SHORT + + # Create position + position = Position( + trading_pair=trading_pair, + position_side=position_side, + unrealized_pnl=self._data_source.from_e9(getattr(pos_data, "unrealized_pnl_e9", "0")), + entry_price=self._data_source.from_e9(getattr(pos_data, "avg_entry_price_e9", "0")), + amount=abs(amount), + leverage=self._data_source.from_e9(getattr(pos_data, "client_set_leverage_e9", self._data_source.to_e9(Decimal("1")))), + ) + + position_key = self._perpetual_trading.position_key(trading_pair, position_side) + self._perpetual_trading.set_position(position_key, position) + + return # Success + + except Exception as e: + if attempt < max_retries - 1: + self.logger().warning( + f"Position update failed (attempt {attempt + 1}/{max_retries}): {str(e)}. " + f"Retrying in {retry_delay}s..." + ) + await asyncio.sleep(retry_delay) + else: + self.logger().error( + f"Position update failed after {max_retries} attempts: {str(e)}", + exc_info=True + ) + # Don't raise - allow connector to continue with stale data + # This prevents complete failure if position API is temporarily unavailable + + async def _set_trading_pair_leverage(self, trading_pair: str, leverage: int) -> Tuple[bool, str]: + """ + Set leverage for a trading pair with retry logic. + + :param trading_pair: Trading pair + :param leverage: Leverage value + :return: Tuple of (success, message) + """ + max_retries = 2 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + await self._ensure_data_source_started() + await self._data_source.set_leverage(trading_pair, Decimal(str(leverage))) + return True, "" + except (RuntimeError, ValueError, TypeError, AttributeError) as e: + if attempt < max_retries - 1: + self.logger().warning( + f"Set leverage failed (attempt {attempt + 1}/{max_retries}): {str(e)}. " + f"Retrying in {retry_delay}s..." + ) + await asyncio.sleep(retry_delay) + else: + error_msg = f"Failed to set leverage after {max_retries} attempts: {str(e)}" + self.logger().error(error_msg, exc_info=True) + return False, error_msg + + return False, "Set leverage failed with unknown error" + + async def _trading_pair_position_mode_set(self, mode: PositionMode, trading_pair: str) -> Tuple[bool, str]: + """ + Set position mode for a trading pair. + + Bluefin only supports ONEWAY mode, so this is a no-op. + + :param mode: Position mode + :param trading_pair: Trading pair + :return: Tuple of (success, message) + """ + if mode != PositionMode.ONEWAY: + return False, "Bluefin only supports ONEWAY position mode" + return True, "" + + async def _fetch_last_fee_payment(self, trading_pair: str) -> Tuple[float, Decimal, Decimal]: + """ + Fetch last funding fee payment. + + :param trading_pair: Trading pair + :return: Tuple of (timestamp, funding_rate, payment) + """ + del trading_pair + + try: + # Get funding rate history + history = await self._data_source.get_account_funding_rate_history(limit=1) + + if history and len(history.data) > 0: + last_payment = history.data[0] + timestamp = last_payment.executed_at_millis / 1000.0 + payment = self._data_source.from_e9(last_payment.payment_amount_e9) + rate = self._data_source.from_e9(last_payment.rate_e9) + return timestamp, rate, payment + else: + # No payment history + return 0.0, Decimal("0"), Decimal("0") + + except (RuntimeError, ValueError, TypeError, AttributeError) as e: + self.logger().error("Error fetching funding fee payment: %s", e) + return 0.0, Decimal("0"), Decimal("0") + + def _get_fee(self, + base_currency: str, + quote_currency: str, + order_type: OrderType, + order_side: TradeType, + position_action: PositionAction, + amount: Decimal, + price: Decimal = s_decimal_NaN, + is_maker: Optional[bool] = None) -> TradeFeeBase: + """ + Get fee for an order. + + :param base_currency: Base currency + :param quote_currency: Quote currency + :param order_type: Order type + :param order_side: Order side + :param amount: Order amount + :param price: Order price + :param is_maker: Whether order is maker + :return: TradeFeeBase + """ + is_maker = is_maker or False + return build_trade_fee( + self.name, + is_maker, + base_currency=base_currency, + quote_currency=quote_currency, + order_type=order_type, + order_side=order_side, + amount=amount, + price=price, + ) + + async def _update_trading_fees(self): + # Trading fees are static for now and configured in bluefin_perpetual_utils.DEFAULT_FEES. + pass + + async def _all_trade_updates_for_order(self, order: InFlightOrder) -> List[TradeUpdate]: + await self._ensure_data_source_started() + trade_updates: List[TradeUpdate] = [] + exchange_order_id = order.exchange_order_id or await order.get_exchange_order_id() + trades = await self._data_source.get_account_trades(symbol=order.trading_pair, limit=200) + for trade in trades: + if getattr(trade, "order_hash", None) != exchange_order_id: + continue + + fee_token = getattr(trade, "trading_fee_asset", None) or CONSTANTS.CURRENCY + fee_amount = self._data_source.from_e9(getattr(trade, "trading_fee_e9", "0")) + fee = TradeFeeBase.new_perpetual_fee( + fee_schema=self.trade_fee_schema(), + position_action=order.position, + percent_token=fee_token, + flat_fees=[TokenAmount(amount=fee_amount, token=fee_token)], + ) + + fill_price = self._data_source.from_e9(trade.price_e9) + fill_base_amount = self._data_source.from_e9(trade.quantity_e9) + quote_quantity_e9 = getattr(trade, "quote_quantity_e9", None) + fill_quote_amount = ( + self._data_source.from_e9(quote_quantity_e9) + if quote_quantity_e9 is not None + else fill_price * fill_base_amount + ) + + trade_updates.append( + TradeUpdate( + trade_id=trade.id, + client_order_id=order.client_order_id, + exchange_order_id=exchange_order_id, + trading_pair=order.trading_pair, + fill_timestamp=trade.executed_at_millis * 1e-3, + fill_price=fill_price, + fill_base_amount=fill_base_amount, + fill_quote_amount=fill_quote_amount, + fee=fee, + is_taker=not bool(getattr(trade, "is_maker", False)), + ) + ) + + return trade_updates + + async def _request_order_status(self, tracked_order: InFlightOrder) -> OrderUpdate: + await self._ensure_data_source_started() + exchange_order_id = tracked_order.exchange_order_id + if exchange_order_id is None: + exchange_order_id = await tracked_order.get_exchange_order_id() + + open_orders = await self._data_source.get_open_orders(tracked_order.trading_pair) + matching_order = next( + ( + o for o in open_orders + if getattr(o, "order_hash", None) == exchange_order_id + or getattr(o, "client_order_id", None) == tracked_order.client_order_id + ), + None, + ) + + if matching_order is None: + trades = await self._data_source.get_account_trades(symbol=tracked_order.trading_pair, limit=100) + was_filled = any(getattr(trade, "order_hash", None) == exchange_order_id for trade in trades) + return OrderUpdate( + trading_pair=tracked_order.trading_pair, + update_timestamp=self.current_timestamp, + new_state=OrderState.FILLED if was_filled else OrderState.CANCELED, + client_order_id=tracked_order.client_order_id, + exchange_order_id=exchange_order_id, + ) + + status_value = getattr(getattr(matching_order, "status", None), "value", str(getattr(matching_order, "status", ""))) + new_state = CONSTANTS.ORDER_STATE.get(status_value, OrderState.OPEN) + update_timestamp = getattr(matching_order, "updated_at_millis", int(self.current_timestamp * 1000)) * 1e-3 + return OrderUpdate( + trading_pair=tracked_order.trading_pair, + update_timestamp=update_timestamp, + new_state=new_state, + client_order_id=getattr(matching_order, "client_order_id", tracked_order.client_order_id), + exchange_order_id=getattr(matching_order, "order_hash", exchange_order_id), + ) + + async def _iter_user_event_queue(self) -> AsyncIterable[Any]: + while True: + try: + yield await self._user_stream_tracker.user_stream.get() + except (AttributeError, RuntimeError, TypeError, ValueError): + self.logger().network( + "Unknown error while reading Bluefin user stream. Retrying in 1 second.", + exc_info=True, + ) + await self._sleep(1.0) + + async def _user_stream_event_listener(self): + async for event_message in self._iter_user_event_queue(): + try: + self._process_user_stream_event(event_message) + except (AttributeError, RuntimeError, TypeError, ValueError): + self.logger().exception("Unexpected error in user stream listener loop.") + await self._sleep(5.0) + + def _initialize_trading_pair_symbols_from_exchange_info(self, exchange_info: Dict[str, Any]): + mapping = bidict() + for market in getattr(exchange_info, "markets", []): + symbol = getattr(market, "symbol", None) + if symbol is None: + continue + base = symbol.split("-")[0] + hb_trading_pair = combine_to_hb_trading_pair(base, "USD") + mapping[symbol] = hb_trading_pair + + self._set_trading_pair_symbol_map(mapping) + + def _process_user_stream_event(self, event: Any): + """ + Process user stream event. + + :param event: Event from user stream + """ + try: + event_name = type(event).__name__ + if event_name == "AccountOrderUpdate": + self._process_order_update(event) + elif event_name == "AccountTradeUpdate": + self._process_trade_update(event) + elif event_name == "AccountPositionUpdate": + self._process_position_update(event) + elif event_name == "AccountUpdate": + self._process_account_update(event) + except (AttributeError, RuntimeError, TypeError, ValueError): + self.logger().exception("Error processing user stream event") + + def _process_order_update(self, event: Any): + """Process order update event.""" + # AccountOrderUpdate is a oneOf type containing either ActiveOrderUpdate or OrderCancellationUpdate + if hasattr(event, 'actual_instance'): + actual_event = event.actual_instance + actual_event_name = type(actual_event).__name__ + + if actual_event_name == "ActiveOrderUpdate": + # Order fill or partial fill + client_order_id = actual_event.client_order_id + exchange_order_id = actual_event.order_hash + trading_pair = self._data_source.bluefin_to_hb_symbol(actual_event.symbol) + + # Map order status + new_state = CONSTANTS.ORDER_STATE.get(actual_event.status.value, None) + + if new_state: + order_update = OrderUpdate( + trading_pair=trading_pair, + update_timestamp=actual_event.updated_at_millis / 1000.0, + new_state=new_state, + client_order_id=client_order_id, + exchange_order_id=exchange_order_id, + ) + self._order_tracker.process_order_update(order_update) + + elif actual_event_name == "OrderCancellationUpdate": + # Order cancelled + client_order_id = actual_event.client_order_id + trading_pair = self._data_source.bluefin_to_hb_symbol(actual_event.symbol) + + order_update = OrderUpdate( + trading_pair=trading_pair, + update_timestamp=actual_event.created_at_millis / 1000.0, + new_state=CONSTANTS.ORDER_STATE["CANCELLED"], + client_order_id=client_order_id, + exchange_order_id=actual_event.order_hash, + ) + self._order_tracker.process_order_update(order_update) + + def _process_trade_update(self, event: Any): + """Process trade update event.""" + trade = event.trade + exchange_order_id = getattr(trade, "order_hash", "") + tracked_order = self._order_tracker.all_fillable_orders_by_exchange_order_id.get(exchange_order_id) + if tracked_order is None: + return + + fee_token = getattr(trade, "trading_fee_asset", None) or CONSTANTS.CURRENCY + fee_amount = self._data_source.from_e9(getattr(trade, "trading_fee_e9", "0")) + fee = TradeFeeBase.new_perpetual_fee( + fee_schema=self.trade_fee_schema(), + position_action=tracked_order.position, + percent_token=fee_token, + flat_fees=[TokenAmount(amount=fee_amount, token=fee_token)], + ) + + fill_price = self._data_source.from_e9(trade.price_e9) + fill_amount = self._data_source.from_e9(trade.quantity_e9) + fill_quote_amount = self._data_source.from_e9(trade.quote_quantity_e9) + + trade_update = TradeUpdate( + trade_id=trade.id, + client_order_id=tracked_order.client_order_id, + exchange_order_id=exchange_order_id, + trading_pair=tracked_order.trading_pair, + fill_timestamp=trade.executed_at_millis / 1000.0, + fill_price=fill_price, + fill_base_amount=fill_amount, + fill_quote_amount=fill_quote_amount, + fee=fee, + is_taker=not bool(getattr(trade, "is_maker", False)), + ) + self._order_tracker.process_trade_update(trade_update) + + def _process_position_update(self, event: Any): + """Process position update event.""" + trading_pair = self._data_source.bluefin_to_hb_symbol(event.symbol) + position_side = PositionSide.LONG if str(getattr(event.side, "value", event.side)).upper() == "LONG" else PositionSide.SHORT + amount = self._data_source.from_e9(event.size_e9) + if amount == Decimal("0"): + pos_key = self._perpetual_trading.position_key(trading_pair, position_side) + self._perpetual_trading.remove_position(pos_key) + return + + position = Position( + trading_pair=trading_pair, + position_side=position_side, + unrealized_pnl=self._data_source.from_e9(event.unrealized_pnl_e9), + entry_price=self._data_source.from_e9(event.avg_entry_price_e9), + amount=abs(amount), + leverage=self._data_source.from_e9(event.client_set_leverage_e9), + ) + pos_key = self._perpetual_trading.position_key(trading_pair, position_side) + self._perpetual_trading.set_position(pos_key, position) + + def _process_account_update(self, event: Any): + """Process account balance update event.""" + for asset in getattr(event, "assets", []): + symbol = getattr(asset, "symbol", None) + if symbol is None: + continue + self._account_balances[symbol] = self._data_source.from_e9(getattr(asset, "quantity_e9", "0")) + self._account_available_balances[symbol] = self._data_source.from_e9( + getattr(asset, "max_withdraw_quantity_e9", "0") + ) diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_user_stream_data_source.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_user_stream_data_source.py new file mode 100644 index 00000000000..84f2781e3fb --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_user_stream_data_source.py @@ -0,0 +1,122 @@ +""" +User stream data source for Bluefin Perpetual connector. + +Handles account events via the Bluefin SDK including: +- Order updates +- Trade updates +- Position updates +- Account balance updates +""" +import asyncio +from typing import TYPE_CHECKING, Any, List, Optional + +from hummingbot.core.web_assistant.ws_assistant import WSAssistant + +from hummingbot.connector.derivative.bluefin_perpetual import bluefin_perpetual_constants as CONSTANTS +from hummingbot.connector.derivative.bluefin_perpetual.data_sources.bluefin_data_source import BluefinDataSource +from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource +from hummingbot.logger import HummingbotLogger + +if TYPE_CHECKING: + from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative import ( + BluefinPerpetualDerivative, + ) + + +class BluefinPerpetualUserStreamDataSource(UserStreamTrackerDataSource): + """User stream data source for Bluefin Perpetual.""" + + HEARTBEAT_TIME_INTERVAL = 30.0 + _logger: Optional[HummingbotLogger] = None + + def __init__( + self, + trading_pairs: List[str], + connector: 'BluefinPerpetualDerivative', + data_source: BluefinDataSource, + domain: str = CONSTANTS.DOMAIN, + ): + """ + Initialize user stream data source. + + :param trading_pairs: List of trading pairs + :param connector: Parent connector instance + :param data_source: Bluefin SDK data source wrapper + :param domain: Domain (mainnet or staging) + """ + super().__init__() + self._connector = connector + self._data_source = data_source + self._domain = domain + self._trading_pairs = trading_pairs + self._last_recv_time = 0 + + @property + def last_recv_time(self) -> float: + """Get time of last received message.""" + return self._last_recv_time + + async def _connected_websocket_assistant(self) -> WSAssistant: + """Bluefin SDK manages websocket lifecycle in the shared data source.""" + factory = getattr(self._connector, "_web_assistants_factory") + return await factory.get_ws_assistant() + + async def _subscribe_channels(self, websocket_assistant: WSAssistant): + """Subscriptions are configured by BluefinDataSource when streams are created.""" + del websocket_assistant + + async def listen_for_user_stream(self, output: asyncio.Queue[Any]): + """ + Listen for user stream messages from the Bluefin SDK. + + Continuously receives account events and puts them in the output queue. + The events are forwarded from the BluefinDataSource which manages + the WebSocket connection via the SDK. + + :param output: Queue to put user stream messages + """ + event_getters = [ + self._data_source.get_account_order_event, + self._data_source.get_account_trade_event, + self._data_source.get_account_position_event, + self._data_source.get_account_balance_event, + ] + + while True: + pending_tasks = [] + try: + pending_tasks = [asyncio.create_task(getter()) for getter in event_getters] + done, pending = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED) + + # Cancel all pending tasks + for task in pending: + task.cancel() + + # Find first successfully completed task (without exception) + event = None + for task in done: + if not task.cancelled() and task.exception() is None: + event = task.result() + break + + # If no successful task was found, all getters failed/cancelled + if event is None: + continue + + # Update last receive time + self._last_recv_time = asyncio.get_event_loop().time() + + # Put event in output queue for processing by connector + output.put_nowait(event) + + self.logger().debug("Received account event: %s", type(event).__name__) + + except asyncio.CancelledError: + for task in pending_tasks: + task.cancel() + raise + except (AttributeError, RuntimeError, TypeError, ValueError): + self.logger().exception( + "Unexpected error while listening for user stream. Retrying after 5 seconds..." + ) + await asyncio.sleep(5) diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_utils.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_utils.py new file mode 100644 index 00000000000..8dd6b62ea9c --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_utils.py @@ -0,0 +1,50 @@ +from decimal import Decimal +from typing import Literal + +from pydantic import ConfigDict, Field, SecretStr + +from hummingbot.client.config.config_data_types import BaseConnectorConfigMap +from hummingbot.core.data_type.trade_fee import TradeFeeSchema + +# Bluefin is a decentralized exchange +CENTRALIZED = False + +# Example trading pair +EXAMPLE_PAIR = "BTC-USD" + +# Default fee structure +# Based on Bluefin's standard fee schedule +DEFAULT_FEES = TradeFeeSchema( + maker_percent_fee_decimal=Decimal("0.0001"), # 0.01% + taker_percent_fee_decimal=Decimal("0.0005"), # 0.05% + buy_percent_fee_deducted_from_returns=True +) + + +class BluefinPerpetualConfigMap(BaseConnectorConfigMap): + connector: str = "bluefin_perpetual" + + bluefin_perpetual_wallet_mnemonic: SecretStr = Field( + default=..., + json_schema_extra={ + "prompt": "Enter your Bluefin wallet mnemonic (24 words)", + "is_secure": True, + "is_connect_key": True, + "prompt_on_new": True, + } + ) + + bluefin_perpetual_network: Literal["MAINNET", "STAGING"] = Field( + default="MAINNET", + json_schema_extra={ + "prompt": "Select network (MAINNET/STAGING)", + "is_secure": False, + "is_connect_key": True, + "prompt_on_new": True, + } + ) + + model_config = ConfigDict(title="bluefin_perpetual") + + +KEYS = BluefinPerpetualConfigMap.model_construct() diff --git a/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_web_utils.py b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_web_utils.py new file mode 100644 index 00000000000..8e2d7dfff80 --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/bluefin_perpetual_web_utils.py @@ -0,0 +1,70 @@ +import time +from typing import Optional + +import hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_constants as CONSTANTS +from hummingbot.core.api_throttler.async_throttler import AsyncThrottler +from hummingbot.core.web_assistant.auth import AuthBase +from hummingbot.core.web_assistant.connections.data_types import RESTRequest +from hummingbot.core.web_assistant.rest_pre_processors import RESTPreProcessorBase +from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory + + +class BluefinPerpetualRESTPreProcessor(RESTPreProcessorBase): + """REST pre-processor for Bluefin API requests.""" + + async def pre_process(self, request: RESTRequest) -> RESTRequest: + if request.headers is None: + request.headers = {} + else: + request.headers = dict(request.headers) + request.headers["Content-Type"] = "application/json" + return request + + +def get_rest_url_for_endpoint(endpoint: str, domain: str = CONSTANTS.DOMAIN) -> str: + """ + Get the full REST URL for an endpoint based on domain. + + Note: The Bluefin SDK handles all REST communication internally. + This is provided for compatibility with hummingbot's web assistant pattern. + """ + env_name = CONSTANTS.MAINNET_ENV_NAME if domain == CONSTANTS.DOMAIN else CONSTANTS.STAGING_ENV_NAME + base_url = CONSTANTS.get_rest_url_for_env(env_name, service="api") + return base_url + endpoint + + +def get_ws_url(domain: str = CONSTANTS.DOMAIN, stream_type: str = "market") -> str: + """ + Get the WebSocket URL based on domain and stream type. + + Note: The Bluefin SDK handles all WebSocket communication internally. + This is provided for compatibility with hummingbot's web assistant pattern. + """ + env_name = CONSTANTS.MAINNET_ENV_NAME if domain == CONSTANTS.DOMAIN else CONSTANTS.STAGING_ENV_NAME + return CONSTANTS.get_ws_url_for_env(env_name, stream_type) + + +def build_api_factory( + throttler: Optional[AsyncThrottler] = None, + auth: Optional[AuthBase] = None) -> WebAssistantsFactory: + """Build web assistants factory with throttler and auth.""" + throttler = throttler or create_throttler() + api_factory = WebAssistantsFactory( + throttler=throttler, + rest_pre_processors=[BluefinPerpetualRESTPreProcessor()], + auth=auth) + return api_factory + + +def create_throttler() -> AsyncThrottler: + """Create async throttler with rate limits.""" + return AsyncThrottler(CONSTANTS.RATE_LIMITS) + + +async def get_current_server_time(throttler: AsyncThrottler, domain: str) -> float: + """ + Get current server time. + + Note: Returns local time as Bluefin SDK handles time synchronization internally. + """ + return time.time() diff --git a/hummingbot/connector/derivative/bluefin_perpetual/data_sources/__init__.py b/hummingbot/connector/derivative/bluefin_perpetual/data_sources/__init__.py new file mode 100644 index 00000000000..3a1d611278a --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/data_sources/__init__.py @@ -0,0 +1 @@ +# Bluefin data sources diff --git a/hummingbot/connector/derivative/bluefin_perpetual/data_sources/bluefin_data_source.py b/hummingbot/connector/derivative/bluefin_perpetual/data_sources/bluefin_data_source.py new file mode 100644 index 00000000000..7890ca0922b --- /dev/null +++ b/hummingbot/connector/derivative/bluefin_perpetual/data_sources/bluefin_data_source.py @@ -0,0 +1,446 @@ +""" +Bluefin SDK data source wrapper. + +This module wraps the BluefinProSdk to provide a clean interface +for the connector to interact with the Bluefin exchange. +""" +# pyright: reportMissingTypeStubs=false + +import asyncio +import logging +from decimal import Decimal +from types import TracebackType +from typing import Any, Dict, List, Optional, Type + +# Bluefin SDK imports +try: + from bluefin_pro_sdk import BluefinProSdk, Order, Environment + from crypto_helpers.wallet import SuiWallet + from openapi_client import CancelOrdersRequest + from openapi_client.models.account_data_stream import AccountDataStream + from openapi_client.models.market_data_stream_name import MarketDataStreamName + from openapi_client.models.market_subscription_streams import MarketSubscriptionStreams +except ImportError as e: + raise ImportError( + "bluefin_pro_sdk is required for Bluefin connector. " + "Please install it with: pip install bluefin-pro-sdk" + ) from e + +logger = logging.getLogger(__name__) + + +class BluefinDataSource: + """ + Wrapper for BluefinProSdk that manages the SDK lifecycle and provides + a clean interface for the connector. + """ + + def __init__(self, wallet_mnemonic: str, network: str = "MAINNET", debug: bool = False): + """ + Initialize Bluefin data source. + + :param wallet_mnemonic: 24-word mnemonic phrase + :param network: Network name ("MAINNET" or "STAGING") + :param debug: Enable debug logging in SDK + """ + self._wallet_mnemonic = wallet_mnemonic + self._network = network + self._debug = debug + + # SDK environment mapping + self._env = Environment.PRODUCTION if network == "MAINNET" else Environment.STAGING + + # Create wallet and SDK client + self._wallet = SuiWallet(mnemonic=wallet_mnemonic) + self._client: Optional[BluefinProSdk] = None + self._is_initialized = False + + # Symbol mapping: hummingbot (BTC-USD) <-> Bluefin (BTC-PERP) + self._hb_to_bluefin: Dict[str, str] = {} + + # WebSocket listeners + self._market_data_listener = None + self._account_data_listener = None + + # Event queues for streaming data with category fan-out + self._market_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._market_funding_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._market_order_book_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._market_trade_event_queue: asyncio.Queue[Any] = asyncio.Queue() + + self._account_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._account_order_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._account_trade_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._account_position_event_queue: asyncio.Queue[Any] = asyncio.Queue() + self._account_balance_event_queue: asyncio.Queue[Any] = asyncio.Queue() + + async def __aenter__(self): + """Async context manager entry.""" + await self.initialize() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ): + """Async context manager exit.""" + await self.shutdown() + + def _require_client(self) -> BluefinProSdk: + if self._client is None: + raise RuntimeError("Bluefin SDK client is not initialized") + return self._client + + async def initialize(self): + """Initialize the SDK client and login.""" + if self._is_initialized: + return + + try: + self._client = BluefinProSdk( + sui_wallet=self._wallet, + env=self._env, + debug=self._debug + ) + await self._require_client().init() + self._is_initialized = True + logger.info("Bluefin SDK initialized for network: %s", self._network) + + # Load exchange info to populate symbol mapping + await self._load_exchange_info() + + except (ValueError, TypeError, AttributeError) as e: + logger.error("Failed to initialize Bluefin SDK: %s", e) + raise + + async def shutdown(self): + """Shutdown the SDK client and cleanup resources.""" + try: + # Close WebSocket listeners + if self._market_data_listener is not None: + try: + market_listener: Any = self._market_data_listener + await market_listener.__aexit__(None, None, None) + except (AttributeError, RuntimeError, TypeError) as e: + logger.error("Error closing market data listener: %s", e) + self._market_data_listener = None + + if self._account_data_listener is not None: + try: + account_listener: Any = self._account_data_listener + await account_listener.__aexit__(None, None, None) + except (AttributeError, RuntimeError, TypeError) as e: + logger.error("Error closing account data listener: %s", e) + self._account_data_listener = None + + # Close SDK client + if self._client is not None: + try: + client: Any = self._require_client() + await client.__aexit__(None, None, None) + except (AttributeError, RuntimeError, TypeError) as e: + logger.error("Error closing SDK client: %s", e) + self._client = None + + self._is_initialized = False + logger.info("Bluefin SDK shutdown complete") + + except (RuntimeError, AttributeError, TypeError) as e: + logger.error("Error during Bluefin SDK shutdown: %s", e) + + async def _load_exchange_info(self): + """Load exchange info and build symbol mapping.""" + info = await self._require_client().exchange_data_api.get_exchange_info() + for market in info.markets: + # Bluefin uses "BTC-PERP" format, hummingbot uses "BTC-USD" + bluefin_symbol = market.symbol + base = bluefin_symbol.split("-")[0] # Extract "BTC" from "BTC-PERP" + hb_symbol = f"{base}-USD" # Create "BTC-USD" + self._hb_to_bluefin[hb_symbol] = bluefin_symbol + + logger.info("Loaded %s trading pairs from Bluefin", len(self._hb_to_bluefin)) + + def hb_to_bluefin_symbol(self, hb_symbol: str) -> str: + """Convert hummingbot symbol to Bluefin symbol.""" + return self._hb_to_bluefin.get(hb_symbol, hb_symbol) + + def bluefin_to_hb_symbol(self, bluefin_symbol: str) -> str: + """Convert Bluefin symbol to hummingbot symbol.""" + for hb_symbol, bf_symbol in self._hb_to_bluefin.items(): + if bf_symbol == bluefin_symbol: + return hb_symbol + return bluefin_symbol + + @staticmethod + def to_e9(value: Decimal) -> str: + """Convert Decimal to e9 string format.""" + return str(int(value * Decimal("1e9"))) + + @staticmethod + def from_e9(e9_str: str) -> Decimal: + """Convert e9 string to Decimal.""" + return Decimal(e9_str) / Decimal("1e9") + + # ====================== + # Exchange Data API + # ====================== + + async def get_exchange_info(self) -> Any: + """Get exchange information including markets and assets.""" + return await self._require_client().exchange_data_api.get_exchange_info() + + async def get_market_ticker(self, symbol: str) -> Any: + """Get market ticker for a symbol.""" + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + return await self._require_client().exchange_data_api.get_market_ticker(bluefin_symbol) + + async def get_all_market_tickers(self) -> Any: + """Get all market tickers.""" + return await self._require_client().exchange_data_api.get_all_market_ticker() + + async def get_orderbook(self, symbol: str) -> Any: + """Get orderbook depth for a symbol.""" + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + return await self._require_client().exchange_data_api.get_orderbook_depth(bluefin_symbol) + + async def get_funding_rate_history(self, symbol: str, limit: int = 1) -> Any: + """Get funding rate history for a symbol.""" + del limit + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + return await self._require_client().exchange_data_api.get_funding_rate_history( + symbol=bluefin_symbol + ) + + # ====================== + # Account Data API + # ====================== + + async def get_account(self, account_address: Optional[str] = None) -> Any: + """Get account details including balance and positions.""" + return await self._require_client().account_data_api.get_account_details( + account_address=account_address + ) + + async def get_account_trades( + self, + symbol: str, + start_time_at_millis: Optional[int] = None, + end_time_at_millis: Optional[int] = None, + limit: Optional[int] = None, + page: Optional[int] = None, + ) -> Any: + """Get account trade history.""" + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + return await self._require_client().account_data_api.get_account_trades( + symbol=bluefin_symbol, + start_time_at_millis=start_time_at_millis, + end_time_at_millis=end_time_at_millis, + limit=limit, + page=page, + ) + + async def get_account_funding_rate_history( + self, + account_address: Optional[str] = None, + limit: Optional[int] = None, + page: Optional[int] = None + ) -> Any: + """Get account funding rate history.""" + return await self._require_client().account_data_api.get_account_funding_rate_history( + account_address=account_address, + limit=limit, + page=page + ) + + # ====================== + # Trading API + # ====================== + + async def get_open_orders(self, symbol: str) -> Any: + """Get open orders for a symbol.""" + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + return await self._require_client().get_open_orders(bluefin_symbol) + + async def place_order(self, order: Order) -> Any: + """ + Place an order. + + :param order: Order object from bluefin_pro_sdk + :return: CreateOrderResponse with order_hash + """ + # Convert symbol if needed + order.symbol = self.hb_to_bluefin_symbol(order.symbol) + return await self._require_client().create_order(order) + + async def cancel_order(self, symbol: str, order_hash: Optional[str] = None) -> Any: + """ + Cancel order(s) for a symbol. + + :param symbol: Trading pair symbol + :param order_hash: Specific order hash to cancel (None cancels all) + :return: CancelOrdersResponse + """ + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + request = CancelOrdersRequest.model_construct( + symbol=bluefin_symbol, + order_hashes=[order_hash] if order_hash is not None else None, + ) + return await self._require_client().cancel_order(request) + + async def cancel_all_orders(self, symbol: str) -> Any: + """Cancel all orders for a symbol.""" + return await self.cancel_order(symbol, order_hash=None) + + async def set_leverage(self, symbol: str, leverage: Decimal) -> Any: + """ + Set leverage for a trading pair. + + :param symbol: Trading pair symbol + :param leverage: Leverage value (e.g., Decimal("10") for 10x) + :return: Response from update_leverage + """ + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + leverage_e9 = self.to_e9(leverage) + return await self._require_client().update_leverage( + symbol=bluefin_symbol, + leverage_e9=leverage_e9 + ) + + # ====================== + # WebSocket Streaming + # ====================== + + async def create_market_data_stream(self, symbols: List[str]) -> None: + """ + Create market data stream listener for given symbols. + + :param symbols: List of trading pair symbols (hummingbot format) + """ + if self._market_data_listener is not None: + logger.warning("Market data stream already exists") + return + + async def handler(msg: Any): + """Handler for market data events.""" + event_name = type(msg).__name__ + await self._market_event_queue.put(msg) + if event_name in {"OraclePriceUpdate", "MarkPriceUpdate"}: + await self._market_funding_event_queue.put(msg) + if event_name in {"OrderbookDiffDepthUpdate", "OrderbookPartialDepthUpdate"}: + await self._market_order_book_event_queue.put(msg) + if event_name == "RecentTradesUpdates": + await self._market_trade_event_queue.put(msg) + + self._market_data_listener = await self._require_client().create_market_data_stream_listener( + handler=handler + ) + + # Subscribe to streams for each symbol + subscriptions: List[Any] = [] + for symbol in symbols: + bluefin_symbol = self.hb_to_bluefin_symbol(symbol) + subscriptions.append( + MarketSubscriptionStreams( + symbol=bluefin_symbol, + streams=[ + MarketDataStreamName.ORACLE_PRICE, + MarketDataStreamName.MARK_PRICE, + MarketDataStreamName.DIFF_DEPTH_500_MS, + MarketDataStreamName.PARTIAL_DEPTH_5, + MarketDataStreamName.RECENT_TRADE, + ] + ) + ) + + await self._market_data_listener.subscribe(subscriptions) + logger.info("Market data stream created for %s symbols", len(symbols)) + + async def create_account_data_stream(self) -> None: + """Create account data stream listener.""" + if self._account_data_listener is not None: + logger.warning("Account data stream already exists") + return + + async def handler(msg: Any): + """Handler for account data events.""" + event_name = type(msg).__name__ + await self._account_event_queue.put(msg) + if event_name == "AccountOrderUpdate": + await self._account_order_event_queue.put(msg) + elif event_name == "AccountTradeUpdate": + await self._account_trade_event_queue.put(msg) + elif event_name == "AccountPositionUpdate": + await self._account_position_event_queue.put(msg) + elif event_name == "AccountUpdate": + await self._account_balance_event_queue.put(msg) + + self._account_data_listener = await self._require_client().create_account_data_stream_listener( + handler=handler + ) + + # Subscribe to all account streams + await self._account_data_listener.subscribe([ + AccountDataStream.ACCOUNTORDERUPDATE, + AccountDataStream.ACCOUNTTRADEUPDATE, + AccountDataStream.ACCOUNTPOSITIONUPDATE, + AccountDataStream.ACCOUNTUPDATE, + AccountDataStream.ACCOUNTTRANSACTIONUPDATE, + ]) + logger.info("Account data stream created") + + async def get_market_event(self) -> Any: + """Get next market event from queue (blocking).""" + return await self._market_event_queue.get() + + async def get_market_funding_event(self) -> Any: + """Get next funding-related market event.""" + return await self._market_funding_event_queue.get() + + async def get_market_order_book_event(self) -> Any: + """Get next order book-related market event.""" + return await self._market_order_book_event_queue.get() + + async def get_market_trade_event(self) -> Any: + """Get next trade-related market event.""" + return await self._market_trade_event_queue.get() + + async def get_account_event(self) -> Any: + """Get next account event from queue (blocking).""" + return await self._account_event_queue.get() + + async def get_account_order_event(self) -> Any: + """Get next account order event.""" + return await self._account_order_event_queue.get() + + async def get_account_trade_event(self) -> Any: + """Get next account trade event.""" + return await self._account_trade_event_queue.get() + + async def get_account_position_event(self) -> Any: + """Get next account position event.""" + return await self._account_position_event_queue.get() + + async def get_account_balance_event(self) -> Any: + """Get next account balance event.""" + return await self._account_balance_event_queue.get() + + # ====================== + # Helper Properties + # ====================== + + @property + def is_initialized(self) -> bool: + """Check if SDK is initialized.""" + return self._is_initialized + + @property + def wallet_address(self) -> str: + """Get wallet address.""" + return self._wallet.sui_address + + @property + def trading_pair_symbol_map(self) -> Dict[str, str]: + """Get the internal HB<->Bluefin trading pair map.""" + return self._hb_to_bluefin diff --git a/hummingbot/core/rate_oracle/rate_oracle.py b/hummingbot/core/rate_oracle/rate_oracle.py index 14422f95242..3b0cd9981c6 100644 --- a/hummingbot/core/rate_oracle/rate_oracle.py +++ b/hummingbot/core/rate_oracle/rate_oracle.py @@ -10,6 +10,7 @@ from hummingbot.core.rate_oracle.sources.aevo_rate_source import AevoRateSource from hummingbot.core.rate_oracle.sources.ascend_ex_rate_source import AscendExRateSource from hummingbot.core.rate_oracle.sources.binance_rate_source import BinanceRateSource +from hummingbot.core.rate_oracle.sources.bluefin_perpetual_rate_source import BluefinPerpetualRateSource from hummingbot.core.rate_oracle.sources.coin_cap_rate_source import CoinCapRateSource from hummingbot.core.rate_oracle.sources.coin_gecko_rate_source import CoinGeckoRateSource from hummingbot.core.rate_oracle.sources.coinbase_advanced_trade_rate_source import CoinbaseAdvancedTradeRateSource @@ -30,6 +31,7 @@ RATE_ORACLE_SOURCES = { "aevo_perpetual": AevoRateSource, "binance": BinanceRateSource, + "bluefin_perpetual": BluefinPerpetualRateSource, "coin_gecko": CoinGeckoRateSource, "coin_cap": CoinCapRateSource, "kucoin": KucoinRateSource, diff --git a/hummingbot/core/rate_oracle/sources/bluefin_perpetual_rate_source.py b/hummingbot/core/rate_oracle/sources/bluefin_perpetual_rate_source.py new file mode 100644 index 00000000000..ec65e5f802c --- /dev/null +++ b/hummingbot/core/rate_oracle/sources/bluefin_perpetual_rate_source.py @@ -0,0 +1,102 @@ +""" +Rate oracle source for Bluefin Perpetual. + +Provides real-time price data from Bluefin to the rate oracle system. +""" +from decimal import Decimal +from typing import TYPE_CHECKING, Dict, Optional + +from hummingbot.connector.utils import split_hb_trading_pair +from hummingbot.core.rate_oracle.sources.rate_source_base import RateSourceBase +from hummingbot.core.utils import async_ttl_cache + +if TYPE_CHECKING: + from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative import ( + BluefinPerpetualDerivative, + ) + + +class BluefinPerpetualRateSource(RateSourceBase): + """Rate source for Bluefin Perpetual exchange.""" + + def __init__(self): + super().__init__() + self._exchange: Optional["BluefinPerpetualDerivative"] = None + + @property + def name(self) -> str: + """Name of the rate source.""" + return "bluefin_perpetual" + + @async_ttl_cache(ttl=30, maxsize=1) + async def get_prices(self, quote_token: Optional[str] = None) -> Dict[str, Decimal]: + """ + Fetch current prices from Bluefin. + + :param quote_token: Optional filter for specific quote token + :return: Dict of trading_pair -> price + """ + self._ensure_exchange() + results = {} + try: + # Exchange method to get all pairs prices would need to be implemented + # For now, get prices from trading rules/market data + await self._exchange._update_trading_rules() + + for trading_pair in self._exchange.trading_rules: + if quote_token is not None: + base, quote = split_hb_trading_pair(trading_pair=trading_pair) + if quote != quote_token: + continue + + # Get mark price from data source if available + try: + if hasattr(self._exchange, "_data_source") and self._exchange._data_source: + market_symbols = await self._exchange._data_source.get_market_symbols() + # Find matching symbol and get mark price + exchange_symbol = await self._exchange.exchange_symbol_associated_to_pair(trading_pair) + for symbol_data in market_symbols: + if symbol_data.get("symbol") == exchange_symbol: + mark_price = symbol_data.get("markPrice") + if mark_price: + results[trading_pair] = Decimal(str(mark_price)) + break + except Exception as symbol_error: + self.logger().debug( + f"Error fetching price for {trading_pair}: {symbol_error}", + exc_info=True + ) + continue + + except Exception: + self.logger().exception( + msg="Unexpected error while retrieving rates from Bluefin. Check the log file for more info.", + ) + return results + + def _ensure_exchange(self): + """Ensure exchange instance exists.""" + if self._exchange is None: + self._exchange = self._build_bluefin_perpetual_connector_without_private_keys() + + def _build_bluefin_perpetual_connector_without_private_keys(self) -> "BluefinPerpetualDerivative": + """ + Build Bluefin connector instance for rate oracle (no real trading). + + Uses a test mnemonic for initialization since we only need price data. + """ + from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative import ( + BluefinPerpetualDerivative, + ) + + # Use test mnemonic for price feed only (no trading) + test_mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" + + connector = BluefinPerpetualDerivative( + bluefin_perpetual_wallet_mnemonic=test_mnemonic, + bluefin_perpetual_network="MAINNET", + trading_pairs=[], + trading_required=False, + ) + + return connector diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/__init__.py b/test/hummingbot/connector/derivative/bluefin_perpetual/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_api_order_book_data_source.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_api_order_book_data_source.py new file mode 100644 index 00000000000..7e4a21cadaf --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_api_order_book_data_source.py @@ -0,0 +1,93 @@ +import asyncio +from decimal import Decimal +from test.isolated_asyncio_wrapper_test_case import IsolatedAsyncioWrapperTestCase +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_api_order_book_data_source import ( + BluefinPerpetualAPIOrderBookDataSource, +) +from hummingbot.core.data_type.funding_info import FundingInfoUpdate +from hummingbot.core.data_type.order_book_message import OrderBookMessageType + + +class BluefinPerpetualAPIOrderBookDataSourceTests(IsolatedAsyncioWrapperTestCase): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.trading_pair = "BTC-USD" + + def setUp(self) -> None: + super().setUp() + + self.connector = MagicMock() + self.data_source = MagicMock() + + def _from_e9(e9: Any) -> Decimal: + return Decimal(str(e9)) / Decimal("1e9") + + def _to_hb_symbol(symbol: Any) -> str: + return "BTC-USD" if symbol == "BTC-PERP" else str(symbol) + + self.data_source.from_e9.side_effect = _from_e9 + self.data_source.bluefin_to_hb_symbol.side_effect = _to_hb_symbol + self.data_source.get_market_ticker = AsyncMock( + return_value=SimpleNamespace(last_trade_price_e9="101250000000") + ) + + self.order_book_source = BluefinPerpetualAPIOrderBookDataSource( + trading_pairs=[self.trading_pair], + connector=self.connector, + data_source=self.data_source, + ) + + async def test_get_last_traded_prices_fetches_from_ticker(self): + prices = await self.order_book_source.get_last_traded_prices([self.trading_pair]) + + self.assertEqual({"BTC-USD": 101.25}, prices) + self.data_source.get_market_ticker.assert_awaited_once_with("BTC-USD") + + async def test_listen_for_order_book_diffs_forwards_diff_message(self): + output: asyncio.Queue[Any] = asyncio.Queue() + diff_event = type( + "OrderbookDiffDepthUpdate", + (), + { + "symbol": "BTC-PERP", + "bids_e9": [["100000000000", "2000000000"]], + "asks_e9": [["101000000000", "3000000000"]], + "last_update_id": 123, + "updated_at_millis": 1000, + }, + )() + + self.data_source.get_market_order_book_event = AsyncMock(side_effect=[diff_event, asyncio.CancelledError()]) + + with self.assertRaises(asyncio.CancelledError): + await self.order_book_source.listen_for_order_book_diffs(self.local_event_loop, output) + + message = output.get_nowait() + self.assertEqual(OrderBookMessageType.DIFF, message.type) + self.assertEqual("BTC-USD", message.content["trading_pair"]) + + async def test_listen_for_funding_info_emits_funding_update(self): + output: asyncio.Queue[Any] = asyncio.Queue() + funding_event = type( + "OraclePriceUpdate", + (), + { + "symbol": "BTC-PERP", + "oracle_price_e9": "99900000000", + "mark_price_e9": "100100000000", + }, + )() + + self.data_source.get_market_funding_event = AsyncMock(side_effect=[funding_event, asyncio.CancelledError()]) + + with self.assertRaises(asyncio.CancelledError): + await self.order_book_source.listen_for_funding_info(output) + + update = output.get_nowait() + self.assertIsInstance(update, FundingInfoUpdate) + self.assertEqual("BTC-USD", update.trading_pair) diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_auth.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_auth.py new file mode 100644 index 00000000000..70800930a06 --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_auth.py @@ -0,0 +1,73 @@ +""" +Tests for Bluefin Perpetual authentication. + +Since the Bluefin SDK handles authentication internally via JWT tokens, +these tests verify the auth wrapper properly stores credentials. +""" +import asyncio +import unittest +from typing import Awaitable +from unittest.mock import MagicMock + +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_auth import BluefinPerpetualAuth +from hummingbot.core.web_assistant.connections.data_types import RESTMethod, RESTRequest + + +class BluefinPerpetualAuthTests(unittest.TestCase): + """Test suite for BluefinPerpetualAuth class.""" + + def setUp(self) -> None: + """Set up test fixtures.""" + super().setUp() + self.test_mnemonic = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" + self.test_network = "MAINNET" + self.auth = BluefinPerpetualAuth( + wallet_mnemonic=self.test_mnemonic, + network=self.test_network, + ) + + def async_run_with_timeout(self, coroutine: Awaitable, timeout: int = 1): + """Run async coroutine with timeout.""" + ret = asyncio.get_event_loop().run_until_complete(asyncio.wait_for(coroutine, timeout)) + return ret + + def test_wallet_mnemonic_property(self): + """Test that wallet mnemonic is stored correctly.""" + self.assertEqual(self.test_mnemonic, self.auth.wallet_mnemonic) + + def test_network_property(self): + """Test that network is stored correctly.""" + self.assertEqual(self.test_network, self.auth.network) + + def test_rest_authenticate_passes_through(self): + """Test that REST authenticate returns request unchanged (SDK handles auth).""" + request = RESTRequest( + method=RESTMethod.POST, + url="https://test.url/api/orders", + data='{"symbol": "BTC-PERP"}', + is_auth_required=True, + ) + + authenticated_request = self.async_run_with_timeout(self.auth.rest_authenticate(request)) + + # Should return the same request unchanged + self.assertEqual(request, authenticated_request) + self.assertEqual(request.url, authenticated_request.url) + self.assertEqual(request.data, authenticated_request.data) + + def test_get_headers_returns_empty_dict(self): + """Test that get_headers returns empty dict (SDK manages JWT headers).""" + headers = self.auth.get_headers() + + self.assertEqual({}, headers) + self.assertIsInstance(headers, dict) + + def test_auth_with_staging_network(self): + """Test authentication with STAGING network.""" + staging_auth = BluefinPerpetualAuth( + wallet_mnemonic=self.test_mnemonic, + network="STAGING", + ) + + self.assertEqual("STAGING", staging_auth.network) + self.assertEqual(self.test_mnemonic, staging_auth.wallet_mnemonic) diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_derivative.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_derivative.py new file mode 100644 index 00000000000..5890ed45c69 --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_derivative.py @@ -0,0 +1,664 @@ +import asyncio +import json +import logging +import re +from decimal import Decimal +from typing import Callable, List, Optional +from unittest.mock import AsyncMock, MagicMock, patch + +import pandas as pd +from aioresponses import aioresponses + +import hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_constants as CONSTANTS +import hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_web_utils as web_utils +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative import BluefinPerpetualDerivative +from hummingbot.connector.test_support.perpetual_derivative_test import AbstractPerpetualDerivativeTests +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.connector.utils import combine_to_hb_trading_pair +from hummingbot.core.data_type.common import OrderType, PositionMode, TradeType +from hummingbot.core.data_type.in_flight_order import InFlightOrder +from hummingbot.core.data_type.trade_fee import AddedToCostTradeFee, TokenAmount, TradeFeeBase + + +# Valid BIP-39 test mnemonic (standard test mnemonic) +TEST_MNEMONIC = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" + + +class BluefinPerpetualDerivativeTests(AbstractPerpetualDerivativeTests.PerpetualDerivativeTests): + """Comprehensive test suite for Bluefin Perpetual connector using AbstractPerpetualDerivativeTests.""" + + _logger = logging.getLogger(__name__) + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.api_mnemonic = TEST_MNEMONIC + cls.base_asset = "BTC" + cls.quote_asset = "USD" + cls.trading_pair = combine_to_hb_trading_pair(cls.base_asset, cls.quote_asset) + + # ============================================================ + # AbstractExchangeConnectorTests Required Properties + # ============================================================ + + @property + def all_symbols_url(self): + """URL for fetching all trading pairs.""" + # Bluefin SDK handles this internally - return mock URL + url = web_utils.get_rest_url_for_endpoint("/markets") + return re.compile(f"^{url}".replace(".", r"\.").replace("?", r"\?") + ".*") + + @property + def latest_prices_url(self): + """URL for fetching latest prices.""" + url = web_utils.get_rest_url_for_endpoint("/marketData") + return re.compile(f"^{url}".replace(".", r"\.").replace("?", r"\?") + ".*") + + @property + def network_status_url(self): + """URL for checking network status.""" + url = web_utils.get_rest_url_for_endpoint("/status") + return re.compile(f"^{url}".replace(".", r"\.").replace("?", r"\?") + ".*") + + @property + def trading_rules_url(self): + """URL for fetching trading rules.""" + return self.all_symbols_url + + @property + def order_creation_url(self): + """URL for creating orders.""" + url = web_utils.get_rest_url_for_endpoint("/orders") + return re.compile(f"^{url}".replace(".", r"\.").replace("?", r"\?") + ".*") + + @property + def balance_url(self): + """URL for fetching account balance.""" + url = web_utils.get_rest_url_for_endpoint("/account") + return url + + @property + def funding_info_url(self): + """URL for fetching funding rate info.""" + url = web_utils.get_rest_url_for_endpoint("/fundingInfo") + return re.compile(f"^{url}".replace(".", r"\.").replace("?", r"\?") + ".*") + + @property + def funding_payment_url(self): + """URL for fetching funding payment history.""" + # Bluefin doesn't have separate funding payment endpoint + return None + + # ============================================================ + # Mock Response Properties + # ============================================================ + + @property + def all_symbols_request_mock_response(self): + """Mock response for all trading pairs.""" + return { + "markets": [ + { + "symbol": "BTC-PERP", + "baseAsset": "BTC", + "quoteAsset": "USD", + "tickSize": "0.1", + "stepSize": "0.001", + "minOrderSize": "0.001", + "maxLeverage": 50, + }, + { + "symbol": "ETH-PERP", + "baseAsset": "ETH", + "quoteAsset": "USD", + "tickSize": "0.01", + "stepSize": "0.01", + "minOrderSize": "0.01", + "maxLeverage": 50, + }, + ] + } + + @property + def latest_prices_request_mock_response(self): + """Mock response for latest prices.""" + return { + "marketData": [ + { + "symbol": "BTC-PERP", + "markPrice": str(self.expected_latest_price), + "indexPrice": "36700.0", + "lastPrice": str(self.expected_latest_price), + }, + { + "symbol": "ETH-PERP", + "markPrice": "1920.0", + "indexPrice": "1918.0", + "lastPrice": "1920.0", + }, + ] + } + + @property + def all_symbols_including_invalid_pair_mock_response(self): + """Mock response including an invalid pair.""" + return "INVALID-PAIR", self.all_symbols_request_mock_response + + @property + def network_status_request_successful_mock_response(self): + """Mock response for successful network status check.""" + return {"status": "ok", "timestamp": 1640780000000} + + @property + def trading_rules_request_mock_response(self): + """Mock response for trading rules.""" + return self.all_symbols_request_mock_response + + @property + def trading_rules_request_erroneous_mock_response(self): + """Mock response with erroneous trading rules (missing stepSize).""" + return { + "markets": [ + { + "symbol": "BTC-PERP", + "baseAsset": "BTC", + "quote Asset": "USD", + "tickSize": "0.1", + # Missing stepSize - will cause parsing error + "minOrderSize": "0.001", + }, + ] + } + + @property + def order_creation_request_successful_mock_response(self): + """Mock response for successful order creation.""" + return { + "success": True, + "orderId": self.expected_exchange_order_id, + "orderHash": "0xabc123def456", + "status": "PENDING", + } + + @property + def balance_request_mock_response_for_base_and_quote(self): + """Mock response for account balance.""" + return { + "address": "0x123", + "freeCollateral": "10000.0", + "totalAccountValue": "12000.0", + "totalNotionalPositionSize": "2000.0", + "availableMargin": "10000.0", + "positions": [], + } + + @property + def balance_request_mock_response_only_base(self): + """Mock response for balance with only base asset.""" + return self.balance_request_mock_response_for_base_and_quote + + @property + def expected_latest_price(self): + """Expected latest price for testing.""" + return 9999.9 + + @property + def expected_supported_order_types(self): + """Supported order types for Bluefin.""" + return [OrderType.LIMIT, OrderType.MARKET, OrderType.LIMIT_MAKER] + + @property + def expected_supported_position_modes(self) -> List[PositionMode]: + """Supported position modes for Bluefin.""" + return [PositionMode.ONEWAY] + + @property + def expected_trading_rule(self): + """Expected trading rule.""" + return TradingRule( + trading_pair=self.trading_pair, + min_base_amount_increment=Decimal("0.001"), + min_price_increment=Decimal("0.1"), + min_order_size=Decimal("0.001"), + min_notional_size=Decimal("10"), + buy_order_collateral_token=self.quote_asset, + sell_order_collateral_token=self.quote_asset, + ) + + @property + def expected_logged_error_for_erroneous_trading_rule(self): + """Expected error message for erroneous trading rule.""" + return f"Error parsing the trading pair rule" + + @property + def expected_exchange_order_id(self): + """Expected exchange order ID.""" + return "bluefin_order_123456" + + @property + def is_order_fill_http_update_included_in_status_update(self) -> bool: + """Whether order fill updates are included in status updates.""" + return True + + @property + def is_order_fill_http_update_executed_during_websocket_order_event_processing(self) -> bool: + """Whether HTTP fill updates happen during websocket processing.""" + return False + + @property + def expected_partial_fill_price(self) -> Decimal: + """Expected price for partial fill.""" + return Decimal("10000") + + @property + def expected_partial_fill_amount(self) -> Decimal: + """Expected amount for partial fill.""" + return Decimal("0.5") + + @property + def expected_fill_fee(self) -> TradeFeeBase: + """Expected fill fee.""" + return AddedToCostTradeFee( + percent_token=self.quote_asset, + flat_fees=[TokenAmount(token=self.quote_asset, amount=Decimal("5.0"))], + ) + + @property + def expected_fill_trade_id(self) -> str: + """Expected fill trade ID.""" + return "bluefin_trade_789" + + # ============================================================ + # Funding Rate Properties + # ============================================================ + + @property + def funding_info_mock_response(self): + """Mock response for funding info.""" + return { + "symbol": "BTC-PERP", + "markPrice": str(self.target_funding_info_mark_price), + "indexPrice": str(self.target_funding_info_index_price), + "fundingRate": str(self.target_funding_info_rate), + "nextFundingTime": self.target_funding_info_next_funding_utc_timestamp * 1000, + } + + @property + def empty_funding_payment_mock_response(self): + """Mock response for empty funding payment history.""" + return {"payments": []} + + @property + def funding_payment_mock_response(self): + """Mock response for funding payment history.""" + # Bluefin SDK handles funding internally + return {"payments": []} + + # ============================================================ + # Connector Creation and Configuration + # ============================================================ + + def exchange_symbol_for_tokens(self, base_token: str, quote_token: str) -> str: + """Convert base/quote tokens to exchange symbol.""" + return f"{base_token}-PERP" + + def create_exchange_instance(self): + """Create Bluefin exchange instance for testing.""" + with patch("hummingbot.connector.derivative.bluefin_perpetual.data_sources.bluefin_data_source.BluefinDataSource"): + exchange = BluefinPerpetualDerivative( + bluefin_perpetual_wallet_mnemonic=self.api_mnemonic, + bluefin_perpetual_network="MAINNET", + trading_pairs=[self.trading_pair], + trading_required=False, + ) + return exchange + + def validate_auth_credentials_present(self, request_call): + """Validate that auth credentials are present in request.""" + # Bluefin SDK handles authentication internally + pass + + def validate_order_creation_request(self, order: InFlightOrder, request_call): + """Validate order creation request.""" + # Bluefin SDK handles order creation internally + pass + + def validate_order_cancelation_request(self, order: InFlightOrder, request_call): + """Validate order cancellation request.""" + pass + + def validate_order_status_request(self, order: InFlightOrder, request_call): + """Validate order status request.""" + pass + + def validate_trades_request(self, order: InFlightOrder, request_call): + """Validate trades request.""" + pass + + def configure_successful_cancelation_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure successful cancellation response.""" + # Bluefin SDK handles cancellation + return "" + + def configure_erroneous_cancelation_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure erroneous cancellation response.""" + return "" + + def configure_order_not_found_error_cancelation_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure order not found error for cancellation.""" + return "" + + def configure_order_not_found_error_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure order not found error for status.""" + return "" + + def configure_completely_filled_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure completely filled order status response.""" + return "" + + def configure_canceled_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure canceled order status response.""" + return "" + + def configure_open_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure open order status response.""" + return "" + + def configure_http_error_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure HTTP error for order status.""" + return "" + + def configure_partially_filled_order_status_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure partially filled order status response.""" + return "" + + def configure_partial_fill_trade_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure partial fill trade response.""" + return "" + + def configure_full_fill_trade_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure full fill trade response.""" + return "" + + def configure_erroneous_http_fill_trade_response( + self, order: InFlightOrder, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ) -> str: + """Configure erroneous fill trade response.""" + return "" + + def configure_successful_set_position_mode( + self, position_mode: PositionMode, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure successful position mode set.""" + pass + + def configure_failed_set_position_mode( + self, position_mode: PositionMode, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure failed position mode set.""" + pass + + def configure_failed_set_leverage( + self, leverage: int, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure failed leverage set.""" + return "", "Unable to set leverage" + + def configure_successful_set_leverage( + self, leverage: int, mock_api: aioresponses, callback: Optional[Callable] = lambda *args, **kwargs: None + ): + """Configure successful leverage set.""" + return "" + + def configure_one_successful_one_erroneous_cancel_all_response( + self, successful_order: InFlightOrder, erroneous_order: InFlightOrder, mock_api: aioresponses + ) -> List[str]: + """Configure mixed success/error cancel all response.""" + return [] + + def order_event_for_new_order_websocket_update(self, order: InFlightOrder): + """WebSocket event for new order.""" + return {} + + def order_event_for_canceled_order_websocket_update(self, order: InFlightOrder): + """WebSocket event for canceled order.""" + return {} + + def order_event_for_full_fill_websocket_update(self, order: InFlightOrder): + """WebSocket event for full fill.""" + return {} + + def trade_event_for_full_fill_websocket_update(self, order: InFlightOrder): + """WebSocket event for full fill trade.""" + return {} + + def position_event_for_full_fill_websocket_update(self, order: InFlightOrder, unrealized_pnl: float): + """WebSocket event for position update.""" + return {} + + @property + def balance_event_websocket_update(self): + """WebSocket event for balance update.""" + return {} + + def funding_info_event_for_websocket_update(self): + """WebSocket event for funding info update.""" + return {} + + def is_cancel_request_executed_synchronously_by_server(self) -> bool: + """Whether cancel requests are executed synchronously.""" + return False + + @property + def latest_trade_hist_timestamp(self) -> int: + """Latest trade history timestamp.""" + return 1640780000 + + # ============================================================ + # Position Mode Tests Override + # ============================================================ + + @aioresponses() + def test_set_position_mode_success(self, mock_api): + """Test setting position mode to ONEWAY (only supported mode).""" + self.exchange.set_position_mode(PositionMode.ONEWAY) + self.async_run_with_timeout(asyncio.sleep(0.1)) + self.assertTrue( + self.is_logged( + log_level="DEBUG", + message=f"Position mode switched to {PositionMode.ONEWAY}.", + ) + ) + + @aioresponses() + def test_set_position_mode_failure(self, mock_api): + """Test setting unsupported position mode.""" + self.exchange.set_position_mode(PositionMode.HEDGE) + self.assertTrue( + self.is_logged( + log_level="ERROR", + message=f"Position mode {PositionMode.HEDGE} is not supported. Mode not set.", + ) + ) + + # ============================================================ + # Bluefin-Specific Custom Tests + # ============================================================ + + # ============================================================ + # Bluefin-Specific Custom Tests + # ============================================================ + + @property + def connector(self): + """Alias for self.exchange for backward compatibility.""" + return self.exchange + + def test_supported_order_types_includes_limit_maker(self): + """Test that LIMIT_MAKER is in supported order types.""" + supported = self.exchange.supported_order_types() + + self.assertIn(OrderType.LIMIT, supported) + self.assertIn(OrderType.MARKET, supported) + self.assertIn(OrderType.LIMIT_MAKER, supported) + + def test_supported_position_modes_oneway_only(self): + """Test that only ONEWAY position mode is supported.""" + supported = self.exchange.supported_position_modes() + + self.assertEqual([PositionMode.ONEWAY], supported) + + def test_get_collateral_token_returns_usdc(self): + """Test that collateral token is USDC for all pairs.""" + buy_collateral = self.exchange.get_buy_collateral_token(self.trading_pair) + sell_collateral = self.exchange.get_sell_collateral_token(self.trading_pair) + + self.assertEqual("USDC", buy_collateral) + self.assertEqual("USDC", sell_collateral) + + @patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.Order") + @patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.BluefinOrderType") + async def test_place_order_limit_maker_sets_post_only(self, mock_order_type, mock_order): + """Test that LIMIT_MAKER orders set post_only flag.""" + # Setup mocks + mock_order_type.LIMIT = "LIMIT" + mock_order_instance = MagicMock() + mock_order.return_value = mock_order_instance + + # Mock data source + self.exchange._data_source = MagicMock() + self.exchange._data_source.to_e9 = lambda x: int(Decimal(str(x)) * Decimal("1e9")) + self.exchange._data_source.place_order = AsyncMock( + return_value=MagicMock(order_hash="test_hash_123") + ) + + # Mock quantize methods + self.exchange.quantize_order_price = MagicMock(return_value=Decimal("50000")) + self.exchange.quantize_order_amount = MagicMock(return_value=Decimal("1")) + + # Place LIMIT_MAKER order + order_id = "test_order_123" + trading_pair = self.trading_pair + amount = Decimal("1") + price = Decimal("50000") + + result = await self.exchange._place_order( + order_id=order_id, + trading_pair=trading_pair, + amount=amount, + trade_type=TradeType.BUY, + order_type=OrderType.LIMIT_MAKER, + price=price, + ) + + # Verify order was created with post_only=True + mock_order.assert_called_once() + call_kwargs = mock_order.call_args.kwargs + self.assertTrue(call_kwargs.get("post_only"), "LIMIT_MAKER order should have post_only=True") + + # Verify result + self.assertEqual("test_hash_123", result[0]) + + @patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.Order") + @patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.BluefinOrderType") + async def test_place_order_limit_does_not_set_post_only(self, mock_order_type, mock_order): + """Test that regular LIMIT orders do not set post_only flag.""" + # Setup mocks + mock_order_type.LIMIT = "LIMIT" + mock_order_instance = MagicMock() + mock_order.return_value = mock_order_instance + + # Mock data source + self.exchange._data_source = MagicMock() + self.exchange._data_source.to_e9 = lambda x: int(Decimal(str(x)) * Decimal("1e9")) + self.exchange._data_source.place_order = AsyncMock( + return_value=MagicMock(order_hash="test_hash_456") + ) + + # Mock quantize methods + self.exchange.quantize_order_price = MagicMock(return_value=Decimal("50000")) + self.exchange.quantize_order_amount = MagicMock(return_value=Decimal("1")) + + # Place regular LIMIT order + order_id = "test_order_456" + trading_pair = self.trading_pair + amount = Decimal("1") + price = Decimal("50000") + + result = await self.exchange._place_order( + order_id=order_id, + trading_pair=trading_pair, + amount=amount, + trade_type=TradeType.BUY, + order_type=OrderType.LIMIT, + price=price, + ) + + # Verify order was created with post_only=False + mock_order.assert_called_once() + call_kwargs = mock_order.call_args.kwargs + self.assertFalse(call_kwargs.get("post_only"), "Regular LIMIT order should have post_only=False") + + # Verify result + self.assertEqual("test_hash_456", result[0]) + + @patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.asyncio.sleep", new_callable=AsyncMock) + async def test_place_order_retries_on_failure(self, mock_sleep): + """Test that place_order retries on transient failures.""" + # Mock data source that fails twice then succeeds + self.exchange._data_source = MagicMock() + self.exchange._data_source.to_e9 = lambda x: int(Decimal(str(x)) * Decimal("1e9")) + self.exchange._data_source.place_order = AsyncMock( + side_effect=[ + Exception("Network error"), + Exception("Temporary failure"), + MagicMock(order_hash="success_hash"), + ] + ) + + # Mock quantize methods + self.exchange.quantize_order_price = MagicMock(return_value=Decimal("50000")) + self.exchange.quantize_order_amount = MagicMock(return_value=Decimal("1")) + + # Place order + with patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.Order"): + with patch("hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_derivative.BluefinOrderType"): + result = await self.exchange._place_order( + order_id="test_retry", + trading_pair=self.trading_pair, + amount=Decimal("1"), + trade_type=TradeType.BUY, + order_type=OrderType.LIMIT, + price=Decimal("50000"), + ) + + # Verify it retried and eventually succeeded + self.assertEqual(3, self.exchange._data_source.place_order.await_count) + self.assertEqual("success_hash", result[0]) + + # Verify sleep was called for retries (2 retries = 2 sleeps) + self.assertEqual(2, mock_sleep.await_count) + + diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_user_stream_data_source.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_user_stream_data_source.py new file mode 100644 index 00000000000..02a741c7119 --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_user_stream_data_source.py @@ -0,0 +1,47 @@ +import asyncio +from test.isolated_asyncio_wrapper_test_case import IsolatedAsyncioWrapperTestCase +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_user_stream_data_source import ( + BluefinPerpetualUserStreamDataSource, +) + + +class BluefinPerpetualUserStreamDataSourceTests(IsolatedAsyncioWrapperTestCase): + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + cls.trading_pair = "BTC-USD" + + def setUp(self) -> None: + super().setUp() + + self.connector = MagicMock() + factory = MagicMock() + factory.get_ws_assistant = AsyncMock(return_value=MagicMock()) + setattr(self.connector, "_web_assistants_factory", factory) + + self.data_source = MagicMock() + self.data_source.get_account_order_event = AsyncMock(return_value={"event": "order"}) + self.data_source.get_account_trade_event = AsyncMock(side_effect=asyncio.CancelledError) + self.data_source.get_account_position_event = AsyncMock(side_effect=asyncio.CancelledError) + self.data_source.get_account_balance_event = AsyncMock(side_effect=asyncio.CancelledError) + + self.user_stream_source = BluefinPerpetualUserStreamDataSource( + trading_pairs=[self.trading_pair], + connector=self.connector, + data_source=self.data_source, + ) + + async def test_listen_for_user_stream_forwards_first_event(self): + output: asyncio.Queue[Any] = asyncio.Queue() + + task = self.local_event_loop.create_task(self.user_stream_source.listen_for_user_stream(output=output)) + + event = await asyncio.wait_for(output.get(), timeout=1) + self.assertEqual({"event": "order"}, event) + + task.cancel() + with self.assertRaises(asyncio.CancelledError): + await task diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_utils.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_utils.py new file mode 100644 index 00000000000..c287967658c --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_utils.py @@ -0,0 +1,78 @@ +""" +Tests for Bluefin Perpetual utility functions. + +Tests configuration validation, fee structures, and utility helpers. +""" +import unittest +from decimal import Decimal + +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_utils import ( + CENTRALIZED, + DEFAULT_FEES, + EXAMPLE_PAIR, + BluefinPerpetualConfigMap, +) + + +class BluefinPerpetualUtilsTests(unittest.TestCase): + """Test suite for Bluefin Perpetual utility functions.""" + + def test_centralized_constant(self): + """Test that CENTRALIZED is set to False (Bluefin is decentralized).""" + self.assertFalse(CENTRALIZED) + + def test_example_pair(self): + """Test that EXAMPLE_PAIR is properly set.""" + self.assertEqual("BTC-USD", EXAMPLE_PAIR) + + def test_default_fees_structure(self): + """Test that DEFAULT_FEES are properly configured.""" + # Verify fee schema exists + self.assertIsNotNone(DEFAULT_FEES) + + # Verify maker fee (0.01%) + maker_fee = DEFAULT_FEES.maker_percent_fee_decimal + self.assertEqual(Decimal("0.0001"), maker_fee) + + # Verify taker fee (0.05%) + taker_fee = DEFAULT_FEES.taker_percent_fee_decimal + self.assertEqual(Decimal("0.0005"), taker_fee) + + # Verify buy percent fee deducted from returns + self.assertTrue(DEFAULT_FEES.buy_percent_fee_deducted_from_returns) + + def test_config_map_has_required_fields(self): + """Test that BluefinPerpetualConfigMap has all required fields.""" + # Should be able to create config with all required fields + config = BluefinPerpetualConfigMap.model_construct( + bluefin_perpetual_wallet_mnemonic="test " * 23 + "word", + bluefin_perpetual_network="MAINNET", + ) + + self.assertEqual("bluefin_perpetual", config.connector) + self.assertIsNotNone(config.bluefin_perpetual_wallet_mnemonic) + self.assertEqual("MAINNET", config.bluefin_perpetual_network) + + def test_config_map_network_options(self): + """Test that network field accepts both MAINNET and STAGING.""" + # Test MAINNET + config_mainnet = BluefinPerpetualConfigMap.model_construct( + bluefin_perpetual_wallet_mnemonic="test " * 23 + "word", + bluefin_perpetual_network="MAINNET", + ) + self.assertEqual("MAINNET", config_mainnet.bluefin_perpetual_network) + + # Test STAGING + config_staging = BluefinPerpetualConfigMap.model_construct( + bluefin_perpetual_wallet_mnemonic="test " * 23 + "word", + bluefin_perpetual_network="STAGING", + ) + self.assertEqual("STAGING", config_staging.bluefin_perpetual_network) + + def test_config_map_title(self): + """Test that config map has proper title.""" + config = BluefinPerpetualConfigMap.model_construct( + bluefin_perpetual_wallet_mnemonic="test " * 23 + "word", + bluefin_perpetual_network="MAINNET", + ) + self.assertEqual("bluefin_perpetual", config.model_config.get("title")) diff --git a/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_web_utils.py b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_web_utils.py new file mode 100644 index 00000000000..9f72da53d3d --- /dev/null +++ b/test/hummingbot/connector/derivative/bluefin_perpetual/test_bluefin_perpetual_web_utils.py @@ -0,0 +1,127 @@ +""" +Tests for Bluefin Perpetual web utilities. + +Tests URL construction, throttler creation, and web assistant factory. +""" +import unittest +from unittest.mock import MagicMock + +from hummingbot.connector.derivative.bluefin_perpetual import bluefin_perpetual_constants as CONSTANTS +from hummingbot.connector.derivative.bluefin_perpetual.bluefin_perpetual_web_utils import ( + build_api_factory, + create_throttler, + get_rest_url_for_endpoint, + get_ws_url, +) +from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory + + +class BluefinPerpetualWebUtilsTests(unittest.TestCase): + """Test suite for Bluefin Perpetual web utilities.""" + + def setUp(self) -> None: + """Set up test fixtures.""" + super().setUp() + + def test_get_rest_url_for_endpoint_mainnet(self): + """Test REST URL construction for mainnet.""" + endpoint = "/markets" + url = get_rest_url_for_endpoint(endpoint, domain=CONSTANTS.DOMAIN) + + # Should contain mainnet environment name + self.assertIn("sui-prod", url) + self.assertIn(endpoint, url) + self.assertTrue(url.startswith("https://")) + + def test_get_rest_url_for_endpoint_staging(self): + """Test REST URL construction for staging.""" + endpoint = "/orders" + url = get_rest_url_for_endpoint(endpoint, domain=CONSTANTS.STAGING_DOMAIN) + + # Should contain staging environment name + self.assertIn("sui-staging", url) + self.assertIn(endpoint, url) + self.assertTrue(url.startswith("https://")) + + def test_get_ws_url_mainnet_market(self): + """Test WebSocket URL construction for mainnet market stream.""" + url = get_ws_url(domain=CONSTANTS.DOMAIN, stream_type="market") + + # Should contain mainnet environment and market stream + self.assertIn("sui-prod", url) + self.assertIn("/ws/market", url) + self.assertTrue(url.startswith("wss://")) + + def test_get_ws_url_mainnet_account(self): + """Test WebSocket URL construction for mainnet account stream.""" + url = get_ws_url(domain=CONSTANTS.DOMAIN, stream_type="account") + + # Should contain mainnet environment and account stream + self.assertIn("sui-prod", url) + self.assertIn("/ws/account", url) + self.assertTrue(url.startswith("wss://")) + + def test_get_ws_url_staging(self): + """Test WebSocket URL construction for staging.""" + url = get_ws_url(domain=CONSTANTS.STAGING_DOMAIN, stream_type="market") + + # Should contain staging environment + self.assertIn("sui-staging", url) + self.assertTrue(url.startswith("wss://")) + + def test_create_throttler(self): + """Test throttler creation.""" + throttler = create_throttler() + + # Should return valid throttler + self.assertIsNotNone(throttler) + # Should have rate limits configured + self.assertIsNotNone(throttler._rate_limits) + + def test_build_api_factory_without_auth(self): + """Test API factory creation without authentication.""" + factory = build_api_factory() + + # Should return WebAssistantsFactory + self.assertIsInstance(factory, WebAssistantsFactory) + self.assertIsNotNone(factory._throttler) + + def test_build_api_factory_with_custom_throttler(self): + """Test API factory creation with custom throttler.""" + custom_throttler = create_throttler() + factory = build_api_factory(throttler=custom_throttler) + + # Should use provided throttler + self.assertIs(factory._throttler, custom_throttler) + + def test_build_api_factory_with_auth(self): + """Test API factory creation with authentication.""" + mock_auth = MagicMock() + factory = build_api_factory(auth=mock_auth) + + # Should include auth + self.assertIs(factory._auth, mock_auth) + + def test_rest_url_service_variations(self): + """Test REST URL construction with different services.""" + # Test api service (default) + api_url = CONSTANTS.get_rest_url_for_env("sui-prod", service="api") + self.assertIn("api.api.sui-prod", api_url) + + # Test auth service + auth_url = CONSTANTS.get_rest_url_for_env("sui-prod", service="auth") + self.assertIn("auth.api.sui-prod", auth_url) + + # Test trade service + trade_url = CONSTANTS.get_rest_url_for_env("sui-prod", service="trade") + self.assertIn("trade.api.sui-prod", trade_url) + + def test_ws_url_stream_type_variations(self): + """Test WebSocket URL construction with different stream types.""" + # Test market stream + market_url = CONSTANTS.get_ws_url_for_env("sui-prod", stream_type="market") + self.assertIn("/ws/market", market_url) + + # Test account stream + account_url = CONSTANTS.get_ws_url_for_env("sui-prod", stream_type="account") + self.assertIn("/ws/account", account_url)