11from __future__ import annotations
22import asyncio
33import logging
4- from typing import Optional , TYPE_CHECKING
4+ from typing import Optional , TYPE_CHECKING , Dict , Any , Union
55from ably .realtime .connection import ConnectionState
66from ably .transport .websockettransport import ProtocolMessageAction
77from ably .rest .channel import Channel , Channels as RestChannels
1414
1515if TYPE_CHECKING :
1616 from ably .realtime .realtime import AblyRealtime
17+ from ably .util .crypto import CipherParams
1718
1819log = logging .getLogger (__name__ )
1920
2021
22+ class ChannelOptions :
23+ """Channel options for Ably Realtime channels
24+
25+ Attributes
26+ ----------
27+ cipher : CipherParams, optional
28+ Requests encryption for this channel when not null, and specifies encryption-related parameters.
29+ params : Dict[str, str], optional
30+ Channel parameters that configure the behavior of the channel.
31+ """
32+
33+ def __init__ (self , cipher : Optional [CipherParams ] = None , params : Optional [dict ] = None ):
34+ self .__cipher = cipher
35+ self .__params = params
36+ # Validate params
37+ if self .__params and not isinstance (self .__params , dict ):
38+ raise AblyException ("params must be a dictionary" , 40000 , 400 )
39+
40+ @property
41+ def cipher (self ):
42+ """Get cipher configuration"""
43+ return self .__cipher
44+
45+ @cipher .setter
46+ def cipher (self , value ):
47+ """Set cipher configuration"""
48+ self .__cipher = value
49+
50+ @property
51+ def params (self ) -> Dict [str , str ]:
52+ """Get channel parameters"""
53+ return self .__params
54+
55+ @params .setter
56+ def params (self , value : Dict [str , str ]):
57+ """Set channel parameters"""
58+ if value and not isinstance (value , dict ):
59+ raise AblyException ("params must be a dictionary" , 40000 , 400 )
60+ self .__params = value or {}
61+
62+ def __eq__ (self , other ):
63+ """Check equality with another ChannelOptions instance"""
64+ if not isinstance (other , ChannelOptions ):
65+ return False
66+
67+ return (self .__cipher == other .__cipher and
68+ self .__params == other .__params )
69+
70+ def __hash__ (self ):
71+ """Make ChannelOptions hashable"""
72+ return hash ((
73+ self .__cipher ,
74+ tuple (sorted (self .__params .items ())) if self .__params else None ,
75+ ))
76+
77+ def to_dict (self ) -> Dict [str , Any ]:
78+ """Convert to dictionary representation"""
79+ result = {}
80+ if self .__cipher is not None :
81+ result ['cipher' ] = self .__cipher
82+ if self .__params :
83+ result ['params' ] = self .__params
84+ return result
85+
86+ @classmethod
87+ def from_dict (cls , options_dict : Dict [str , Any ]) -> 'ChannelOptions' :
88+ """Create ChannelOptions from dictionary"""
89+ if not isinstance (options_dict , dict ):
90+ raise AblyException ("options must be a dictionary" , 40000 , 400 )
91+
92+ return cls (
93+ cipher = options_dict .get ('cipher' ),
94+ params = options_dict .get ('params' ),
95+ )
96+
97+
2198class RealtimeChannel (EventEmitter , Channel ):
2299 """
23100 Ably Realtime Channel
@@ -43,23 +120,39 @@ class RealtimeChannel(EventEmitter, Channel):
43120 Unsubscribe to messages from a channel
44121 """
45122
46- def __init__ (self , realtime : AblyRealtime , name : str ):
123+ def __init__ (self , realtime : AblyRealtime , name : str , channel_options : Optional [ ChannelOptions ] = None ):
47124 EventEmitter .__init__ (self )
48125 self .__name = name
49126 self .__realtime = realtime
50127 self .__state = ChannelState .INITIALIZED
51128 self .__message_emitter = EventEmitter ()
52129 self .__state_timer : Optional [Timer ] = None
53130 self .__attach_resume = False
131+ self .__attach_serial : Optional [str ] = None
54132 self .__channel_serial : Optional [str ] = None
55133 self .__retry_timer : Optional [Timer ] = None
56134 self .__error_reason : Optional [AblyException ] = None
135+ self .__channel_options = channel_options or ChannelOptions ()
57136
58137 # Used to listen to state changes internally, if we use the public event emitter interface then internals
59138 # will be disrupted if the user called .off() to remove all listeners
60139 self .__internal_state_emitter = EventEmitter ()
61140
62- Channel .__init__ (self , realtime , name , {})
141+ # Pass channel options as dictionary to parent Channel class
142+ Channel .__init__ (self , realtime , name , self .__channel_options .to_dict ())
143+
144+ async def set_options (self , channel_options : ChannelOptions ) -> None :
145+ """Set channel options"""
146+ old_channel_options = self .__channel_options
147+ self .__channel_options = channel_options
148+ # Update parent class options
149+ self .options = channel_options .to_dict ()
150+
151+ if self .should_reattach_to_set_options (old_channel_options , channel_options ):
152+ self ._attach_impl ()
153+ state_change = await self .__internal_state_emitter .once_async ()
154+ if state_change .current in (ChannelState .SUSPENDED , ChannelState .FAILED ):
155+ raise state_change .reason
63156
64157 # RTL4
65158 async def attach (self ) -> None :
@@ -108,6 +201,7 @@ def _attach_impl(self):
108201 # RTL4c
109202 attach_msg = {
110203 "action" : ProtocolMessageAction .ATTACH ,
204+ "params" : self .__channel_options .params ,
111205 "channel" : self .name ,
112206 }
113207
@@ -292,8 +386,6 @@ def _on_message(self, proto_msg: dict) -> None:
292386 action = proto_msg .get ('action' )
293387 # RTL4c1
294388 channel_serial = proto_msg .get ('channelSerial' )
295- if channel_serial :
296- self .__channel_serial = channel_serial
297389 # TM2a, TM2c, TM2f
298390 Message .update_inner_message_fields (proto_msg )
299391
@@ -314,6 +406,8 @@ def _on_message(self, proto_msg: dict) -> None:
314406 if not resumed :
315407 state_change = ChannelStateChange (self .state , ChannelState .ATTACHED , resumed , exception )
316408 self ._emit ("update" , state_change )
409+ self .__attach_serial = channel_serial
410+ self .__channel_serial = channel_serial
317411 elif self .state == ChannelState .ATTACHING :
318412 self ._notify_state (ChannelState .ATTACHED , resumed = resumed )
319413 else :
@@ -327,6 +421,7 @@ def _on_message(self, proto_msg: dict) -> None:
327421 self ._request_state (ChannelState .ATTACHING )
328422 elif action == ProtocolMessageAction .MESSAGE :
329423 messages = Message .from_encoded_array (proto_msg .get ('messages' ))
424+ self .__channel_serial = channel_serial
330425 for message in messages :
331426 self .__message_emitter ._emit (message .name , message )
332427 elif action == ProtocolMessageAction .ERROR :
@@ -431,6 +526,11 @@ def __on_retry_timer_expire(self) -> None:
431526 log .info ("RealtimeChannel retry timer expired, attempting a new attach" )
432527 self ._request_state (ChannelState .ATTACHING )
433528
529+ def should_reattach_to_set_options (self , old_options : ChannelOptions , new_options : ChannelOptions ) -> bool :
530+ if self .state != ChannelState .ATTACHING and self .state != ChannelState .ATTACHED :
531+ return False
532+ return old_options != new_options
533+
434534 # RTL23
435535 @property
436536 def name (self ) -> str :
@@ -466,19 +566,38 @@ class Channels(RestChannels):
466566 """
467567
468568 # RTS3
469- def get (self , name : str ) -> RealtimeChannel :
569+ def get (self , name : str , options : Optional [ Union [ dict , ChannelOptions ]] = None ) -> RealtimeChannel :
470570 """Creates a new RealtimeChannel object, or returns the existing channel object.
471571
472572 Parameters
473573 ----------
474574
475575 name: str
476576 Channel name
577+ options: ChannelOptions or dict, optional
578+ Channel options for the channel
477579 """
580+ # Convert dict to ChannelOptions if needed
581+ if options is not None :
582+ if isinstance (options , dict ):
583+ options = ChannelOptions .from_dict (options )
584+ elif not isinstance (options , ChannelOptions ):
585+ raise AblyException ("options must be ChannelOptions instance or dictionary" , 40000 , 400 )
586+
478587 if name not in self .__all :
479- channel = self .__all [name ] = RealtimeChannel (self .__ably , name )
588+ channel = self .__all [name ] = RealtimeChannel (self .__ably , name , options )
480589 else :
481590 channel = self .__all [name ]
591+ # Update options if channel is not attached or currently attaching
592+ if channel .should_reattach_to_set_options (channel .__channel_options , options ):
593+ raise AblyException (
594+ 'Channels.get() cannot be used to set channel options that would cause the channel to '
595+ 'reattach. Please, use RealtimeChannel.setOptions() instead.' ,
596+ 400 ,
597+ 40000
598+ )
599+ else :
600+ channel .set_options (options )
482601 return channel
483602
484603 # RTS4
0 commit comments