Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions proto/tino/data/v1/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ service DataService {
rpc IngestData(IngestDataRequest) returns (stream IngestDataResponse);
rpc ListCatalog(ListCatalogRequest) returns (ListCatalogResponse);
rpc DeleteCatalog(DeleteCatalogRequest) returns (DeleteCatalogResponse);
rpc GetMarketQuote(GetMarketQuoteRequest) returns (GetMarketQuoteResponse);
rpc GetMarketKlines(GetMarketKlinesRequest) returns (GetMarketKlinesResponse);
rpc GetMarketOverview(GetMarketOverviewRequest) returns (GetMarketOverviewResponse);
rpc ListSupportedExchanges(ListSupportedExchangesRequest) returns (ListSupportedExchangesResponse);
}

message IngestDataRequest {
Expand Down Expand Up @@ -48,3 +52,60 @@ message DeleteCatalogRequest {
message DeleteCatalogResponse {
bool success = 1;
}

message GetMarketQuoteRequest {
string exchange = 1;
string symbol = 2;
}

message MarketQuote {
string exchange = 1;
string symbol = 2;
double last_price = 3;
double bid_price = 4;
double ask_price = 5;
double volume_24h = 6;
double high_24h = 7;
double low_24h = 8;
string timestamp = 9;
}

message GetMarketQuoteResponse {
MarketQuote quote = 1;
}

message GetMarketKlinesRequest {
string exchange = 1;
string symbol = 2;
string interval = 3;
int32 limit = 4;
}

message MarketKline {
int64 open_time = 1;
double open = 2;
double high = 3;
double low = 4;
double close = 5;
double volume = 6;
int64 close_time = 7;
}

message GetMarketKlinesResponse {
repeated MarketKline klines = 1;
}

message GetMarketOverviewRequest {
string exchange = 1;
repeated string symbols = 2;
}

message GetMarketOverviewResponse {
repeated MarketQuote quotes = 1;
}

message ListSupportedExchangesRequest {}

message ListSupportedExchangesResponse {
repeated string exchanges = 1;
}
192 changes: 192 additions & 0 deletions python/tests/test_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import tempfile
from pathlib import Path
from typing import AsyncGenerator
from unittest.mock import AsyncMock, patch

import grpc
import pytest
import pytest_asyncio

from tino_daemon.exchanges.base_connector import Kline, Ticker
from tino_daemon.nautilus.catalog import DataCatalogWrapper
from tino_daemon.proto.tino.data.v1 import data_pb2, data_pb2_grpc
from tino_daemon.services.data import DataServiceServicer
Expand Down Expand Up @@ -54,6 +56,20 @@ async def data_server(
BAR_TYPE = "AAPL.XNAS-1-DAY-LAST-EXTERNAL"


class FakeContext:
"""Minimal gRPC context mock."""

def __init__(self) -> None:
self._code = grpc.StatusCode.OK
self._details = ""

def set_code(self, code: grpc.StatusCode) -> None:
self._code = code

def set_details(self, details: str) -> None:
self._details = details


@pytest.mark.asyncio
async def test_list_catalog_empty(data_server):
server, port = data_server
Expand Down Expand Up @@ -230,3 +246,179 @@ async def test_delete_catalog_after_ingest(data_server, sample_csv: Path):
after_resp = await stub.ListCatalog(data_pb2.ListCatalogRequest())
remaining = [e for e in after_resp.entries if e.bar_type == BAR_TYPE]
assert len(remaining) == 0


@pytest.mark.asyncio
async def test_get_market_quote_success(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

mock_connector = AsyncMock()
mock_connector.get_ticker = AsyncMock(
return_value=Ticker(
symbol="BTCUSDT",
last_price=50000.0,
bid_price=49999.0,
ask_price=50001.0,
volume_24h=12345.0,
high_24h=51000.0,
low_24h=49000.0,
timestamp="1700000000000",
)
)

with patch(
"tino_daemon.services.data.get_connector",
return_value=mock_connector,
):
resp = await servicer.GetMarketQuote(
data_pb2.GetMarketQuoteRequest(exchange="binance", symbol="BTCUSDT"),
ctx,
)

assert resp.quote.symbol == "BTCUSDT"
assert resp.quote.last_price == 50000.0
assert ctx._code == grpc.StatusCode.OK


@pytest.mark.asyncio
async def test_get_market_quote_invalid_exchange(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

with patch(
"tino_daemon.services.data.get_connector",
side_effect=ValueError("Unsupported exchange: unknown"),
):
resp = await servicer.GetMarketQuote(
data_pb2.GetMarketQuoteRequest(exchange="unknown", symbol="BTCUSDT"),
ctx,
)

assert resp.quote.symbol == ""
assert ctx._code == grpc.StatusCode.INVALID_ARGUMENT


@pytest.mark.asyncio
async def test_get_market_klines_success(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

mock_connector = AsyncMock()
mock_connector.get_klines = AsyncMock(
return_value=[
Kline(
open_time=1700000000000,
open=50000.0,
high=51000.0,
low=49000.0,
close=50500.0,
volume=100.0,
close_time=1700003600000,
),
]
)

with patch(
"tino_daemon.services.data.get_connector",
return_value=mock_connector,
):
resp = await servicer.GetMarketKlines(
data_pb2.GetMarketKlinesRequest(
exchange="binance",
symbol="BTCUSDT",
interval="1h",
limit=1,
),
ctx,
)

assert len(resp.klines) == 1
assert resp.klines[0].close == 50500.0
assert ctx._code == grpc.StatusCode.OK


@pytest.mark.asyncio
async def test_get_market_klines_invalid_limit(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

resp = await servicer.GetMarketKlines(
data_pb2.GetMarketKlinesRequest(
exchange="binance",
symbol="BTCUSDT",
interval="1h",
limit=-1,
),
ctx,
)

assert len(resp.klines) == 0
assert ctx._code == grpc.StatusCode.INVALID_ARGUMENT


@pytest.mark.asyncio
async def test_get_market_overview_success(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

mock_connector = AsyncMock()
mock_connector.get_ticker = AsyncMock(
side_effect=[
Ticker(
symbol="BTCUSDT",
last_price=50000.0,
bid_price=49999.0,
ask_price=50001.0,
volume_24h=12345.0,
high_24h=51000.0,
low_24h=49000.0,
timestamp="1700000000000",
),
Ticker(
symbol="ETHUSDT",
last_price=3000.0,
bid_price=2999.0,
ask_price=3001.0,
volume_24h=54321.0,
high_24h=3200.0,
low_24h=2800.0,
timestamp="1700000000000",
),
]
)

with patch(
"tino_daemon.services.data.get_connector",
return_value=mock_connector,
):
resp = await servicer.GetMarketOverview(
data_pb2.GetMarketOverviewRequest(
exchange="binance",
symbols=["BTCUSDT", "ETHUSDT"],
),
ctx,
)

assert len(resp.quotes) == 2
assert resp.quotes[0].symbol == "BTCUSDT"
assert resp.quotes[1].symbol == "ETHUSDT"
assert ctx._code == grpc.StatusCode.OK


@pytest.mark.asyncio
async def test_list_supported_exchanges_success(catalog: DataCatalogWrapper):
servicer = DataServiceServicer(catalog=catalog)
ctx = FakeContext()

with patch(
"tino_daemon.services.data.list_exchanges",
return_value=["binance", "bybit", "okx"],
):
resp = await servicer.ListSupportedExchanges(
data_pb2.ListSupportedExchangesRequest(),
ctx,
)

assert resp.exchanges == ["binance", "bybit", "okx"]
assert ctx._code == grpc.StatusCode.OK
30 changes: 22 additions & 8 deletions python/tests/test_trading_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from tino_daemon.services.trading import TradingServiceServicer


async def collect_events(stream) -> list[Any]:
"""Drain a streaming RPC into a list with explicit call-site timeout."""
return [event async for event in stream]


class FakeTradingNode:
def __init__(self) -> None:
self.stop_calls: list[bool] = []
Expand Down Expand Up @@ -112,18 +117,22 @@ async def test_start_trading_streams_events(trading_server):
stub = trading_pb2_grpc.TradingServiceStub(channel)
request = trading_pb2.StartTradingRequest(
strategy_path="strategies/demo.py",
mode="live",
venue="BINANCE",
instruments=["BTCUSDT.BINANCE"],
config_json="{}",
)

events = [event async for event in stub.StartTrading(request)]
events = await asyncio.wait_for(
collect_events(stub.StartTrading(request)),
timeout=5.0,
)
types = [event.type for event in events]

assert trading_pb2.StartTradingResponse.EVENT_TYPE_STARTED in types
assert trading_pb2.StartTradingResponse.EVENT_TYPE_ORDER_FILLED in types
assert trading_pb2.StartTradingResponse.EVENT_TYPE_STOPPED in types
assert node.started_mode == "paper"
assert node.started_mode == "live"


@pytest.mark.asyncio
Expand Down Expand Up @@ -212,12 +221,17 @@ async def start_trading(self, **kwargs) -> None:
try:
async with grpc.aio.insecure_channel(f"localhost:{port}") as channel:
stub = trading_pb2_grpc.TradingServiceStub(channel)
events = [
event
async for event in stub.StartTrading(
trading_pb2.StartTradingRequest(strategy_path="strategies/demo.py")
)
]
events = await asyncio.wait_for(
collect_events(
stub.StartTrading(
trading_pb2.StartTradingRequest(
strategy_path="strategies/demo.py",
mode="live",
)
)
),
timeout=5.0,
)
error_events = [
event
for event in events
Expand Down
26 changes: 23 additions & 3 deletions python/tino_daemon/proto/tino/data/v1/data_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading