@@ -139,12 +139,12 @@ def connect(self) -> None:
139139 timeout = self .deadline + time .monotonic ()
140140 while not self .connected and time .monotonic () < timeout :
141141 if self ._is_fatal :
142- raise ProviderFatalError ( "fatal gRPC status code" )
142+ break
143143 time .sleep (0.05 )
144144 logger .debug ("Finished blocking gRPC state initialization" )
145145
146146 if self ._is_fatal :
147- raise ProviderFatalError ("fatal gRPC status code" )
147+ raise ProviderFatalError ("Fatal gRPC status code received " )
148148
149149 if not self .connected :
150150 raise ProviderNotReadyError (
@@ -189,8 +189,6 @@ def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
189189 self .connected = False
190190
191191 def emit_error (self ) -> None :
192- if self ._is_fatal :
193- return
194192 logger .debug ("gRPC error emitted" )
195193 self .emit_provider_error (
196194 ProviderEventDetails (
@@ -243,65 +241,69 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
243241 else :
244242 raise e
245243
244+ def _handle_flag_response (
245+ self ,
246+ flag_rsp : sync_pb2 .SyncFlagsResponse ,
247+ context_values_response : typing .Optional [sync_pb2 .GetMetadataResponse ],
248+ ) -> bool :
249+ """Process a single flag response. Returns True if the loop should terminate."""
250+ flag_str = flag_rsp .flag_configuration
251+ logger .debug (f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} " )
252+ self .flag_store .update (json .loads (flag_str ))
253+
254+ if not self .connected :
255+ context_values = {}
256+ if flag_rsp .sync_context :
257+ context_values = MessageToDict (flag_rsp .sync_context )
258+ elif context_values_response :
259+ context_values = MessageToDict (context_values_response )["metadata" ]
260+ self .emit_provider_ready (
261+ ProviderEventDetails (message = "gRPC sync connection established" ),
262+ context_values ,
263+ )
264+ self .connected = True
265+
266+ if not self .active :
267+ logger .debug ("Terminating gRPC sync thread" )
268+ return True
269+ return False
270+
271+ def _handle_rpc_error (self , e : grpc .RpcError ) -> bool :
272+ """Handle a gRPC RpcError. Returns True if the stream loop should stop."""
273+ logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
274+ if e .code ().name in self .config .fatal_status_codes :
275+ self ._is_fatal = True
276+ self .active = False
277+ self .emit_provider_error (
278+ ProviderEventDetails (
279+ message = f"Fatal gRPC status code: { e .code ()} " ,
280+ error_code = ErrorCode .PROVIDER_FATAL ,
281+ )
282+ )
283+ return True
284+ return False
285+
246286 def listen (self ) -> None :
247287 call_args = self .generate_grpc_call_args ()
248-
249288 request_args = self ._create_request_args ()
250289
251290 while self .active :
252291 try :
253292 context_values_response = self ._fetch_metadata ()
254-
255293 request = sync_pb2 .SyncFlagsRequest (** request_args )
256-
257294 logger .debug ("Setting up gRPC sync flags connection" )
258295 for flag_rsp in self .stub .SyncFlags (request , ** call_args ):
259- flag_str = flag_rsp .flag_configuration
260- logger .debug (
261- f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} "
262- )
263- self .flag_store .update (json .loads (flag_str ))
264-
265- context_values = {}
266- if flag_rsp .sync_context :
267- context_values = MessageToDict (flag_rsp .sync_context )
268- elif context_values_response :
269- context_values = MessageToDict (context_values_response )[
270- "metadata"
271- ]
272-
273- if not self .connected :
274- self .emit_provider_ready (
275- ProviderEventDetails (
276- message = "gRPC sync connection established"
277- ),
278- context_values ,
279- )
280- self .connected = True
281-
282- if not self .active :
283- logger .debug ("Terminating gRPC sync thread" )
296+ if self ._handle_flag_response (flag_rsp , context_values_response ):
284297 return
285298 except grpc .RpcError as e : # noqa: PERF203
286- logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
287- if e .code ().name in self .config .fatal_status_codes :
288- self ._is_fatal = True
289- self .active = False
290- self .emit_provider_error (
291- ProviderEventDetails (
292- message = f"Fatal gRPC status code: { e .code ()} " ,
293- error_code = ErrorCode .PROVIDER_FATAL ,
294- )
295- )
299+ if self ._handle_rpc_error (e ):
296300 return
297301 except json .JSONDecodeError :
298302 logger .exception (
299- f "Could not parse JSON flag data from SyncFlags endpoint: { flag_str = } "
303+ "Could not parse JSON flag data from SyncFlags endpoint"
300304 )
301305 except ParseError :
302- logger .exception (
303- f"Could not parse flag data using flagd syntax: { flag_str = } "
304- )
306+ logger .exception ("Could not parse flag data using flagd syntax" )
305307
306308 def generate_grpc_call_args (self ) -> GrpcMultiCallableArgs :
307309 call_args : GrpcMultiCallableArgs = {"wait_for_ready" : True }
0 commit comments