Skip to content

TjTheDj2011/async-ws-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

async-ws-streams

Production WebSocket streaming toolkit for Python. Auto-reconnect, OHLC buffering, and intelligent rate limiting.


Features

  • Auto-reconnect with exponential backoff -- error-type-aware backoff (handshake timeout vs. network error vs. clean disconnect) with decorrelated jitter to prevent thundering-herd effects.
  • DNS pre-resolution -- shaves 200-500ms off WebSocket handshakes by resolving DNS before the connection attempt.
  • Dedicated thread isolation -- the WebSocket runs on its own asyncio event loop in a daemon thread, so heavy processing on the main loop cannot block handshakes, keepalives, or message processing.
  • Thread-safe OHLC ring buffer -- aggregate trade ticks into candles, ingest native exchange OHLC feeds, bootstrap from REST, and export to pandas DataFrames.
  • Multi-timeframe synthesis -- automatically build M15, H1, H4, and D1 candles from lower-timeframe data.
  • Priority-based API rate limiting -- block low-priority calls first when quota runs tight; exponential backoff on rate-limit errors; built-in response cache.

Installation

pip install async-ws-streams

Or install from source:

git clone https://github.com/TjTheDj2011/async-ws-streams.git
cd async-ws-streams
pip install -e .

Quick Start

WebSocketManager

Subclass WebSocketManager and implement the four abstract methods for your exchange or data provider:

import asyncio
from async_ws_streams import WebSocketManager, StreamConfig, PriceTick

class MyExchangeStream(WebSocketManager):
    async def _on_connect(self):
        # Send authentication or setup messages
        await self.ws.send('{"op": "auth", "key": "..."}')

    async def _send_subscribe(self, symbols):
        import json
        await self.ws.send(json.dumps({"op": "subscribe", "symbols": symbols}))

    async def _send_unsubscribe(self, symbols):
        import json
        await self.ws.send(json.dumps({"op": "unsubscribe", "symbols": symbols}))

    async def _parse_message(self, data) -> PriceTick | None:
        if "price" not in data:
            return None
        from datetime import datetime, timezone
        return PriceTick(
            symbol=data["symbol"],
            price=float(data["price"]),
            volume=float(data.get("volume", 0)),
            timestamp=datetime.now(timezone.utc),
            exchange="my_exchange",
        )

async def main():
    config = StreamConfig(
        url="wss://stream.example.com/v2",
        name="my-exchange",
        reconnect_attempts=-1,  # Never give up
    )
    stream = MyExchangeStream(config)
    stream.on_tick(lambda tick: print(f"{tick.symbol}: {tick.price}"))

    await stream.start()
    await stream.subscribe(["BTC/USD", "ETH/USD"])

    # Stream runs in background thread -- do other work here
    await asyncio.sleep(3600)
    await stream.stop()

asyncio.run(main())

OHLCBuffer

Build and query OHLC candles from trade ticks or exchange feeds:

from datetime import datetime, timezone
from async_ws_streams import OHLCBuffer, OHLCCacheManager

# Single-symbol buffer
buf = OHLCBuffer("AAPL", interval_minutes=5, max_candles=500)
buf.update_from_trade(price=150.25, volume=100, timestamp=datetime.now(timezone.utc))
df = buf.to_dataframe(count=200)

# Multi-symbol manager with optional symbol normalisation
symbol_map = {
    "BTC": ["BTC/USD", "BTCUSD", "XXBTZUSD"],
    "ETH": ["ETH/USD", "ETHUSD"],
}
mgr = OHLCCacheManager(symbol_map=symbol_map, max_candles=500)

# All these resolve to the same buffer:
mgr.update_from_trade("BTC/USD", 50000.0, 0.5, datetime.now(timezone.utc))
mgr.update_from_trade("XXBTZUSD", 50001.0, 0.3, datetime.now(timezone.utc))

# Synthesise higher timeframes
mgr.aggregate_higher_timeframes("BTC")
candles = mgr.get_multi_timeframe_candles("BTC")  # {5: [...], 15: [...], ...}

APIRateLimiter

Protect your API calls with priority-based throttling:

import asyncio
from async_ws_streams import APIRateLimiter, RateLimitConfig, APIPriority, create_rate_limiter

# Create a named limiter (singleton per name)
limiter = create_rate_limiter("my-api", RateLimitConfig(
    max_calls_per_minute=30,
    max_calls_per_hour=500,
    initial_backoff_seconds=5.0,
))

async def fetch_data(symbol: str) -> dict:
    # Your actual API call here
    ...

async def main():
    # High-priority call -- only blocked at very high quota usage
    result = await limiter.execute(
        fetch_data, "BTC/USD",
        priority=APIPriority.HIGH,
        cache_key="btc-data",
        cache_ttl=30,
    )

    # Low-priority call -- blocked early when quota is tight
    try:
        bg_result = await limiter.execute(
            fetch_data, "DOGE/USD",
            priority=APIPriority.LOW,
        )
    except Exception:
        pass  # Quota too high, try later

    print(limiter.get_stats())

asyncio.run(main())

API Reference

manager module

Class / Function Description
StreamState Enum of connection lifecycle states
StreamConfig Configuration dataclass for a WebSocket stream
PriceTick Standardised price tick dataclass
StreamStats Runtime statistics and health metrics
WebSocketManager Abstract base class -- subclass for your API
get_websocket_manager(name) Retrieve a registered manager by name
register_websocket_manager Register a manager in the global registry

ohlc_buffer module

Class / Function Description
OHLCCandle Single OHLC candle dataclass
OHLCBuffer Thread-safe ring buffer for one symbol
OHLCCacheManager Multi-symbol, multi-timeframe buffer manager
get_ohlc_cache_manager() Get/create the global singleton manager

rate_limiter module

Class / Function Description
APIPriority Priority levels (CRITICAL, HIGH, MEDIUM, LOW)
RateLimitConfig Rate limiter configuration dataclass
APIRateLimiter Rate limiter with backoff, caching, priorities
create_rate_limiter(name) Create/retrieve a named limiter instance
get_rate_limiter(name) Retrieve a previously created limiter

Requirements

  • Python >= 3.10
  • websockets >= 12.0
  • pandas >= 2.0
  • numpy >= 1.24

License

MIT -- see LICENSE for details.

Copyright 2026 Tiger Intelligence Systems.

About

Production WebSocket streaming toolkit for Python. Auto-reconnect, OHLC buffering, and intelligent rate limiting.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages