diff --git a/agogosml/agogosml/common/eventhub_streaming_client.py b/agogosml/agogosml/common/eventhub_streaming_client.py index d2e79618..ec3c833a 100644 --- a/agogosml/agogosml/common/eventhub_streaming_client.py +++ b/agogosml/agogosml/common/eventhub_streaming_client.py @@ -1,6 +1,7 @@ """Event Hub streaming client.""" import asyncio +import json import signal from typing import Optional @@ -97,6 +98,8 @@ def __init__(self, config): # pragma: no cover EVENT_HUB_NAMESPACE EVENT_HUB_SAS_KEY EVENT_HUB_SAS_POLICY + EVENT_HUB_EPH_OPTIONS: str|dict|instanceof(EPHOptions) <- Accepts JSON str + EVENT_HUB_DEBUG <- Can be overwritten by EPH Options. LEASE_CONTAINER_NAME TIMEOUT @@ -126,10 +129,9 @@ def __init__(self, config): # pragma: no cover policy=user, sas_key=key, consumer_group=consumer_group) - self.eh_options = EPHOptions() - self.eh_options.release_pump_on_timeout = True - self.eh_options.auto_reconnect_on_error = False - self.eh_options.debug_trace = False + + self.eh_options = self.create_eventhub_eph_options(config, self.logger) + self.storage_manager = AzureStorageCheckpointLeaseManager( storage_account_name, storage_key, lease_container_name) @@ -153,6 +155,47 @@ def __init__(self, config): # pragma: no cover self.logger.error('Failed to init EH send client: %s', ex) raise + @staticmethod + def create_eventhub_eph_options(user_config: dict, logger: Logger) -> dict: + """Create the Event Hub EPH Options class.""" + eph_options = user_config.get("EVENT_HUB_EPH_OPTIONS") + if eph_options and isinstance(eph_options, str): + try: + eph_options = json.loads(eph_options) + except ValueError: + logger.warning("Could not parse EPH Options provided as a string. \ + Using default EPH Options. Expecting JSON format.", exc_info=True) + eph_options = None + + if eph_options and isinstance(eph_options, dict): + # TODO: Submit a PR to EventHub SDK to handle serialized configuration. + typed_ephoptions = EPHOptions() + try: + typed_ephoptions.max_batch_size = int(eph_options.get('max_batch_size', 10)) + typed_ephoptions.prefetch_count = int(eph_options.get('prefetch_count', 300)) + typed_ephoptions.receive_timeout = int(eph_options.get('receive_timeout', 60)) + keep_alive = eph_options.get('keep_alive_interval', None) + if keep_alive is not None: + keep_alive = int(keep_alive) + typed_ephoptions.keep_alive_interval = keep_alive + typed_ephoptions.initial_offset_provider = eph_options.get('initial_offset_provider') + typed_ephoptions.debug_trace = bool(eph_options.get('debug_trace')) + typed_ephoptions.http_proxy = eph_options.get('http_proxy') + typed_ephoptions.auto_reconnect_on_error = bool(eph_options.get('auto_reconnect_on_error')) + return typed_ephoptions + except (TypeError, ValueError): + logger.warning("Could not parse EPH Options. Using default EPH Options.", exc_info=True) + eph_options = False + + if eph_options and isinstance(eph_options, EPHOptions): + return eph_options + + logger.info("Using default EPH Options.") + typed_ephoptions = EPHOptions() + typed_ephoptions.debug_trace = user_config.get("EVENT_HUB_DEBUG", False) == "True" + logger.debug("EPH Options Debug and Trace set to: {}".format(typed_ephoptions.debug_trace)) + return typed_ephoptions + def start_receiving(self, on_message_received_callback): # pragma: no cover self.loop = asyncio.get_event_loop() try: diff --git a/agogosml/agogosml/common/kafka_streaming_client.py b/agogosml/agogosml/common/kafka_streaming_client.py index 794e39a9..f506c66f 100644 --- a/agogosml/agogosml/common/kafka_streaming_client.py +++ b/agogosml/agogosml/common/kafka_streaming_client.py @@ -1,5 +1,6 @@ """Kafka streaming client.""" +import json import signal from datetime import datetime @@ -24,6 +25,8 @@ def __init__(self, config): # pragma: no cover KAFKA_ADDRESS KAFKA_CONSUMER_GROUP KAFKA_TOPIC + KAFKA_CONFIG: str|dict + KAFKA_DEBUB: str TIMEOUT EVENTHUB_KAFKA_CONNECTION_STRING """ @@ -44,7 +47,7 @@ def __init__(self, config): # pragma: no cover else: self.timeout = None - kafka_config = self.create_kafka_config(config) + kafka_config = self.create_kafka_config(config, self.logger) self.admin = admin.AdminClient(kafka_config) if config.get("KAFKA_CONSUMER_GROUP") is None: @@ -58,13 +61,25 @@ def __init__(self, config): # pragma: no cover signal.signal(signal.SIGTERM, self.exit_gracefully) @staticmethod - def create_kafka_config(user_config: dict) -> dict: # pragma: no cover + def create_kafka_config(user_config: dict, logger: Logger) -> dict: """Create the kafka configuration.""" + if not user_config.get("KAFKA_ADDRESS"): + raise ValueError("KAFKA_ADDRESS is not set in the config object.") + + base_config = user_config.get("KAFKA_CONFIG") or {} + if base_config and isinstance(base_config, str): + try: + base_config = json.loads(base_config) + except ValueError: + logger.warning("Could not parse Kafka Config provided as a string. Expecting JSON format.") + base_config = {} + config = { + **base_config, "bootstrap.servers": user_config.get("KAFKA_ADDRESS"), "enable.auto.commit": False, "auto.offset.reset": "latest", - "default.topic.config": {'auto.offset.reset': 'latest'}, + "default.topic.config": {'auto.offset.reset': 'latest'} } if user_config.get('KAFKA_CONSUMER_GROUP') is not None: @@ -72,8 +87,9 @@ def create_kafka_config(user_config: dict) -> dict: # pragma: no cover if user_config.get('KAFKA_DEBUG') is not None: config['debug'] = user_config['KAFKA_DEBUG'] + logger.debug("Kafka Debug set to: {}".format(config['debug'])) - if user_config.get('EVENTHUB_KAFKA_CONNECTION_STRING'): + if user_config.get('EVENTHUB_KAFKA_CONNECTION_STRING') is not None: ssl_location = user_config.get('SSL_CERT_LOCATION') or '/etc/ssl/certs/ca-certificates.crt' kakfa_config = { 'security.protocol': "SASL_SSL", diff --git a/agogosml/agogosml/utils/logger.py b/agogosml/agogosml/utils/logger.py index a06bb8a8..06902a03 100644 --- a/agogosml/agogosml/utils/logger.py +++ b/agogosml/agogosml/utils/logger.py @@ -65,17 +65,21 @@ def _telemetry(self) -> TelemetryClient: channel = TelemetryChannel(context, queue) return TelemetryClient(ikey, telemetry_channel=channel) - def debug(self, message: str, *args): + def debug(self, message: str, *args, **kwargs): """Log debug message.""" - self._log(logging.DEBUG, message, *args) + self._log(logging.DEBUG, message, *args, **kwargs) - def info(self, message: str, *args): + def info(self, message: str, *args, **kwargs): """Log info message.""" - self._log(logging.INFO, message, *args) + self._log(logging.INFO, message, *args, **kwargs) - def error(self, message: str, *args): + def warning(self, message: str, *args, **kwargs): + """Log warning message.""" + self._log(logging.WARNING, message, *args, **kwargs) + + def error(self, message: str, *args, **kwargs): """Log error message.""" - self._log(logging.ERROR, message, *args) + self._log(logging.ERROR, message, *args, **kwargs) def event(self, name: str, props: Optional[Dict[str, str]] = None): """Log an event.""" @@ -83,11 +87,11 @@ def event(self, name: str, props: Optional[Dict[str, str]] = None): self._logger.info('Event %s: %r', name, props) self._telemetry.track_event(name, props) - def _log(self, level: int, message: str, *args): + def _log(self, level: int, message: str, *args, **kwargs): """Log a message.""" if not self._logger.isEnabledFor(level): return - self._logger.log(level, message, *args) + self._logger.log(level, message, *args, **kwargs) self._telemetry.track_trace( message, severity=logging.getLevelName(level)) diff --git a/agogosml/tests/test_eventhub_streaming_client.py b/agogosml/tests/test_eventhub_streaming_client.py new file mode 100644 index 00000000..b1de0b9e --- /dev/null +++ b/agogosml/tests/test_eventhub_streaming_client.py @@ -0,0 +1,106 @@ +"""Event Hub Streaming Class Unit Tests""" + +from unittest.mock import patch + +from azure.eventprocessorhost import EPHOptions +from agogosml.common.eventhub_streaming_client import EventHubStreamingClient + + +@patch('agogosml.utils.logger.Logger') +def test_create_eventhub_eph_options(mock_logger): + """Test the create eventhub eph options method.""" + + # Pass in empty config + eph_options = EventHubStreamingClient.create_eventhub_eph_options({}, mock_logger) + + # Assert default config is returned. + assert isinstance(eph_options, EPHOptions) + assert not eph_options.debug_trace + + # Pass in null config + config = { + 'EVENT_HUB_EPH_OPTIONS': None + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert isinstance(eph_options, EPHOptions) + assert not eph_options.debug_trace + + # Pass in invalid string + config = { + 'EVENT_HUB_EPH_OPTIONS': 'invalid' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + assert not eph_options.debug_trace + + mock_logger.reset_mock() + + # Pass in invalid string and valid debug variable + config = { + 'EVENT_HUB_EPH_OPTIONS': 'invalid', + 'EVENT_HUB_DEBUG': 'True' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + assert eph_options.debug_trace + + mock_logger.reset_mock() + + # Pass in string and valid debug variable + config = { + 'EVENT_HUB_EPH_OPTIONS': '{"debug_trace": "True"}', + 'EVENT_HUB_DEBUG': 'False' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert not mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + # NOTE: EVENT_HUB_EPH_OPTIONS should overwrite EVENT_HUB_DEBUG + assert eph_options.debug_trace + + mock_logger.reset_mock() + + # Pass in valid string + config = { + 'EVENT_HUB_EPH_OPTIONS': '{"keep_alive_interval": "30"}' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert not mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + assert eph_options.keep_alive_interval == 30 + + mock_logger.reset_mock() + + # Pass in invalid string + config = { + 'EVENT_HUB_EPH_OPTIONS': '{"keep_alive_interval": "True"}' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + assert eph_options.keep_alive_interval is None + + mock_logger.reset_mock() + + # Pass in custom EPH Options + eph_options = EPHOptions() + eph_options.keep_alive_interval = "-1" + eph_options.debug_trace = True + config = { + 'EVENT_HUB_EPH_OPTIONS': eph_options, + 'EVENT_HUB_DEBUG': 'False' + } + eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger) + # Assert default config is returned. + assert not mock_logger.warning.called + assert isinstance(eph_options, EPHOptions) + # NOTE: EVENT_HUB_EPH_OPTIONS should overwrite EVENT_HUB_DEBUG + assert eph_options.keep_alive_interval == "-1" + assert eph_options.debug_trace diff --git a/agogosml/tests/test_kafka_streaming_client.py b/agogosml/tests/test_kafka_streaming_client.py new file mode 100644 index 00000000..3f0aff43 --- /dev/null +++ b/agogosml/tests/test_kafka_streaming_client.py @@ -0,0 +1,122 @@ +"""Event Hub Streaming Class Unit Tests""" + +from unittest.mock import patch + +import pytest + +from agogosml.common.kafka_streaming_client import KafkaStreamingClient + + +@patch('agogosml.utils.logger.Logger') +def test_create_kafka_config(mock_logger): + """Test the create kafka config method.""" + + with pytest.raises(ValueError): + kafka_config = KafkaStreamingClient.create_kafka_config({}, mock_logger) + + # Pass in base config + user_config = { + 'KAFKA_ADDRESS': 'localhost' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should return default config. + assert not kafka_config['enable.auto.commit'] + + # Pass in invalid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': None + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should return default config. + assert not kafka_config['enable.auto.commit'] + + # Pass in invalid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': 'invalid_string' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should return default config. + assert mock_logger.warning.called + assert not kafka_config['enable.auto.commit'] + + mock_logger.reset_mock() + + # Pass in empty user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': '{}' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + assert not mock_logger.warning.called + assert not kafka_config['enable.auto.commit'] + + mock_logger.reset_mock() + + # Pass in valid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': {"client.id": "test"} + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + assert not mock_logger.warning.called + assert not kafka_config['enable.auto.commit'] + assert kafka_config['client.id'] == 'test' + + mock_logger.reset_mock() + + # Pass in valid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': '{"client.id": "test"}' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + assert not mock_logger.warning.called + assert not kafka_config['enable.auto.commit'] + assert kafka_config['client.id'] == 'test' + + mock_logger.reset_mock() + + # Pass in valid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': '{"enabled.auto.commit": true}' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should squash 'enabled.auto.commit' to library setting. + assert not mock_logger.warning.called + assert not kafka_config['enable.auto.commit'] + + mock_logger.reset_mock() + + # Pass in valid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'KAFKA_CONFIG': '{"enabled.auto.commit": true}', + 'KAFKA_CONSUMER_GROUP': 'helloworld', + 'KAFKA_DEBUG': 'consumer' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should squash 'enabled.auto.commit' to library setting. + assert not mock_logger.warning.called + assert mock_logger.debug.called + assert not kafka_config['enable.auto.commit'] + assert kafka_config['group.id'] == 'helloworld' + assert kafka_config['debug'] == 'consumer' + + mock_logger.reset_mock() + + # Pass in valid user config + user_config = { + 'KAFKA_ADDRESS': 'localhost', + 'EVENTHUB_KAFKA_CONNECTION_STRING': 'localhost' + } + kafka_config = KafkaStreamingClient.create_kafka_config(user_config, mock_logger) + # Should squash 'enabled.auto.commit' to library setting. + assert not mock_logger.warning.called + assert not mock_logger.debug.called + assert not kafka_config['enable.auto.commit'] + assert kafka_config['client.id'] == 'agogosml' + + mock_logger.reset_mock()