Skip to content

Commit df15d8a

Browse files
committed
[AIT-96] feat: RealtimeChannel publish over WebSocket implementation
Implemented Spec points: ## Message Publishing Specifications (RTL6) ### RTL6c - Messages published on channels in specific states - Messages published when channel is not **ATTACHED** should be published immediately ### RTL6c2 - Message queuing behavior - Messages can be queued when connection/channel is not ready - Relates to processing queued messages when connection becomes ready ### RTL6c3 - Publishing without implicit attach ### RTL6c4 - Behavior when queueMessages client option is false ### RTL6d - Message bundling restrictions #### RTL6d1: Maximum message size limits for bundling - **RTL6d2**: All messages in bundle must have same clientId #### RTL6d3: Can only bundle messages for same channel - **RTL6d4**: Can only bundle messages with same action (MESSAGE or PRESENCE) #### RTL6d7: Cannot bundle idempotent messages with non-idempotent messages --- ## Message Acknowledgment (RTN7) ### RTN7a All **PRESENCE**, **MESSAGE**, **ANNOTATION**, and **OBJECT** ProtocolMessages sent to Ably expect either an **ACK** or **NACK** to confirm successful receipt or failure ### RTN7b Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing `msgSerial` integer starting at zero ### RTN7c If connection enters **SUSPENDED**, **CLOSED**, or **FAILED** state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues ### RTN7d If `queueMessages` is false, messages entering **DISCONNECTED** state without acknowledgment should be treated as failed immediately ### RTN7e When connection state changes to **SUSPENDED**/**CLOSED**/**FAILED**, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed --- ## Message Resending and Serial Handling (RTN19) ### RTN19a Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK ### RTN19a2 In the event of a new `connectionId` (connection not resumed), previous `msgSerials` are meaningless and must be reset. The `msgSerial` counter resets to 0 for the new connection --- ## Channel State and Reattachment (RTL3, RTL4, RTL5) ### RTL3c Channel state implications when connection goes into **SUSPENDED** ### RTL3d When connection enters **CONNECTED** state, channels in **ATTACHING**, **ATTACHED**, or **SUSPENDED** states should transition to **ATTACHING** and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish ### RTL4c - Attach sequence - **RTL4c1**: ATTACH message includes channel serial to resume from previous message or attachment ### RTL5i If channel is **DETACHING**, re-send **DETACH** and remain in 'detaching' state
1 parent 7df692f commit df15d8a

File tree

5 files changed

+1090
-15
lines changed

5 files changed

+1090
-15
lines changed

ably/realtime/connectionmanager.py

Lines changed: 228 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,88 @@
2424
log = logging.getLogger(__name__)
2525

2626

27+
class PendingMessage:
28+
"""Represents a message awaiting acknowledgment from the server"""
29+
30+
def __init__(self, message: dict):
31+
self.message = message
32+
self.future: asyncio.Future | None = None
33+
action = message.get('action')
34+
35+
# Messages that require acknowledgment: MESSAGE, PRESENCE, ANNOTATION, OBJECT
36+
self.ack_required = action in (
37+
ProtocolMessageAction.MESSAGE,
38+
ProtocolMessageAction.PRESENCE,
39+
ProtocolMessageAction.ANNOTATION,
40+
ProtocolMessageAction.OBJECT,
41+
)
42+
43+
if self.ack_required:
44+
self.future = asyncio.Future()
45+
46+
47+
class PendingMessageQueue:
48+
"""Queue for tracking messages awaiting acknowledgment"""
49+
50+
def __init__(self):
51+
self.messages: list[PendingMessage] = []
52+
53+
def push(self, pending_message: PendingMessage) -> None:
54+
"""Add a message to the queue"""
55+
self.messages.append(pending_message)
56+
57+
def count(self) -> int:
58+
"""Return the number of pending messages"""
59+
return len(self.messages)
60+
61+
def complete_messages(self, serial: int, count: int, err: AblyException | None = None) -> None:
62+
"""Complete messages based on serial and count from ACK/NACK
63+
64+
Args:
65+
serial: The msgSerial of the first message being acknowledged
66+
count: The number of messages being acknowledged
67+
err: Error from NACK, or None for successful ACK
68+
"""
69+
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, err={err}')
70+
71+
if not self.messages:
72+
log.warning('MessageQueue.complete_messages(): called on empty queue')
73+
return
74+
75+
first = self.messages[0]
76+
if first:
77+
start_serial = first.message.get('msgSerial')
78+
if start_serial is None:
79+
log.warning('MessageQueue.complete_messages(): first message has no msgSerial')
80+
return
81+
82+
end_serial = serial + count
83+
84+
if end_serial > start_serial:
85+
# Remove and complete the acknowledged messages
86+
num_to_complete = min(end_serial - start_serial, len(self.messages))
87+
completed_messages = self.messages[:num_to_complete]
88+
self.messages = self.messages[num_to_complete:]
89+
90+
for msg in completed_messages:
91+
if msg.future and not msg.future.done():
92+
if err:
93+
msg.future.set_exception(err)
94+
else:
95+
msg.future.set_result(None)
96+
97+
def complete_all_messages(self, err: AblyException) -> None:
98+
"""Complete all pending messages with an error"""
99+
while self.messages:
100+
msg = self.messages.pop(0)
101+
if msg.future and not msg.future.done():
102+
msg.future.set_exception(err)
103+
104+
def clear(self) -> None:
105+
"""Clear all messages from the queue"""
106+
self.messages.clear()
107+
108+
27109
class ConnectionManager(EventEmitter):
28110
def __init__(self, realtime: AblyRealtime, initial_state):
29111
self.options = realtime.options
@@ -43,6 +125,8 @@ def __init__(self, realtime: AblyRealtime, initial_state):
43125
self.__fallback_hosts: list[str] = self.options.get_fallback_realtime_hosts()
44126
self.queued_messages: Queue = Queue()
45127
self.__error_reason: AblyException | None = None
128+
self.msg_serial: int = 0
129+
self.pending_message_queue: PendingMessageQueue = PendingMessageQueue()
46130
super().__init__()
47131

