From b259b3fb97c64d91ac2a155107943b773bf4a3aa Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 2 Dec 2024 17:27:12 +0000 Subject: [PATCH 01/12] use fc00 and f144 instead of rf5k and f142 respectively --- BlockServerToKafka/forwarder_config.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index 0e059f57..eb96b8bc 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -15,13 +15,13 @@ # http://opensource.org/licenses/eclipse-1.0.php from typing import List -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.Protocol import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.UpdateType import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import ( UpdateType, ) -from streaming_data_types.forwarder_config_update_rf5k import StreamInfo, serialise_rf5k +from streaming_data_types.forwarder_config_update_fc00 import StreamInfo, serialise_fc00 class ForwarderConfig: @@ -29,7 +29,7 @@ class ForwarderConfig: Class that converts the pv information to a forwarder config message payload """ - def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f142"): + def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144"): self.schema = schema self.topic = topic self.epics_protocol = epics_protocol From 6db5145a6cb0efc127acd02aaf256275a3c6e319 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 2 Dec 2024 17:32:25 +0000 Subject: [PATCH 02/12] Update forwarder_config.py --- BlockServerToKafka/forwarder_config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index eb96b8bc..0a558830 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -38,11 +38,11 @@ def _create_streams(self, pvs: List[str]) -> List[StreamInfo]: return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol) for pv in pvs] def create_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_rf5k(UpdateType.ADD, self._create_streams(pvs)) + return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) def remove_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_rf5k(UpdateType.REMOVE, self._create_streams(pvs)) + return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) @staticmethod def remove_all_forwarder_configuration() -> bytes: - return serialise_rf5k(UpdateType.REMOVEALL, []) + return serialise_fc00(UpdateType.REMOVEALL, []) From f6440d6ba466af3ac5507415209354cd5a43efe1 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Mon, 2 Dec 2024 17:33:40 +0000 Subject: [PATCH 03/12] Update forwarder_config.py --- BlockServerToKafka/forwarder_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index 0a558830..d2243e38 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -29,7 +29,7 @@ class ForwarderConfig: Class that converts the pv information to a forwarder config message payload """ - def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144"): + def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144") -> None: self.schema = schema self.topic = topic self.epics_protocol = epics_protocol From f19f1dcc8f81b49ce0ddfc2f1bbf7b2d821334d8 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 6 Dec 2024 09:01:28 +0000 Subject: [PATCH 04/12] format, add test files --- BlockServerToKafka/forwarder_config.py | 4 ++- BlockServerToKafka/kafka_producer.py | 2 +- .../test_modules/test_forwarder_config.py | 30 +++++++++---------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index d2243e38..0bc26698 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -29,7 +29,9 @@ class ForwarderConfig: Class that converts the pv information to a forwarder config message payload """ - def __init__(self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144") -> None: + def __init__( + self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144" + ) -> None: self.schema = schema self.topic = topic self.epics_protocol = epics_protocol diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index d2cab5d3..40d5c618 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -17,7 +17,7 @@ from typing import List from kafka import KafkaConsumer, KafkaProducer, errors -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.Protocol import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 12db74c8..3ec02e2e 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -15,11 +15,11 @@ # http://opensource.org/licenses/eclipse-1.0.php import unittest -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.Protocol import ( +from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) -from streaming_data_types.fbschemas.forwarder_config_update_rf5k.UpdateType import UpdateType -from streaming_data_types.forwarder_config_update_rf5k import deserialise_rf5k +from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import UpdateType +from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 from BlockServerToKafka.forwarder_config import ForwarderConfig @@ -33,7 +33,7 @@ class TestForwarderConfig(unittest.TestCase): @staticmethod def is_flatbuffers(payload): try: - deserialise_rf5k(payload) + deserialise_fc00(payload) except ValueError: return False return True @@ -51,24 +51,24 @@ def test_WHEN_new_forwarder_config_created_THEN_returns_configuration_update_con self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.ADD) def test_WHEN_forwarder_config_removed_THEN_output_has_correct_command_type(self): raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVE) def test_WHEN_all_pvs_removed_THEN_output_has_correct_command_type(self): raw_output = self.kafka_forwarder.remove_all_forwarder_configuration() - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVEALL) def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_streams_with_channels_and_converters( self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: self.assertEqual(self.test_block_1, stream.channel) @@ -80,7 +80,7 @@ def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JS ): kafka_version_4 = ForwarderConfig(epics_protocol=Protocol.PVA, topic=self.test_topic) raw_output = kafka_version_4.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: self.assertEqual(stream.protocol, Protocol.PVA) @@ -89,7 +89,7 @@ def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_TH self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(1, len(output[1])) def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_two_stream( @@ -98,14 +98,14 @@ def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_TH raw_output = self.kafka_forwarder.create_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(2, len(output[1])) def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_block_pv_string( self, ): raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) stream = output[1][0] self.assertEqual(self.test_block_1, stream.channel) @@ -115,7 +115,7 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_new_forwarder_config_created_T raw_output = self.kafka_forwarder.create_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) for blk in [self.test_block_1, self.test_block_2]: self.assertTrue(blk in [stream.channel for stream in output[1]]) @@ -127,7 +127,7 @@ def test_GIVEN_configuration_with_one_block_WHEN_removed_old_forwarder_THEN_retu self, ): raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) self.assertEqual(self.test_block_1, output[1][0].channel) def test_GIVEN_configuration_with_two_blocks_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_both_block_pv_string( @@ -136,6 +136,6 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_removed_old_forwarder_THEN_ret raw_output = self.kafka_forwarder.remove_forwarder_configuration( self.config_with_two_blocks ) - output = deserialise_rf5k(raw_output) + output = deserialise_fc00(raw_output) for blk in [self.test_block_1, self.test_block_2]: self.assertTrue(blk in [stream.channel for stream in output[1]]) From 0bddee813b96b6c7a0506be0aa44d414d42130cd Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 6 Dec 2024 09:08:45 +0000 Subject: [PATCH 05/12] fix tests and sort ruff out --- BlockServerToKafka/block_server_monitor.py | 18 ++++++++++-------- BlockServerToKafka/forwarder_config.py | 2 +- BlockServerToKafka/kafka_producer.py | 15 +++++++-------- .../test_modules/test_block_server_monitor.py | 3 +-- .../test_modules/test_forwarder_config.py | 3 +-- 5 files changed, 20 insertions(+), 21 deletions(-) diff --git a/BlockServerToKafka/block_server_monitor.py b/BlockServerToKafka/block_server_monitor.py index a4874496..a5672a8c 100644 --- a/BlockServerToKafka/block_server_monitor.py +++ b/BlockServerToKafka/block_server_monitor.py @@ -15,11 +15,12 @@ # http://opensource.org/licenses/eclipse-1.0.php import json from threading import RLock +from typing import Any import ca -from CaChannel import CaChannel, CaChannelException - from BlockServer.core.macros import BLOCK_PREFIX +from BlockServerToKafka.kafka_producer import ProducerWrapper +from CaChannel import CaChannel, CaChannelException from server_common.utilities import dehex_and_decompress, print_and_log @@ -30,7 +31,7 @@ class BlockServerMonitor: Uses a Channel Access Monitor. """ - def __init__(self, address, pvprefix, producer): + def __init__(self, address: str, pvprefix: str, producer: ProducerWrapper) -> None: self.PVPREFIX = pvprefix self.address = address self.channel = CaChannel() @@ -53,7 +54,7 @@ def __init__(self, address, pvprefix, producer): ) self.channel.pend_event() - def block_name_to_pv_name(self, blk): + def block_name_to_pv_name(self, blk: str) -> str: """ Converts a block name to a PV name by adding the prefixes. @@ -66,11 +67,12 @@ def block_name_to_pv_name(self, blk): return f"{self.PVPREFIX}{BLOCK_PREFIX}{blk}" @staticmethod - def convert_to_string(pv_array): + def convert_to_string(pv_array: bytearray) -> str: """ Convert from byte array to string and remove null characters. - We cannot get the number of elements in the array so convert to bytes and remove the null characters. + We cannot get the number of elements in the array so convert to bytes and remove the + null characters. Args: pv_array (bytearray): The byte array of PVs. @@ -81,7 +83,7 @@ def convert_to_string(pv_array): return bytearray(pv_array).decode("utf-8").replace("\x00", "") - def update_config(self, blocks): + def update_config(self, blocks: list[str]) -> None: """ Updates the forwarder configuration to monitor the supplied blocks. @@ -99,7 +101,7 @@ def update_config(self, blocks): self.producer.add_config(pvs) self.last_pvs = pvs - def update(self, epics_args, user_args): + def update(self, epics_args: dict[str, bytearray], user_args: Any) -> None: # noqa: ANN401 """ Updates the kafka config when the blockserver changes. This is called from the monitor. diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index 0bc26698..8f4d6450 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -37,7 +37,7 @@ def __init__( self.epics_protocol = epics_protocol def _create_streams(self, pvs: List[str]) -> List[StreamInfo]: - return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol) for pv in pvs] + return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) for pv in pvs] def create_forwarder_configuration(self, pvs: List[str]) -> bytes: return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index 40d5c618..a10233fc 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -16,14 +16,13 @@ from time import sleep from typing import List +from BlockServerToKafka.forwarder_config import ForwarderConfig from kafka import KafkaConsumer, KafkaProducer, errors +from server_common.utilities import print_and_log from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) -from BlockServerToKafka.forwarder_config import ForwarderConfig -from server_common.utilities import print_and_log - class ProducerWrapper: """ @@ -36,12 +35,12 @@ def __init__( config_topic: str, data_topic: str, epics_protocol: Protocol = Protocol.CA, - ): + ) -> None: self.topic = config_topic self.converter = ForwarderConfig(data_topic, epics_protocol) self._set_up_producer(server) - def _set_up_producer(self, server: str): + def _set_up_producer(self, server: str) -> None: try: self.client = KafkaConsumer(bootstrap_servers=server) self.producer = KafkaProducer(bootstrap_servers=server) @@ -67,7 +66,7 @@ def _set_up_producer(self, server: str): # Recursive call after waiting self._set_up_producer(server) - def add_config(self, pvs: List[str]): + def add_config(self, pvs: List[str]) -> None: """ Create a forwarder configuration to add more pvs to be monitored. @@ -79,7 +78,7 @@ def add_config(self, pvs: List[str]): def topic_exists(self, topic_name: str) -> bool: return topic_name in self.client.topics() - def remove_config(self, pvs: List[str]): + def remove_config(self, pvs: List[str]) -> None: """ Create a forwarder configuration to remove pvs that are being monitored. @@ -88,7 +87,7 @@ def remove_config(self, pvs: List[str]): message_buffer = self.converter.remove_forwarder_configuration(pvs) self.producer.send(self.topic, message_buffer) - def stop_all_pvs(self): + def stop_all_pvs(self) -> None: """ Sends a stop_all command to the forwarder to clear all configuration. """ diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py index 8caa68b4..7fbc52ca 100644 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ b/BlockServerToKafka/test_modules/test_block_server_monitor.py @@ -16,9 +16,8 @@ import unittest -from mock import MagicMock, patch - from BlockServerToKafka.block_server_monitor import BlockServerMonitor +from mock import MagicMock, patch class TestBlockServerMonitor(unittest.TestCase): diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 3ec02e2e..0b224349 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -15,14 +15,13 @@ # http://opensource.org/licenses/eclipse-1.0.php import unittest +from BlockServerToKafka.forwarder_config import ForwarderConfig from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import UpdateType from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 -from BlockServerToKafka.forwarder_config import ForwarderConfig - class TestForwarderConfig(unittest.TestCase): test_schema = "schema" From 6bfd9d79ac66c572056fb7e76c05aafed3618604 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 6 Dec 2024 09:16:13 +0000 Subject: [PATCH 06/12] ruff being weird --- .../test_modules/test_block_server_monitor.py | 13 +++++++++++-- .../test_modules/test_forwarder_config.py | 4 +++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py index 7fbc52ca..c1155673 100644 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ b/BlockServerToKafka/test_modules/test_block_server_monitor.py @@ -29,7 +29,14 @@ class TestBlockServerMonitor(unittest.TestCase): @patch("CaChannel.CaChannel.add_masked_array_event") @patch("CaChannel.CaChannel.field_type") @patch("CaChannel.CaChannel.pend_event") - def setUp(self, mock_ca_channel, mock_search, mock_add_array, mock_field_type, mock_pend_event): + def setUp( + self, + mock_ca_channel, + mock_search, + mock_add_array, + mock_field_type, + mock_pend_event, + ): self.mock_producer = MagicMock() self.bs_monitor = BlockServerMonitor( self.test_address, self.test_prefix, self.mock_producer @@ -69,7 +76,9 @@ def test_WHEN_convert_nulls_THEN_empty_string_returned(self): arr = [0] * 10 self.assertEqual("", self.bs_monitor.convert_to_string(arr)) - def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called(self): + def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called( + self, + ): self.bs_monitor.update_config(["BLOCK"]) self.mock_producer.add_config.assert_called_once() diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 0b224349..949fec12 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -19,7 +19,9 @@ from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) -from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import UpdateType +from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import ( + UpdateType, +) from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 From c2faea390120bf3104aa877b0fa3195914896469 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Fri, 6 Dec 2024 11:19:56 +0000 Subject: [PATCH 07/12] PEBCAK --- BlockServerToKafka/block_server_monitor.py | 3 +- BlockServerToKafka/kafka_producer.py | 5 ++-- BlockServerToKafka/test_modules/__init__.py | 28 ------------------- .../test_modules/test_block_server_monitor.py | 3 +- .../test_modules/test_forwarder_config.py | 3 +- 5 files changed, 9 insertions(+), 33 deletions(-) delete mode 100644 BlockServerToKafka/test_modules/__init__.py diff --git a/BlockServerToKafka/block_server_monitor.py b/BlockServerToKafka/block_server_monitor.py index a5672a8c..53bb9c69 100644 --- a/BlockServerToKafka/block_server_monitor.py +++ b/BlockServerToKafka/block_server_monitor.py @@ -18,9 +18,10 @@ from typing import Any import ca +from CaChannel import CaChannel, CaChannelException + from BlockServer.core.macros import BLOCK_PREFIX from BlockServerToKafka.kafka_producer import ProducerWrapper -from CaChannel import CaChannel, CaChannelException from server_common.utilities import dehex_and_decompress, print_and_log diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index a10233fc..76710ed4 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -16,13 +16,14 @@ from time import sleep from typing import List -from BlockServerToKafka.forwarder_config import ForwarderConfig from kafka import KafkaConsumer, KafkaProducer, errors -from server_common.utilities import print_and_log from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) +from BlockServerToKafka.forwarder_config import ForwarderConfig +from server_common.utilities import print_and_log + class ProducerWrapper: """ diff --git a/BlockServerToKafka/test_modules/__init__.py b/BlockServerToKafka/test_modules/__init__.py deleted file mode 100644 index 882b923b..00000000 --- a/BlockServerToKafka/test_modules/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import absolute_import, division, print_function, unicode_literals - -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php -import os - - -def load_tests(loader, standard_tests, pattern): - """ - This function is needed by the load_tests protocol described at - https://docs.python.org/3/library/unittest.html#load-tests-protocol - The tests in this module are only added under Python 3. - """ - standard_tests.addTests(loader.discover(os.path.dirname(__file__), pattern=pattern)) - return standard_tests diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py index c1155673..6adb57c0 100644 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ b/BlockServerToKafka/test_modules/test_block_server_monitor.py @@ -16,9 +16,10 @@ import unittest -from BlockServerToKafka.block_server_monitor import BlockServerMonitor from mock import MagicMock, patch +from BlockServerToKafka.block_server_monitor import BlockServerMonitor + class TestBlockServerMonitor(unittest.TestCase): test_address = "TEST_ADDRESS" diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 949fec12..920576c6 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -15,7 +15,6 @@ # http://opensource.org/licenses/eclipse-1.0.php import unittest -from BlockServerToKafka.forwarder_config import ForwarderConfig from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) @@ -24,6 +23,8 @@ ) from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 +from BlockServerToKafka.forwarder_config import ForwarderConfig + class TestForwarderConfig(unittest.TestCase): test_schema = "schema" From a9fb432961844e0e568bf653f60a239d713647e3 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Thu, 12 Dec 2024 12:17:00 +0000 Subject: [PATCH 08/12] make tests a bit less patch-happy, ruff and pyright fixes --- BlockServerToKafka/block_server_monitor.py | 4 +- BlockServerToKafka/forwarder_config.py | 16 +++-- BlockServerToKafka/kafka_producer.py | 4 +- BlockServerToKafka/test_modules/__init__.py | 0 .../test_modules/test_block_server_monitor.py | 63 ++++++++++--------- .../test_modules/test_forwarder_config.py | 41 +++++++++--- 6 files changed, 80 insertions(+), 48 deletions(-) create mode 100644 BlockServerToKafka/test_modules/__init__.py diff --git a/BlockServerToKafka/block_server_monitor.py b/BlockServerToKafka/block_server_monitor.py index 53bb9c69..7f92b8af 100644 --- a/BlockServerToKafka/block_server_monitor.py +++ b/BlockServerToKafka/block_server_monitor.py @@ -47,9 +47,9 @@ def __init__(self, address: str, pvprefix: str, producer: ProducerWrapper) -> No # Create the CA monitor callback self.channel.add_masked_array_event( - ca.dbf_type_to_DBR_STS(self.channel.field_type()), + ca.dbf_type_to_DBR_STS(self.channel.field_type()), # pyright: ignore 0, - ca.DBE_VALUE, + ca.DBE_VALUE, # pyright: ignore self.update, None, ) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index 8f4d6450..3cd8b36d 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -30,21 +30,27 @@ class ForwarderConfig: """ def __init__( - self, topic: str, epics_protocol: Protocol = Protocol.CA, schema: str = "f144" + self, + topic: str, + epics_protocol: Protocol = Protocol.CA, # pyright: ignore + schema: str = "f144", ) -> None: self.schema = schema self.topic = topic self.epics_protocol = epics_protocol def _create_streams(self, pvs: List[str]) -> List[StreamInfo]: - return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) for pv in pvs] + return [ + StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) + for pv in pvs + ] # pyright: ignore def create_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) + return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) # pyright: ignore def remove_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) + return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) # pyright: ignore @staticmethod def remove_all_forwarder_configuration() -> bytes: - return serialise_fc00(UpdateType.REMOVEALL, []) + return serialise_fc00(UpdateType.REMOVEALL, []) # pyright: ignore diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index 76710ed4..756a496c 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -35,7 +35,7 @@ def __init__( server: str, config_topic: str, data_topic: str, - epics_protocol: Protocol = Protocol.CA, + epics_protocol: Protocol = Protocol.CA, # pyright: ignore ) -> None: self.topic = config_topic self.converter = ForwarderConfig(data_topic, epics_protocol) @@ -51,7 +51,7 @@ def _set_up_producer(self, server: str) -> None: ) except errors.NoBrokersAvailable: print_and_log(f"No brokers found on server: {server[0]}") - except errors.ConnectionError: + except errors.KafkaConnectionError: print_and_log("No server found, connection error") except errors.InvalidConfigurationError: print_and_log("Invalid configuration") diff --git a/BlockServerToKafka/test_modules/__init__.py b/BlockServerToKafka/test_modules/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py index 6adb57c0..c1980d18 100644 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ b/BlockServerToKafka/test_modules/test_block_server_monitor.py @@ -21,70 +21,75 @@ from BlockServerToKafka.block_server_monitor import BlockServerMonitor +@patch("CaChannel.CaChannel") class TestBlockServerMonitor(unittest.TestCase): test_address = "TEST_ADDRESS" test_prefix = "TEST_PREFIX" - @patch("CaChannel.CaChannel") - @patch("CaChannel.CaChannel.searchw") - @patch("CaChannel.CaChannel.add_masked_array_event") - @patch("CaChannel.CaChannel.field_type") - @patch("CaChannel.CaChannel.pend_event") - def setUp( - self, - mock_ca_channel, - mock_search, - mock_add_array, - mock_field_type, - mock_pend_event, - ): + def setUp(self): self.mock_producer = MagicMock() self.bs_monitor = BlockServerMonitor( self.test_address, self.test_prefix, self.mock_producer ) - def test_WHEN_convert_one_char_to_string_THEN_returns_character(self): + def test_WHEN_convert_one_char_to_string_THEN_returns_character( + self, + mock_ca_channel, + ): c = "a" arr = [ord(c)] - self.assertEqual(c, self.bs_monitor.convert_to_string(arr)) + self.assertEqual(c, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_many_chars_to_string_THEN_returns_characters(self): + def test_WHEN_convert_many_chars_to_string_THEN_returns_characters( + self, mock_ca_channel + ): chars = "hello world" arr = [ord(c) for c in chars] - self.assertEqual(chars, self.bs_monitor.convert_to_string(arr)) + self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_chars_with_null_at_end_THEN_nulls_removed(self): + def test_WHEN_convert_chars_with_null_at_end_THEN_nulls_removed( + self, + mock_ca_channel, + ): chars = "hello world" arr = [ord(c) for c in chars] for i in range(3): arr.append(0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(arr)) + self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_chars_with_null_at_start_THEN_nulls_removed(self): + def test_WHEN_convert_chars_with_null_at_start_THEN_nulls_removed( + self, + mock_ca_channel, + ): chars = "hello world" arr = [ord(c) for c in chars] for i in range(3): arr.insert(0, 0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(arr)) + self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_chars_with_nulls_in_centre_THEN_nulls_removed(self): + def test_WHEN_convert_chars_with_nulls_in_centre_THEN_nulls_removed( + self, mock_ca_channel + ): chars = "hello world" arr = [ord(c) for c in chars] arr.insert(4, 0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(arr)) + self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_nulls_THEN_empty_string_returned(self): + def test_WHEN_convert_nulls_THEN_empty_string_returned( + self, + mock_ca_channel, + ): arr = [0] * 10 - self.assertEqual("", self.bs_monitor.convert_to_string(arr)) + self.assertEqual("", self.bs_monitor.convert_to_string(bytearray(arr))) def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called( - self, + self, mock_ca_channel ): self.bs_monitor.update_config(["BLOCK"]) self.mock_producer.add_config.assert_called_once() def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called_containing_block_name( - self, + self, mock_ca_channel ): block = "BLOCK" self.bs_monitor.update_config([block]) @@ -93,7 +98,7 @@ def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called ) def test_GIVEN_previous_pvs_WHEN_update_config_called_with_same_pvs_THEN_producer_is_not_called( - self, + self, mock_ca_channel ): block = "BLOCK" self.bs_monitor.update_config([block]) @@ -101,7 +106,7 @@ def test_GIVEN_previous_pvs_WHEN_update_config_called_with_same_pvs_THEN_produce self.mock_producer.add_config.assert_called_once() def test_GIVEN_previous_pvs_WHEN_update_config_called_with_different_pvs_THEN_producer_is_called( - self, + self, mock_ca_channel ): self.bs_monitor.update_config(["OLD_BLOCK"]) self.mock_producer.reset_mock() diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 920576c6..97958050 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -46,18 +46,24 @@ def setUp(self): self.config_with_two_blocks = [self.test_block_1, self.test_block_2] def test_WHEN_new_forwarder_config_created_THEN_returns_valid_flatbuffers(self): - output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) + output = self.kafka_forwarder.create_forwarder_configuration( + self.config_with_one_block + ) self.assertTrue(self.is_flatbuffers(output)) def test_WHEN_new_forwarder_config_created_THEN_returns_configuration_update_containing_add_command( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.create_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.ADD) def test_WHEN_forwarder_config_removed_THEN_output_has_correct_command_type(self): - raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.remove_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVE) @@ -69,7 +75,9 @@ def test_WHEN_all_pvs_removed_THEN_output_has_correct_command_type(self): def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_streams_with_channels_and_converters( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.create_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: @@ -80,8 +88,13 @@ def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_st def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_streams_with_pva_channel_type( self, ): - kafka_version_4 = ForwarderConfig(epics_protocol=Protocol.PVA, topic=self.test_topic) - raw_output = kafka_version_4.create_forwarder_configuration(self.config_with_one_block) + kafka_version_4 = ForwarderConfig( + epics_protocol=Protocol.PVA, # pyright: ignore noqa + topic=self.test_topic, + ) + raw_output = kafka_version_4.create_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: @@ -90,7 +103,9 @@ def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JS def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_one_stream( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.create_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertEqual(1, len(output[1])) @@ -106,7 +121,9 @@ def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_TH def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_block_pv_string( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.create_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) stream = output[1][0] self.assertEqual(self.test_block_1, stream.channel) @@ -122,13 +139,17 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_new_forwarder_config_created_T self.assertTrue(blk in [stream.channel for stream in output[1]]) def test_WHEN_removed_old_forwarder_THEN_JSON_returns_valid(self): - output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) + output = self.kafka_forwarder.remove_forwarder_configuration( + self.config_with_one_block + ) self.assertTrue(self.is_flatbuffers(output)) def test_GIVEN_configuration_with_one_block_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_block_pv_string( self, ): - raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) + raw_output = self.kafka_forwarder.remove_forwarder_configuration( + self.config_with_one_block + ) output = deserialise_fc00(raw_output) self.assertEqual(self.test_block_1, output[1][0].channel) From e33d522a6e8069592ac55633a542e25a24462b62 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Thu, 12 Dec 2024 12:19:27 +0000 Subject: [PATCH 09/12] ruff - i used the old version by mistake --- BlockServerToKafka/forwarder_config.py | 5 +-- .../test_modules/test_block_server_monitor.py | 8 ++--- .../test_modules/test_forwarder_config.py | 36 +++++-------------- 3 files changed, 12 insertions(+), 37 deletions(-) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py index 3cd8b36d..68e21268 100644 --- a/BlockServerToKafka/forwarder_config.py +++ b/BlockServerToKafka/forwarder_config.py @@ -40,10 +40,7 @@ def __init__( self.epics_protocol = epics_protocol def _create_streams(self, pvs: List[str]) -> List[StreamInfo]: - return [ - StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) - for pv in pvs - ] # pyright: ignore + return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) for pv in pvs] # pyright: ignore def create_forwarder_configuration(self, pvs: List[str]) -> bytes: return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) # pyright: ignore diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py index c1980d18..12fb81ea 100644 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ b/BlockServerToKafka/test_modules/test_block_server_monitor.py @@ -40,9 +40,7 @@ def test_WHEN_convert_one_char_to_string_THEN_returns_character( arr = [ord(c)] self.assertEqual(c, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_many_chars_to_string_THEN_returns_characters( - self, mock_ca_channel - ): + def test_WHEN_convert_many_chars_to_string_THEN_returns_characters(self, mock_ca_channel): chars = "hello world" arr = [ord(c) for c in chars] self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) @@ -67,9 +65,7 @@ def test_WHEN_convert_chars_with_null_at_start_THEN_nulls_removed( arr.insert(0, 0) self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - def test_WHEN_convert_chars_with_nulls_in_centre_THEN_nulls_removed( - self, mock_ca_channel - ): + def test_WHEN_convert_chars_with_nulls_in_centre_THEN_nulls_removed(self, mock_ca_channel): chars = "hello world" arr = [ord(c) for c in chars] arr.insert(4, 0) diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py index 97958050..50b860d7 100644 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ b/BlockServerToKafka/test_modules/test_forwarder_config.py @@ -46,24 +46,18 @@ def setUp(self): self.config_with_two_blocks = [self.test_block_1, self.test_block_2] def test_WHEN_new_forwarder_config_created_THEN_returns_valid_flatbuffers(self): - output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_one_block - ) + output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) self.assertTrue(self.is_flatbuffers(output)) def test_WHEN_new_forwarder_config_created_THEN_returns_configuration_update_containing_add_command( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.ADD) def test_WHEN_forwarder_config_removed_THEN_output_has_correct_command_type(self): - raw_output = self.kafka_forwarder.remove_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertEqual(output.config_change, UpdateType.REMOVE) @@ -75,9 +69,7 @@ def test_WHEN_all_pvs_removed_THEN_output_has_correct_command_type(self): def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_streams_with_channels_and_converters( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: @@ -92,9 +84,7 @@ def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JS epics_protocol=Protocol.PVA, # pyright: ignore noqa topic=self.test_topic, ) - raw_output = kafka_version_4.create_forwarder_configuration( - self.config_with_one_block - ) + raw_output = kafka_version_4.create_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertNotEqual(0, len(output[1])) for stream in output[1]: @@ -103,9 +93,7 @@ def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JS def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_one_stream( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertEqual(1, len(output[1])) @@ -121,9 +109,7 @@ def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_TH def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_block_pv_string( self, ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) stream = output[1][0] self.assertEqual(self.test_block_1, stream.channel) @@ -139,17 +125,13 @@ def test_GIVEN_configuration_with_two_blocks_WHEN_new_forwarder_config_created_T self.assertTrue(blk in [stream.channel for stream in output[1]]) def test_WHEN_removed_old_forwarder_THEN_JSON_returns_valid(self): - output = self.kafka_forwarder.remove_forwarder_configuration( - self.config_with_one_block - ) + output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) self.assertTrue(self.is_flatbuffers(output)) def test_GIVEN_configuration_with_one_block_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_block_pv_string( self, ): - raw_output = self.kafka_forwarder.remove_forwarder_configuration( - self.config_with_one_block - ) + raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) output = deserialise_fc00(raw_output) self.assertEqual(self.test_block_1, output[1][0].channel) From 9340573042647f6ac044b979eb19d933324e42a8 Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Thu, 19 Dec 2024 16:45:20 +0000 Subject: [PATCH 10/12] always use livedata --- start_bs_to_kafka_cmd.bat | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/start_bs_to_kafka_cmd.bat b/start_bs_to_kafka_cmd.bat index bbf18582..2c342d03 100644 --- a/start_bs_to_kafka_cmd.bat +++ b/start_bs_to_kafka_cmd.bat @@ -10,11 +10,4 @@ set EPICS_CAS_BEACON_ADDR_LIST=127.255.255.255 set PYTHONUNBUFFERED=TRUE -if "%ISIS_INSTRUMENT%" == "1" ( - set BROKER=livedata.isis.cclrc.ac.uk -) else ( - REM point at the test server - set BROKER=tenten.isis.cclrc.ac.uk -) - -%PYTHON3W% %MYDIRCD%\BlockServerToKafka\main.py -d %INSTRUMENT%_sampleEnv -c forwarder_config -b %BROKER%:9092 -p %MYPVPREFIX% +%PYTHON3W% %MYDIRCD%\BlockServerToKafka\main.py -d %INSTRUMENT%_sampleEnv -c forwarder_config -b livedata.isis.cclrc.ac.uk:9092 -p %MYPVPREFIX% From ef83bda2b27a6d18fa7288d1df707e3313c81cea Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Wed, 8 Jan 2025 15:44:02 +0000 Subject: [PATCH 11/12] fix endless loop logic --- BlockServerToKafka/kafka_producer.py | 32 +++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py index 756a496c..a27bfa89 100644 --- a/BlockServerToKafka/kafka_producer.py +++ b/BlockServerToKafka/kafka_producer.py @@ -22,7 +22,7 @@ ) from BlockServerToKafka.forwarder_config import ForwarderConfig -from server_common.utilities import print_and_log +from server_common.utilities import SEVERITY, print_and_log class ProducerWrapper: @@ -39,9 +39,14 @@ def __init__( ) -> None: self.topic = config_topic self.converter = ForwarderConfig(data_topic, epics_protocol) - self._set_up_producer(server) + while not self._set_up_producer(server): + print_and_log("Failed to create producer, retrying in 30s") + sleep(30) - def _set_up_producer(self, server: str) -> None: + def _set_up_producer(self, server: str) -> bool: + """ + Attempts to create a Kafka producer and consumer. Retries with a recursive call every 30s. + """ try: self.client = KafkaConsumer(bootstrap_servers=server) self.producer = KafkaProducer(bootstrap_servers=server) @@ -49,23 +54,26 @@ def _set_up_producer(self, server: str) -> None: print_and_log( f"WARNING: topic {self.topic} does not exist. It will be created by default." ) + return True except errors.NoBrokersAvailable: - print_and_log(f"No brokers found on server: {server[0]}") + print_and_log(f"No brokers found on server: {server[0]}", severity=SEVERITY.MAJOR) except errors.KafkaConnectionError: - print_and_log("No server found, connection error") + print_and_log("No server found, connection error", severity=SEVERITY.MAJOR) except errors.InvalidConfigurationError: - print_and_log("Invalid configuration") + print_and_log("Invalid configuration", severity=SEVERITY.MAJOR) quit() except errors.InvalidTopicError: print_and_log( "Invalid topic, to enable auto creation of topics set" - " auto.create.topics.enable to false in broker configuration" + " auto.create.topics.enable to false in broker configuration", + severity=SEVERITY.MAJOR, + ) + except Exception as e: + print_and_log( + f"Unexpected error while creating producer or consumer: {str(e)}", + severity=SEVERITY.MAJOR, ) - finally: - print_and_log("Retrying in 10s") - sleep(10) - # Recursive call after waiting - self._set_up_producer(server) + return False def add_config(self, pvs: List[str]) -> None: """ From 956d4a8072296fbae28aefe14a3d84f54098155a Mon Sep 17 00:00:00 2001 From: Jack Harper Date: Thu, 9 Jan 2025 11:12:30 +0000 Subject: [PATCH 12/12] fix CI by adding repo-level requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..dbe515b7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +ess-streaming-data-types