Skip to content

Commit 96a88ad

Browse files
feat: introduce list for fatal status codes (#362)
* feat: introduce list for fatal status codes Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com> * todds changes Signed-off-by: Lea Konvalinka <lea.konvalinka@dynatrace.com> * cleanup, lint fix Signed-off-by: Lea Konvalinka <lea.konvalinka@dynatrace.com> --------- Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com> Signed-off-by: Lea Konvalinka <lea.konvalinka@dynatrace.com> Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
1 parent 8592c28 commit 96a88ad

File tree

8 files changed

+161
-74
lines changed

8 files changed

+161
-74
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def __init__( # noqa: PLR0913
6565
default_authority: typing.Optional[str] = None,
6666
channel_credentials: typing.Optional[grpc.ChannelCredentials] = None,
6767
sync_metadata_disabled: typing.Optional[bool] = None,
68+
fatal_status_codes: typing.Optional[list[str]] = None,
6869
):
6970
"""
7071
Create an instance of the FlagdProvider
@@ -111,6 +112,7 @@ def __init__( # noqa: PLR0913
111112
default_authority=default_authority,
112113
channel_credentials=channel_credentials,
113114
sync_metadata_disabled=sync_metadata_disabled,
115+
fatal_status_codes=fatal_status_codes,
114116
)
115117
self.enriched_context: dict = {}
116118

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
GeneralError,
1919
InvalidContextError,
2020
ParseError,
21+
ProviderFatalError,
2122
ProviderNotReadyError,
2223
TypeMismatchError,
2324
)
@@ -61,11 +62,13 @@ def __init__(
6162
if self.config.cache == CacheType.LRU
6263
else None
6364
)
65+
logger.debug(self.config.fatal_status_codes)
6466

6567
self.retry_grace_period = config.retry_grace_period
6668
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6769
self.deadline = config.deadline_ms * 0.001
6870
self.connected = False
71+
self._is_fatal = False
6972
self.channel = self._generate_channel(config)
7073
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
7174

@@ -151,9 +154,14 @@ def connect(self) -> None:
151154
## block until ready or deadline reached
152155
timeout = self.deadline + time.monotonic()
153156
while not self.connected and time.monotonic() < timeout:
157+
if self._is_fatal:
158+
break
154159
time.sleep(0.05)
155160
logger.debug("Finished blocking gRPC state initialization")
156161

162+
if self._is_fatal:
163+
raise ProviderFatalError("Fatal gRPC status code received")
164+
157165
if not self.connected:
158166
raise ProviderNotReadyError(
159167
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
@@ -164,6 +172,8 @@ def monitor(self) -> None:
164172

165173
def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
166174
logger.debug(f"gRPC state change: {new_state}")
175+
if self._is_fatal:
176+
return
167177
if (
168178
new_state == grpc.ChannelConnectivity.READY
169179
or new_state == grpc.ChannelConnectivity.IDLE
@@ -235,6 +245,16 @@ def listen(self) -> None:
235245
except grpc.RpcError as e: # noqa: PERF203
236246
# although it seems like this error log is not interesting, without it, the retry is not working as expected
237247
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
248+
if e.code().name in self.config.fatal_status_codes:
249+
self._is_fatal = True
250+
self.active = False
251+
self.emit_provider_error(
252+
ProviderEventDetails(
253+
message=f"Fatal gRPC status code: {e.code()}",
254+
error_code=ErrorCode.PROVIDER_FATAL,
255+
)
256+
)
257+
return
238258
except ParseError:
239259
logger.exception(
240260
f"Could not parse flag data using flagd syntax: {message=}"
@@ -399,8 +419,11 @@ def _resolve( # noqa: PLR0915 C901
399419
except grpc.RpcError as e:
400420
code = e.code()
401421
message = f"received grpc status code {code}"
422+
logger.debug(message)
402423

403-
if code == grpc.StatusCode.NOT_FOUND:
424+
if code.name in self.config.fatal_status_codes:
425+
raise ProviderFatalError(message) from e
426+
elif code == grpc.StatusCode.NOT_FOUND:
404427
raise FlagNotFoundError(message) from e
405428
elif code == grpc.StatusCode.INVALID_ARGUMENT:
406429
raise TypeMismatchError(message) from e

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@
1010

1111
from openfeature.evaluation_context import EvaluationContext
1212
from openfeature.event import ProviderEventDetails
13-
from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError
13+
from openfeature.exception import (
14+
ErrorCode,
15+
ParseError,
16+
ProviderFatalError,
17+
ProviderNotReadyError,
18+
)
1419
from openfeature.schemas.protobuf.flagd.sync.v1 import (
1520
sync_pb2,
1621
sync_pb2_grpc,
@@ -50,6 +55,7 @@ def __init__(
5055
self.emit_provider_stale = emit_provider_stale
5156

5257
self.connected = False
58+
self._is_fatal = False
5359
self.thread: typing.Optional[threading.Thread] = None
5460
self.timer: typing.Optional[threading.Timer] = None
5561

@@ -132,9 +138,14 @@ def connect(self) -> None:
132138
## block until ready or deadline reached
133139
timeout = self.deadline + time.monotonic()
134140
while not self.connected and time.monotonic() < timeout:
141+
if self._is_fatal:
142+
break
135143
time.sleep(0.05)
136144
logger.debug("Finished blocking gRPC state initialization")
137145

146+
if self._is_fatal:
147+
raise ProviderFatalError("Fatal gRPC status code received")
148+
138149
if not self.connected:
139150
raise ProviderNotReadyError(
140151
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
@@ -145,6 +156,8 @@ def monitor(self) -> None:
145156

146157
def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
147158
logger.debug(f"gRPC state change: {new_state}")
159+
if self._is_fatal:
160+
return
148161
if (
149162
new_state == grpc.ChannelConnectivity.READY
150163
or new_state == grpc.ChannelConnectivity.IDLE
@@ -228,55 +241,69 @@ def _fetch_metadata(self) -> typing.Optional[sync_pb2.GetMetadataResponse]:
228241
else:
229242
raise e
230243

244+
def _handle_flag_response(
245+
self,
246+
flag_rsp: sync_pb2.SyncFlagsResponse,
247+
context_values_response: typing.Optional[sync_pb2.GetMetadataResponse],
248+
) -> bool:
249+
"""Process a single flag response. Returns True if the loop should terminate."""
250+
flag_str = flag_rsp.flag_configuration
251+
logger.debug(f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}")
252+
self.flag_store.update(json.loads(flag_str))
253+
254+
if not self.connected:
255+
context_values = {}
256+
if flag_rsp.sync_context:
257+
context_values = MessageToDict(flag_rsp.sync_context)
258+
elif context_values_response:
259+
context_values = MessageToDict(context_values_response)["metadata"]
260+
self.emit_provider_ready(
261+
ProviderEventDetails(message="gRPC sync connection established"),
262+
context_values,
263+
)
264+
self.connected = True
265+
266+
if not self.active:
267+
logger.debug("Terminating gRPC sync thread")
268+
return True
269+
return False
270+
271+
def _handle_rpc_error(self, e: grpc.RpcError) -> bool:
272+
"""Handle a gRPC RpcError. Returns True if the stream loop should stop."""
273+
logger.warning(f"SyncFlags stream error, {e.code()=} {e.details()=}")
274+
if e.code().name in self.config.fatal_status_codes:
275+
self._is_fatal = True
276+
self.active = False
277+
self.emit_provider_error(
278+
ProviderEventDetails(
279+
message=f"Fatal gRPC status code: {e.code()}",
280+
error_code=ErrorCode.PROVIDER_FATAL,
281+
)
282+
)
283+
return True
284+
return False
285+
231286
def listen(self) -> None:
232287
call_args = self.generate_grpc_call_args()
233-
234288
request_args = self._create_request_args()
235289

236290
while self.active:
237291
try:
238292
context_values_response = self._fetch_metadata()
239-
240293
request = sync_pb2.SyncFlagsRequest(**request_args)
241-
242294
logger.debug("Setting up gRPC sync flags connection")
243295
for flag_rsp in self.stub.SyncFlags(request, **call_args):
244-
flag_str = flag_rsp.flag_configuration
245-
logger.debug(
246-
f"Received flag configuration - {abs(hash(flag_str)) % (10**8)}"
247-
)
248-
self.flag_store.update(json.loads(flag_str))
249-
250-
context_values = {}
251-
if flag_rsp.sync_context:
252-
context_values = MessageToDict(flag_rsp.sync_context)
253-
elif context_values_response:
254-
context_values = MessageToDict(context_values_response)[
255-
"metadata"
256-
]
257-
258-
if not self.connected:
259-
self.emit_provider_ready(
260-
ProviderEventDetails(
261-
message="gRPC sync connection established"
262-
),
263-
context_values,
264-
)
265-
self.connected = True
266-
267-
if not self.active:
268-
logger.debug("Terminating gRPC sync thread")
296+
if self._handle_flag_response(flag_rsp, context_values_response):
269297
return
270298
except grpc.RpcError as e: # noqa: PERF203
271-
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
299+
if self._handle_rpc_error(e):
300+
return
272301
except json.JSONDecodeError:
273302
logger.exception(
274-
f"Could not parse JSON flag data from SyncFlags endpoint: {flag_str=}"
303+
"Could not parse JSON flag data from SyncFlags endpoint"
275304
)
276305
except ParseError:
277-
logger.exception(
278-
f"Could not parse flag data using flagd syntax: {flag_str=}"
279-
)
306+
logger.exception("Could not parse flag data using flagd syntax")
280307

281308
def generate_grpc_call_args(self) -> GrpcMultiCallableArgs:
282309
call_args: GrpcMultiCallableArgs = {"wait_for_ready": True}
Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,98 @@
1-
import os.path
1+
import logging
2+
import os
23
import time
34
import typing
45
from pathlib import Path
56

67
import grpc
78
from grpc_health.v1 import health_pb2, health_pb2_grpc
8-
from testcontainers.core.container import DockerContainer
9-
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
9+
from testcontainers.compose import DockerCompose
1010

1111
from openfeature.contrib.provider.flagd.config import ResolverType
1212

13+
logger = logging.getLogger(__name__)
14+
1315
HEALTH_CHECK = 8014
1416
LAUNCHPAD = 8080
17+
FORBIDDEN = 9212
18+
19+
20+
class FlagdContainer:
21+
"""Manages the docker-compose environment for flagd e2e tests.
1522
23+
Uses docker-compose to start both flagd and envoy containers,
24+
so the envoy forbidden endpoint (port 9212) returns a proper HTTP 403.
25+
"""
1626

17-
class FlagdContainer(DockerContainer):
1827
def __init__(
1928
self,
2029
feature: typing.Optional[str] = None,
2130
**kwargs,
2231
) -> None:
32+
self._test_harness_dir = (
33+
Path(__file__).parents[2] / "openfeature" / "test-harness"
34+
)
35+
self._version = (self._test_harness_dir / "version.txt").read_text().rstrip()
36+
2337
image: str = "ghcr.io/open-feature/flagd-testbed"
2438
if feature is not None:
2539
image = f"{image}-{feature}"
26-
path = Path(__file__).parents[2] / "openfeature/test-harness/version.txt"
27-
data = path.read_text().rstrip()
28-
super().__init__(f"{image}:v{data}", **kwargs)
29-
self.rpc = 8013
30-
self.ipr = 8015
40+
3141
self.flagDir = Path("./flags")
3242
self.flagDir.mkdir(parents=True, exist_ok=True)
33-
self.with_exposed_ports(self.rpc, self.ipr, HEALTH_CHECK, LAUNCHPAD)
34-
self.with_volume_mapping(os.path.abspath(self.flagDir.name), "/flags", "rw")
35-
self.waiting_for(LogMessageWaitStrategy("listening").with_startup_timeout(5))
3643

37-
def get_port(self, resolver_type: ResolverType):
44+
# Set environment variables for docker-compose substitution
45+
os.environ["IMAGE"] = image
46+
os.environ["VERSION"] = f"v{self._version}"
47+
os.environ["FLAGS_DIR"] = str(self.flagDir.absolute())
48+
49+
self._compose = DockerCompose(
50+
context=str(self._test_harness_dir),
51+
compose_file_name="docker-compose.yaml",
52+
wait=True,
53+
)
54+
55+
def get_port(self, resolver_type: ResolverType) -> int:
3856
if resolver_type == ResolverType.RPC:
39-
return self.get_exposed_port(self.rpc)
57+
return self._compose.get_service_port("flagd", 8013)
4058
else:
41-
return self.get_exposed_port(self.ipr)
59+
return self._compose.get_service_port("flagd", 8015)
4260

43-
def get_launchpad_url(self):
44-
return f"http://localhost:{self.get_exposed_port(LAUNCHPAD)}"
61+
def get_exposed_port(self, port: int) -> int:
62+
"""Get mapped port. For FORBIDDEN (9212) returns envoy port, otherwise flagd port."""
63+
if port == FORBIDDEN:
64+
return self._compose.get_service_port("envoy", FORBIDDEN)
65+
return self._compose.get_service_port("flagd", port)
66+
67+
def get_launchpad_url(self) -> str:
68+
port = self._compose.get_service_port("flagd", LAUNCHPAD)
69+
return f"http://localhost:{port}"
4570

4671
def start(self) -> "FlagdContainer":
47-
super().start()
48-
self._checker(self.get_container_host_ip(), self.get_exposed_port(HEALTH_CHECK))
72+
self._compose.start()
73+
host = self._compose.get_service_host("flagd", HEALTH_CHECK) or "localhost"
74+
port = self._compose.get_service_port("flagd", HEALTH_CHECK)
75+
self._checker(host, port)
4976
return self
5077

78+
def stop(self) -> None:
79+
self._compose.stop()
80+
5181
def _checker(self, host: str, port: int) -> None:
5282
# Give an extra second before continuing
5383
time.sleep(1)
54-
# Second we use the GRPC health check endpoint
55-
with grpc.insecure_channel(host + ":" + str(port)) as channel:
84+
# Use the GRPC health check endpoint
85+
with grpc.insecure_channel(f"{host}:{port}") as channel:
5686
health_stub = health_pb2_grpc.HealthStub(channel)
5787

5888
def health_check_call(stub: health_pb2_grpc.HealthStub):
5989
request = health_pb2.HealthCheckRequest()
60-
resp = stub.Check(request)
61-
if resp.status == health_pb2.HealthCheckResponse.SERVING:
62-
return True
63-
elif resp.status == health_pb2.HealthCheckResponse.NOT_SERVING:
90+
try:
91+
resp = stub.Check(request)
92+
return resp.status == health_pb2.HealthCheckResponse.SERVING
93+
except Exception:
6494
return False
6595

66-
# Should succeed
6796
# Check health status every 1 second for 30 seconds
6897
ok = False
6998
for _ in range(30):
@@ -73,4 +102,4 @@ def health_check_call(stub: health_pb2_grpc.HealthStub):
73102
time.sleep(1)
74103

75104
if not ok:
76-
raise ConnectionError("flagD not ready in time")
105+
raise ConnectionError("flagd not ready in time")

providers/openfeature-provider-flagd/tests/e2e/inprocess/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from tests.e2e.testfilter import TestFilter
55

66
resolver = ResolverType.IN_PROCESS
7-
feature_list = ["~targetURI", "~unixsocket", "~deprecated", "~forbidden"]
7+
feature_list = ["~targetURI", "~unixsocket", "~deprecated"]
88

99

1010
def pytest_collection_modifyitems(config, items):

providers/openfeature-provider-flagd/tests/e2e/rpc/conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
"~sync",
1111
"~metadata",
1212
"~deprecated",
13-
"~forbidden",
1413
]
1514

1615

0 commit comments

Comments
 (0)