48132
def enact_state_change(self, state: ConnectionState, reason: AblyException | None = None) -> None:
@@ -88,37 +172,119 @@ async def close_impl(self) -> None:
88172
self.notify_state(ConnectionState.CLOSED)
89173

90174
async def send_protocol_message(self, protocol_message: dict) -> None:
175+
"""Send a protocol message and optionally track it for acknowledgment
176+
177+
Args:
178+
protocol_message: protocol message dict (new message)
179+
Returns:
180+
None
181+
"""
182+
pending_message = PendingMessage(protocol_message)
183+
184+
# Assign msgSerial to messages that need acknowledgment
185+
if pending_message.ack_required:
186+
# New message - assign fresh serial
187+
protocol_message['msgSerial'] = self.msg_serial
188+
self.msg_serial += 1
189+
91190
if self.state in (
92191
ConnectionState.DISCONNECTED,
93192
ConnectionState.CONNECTING,
94193
):
95-
self.queued_messages.put(protocol_message)
96-
return
194+
self.queued_messages.put(pending_message)
195+
# For queued messages requiring ack, add to pending queue
196+
if pending_message.ack_required:
197+
self.pending_message_queue.push(pending_message)
198+
199+
if pending_message.ack_required:
200+
await pending_message.future
201+
return None
97202

98203
if self.state == ConnectionState.CONNECTED:
99-
if self.transport:
100-
await self.transport.send(protocol_message)
101-
else:
102-
log.exception(
103-
"ConnectionManager.send_protocol_message(): can not send message with no active transport"
104-
)
105-
return
204+
return await self.send_protocol_message_on_connected_state(pending_message)
205+
206+
error = AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
207+
if pending_message.future:
208+
pending_message.future.set_exception(error)
209+
raise error
106210

107-
raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
211+
async def send_protocol_message_on_connected_state(self, pending_message: PendingMessage) -> None:
212+
if self.state == ConnectionState.CONNECTED and self.transport:
213+
# Add to pending queue before sending
214+
if pending_message.ack_required:
215+
self.pending_message_queue.push(pending_message)
216+
217+
await self.transport.send(pending_message.message)
218+
else:
219+
log.exception(
220+
"ConnectionManager.send_pending_message(): can not send message with no active transport"
221+
)
222+
if pending_message.future:
223+
pending_message.future.set_exception(
224+
AblyException("No active transport", 500, 50000)
225+
)
226+
if pending_message.ack_required:
227+
await pending_message.future
228+
return None
108229

109230
def send_queued_messages(self) -> None:
110231
log.info(f'ConnectionManager.send_queued_messages(): sending {self.queued_messages.qsize()} message(s)')
111232
while not self.queued_messages.empty():
112-
asyncio.create_task(self.send_protocol_message(self.queued_messages.get()))
233+
pending_message = self.queued_messages.get()
234+
asyncio.create_task(self.send_protocol_message_on_connected_state(pending_message))
235+
236+
def requeue_pending_messages(self) -> None:
237+
"""RTN19a: Requeue messages awaiting ACK/NACK when transport disconnects
238+
239+
These messages will be resent when connection becomes CONNECTED again.
240+
RTN19a2: msgSerial is preserved for resume, reset for new connection.
241+
"""
242+
pending_count = self.pending_message_queue.count()
243+
if pending_count == 0:
244+
return
245+
246+
log.info(
247+
f'ConnectionManager.requeue_pending_messages(): '
248+
f'requeuing {pending_count} pending message(s) for resend'
249+
)
250+
251+
# Get all pending messages and add them back to the queue
252+
# They'll be sent again when we reconnect
253+
pending_messages = list(self.pending_message_queue.messages)
254+
255+
# Add back to front of queue (FIFO but priority over new messages)
256+
# Store the entire PendingMessage object to preserve Future
257+
for pending_msg in reversed(pending_messages):
258+
# PendingMessage object retains its Future, msgSerial
259+
self.queued_messages.put(pending_msg)
260+
261+
# Clear the message queue since we're requeueing them all
262+
# When they're resent, the existing Future will be resolved
263+
self.pending_message_queue.clear()
113264

