Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions agogosml/agogosml/common/eventhub_streaming_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Event Hub streaming client."""

import asyncio
import json
import signal
from typing import Optional

Expand Down Expand Up @@ -97,6 +98,8 @@ def __init__(self, config): # pragma: no cover
EVENT_HUB_NAMESPACE
EVENT_HUB_SAS_KEY
EVENT_HUB_SAS_POLICY
EVENT_HUB_EPH_OPTIONS: str|dict|instanceof(EPHOptions) <- Accepts JSON str
EVENT_HUB_DEBUG <- Can be overwritten by EPH Options.
LEASE_CONTAINER_NAME
TIMEOUT

Expand Down Expand Up @@ -126,10 +129,9 @@ def __init__(self, config): # pragma: no cover
policy=user,
sas_key=key,
consumer_group=consumer_group)
self.eh_options = EPHOptions()
self.eh_options.release_pump_on_timeout = True
self.eh_options.auto_reconnect_on_error = False
self.eh_options.debug_trace = False

self.eh_options = self.create_eventhub_eph_options(config, self.logger)

self.storage_manager = AzureStorageCheckpointLeaseManager(
storage_account_name, storage_key,
lease_container_name)
Expand All @@ -153,6 +155,47 @@ def __init__(self, config): # pragma: no cover
self.logger.error('Failed to init EH send client: %s', ex)
raise

@staticmethod
def create_eventhub_eph_options(user_config: dict, logger: Logger) -> dict:
"""Create the Event Hub EPH Options class."""
eph_options = user_config.get("EVENT_HUB_EPH_OPTIONS")
if eph_options and isinstance(eph_options, str):
try:
eph_options = json.loads(eph_options)
except ValueError:
logger.warning("Could not parse EPH Options provided as a string. \
Using default EPH Options. Expecting JSON format.", exc_info=True)
eph_options = None

if eph_options and isinstance(eph_options, dict):
# TODO: Submit a PR to EventHub SDK to handle serialized configuration.
typed_ephoptions = EPHOptions()
try:
typed_ephoptions.max_batch_size = int(eph_options.get('max_batch_size', 10))
typed_ephoptions.prefetch_count = int(eph_options.get('prefetch_count', 300))
typed_ephoptions.receive_timeout = int(eph_options.get('receive_timeout', 60))
keep_alive = eph_options.get('keep_alive_interval', None)
if keep_alive is not None:
keep_alive = int(keep_alive)
typed_ephoptions.keep_alive_interval = keep_alive
typed_ephoptions.initial_offset_provider = eph_options.get('initial_offset_provider')
typed_ephoptions.debug_trace = bool(eph_options.get('debug_trace'))
typed_ephoptions.http_proxy = eph_options.get('http_proxy')
typed_ephoptions.auto_reconnect_on_error = bool(eph_options.get('auto_reconnect_on_error'))
return typed_ephoptions
except (TypeError, ValueError):
logger.warning("Could not parse EPH Options. Using default EPH Options.", exc_info=True)
eph_options = False

if eph_options and isinstance(eph_options, EPHOptions):
return eph_options

logger.info("Using default EPH Options.")
typed_ephoptions = EPHOptions()
typed_ephoptions.debug_trace = user_config.get("EVENT_HUB_DEBUG", False) == "True"
logger.debug("EPH Options Debug and Trace set to: {}".format(typed_ephoptions.debug_trace))
return typed_ephoptions

def start_receiving(self, on_message_received_callback): # pragma: no cover
self.loop = asyncio.get_event_loop()
try:
Expand Down
24 changes: 20 additions & 4 deletions agogosml/agogosml/common/kafka_streaming_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Kafka streaming client."""

import json
import signal
from datetime import datetime

