High-performance, end-to-end encrypted message streaming for building connected services and devices. This repository contains the Python SDKs for both the gRPC and WebSocket transports as well as shared utilities.
Install the production-ready gRPC SDK from PyPI:
pip install ensync-sdkFor the WebSocket alternative:
pip install ensync-sdk-wsBoth packages automatically install ensync-core, which provides shared cryptography helpers, payload utilities, and rich error types.
import asyncio
import os
from dotenv import load_dotenv
from ensync_sdk import EnSyncEngine
load_dotenv()
async def main():
engine = EnSyncEngine() # Defaults to grpcs://node.gms.ensync.cloud
client = await engine.create_client(
os.environ["ENSYNC_APP_KEY"],
{"app_decrypt_key": os.environ.get("ENSYNC_SECRET_KEY")}
)
# Publish a message
await client.publish(
"orders/status",
["destinationAppId"],
{"order_id": "order-123", "status": "completed"},
{"source": "order-service"}
)
# Subscribe with decorator pattern
subscription = client.subscribe("orders/status")
@subscription.handler
async def handle_message(message):
print(f"Order {message['payload']['order_id']} is {message['payload']['status']}")
try:
await asyncio.Future() # Run until interrupted
except KeyboardInterrupt:
await subscription.unsubscribe()
await client.close()
if __name__ == "__main__":
asyncio.run(main())ensync/– Legacy compatibility package transitioning from event to message semantics.ensync-sdk/– Modern message-driven gRPC SDK (recommended for production workloads).ensync_sdk_ws– WebSocket transport mirroring the gRPC API surface for environments without HTTP/2.ensync-core/– Shared cryptography, payload helpers, and error classes consumed by every transport.
from ensync_sdk import EnSyncEngine
engine = EnSyncEngine("grpcs://node.gms.ensync.cloud")
client = await engine.create_client(
"your-app-key",
{"app_decrypt_key": "optional-per-client-secret"}
)Common engine options:
enableLogging(bool) – Emit informative logs to stdout.heartbeatInterval(int) – Interval (milliseconds) between heartbeat pings.reconnectInterval(int) – Initial delay (ms) before automatic reconnect attempts.maxReconnectAttempts(int) – Maximum reconnect tries before giving up.
message_id = await client.publish(
"notifications/email", # Message name
["targetAppId"], # Recipients' app IDs
{"to": "user@example.com", "subject": "Welcome"},
{"priority": "high"} # Optional metadata (not encrypted)
)
print(f"Published message: {message_id}")Notes:
- Payloads accept any JSON-serialisable structure.
- Metadata travels alongside the message unencrypted for routing, filtering, or debugging.
- Multiple recipients automatically enable hybrid encryption to optimise payload distribution.
subscription = client.subscribe(
"notifications/email",
auto_ack=True,
app_decrypt_key="per-subscription-secret"
)
@subscription.handler
async def handle_notification(message):
print(message["payload"]["subject"])Subscription options:
auto_ack(bool, defaultTrue) – Automatically acknowledge after the handler completes successfully.app_decrypt_key(str | None) – Override the decryption key for this subscription.
Incoming messages are delivered as dictionaries containing:
{
"idem": "message-unique-id",
"messageName": "notifications/email",
"block": 1024,
"timestamp": 1732912864123,
"payload": {"subject": "Welcome"},
"sender": "BASE64_APP_ID",
"metadata": {"priority": "high"}
}Both transports expose identical helpers:
# Pause delivery for maintenance
await subscription.pause("maintenance window")
# Resume delivery
await subscription.resume()
# Defer a message for later redelivery
await subscription.defer(message_idem, delay_ms=30_000, reason="retry later")
# Permanently discard a message
await subscription.discard(message_idem, "invalid payload")
# Replay a specific message on demand
replayed = await subscription.replay(message_idem)Pausing affects only the current client_id, allowing other workers subscribed to the same message stream to continue processing uninterrupted.
All transports raise subclasses of EnSyncError from ensync_core.error.
from ensync_sdk import EnSyncEngine
from ensync_core.error import EnSyncError
try:
client = await EnSyncEngine().create_client("app-key")
await client.publish("channel", ["recipient"], {"hello": "world"})
except EnSyncError as err:
print(f"EnSync error: {err}")Frequent error types:
EnSyncConnectionError– Authentication or network issues.EnSyncPublishError– Validation or server-side publish failures.EnSyncSubscriptionError– Problems establishing or maintaining subscriptions.EnSyncGenericError– Fallback for unexpected exceptions.
ensync-sdk/tests/– gRPC publisher/subscriber demonstration scripts.ensync-sdk-ws/tests/– WebSocket publisher/subscriber samples.ensync/tests/– Backwards-compatible examples for the legacy package.
Set the following environment variables before running the examples:
ENSYNC_APP_KEYENSYNC_SECRET_KEY(optional if you do not decrypt payloads)ENSYNC_URL(override the default messaging endpoint)
The WebSocket engine mirrors the gRPC API surface:
from ensync_sdk_ws import EnSyncEngine as WsEngine
ws_engine = WsEngine() # Defaults to wss://node.gms.ensync.cloud
ws_client = await ws_engine.create_client("your-app-key")
await ws_client.publish("alerts/critical", ["monitor"], {"status": "down"})Use the same subscribe, pause, resume, defer, discard, and replay helpers; only the underlying transport differs.
- Secure Keys – Load access and decrypt keys from environment variables or secret managers.
- Message Design
- Ensure message names are pre-registered within EnSync (e.g., via the EnSync UI) before publishing; clients may only publish to existing names
- Use hierarchical message names:
company/service/message-type - Keep payloads focused, minimal, and compliant with any schema registered for that message name (schemas are optional but enforced when present)
- Use metadata for non-sensitive routing information
- Monitor Encryption Latency – Inspect
client.encryption_durationsto profile crypto overhead. - Handle Backpressure – Pause subscriptions or defer messages when downstream systems are congested.
- Documentation – https://docs.ensync.cloud
- Python SDK on PyPI – https://pypi.org/project/ensync-sdk
- Support – Contact your EnSync solutions engineer or open an issue in this repository.