@@ -139,12 +139,12 @@ def connect(self) -> None:
139139 timeout = self .deadline + time .monotonic ()
140140 while not self .connected and time .monotonic () < timeout :
141141 if self ._is_fatal :
142- raise ProviderFatalError ("fatal gRPC status code" )
142+ raise ProviderFatalError ("Fatal gRPC status code received " )
143143 time .sleep (0.05 )
144144 logger .debug ("Finished blocking gRPC state initialization" )
145145
146146 if self ._is_fatal :
147- raise ProviderFatalError ("fatal gRPC status code" )
147+ raise ProviderFatalError ("Fatal gRPC status code received " )
148148
149149 if not self .connected :
150150 raise ProviderNotReadyError (
@@ -243,65 +243,69 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
243243 else :
244244 raise e
245245
246+ def _handle_flag_response (
247+ self ,
248+ flag_rsp : sync_pb2 .SyncFlagsResponse ,
249+ context_values_response : typing .Optional [sync_pb2 .GetMetadataResponse ],
250+ ) -> bool :
251+ """Process a single flag response. Returns True if the loop should terminate."""
252+ flag_str = flag_rsp .flag_configuration
253+ logger .debug (f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} " )
254+ self .flag_store .update (json .loads (flag_str ))
255+
256+ if not self .connected :
257+ context_values = {}
258+ if flag_rsp .sync_context :
259+ context_values = MessageToDict (flag_rsp .sync_context )
260+ elif context_values_response :
261+ context_values = MessageToDict (context_values_response )["metadata" ]
262+ self .emit_provider_ready (
263+ ProviderEventDetails (message = "gRPC sync connection established" ),
264+ context_values ,
265+ )
266+ self .connected = True
267+
268+ if not self .active :
269+ logger .debug ("Terminating gRPC sync thread" )
270+ return True
271+ return False
272+
273+ def _handle_rpc_error (self , e : grpc .RpcError ) -> bool :
274+ """Handle a gRPC RpcError. Returns True if the stream loop should stop."""
275+ logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
276+ if e .code ().name in self .config .fatal_status_codes :
277+ self ._is_fatal = True
278+ self .active = False
279+ self .emit_provider_error (
280+ ProviderEventDetails (
281+ message = f"Fatal gRPC status code: { e .code ()} " ,
282+ error_code = ErrorCode .PROVIDER_FATAL ,
283+ )
284+ )
285+ return True
286+ return False
287+
246288 def listen (self ) -> None :
247289 call_args = self .generate_grpc_call_args ()
248-
249290 request_args = self ._create_request_args ()
250291
251292 while self .active :
252293 try :
253294 context_values_response = self ._fetch_metadata ()
254-
255295 request = sync_pb2 .SyncFlagsRequest (** request_args )
256-
257296 logger .debug ("Setting up gRPC sync flags connection" )
258297 for flag_rsp in self .stub .SyncFlags (request , ** call_args ):
259- flag_str = flag_rsp .flag_configuration
260- logger .debug (
261- f"Received flag configuration - { abs (hash (flag_str )) % (10 ** 8 )} "
262- )
263- self .flag_store .update (json .loads (flag_str ))
264-
265- context_values = {}
266- if flag_rsp .sync_context :
267- context_values = MessageToDict (flag_rsp .sync_context )
268- elif context_values_response :
269- context_values = MessageToDict (context_values_response )[
270- "metadata"
271- ]
272-
273- if not self .connected :
274- self .emit_provider_ready (
275- ProviderEventDetails (
276- message = "gRPC sync connection established"
277- ),
278- context_values ,
279- )
280- self .connected = True
281-
282- if not self .active :
283- logger .debug ("Terminating gRPC sync thread" )
298+ if self ._handle_flag_response (flag_rsp , context_values_response ):
284299 return
285300 except grpc .RpcError as e : # noqa: PERF203
286- logger .warning (f"SyncFlags stream error, { e .code ()= } { e .details ()= } " )
287- if e .code ().name in self .config .fatal_status_codes :
288- self ._is_fatal = True
289- self .active = False
290- self .emit_provider_error (
291- ProviderEventDetails (
292- message = f"Fatal gRPC status code: { e .code ()} " ,
293- error_code = ErrorCode .PROVIDER_FATAL ,
294- )
295- )
301+ if self ._handle_rpc_error (e ):
296302 return
297303 except json .JSONDecodeError :
298304 logger .exception (
299- f "Could not parse JSON flag data from SyncFlags endpoint: { flag_str = } "
305+ "Could not parse JSON flag data from SyncFlags endpoint"
300306 )
301307 except ParseError :
302- logger .exception (
303- f"Could not parse flag data using flagd syntax: { flag_str = } "
304- )
308+ logger .exception ("Could not parse flag data using flagd syntax" )
305309
306310 def generate_grpc_call_args (self ) -> GrpcMultiCallableArgs :
307311 call_args : GrpcMultiCallableArgs = {"wait_for_ready" : True }
0 commit comments