Expand All @@ -24,6 +25,8 @@ def __init__(self, config): # pragma: no cover
KAFKA_ADDRESS
KAFKA_CONSUMER_GROUP
KAFKA_TOPIC
KAFKA_CONFIG: str|dict
KAFKA_DEBUB: str
TIMEOUT
EVENTHUB_KAFKA_CONNECTION_STRING
"""
Expand All @@ -44,7 +47,7 @@ def __init__(self, config): # pragma: no cover
else:
self.timeout = None

kafka_config = self.create_kafka_config(config)
kafka_config = self.create_kafka_config(config, self.logger)
self.admin = admin.AdminClient(kafka_config)

if config.get("KAFKA_CONSUMER_GROUP") is None:
Expand All @@ -58,22 +61,35 @@ def __init__(self, config): # pragma: no cover
signal.signal(signal.SIGTERM, self.exit_gracefully)

@staticmethod
def create_kafka_config(user_config: dict) -> dict: # pragma: no cover
def create_kafka_config(user_config: dict, logger: Logger) -> dict:
"""Create the kafka configuration."""
if not user_config.get("KAFKA_ADDRESS"):
raise ValueError("KAFKA_ADDRESS is not set in the config object.")

base_config = user_config.get("KAFKA_CONFIG") or {}
if base_config and isinstance(base_config, str):
try:
base_config = json.loads(base_config)
except ValueError:
logger.warning("Could not parse Kafka Config provided as a string. Expecting JSON format.")
base_config = {}

config = {
**base_config,
"bootstrap.servers": user_config.get("KAFKA_ADDRESS"),
"enable.auto.commit": False,
"auto.offset.reset": "latest",
"default.topic.config": {'auto.offset.reset': 'latest'},
"default.topic.config": {'auto.offset.reset': 'latest'}
}

if user_config.get('KAFKA_CONSUMER_GROUP') is not None:
config['group.id'] = user_config['KAFKA_CONSUMER_GROUP']

if user_config.get('KAFKA_DEBUG') is not None:
config['debug'] = user_config['KAFKA_DEBUG']
logger.debug("Kafka Debug set to: {}".format(config['debug']))

if user_config.get('EVENTHUB_KAFKA_CONNECTION_STRING'):
if user_config.get('EVENTHUB_KAFKA_CONNECTION_STRING') is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we have to ensure that user_config.get('EVENTHUB_KAFKA_CONNECTION_STRING') won't return an empty string if it's not set

ssl_location = user_config.get('SSL_CERT_LOCATION') or '/etc/ssl/certs/ca-certificates.crt'
kakfa_config = {
'security.protocol': "SASL_SSL",
Expand Down
20 changes: 12 additions & 8 deletions agogosml/agogosml/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,29 +65,33 @@ def _telemetry(self) -> TelemetryClient:
channel = TelemetryChannel(context, queue)
return TelemetryClient(ikey, telemetry_channel=channel)

def debug(self, message: str, *args):
def debug(self, message: str, *args, **kwargs):
"""Log debug message."""
self._log(logging.DEBUG, message, *args)
self._log(logging.DEBUG, message, *args, **kwargs)

def info(self, message: str, *args):
def info(self, message: str, *args, **kwargs):
"""Log info message."""
self._log(logging.INFO, message, *args)
self._log(logging.INFO, message, *args, **kwargs)

def error(self, message: str, *args):
def warning(self, message: str, *args, **kwargs):
"""Log warning message."""
self._log(logging.WARNING, message, *args, **kwargs)

def error(self, message: str, *args, **kwargs):
"""Log error message."""
self._log(logging.ERROR, message, *args)
self._log(logging.ERROR, message, *args, **kwargs)

def event(self, name: str, props: Optional[Dict[str, str]] = None):
"""Log an event."""
props = props or {}
self._logger.info('Event %s: %r', name, props)
self._telemetry.track_event(name, props)

def _log(self, level: int, message: str, *args):
def _log(self, level: int, message: str, *args, **kwargs):
"""Log a message."""
if not self._logger.isEnabledFor(level):
return

self._logger.log(level, message, *args)
self._logger.log(level, message, *args, **kwargs)
self._telemetry.track_trace(
message, severity=logging.getLevelName(level))
106 changes: 106 additions & 0 deletions agogosml/tests/test_eventhub_streaming_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Event Hub Streaming Class Unit Tests"""

from unittest.mock import patch

from azure.eventprocessorhost import EPHOptions
from agogosml.common.eventhub_streaming_client import EventHubStreamingClient


@patch('agogosml.utils.logger.Logger')
def test_create_eventhub_eph_options(mock_logger):
"""Test the create eventhub eph options method."""

# Pass in empty config
eph_options = EventHubStreamingClient.create_eventhub_eph_options({}, mock_logger)

# Assert default config is returned.
assert isinstance(eph_options, EPHOptions)
assert not eph_options.debug_trace

# Pass in null config
config = {
'EVENT_HUB_EPH_OPTIONS': None
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert isinstance(eph_options, EPHOptions)
assert not eph_options.debug_trace

# Pass in invalid string
config = {
'EVENT_HUB_EPH_OPTIONS': 'invalid'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
assert not eph_options.debug_trace

mock_logger.reset_mock()

# Pass in invalid string and valid debug variable
config = {
'EVENT_HUB_EPH_OPTIONS': 'invalid',
'EVENT_HUB_DEBUG': 'True'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
assert eph_options.debug_trace

mock_logger.reset_mock()

# Pass in string and valid debug variable
config = {
'EVENT_HUB_EPH_OPTIONS': '{"debug_trace": "True"}',
'EVENT_HUB_DEBUG': 'False'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert not mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
# NOTE: EVENT_HUB_EPH_OPTIONS should overwrite EVENT_HUB_DEBUG
assert eph_options.debug_trace

mock_logger.reset_mock()

# Pass in valid string
config = {
'EVENT_HUB_EPH_OPTIONS': '{"keep_alive_interval": "30"}'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert not mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
assert eph_options.keep_alive_interval == 30

mock_logger.reset_mock()

# Pass in invalid string
config = {
'EVENT_HUB_EPH_OPTIONS': '{"keep_alive_interval": "True"}'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
assert eph_options.keep_alive_interval is None

mock_logger.reset_mock()

# Pass in custom EPH Options
eph_options = EPHOptions()
eph_options.keep_alive_interval = "-1"
eph_options.debug_trace = True
config = {
'EVENT_HUB_EPH_OPTIONS': eph_options,
'EVENT_HUB_DEBUG': 'False'
}
eph_options = EventHubStreamingClient.create_eventhub_eph_options(config, mock_logger)
# Assert default config is returned.
assert not mock_logger.warning.called
assert isinstance(eph_options, EPHOptions)
# NOTE: EVENT_HUB_EPH_OPTIONS should overwrite EVENT_HUB_DEBUG
assert eph_options.keep_alive_interval == "-1"
assert eph_options.debug_trace
Loading