114265
def fail_queued_messages(self, err) -> None:
115266
log.info(
116267
f"ConnectionManager.fail_queued_messages(): discarding {self.queued_messages.qsize()} messages;" +
117268
f" reason = {err}"
118269
)
270+
error = err or AblyException("Connection failed", 80000, 500)
119271
while not self.queued_messages.empty():
120-
msg = self.queued_messages.get()
121-
log.exception(f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: {msg}")
272+
pending_msg = self.queued_messages.get()
273+
log.exception(
274+
f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: "
275+
f"{pending_msg.message}"
276+
)
277+
# Fail the Future if it exists
278+
if pending_msg.future and not pending_msg.future.done():
279+
pending_msg.future.set_exception(error)
280+
281+
# Also fail all pending messages awaiting acknowledgment
282+
if self.pending_message_queue.count() > 0:
283+
count = self.pending_message_queue.count()
284+
log.info(
285+
f"ConnectionManager.fail_queued_messages(): failing {count} pending messages"
286+
)
287+
self.pending_message_queue.complete_all_messages(error)
122288

123289
async def ping(self) -> float:
124290
if self.__ping_future:
@@ -149,6 +315,16 @@ def on_connected(self, connection_details: ConnectionDetails, connection_id: str
149315
reason: AblyException | None = None) -> None:
150316
self.__fail_state = ConnectionState.DISCONNECTED
151317

318+
# RTN19a2: Reset msgSerial if connectionId changed (new connection)
319+
prev_connection_id = self.connection_id
320+
connection_id_changed = prev_connection_id is not None and prev_connection_id != connection_id
321+
322+
if connection_id_changed:
323+
log.info('ConnectionManager.on_connected(): New connectionId; resetting msgSerial')
324+
self.msg_serial = 0
325+
# Note: In JS they call resetSendAttempted() here, but we don't need it
326+
# because we fail all pending messages on disconnect per RTN7e
327+
152328
self.__connection_details = connection_details
153329
self.connection_id = connection_id
154330

@@ -244,7 +420,36 @@ def on_heartbeat(self, id: str | None) -> None:
244420
self.__ping_future.set_result(None)
245421
self.__ping_future = None
246422

423+
def on_ack(self, serial: int, count: int) -> None:
424+
"""Handle ACK protocol message from server
425+
426+
Args:
427+
serial: The msgSerial of the first message being acknowledged
428+
count: The number of messages being acknowledged
429+
"""
430+
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}')
431+
self.pending_message_queue.complete_messages(serial, count)
432+
433+
def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
434+
"""Handle NACK protocol message from server
435+
436+
Args:
437+
serial: The msgSerial of the first message being rejected
438+
count: The number of messages being rejected
439+
err: Error information from the server
440+
"""
441+
if not err:
442+
err = AblyException('Unable to send message; channel not responding', 50001, 500)
443+
444+
log.error(f'ConnectionManager.on_nack(): serial={serial}, count={count}, err={err}')
445+
self.pending_message_queue.complete_messages(serial, count, err)
446+
247447
def deactivate_transport(self, reason: AblyException | None = None):
448+
# RTN19a: Before disconnecting, requeue any pending messages
449+
# so they'll be resent on reconnection
450+
if self.transport:
451+
log.info('ConnectionManager.deactivate_transport(): requeuing pending messages')
452+
self.requeue_pending_messages()
248453
self.transport = None
249454
self.notify_state(ConnectionState.DISCONNECTED, reason)
250455

@@ -383,8 +588,16 @@ def notify_state(self, state: ConnectionState, reason: AblyException | None = No
383588
ConnectionState.SUSPENDED,
384589
ConnectionState.FAILED,
385590
):
591+
# RTN7e: Fail pending messages on SUSPENDED, CLOSED, FAILED
386592
self.fail_queued_messages(reason)
387593
self.ably.channels._propagate_connection_interruption(state, reason)
594+
elif state == ConnectionState.DISCONNECTED and not self.options.queue_messages:
595+
# RTN7d: If queueMessages is false, fail pending messages on DISCONNECTED
596+
log.info(
597+
'ConnectionManager.notify_state(): queueMessages is false; '
598+
'failing pending messages on DISCONNECTED'
599+
)
600+
self.fail_queued_messages(reason)
388601

389602
def start_transition_timer(self, state: ConnectionState, fail_state: ConnectionState | None = None) -> None:
390603
log.debug(f'ConnectionManager.start_transition_timer(): transition state = {state}')
@@ -466,6 +679,8 @@ def cancel_retry_timer(self) -> None:
466679
def disconnect_transport(self) -> None:
467680
log.info('ConnectionManager.disconnect_transport()')
468681
if self.transport:
682+
# RTN19a: Requeue pending messages before disposing transport
683+
self.requeue_pending_messages()
469684
self.disconnect_transport_task = asyncio.create_task(self.transport.dispose())
470685

471686
async def on_auth_updated(self, token_details: TokenDetails):

0 commit comments

Comments
 (0)