diff --git a/BlockServerToKafka/__init__.py b/BlockServerToKafka/__init__.py deleted file mode 100644 index 0d887594..00000000 --- a/BlockServerToKafka/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php diff --git a/BlockServerToKafka/block_server_monitor.py b/BlockServerToKafka/block_server_monitor.py deleted file mode 100644 index 18f467f5..00000000 --- a/BlockServerToKafka/block_server_monitor.py +++ /dev/null @@ -1,122 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php -import json -from threading import RLock -from typing import Any - -import ca -from CaChannel import CaChannel, CaChannelException -from server_common.helpers import BLOCK_PREFIX -from server_common.utilities import dehex_and_decompress, print_and_log - -from BlockServerToKafka.kafka_producer import ProducerWrapper - - -class BlockServerMonitor: - """ - Class that monitors the blockserver to see when the config has changed. - - Uses a Channel Access Monitor. - """ - - def __init__(self, address: str, pvprefix: str, producer: ProducerWrapper) -> None: - self.PVPREFIX = pvprefix - self.address = address - self.channel = CaChannel() - self.producer = producer - self.last_pvs = [] - self.monitor_lock = RLock() - try: - self.channel.searchw(self.address) - except CaChannelException: - print_and_log(f"Unable to find pv {self.address}") - return - - # Create the CA monitor callback - self.channel.add_masked_array_event( - ca.dbf_type_to_DBR_STS(self.channel.field_type()), # pyright: ignore - 0, - ca.DBE_VALUE, # pyright: ignore - self.update, - None, - ) - self.channel.pend_event() - - def block_name_to_pv_name(self, blk: str) -> str: - """ - Converts a block name to a PV name by adding the prefixes. - - Args: - blk (string): The name of the block. - - Returns: - string : the associated PV name. - """ - return f"{self.PVPREFIX}{BLOCK_PREFIX}{blk}" - - @staticmethod - def convert_to_string(pv_array: bytearray) -> str: - """ - Convert from byte array to string and remove null characters. - - We cannot get the number of elements in the array so convert to bytes and remove the - null characters. - - Args: - pv_array (bytearray): The byte array of PVs. - - Returns: - string : The string formed from the bytearray. - """ - - return bytearray(pv_array).decode("utf-8").replace("\x00", "") - - def update_config(self, blocks: list[str]) -> None: - """ - Updates the forwarder configuration to monitor the supplied blocks. - - Args: - blocks (list): Blocks in the BlockServer containing PV data. - - Returns: - None. - """ - - pvs = [self.block_name_to_pv_name(blk) for blk in blocks] - if pvs != self.last_pvs: - print_and_log(f"Blocks configuration changed to: {pvs}") - self.producer.remove_config(self.last_pvs) - self.producer.add_config(pvs) - self.last_pvs = pvs - - def update(self, epics_args: dict[str, bytearray], user_args: Any) -> None: # noqa: ANN401 - """ - Updates the kafka config when the blockserver changes. This is called from the monitor. - - Args: - epics_args (dict): Contains the information for the blockserver blocks PV. - user_args (dict): Not used. - - Returns: - None. - """ - - with self.monitor_lock: - data = self.convert_to_string(epics_args["pv_value"]) - data = dehex_and_decompress(bytes(data, encoding="utf-8")) - blocks = json.loads(data) - - self.update_config(blocks) diff --git a/BlockServerToKafka/forwarder_config.py b/BlockServerToKafka/forwarder_config.py deleted file mode 100644 index 68e21268..00000000 --- a/BlockServerToKafka/forwarder_config.py +++ /dev/null @@ -1,53 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php -from typing import List - -from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( - Protocol, -) -from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import ( - UpdateType, -) -from streaming_data_types.forwarder_config_update_fc00 import StreamInfo, serialise_fc00 - - -class ForwarderConfig: - """ - Class that converts the pv information to a forwarder config message payload - """ - - def __init__( - self, - topic: str, - epics_protocol: Protocol = Protocol.CA, # pyright: ignore - schema: str = "f144", - ) -> None: - self.schema = schema - self.topic = topic - self.epics_protocol = epics_protocol - - def _create_streams(self, pvs: List[str]) -> List[StreamInfo]: - return [StreamInfo(pv, self.schema, self.topic, self.epics_protocol, 0) for pv in pvs] # pyright: ignore - - def create_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_fc00(UpdateType.ADD, self._create_streams(pvs)) # pyright: ignore - - def remove_forwarder_configuration(self, pvs: List[str]) -> bytes: - return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) # pyright: ignore - - @staticmethod - def remove_all_forwarder_configuration() -> bytes: - return serialise_fc00(UpdateType.REMOVEALL, []) # pyright: ignore diff --git a/BlockServerToKafka/inst_pvs.py b/BlockServerToKafka/inst_pvs.py deleted file mode 100644 index ef61d173..00000000 --- a/BlockServerToKafka/inst_pvs.py +++ /dev/null @@ -1,52 +0,0 @@ -from threading import Timer - -from genie_python.mysql_abstraction_layer import SQLAbstraction -from server_common.utilities import print_and_log - -from BlockServerToKafka.kafka_producer import ProducerWrapper - -UPDATE_FREQUENCY_S = 30.0 - - -class InstPVs(object): - def __init__( - self, producer: ProducerWrapper, sql_abstraction: SQLAbstraction | None = None - ) -> None: - self._pvs: set[str] = set() - self._sql = ( - SQLAbstraction(dbid="iocdb", user="report", password="$report", host="localhost") - if sql_abstraction is None - else sql_abstraction - ) - self.producer = producer - - def schedule(self) -> None: - def action() -> None: - self.update_pvs_from_mysql() - self.schedule() - - job = Timer(UPDATE_FREQUENCY_S, action) - job.start() - - def update_pvs_from_mysql(self) -> None: - rows = self._sql.query('SELECT pvname, value FROM iocdb.pvinfo WHERE infoname="archive";') - if rows is None: - return - - pvs = set() - for row in rows: - basename, fields = row - assert isinstance(fields, str) - for field in fields.split(): - if all(c in "0123456789." for c in field): - # This is an archiving time period, e.g. the 5.0 in - # info(archive, "5.0 VAL") - # Ignore it - continue - pvs.add(f"{basename}.{field}") - - if self._pvs != pvs: - print_and_log(f"Inst configuration changed to: {pvs}") - self.producer.remove_config(list(self._pvs - pvs)) - self.producer.add_config(list(pvs - self._pvs)) - self._pvs = pvs diff --git a/BlockServerToKafka/kafka_producer.py b/BlockServerToKafka/kafka_producer.py deleted file mode 100644 index ae63f8b6..00000000 --- a/BlockServerToKafka/kafka_producer.py +++ /dev/null @@ -1,104 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php -from time import sleep -from typing import List - -from kafka import KafkaConsumer, KafkaProducer, errors -from server_common.utilities import SEVERITY, print_and_log -from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( - Protocol, -) - -from BlockServerToKafka.forwarder_config import ForwarderConfig - - -class ProducerWrapper: - """ - A wrapper class for the kafka producer. - """ - - def __init__( - self, - server: str, - config_topic: str, - data_topic: str, - epics_protocol: Protocol = Protocol.CA, # pyright: ignore - ) -> None: - self.topic = config_topic - self.converter = ForwarderConfig(data_topic, epics_protocol) - while not self._set_up_producer(server): - print_and_log("Failed to create producer, retrying in 30s") - sleep(30) - - def _set_up_producer(self, server: str) -> bool: - """ - Attempts to create a Kafka producer and consumer. Retries with a recursive call every 30s. - """ - try: - self.client = KafkaConsumer(bootstrap_servers=server) - self.producer = KafkaProducer(bootstrap_servers=server) - if not self.topic_exists(self.topic): - print_and_log( - f"WARNING: topic {self.topic} does not exist. It will be created by default." - ) - return True - except errors.NoBrokersAvailable: - print_and_log(f"No brokers found on server: {server[0]}", severity=SEVERITY.MAJOR) - except errors.KafkaConnectionError: - print_and_log("No server found, connection error", severity=SEVERITY.MAJOR) - except errors.InvalidConfigurationError: - print_and_log("Invalid configuration", severity=SEVERITY.MAJOR) - quit() - except errors.InvalidTopicError: - print_and_log( - "Invalid topic, to enable auto creation of topics set" - " auto.create.topics.enable to false in broker configuration", - severity=SEVERITY.MAJOR, - ) - except Exception as e: - print_and_log( - f"Unexpected error while creating producer or consumer: {str(e)}", - severity=SEVERITY.MAJOR, - ) - return False - - def add_config(self, pvs: List[str]) -> None: - """ - Create a forwarder configuration to add more pvs to be monitored. - - :param pvs: A list of new PVs to add to the forwarder configuration. - """ - message_buffer = self.converter.create_forwarder_configuration(pvs) - self.producer.send(self.topic, message_buffer) - - def topic_exists(self, topic_name: str) -> bool: - return topic_name in self.client.topics() - - def remove_config(self, pvs: List[str]) -> None: - """ - Create a forwarder configuration to remove pvs that are being monitored. - - :param pvs: A list of PVs to remove from the forwarder configuration. - """ - message_buffer = self.converter.remove_forwarder_configuration(pvs) - self.producer.send(self.topic, message_buffer) - - def stop_all_pvs(self) -> None: - """ - Sends a stop_all command to the forwarder to clear all configuration. - """ - message_buffer = self.converter.remove_all_forwarder_configuration() - self.producer.send(self.topic, message_buffer) diff --git a/BlockServerToKafka/main.py b/BlockServerToKafka/main.py deleted file mode 100644 index 5fc92f4d..00000000 --- a/BlockServerToKafka/main.py +++ /dev/null @@ -1,101 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php - -import os -import sys - -sys.path.insert(0, os.path.abspath(os.getcwd())) -from argparse import ArgumentParser -from os import environ -from time import sleep - -from BlockServerToKafka.block_server_monitor import BlockServerMonitor -from BlockServerToKafka.inst_pvs import InstPVs -from BlockServerToKafka.kafka_producer import ProducerWrapper - -if __name__ == "__main__": - parser = ArgumentParser() - - parser.add_argument( - "-d", - "--data", - help="Kafka topic to send Block PV data to", - type=str, - default="_sampleEnv", - ) - parser.add_argument( - "-c", - "--config", - help="Kafka topic to send forwarder config to", - type=str, - default="forwarder_config", - ) - parser.add_argument( - "-r", - "--runlog", - help="Kafka topic to send run log PV data to", - type=str, - default="_runLog", - ) - parser.add_argument( - "-b", - "--broker", - help="Location of the Kafka brokers (host:port)", - nargs="+", - type=str, - default="livedata.isis.cclrc.ac.uk:31092", - ) - parser.add_argument( - "-p", - "--pvprefix", - help="PV Prefix of the block server", - type=str, - default=environ["MYPVPREFIX"], - ) - - args = parser.parse_args() - KAFKA_DATA = args.data - KAFKA_RUNLOG = args.runlog - KAFKA_CONFIG = args.config - KAFKA_BROKER = args.broker - PREFIX = args.pvprefix - block_producer = ProducerWrapper(KAFKA_BROKER, KAFKA_CONFIG, KAFKA_DATA) - inst_producer = ProducerWrapper(KAFKA_BROKER, KAFKA_CONFIG, KAFKA_DATA) - monitor = BlockServerMonitor(f"{PREFIX}CS:BLOCKSERVER:BLOCKNAMES", PREFIX, block_producer) - runlog_producer = ProducerWrapper(KAFKA_BROKER, KAFKA_CONFIG, KAFKA_RUNLOG) - - dae_prefix = f"{PREFIX}DAE:" - runlog_producer.add_config( - [ - f"{dae_prefix}COUNTRATE", - f"{dae_prefix}BEAMCURRENT", - f"{dae_prefix}GOODFRAMES", - f"{dae_prefix}RAWFRAMES", - f"{dae_prefix}GOODUAH", - f"{dae_prefix}MEVENTS", - f"{dae_prefix}TOTALCOUNTS", - f"{dae_prefix}MONITORCOUNTS", - f"{dae_prefix}NPRATIO", - f"{dae_prefix}PERIOD", - f"{dae_prefix}TOTALUAMPS", - # todo how should we do run_status/icp_event/is_running/is_waiting? - ] - ) - - InstPVs(inst_producer).schedule() - - while True: - sleep(0.1) diff --git a/BlockServerToKafka/test_modules/__init__.py b/BlockServerToKafka/test_modules/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/BlockServerToKafka/test_modules/test_block_server_monitor.py b/BlockServerToKafka/test_modules/test_block_server_monitor.py deleted file mode 100644 index 4d0f0bb9..00000000 --- a/BlockServerToKafka/test_modules/test_block_server_monitor.py +++ /dev/null @@ -1,110 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php - -import unittest - -from mock import MagicMock, patch - -from BlockServerToKafka.block_server_monitor import BlockServerMonitor - - -class TestBlockServerMonitor(unittest.TestCase): - test_address = "TEST_ADDRESS" - test_prefix = "TEST_PREFIX" - - @patch("CaChannel.CaChannel") - @patch("CaChannel.CaChannel.searchw") - @patch("CaChannel.CaChannel.add_masked_array_event") - @patch("CaChannel.CaChannel.field_type") - @patch("CaChannel.CaChannel.pend_event") - def setUp(self, *args, **kwargs): - self.mock_producer = MagicMock() - self.bs_monitor = BlockServerMonitor( - self.test_address, self.test_prefix, self.mock_producer - ) - - def test_WHEN_convert_one_char_to_string_THEN_returns_character( - self, - ): - c = "a" - arr = [ord(c)] - self.assertEqual(c, self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_WHEN_convert_many_chars_to_string_THEN_returns_characters(self): - chars = "hello world" - arr = [ord(c) for c in chars] - self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_WHEN_convert_chars_with_null_at_end_THEN_nulls_removed( - self, - ): - chars = "hello world" - arr = [ord(c) for c in chars] - for i in range(3): - arr.append(0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_WHEN_convert_chars_with_null_at_start_THEN_nulls_removed( - self, - ): - chars = "hello world" - arr = [ord(c) for c in chars] - for i in range(3): - arr.insert(0, 0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_WHEN_convert_chars_with_nulls_in_centre_THEN_nulls_removed(self): - chars = "hello world" - arr = [ord(c) for c in chars] - arr.insert(4, 0) - self.assertEqual(chars, self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_WHEN_convert_nulls_THEN_empty_string_returned( - self, - ): - arr = [0] * 10 - self.assertEqual("", self.bs_monitor.convert_to_string(bytearray(arr))) - - def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called( - self, - ): - self.bs_monitor.update_config(["BLOCK"]) - self.mock_producer.add_config.assert_called_once() - - def test_GIVEN_no_previous_pvs_WHEN_update_config_called_THEN_producer_is_called_containing_block_name( - self, - ): - block = "BLOCK" - self.bs_monitor.update_config([block]) - self.mock_producer.add_config.assert_called_with( - [self.bs_monitor.block_name_to_pv_name(block)] - ) - - def test_GIVEN_previous_pvs_WHEN_update_config_called_with_same_pvs_THEN_producer_is_not_called( - self, - ): - block = "BLOCK" - self.bs_monitor.update_config([block]) - self.bs_monitor.update_config([block]) - self.mock_producer.add_config.assert_called_once() - - def test_GIVEN_previous_pvs_WHEN_update_config_called_with_different_pvs_THEN_producer_is_called( - self, - ): - self.bs_monitor.update_config(["OLD_BLOCK"]) - self.mock_producer.reset_mock() - self.bs_monitor.update_config(["NEW_BLOCK"]) - self.mock_producer.add_config.assert_called_once() diff --git a/BlockServerToKafka/test_modules/test_forwarder_config.py b/BlockServerToKafka/test_modules/test_forwarder_config.py deleted file mode 100644 index 50b860d7..00000000 --- a/BlockServerToKafka/test_modules/test_forwarder_config.py +++ /dev/null @@ -1,146 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php -import unittest - -from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( - Protocol, -) -from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import ( - UpdateType, -) -from streaming_data_types.forwarder_config_update_fc00 import deserialise_fc00 - -from BlockServerToKafka.forwarder_config import ForwarderConfig - - -class TestForwarderConfig(unittest.TestCase): - test_schema = "schema" - test_topic = "topic" - test_block_1 = "block1" - test_block_2 = "block2" - - @staticmethod - def is_flatbuffers(payload): - try: - deserialise_fc00(payload) - except ValueError: - return False - return True - - def setUp(self): - self.kafka_forwarder = ForwarderConfig(self.test_topic, schema=self.test_schema) - self.config_with_one_block = [self.test_block_1] - self.config_with_two_blocks = [self.test_block_1, self.test_block_2] - - def test_WHEN_new_forwarder_config_created_THEN_returns_valid_flatbuffers(self): - output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - self.assertTrue(self.is_flatbuffers(output)) - - def test_WHEN_new_forwarder_config_created_THEN_returns_configuration_update_containing_add_command( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertEqual(output.config_change, UpdateType.ADD) - - def test_WHEN_forwarder_config_removed_THEN_output_has_correct_command_type(self): - raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertEqual(output.config_change, UpdateType.REMOVE) - - def test_WHEN_all_pvs_removed_THEN_output_has_correct_command_type(self): - raw_output = self.kafka_forwarder.remove_all_forwarder_configuration() - output = deserialise_fc00(raw_output) - self.assertEqual(output.config_change, UpdateType.REMOVEALL) - - def test_WHEN_new_forwarder_config_created_THEN_returns_flatbuffer_containing_streams_with_channels_and_converters( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertNotEqual(0, len(output[1])) - for stream in output[1]: - self.assertEqual(self.test_block_1, stream.channel) - self.assertEqual(self.test_topic, stream.topic) - self.assertEqual(Protocol.CA, stream.protocol) - - def test_GIVEN_using_version_4_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_streams_with_pva_channel_type( - self, - ): - kafka_version_4 = ForwarderConfig( - epics_protocol=Protocol.PVA, # pyright: ignore noqa - topic=self.test_topic, - ) - raw_output = kafka_version_4.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertNotEqual(0, len(output[1])) - for stream in output[1]: - self.assertEqual(stream.protocol, Protocol.PVA) - - def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_one_stream( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertEqual(1, len(output[1])) - - def test_GIVEN_configuration_with_two_block_WHEN_new_forwarder_config_created_THEN_returns_JSON_containing_two_stream( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_two_blocks - ) - output = deserialise_fc00(raw_output) - self.assertEqual(2, len(output[1])) - - def test_GIVEN_configuration_with_one_block_WHEN_new_forwarder_config_created_THEN_returns_block_pv_string( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - stream = output[1][0] - self.assertEqual(self.test_block_1, stream.channel) - - def test_GIVEN_configuration_with_two_blocks_WHEN_new_forwarder_config_created_THEN_returns_both_block_pv_string( - self, - ): - raw_output = self.kafka_forwarder.create_forwarder_configuration( - self.config_with_two_blocks - ) - output = deserialise_fc00(raw_output) - for blk in [self.test_block_1, self.test_block_2]: - self.assertTrue(blk in [stream.channel for stream in output[1]]) - - def test_WHEN_removed_old_forwarder_THEN_JSON_returns_valid(self): - output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - self.assertTrue(self.is_flatbuffers(output)) - - def test_GIVEN_configuration_with_one_block_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_block_pv_string( - self, - ): - raw_output = self.kafka_forwarder.remove_forwarder_configuration(self.config_with_one_block) - output = deserialise_fc00(raw_output) - self.assertEqual(self.test_block_1, output[1][0].channel) - - def test_GIVEN_configuration_with_two_blocks_WHEN_removed_old_forwarder_THEN_returns_JSON_containing_both_block_pv_string( - self, - ): - raw_output = self.kafka_forwarder.remove_forwarder_configuration( - self.config_with_two_blocks - ) - output = deserialise_fc00(raw_output) - for blk in [self.test_block_1, self.test_block_2]: - self.assertTrue(blk in [stream.channel for stream in output[1]]) diff --git a/BlockServerToKafka/test_modules/test_inst_pvs.py b/BlockServerToKafka/test_modules/test_inst_pvs.py deleted file mode 100644 index f87d76b5..00000000 --- a/BlockServerToKafka/test_modules/test_inst_pvs.py +++ /dev/null @@ -1,42 +0,0 @@ -import unittest -from unittest.mock import MagicMock, patch - -from BlockServerToKafka.inst_pvs import InstPVs - - -class TestInstPVs(unittest.TestCase): - def test_calling_start_schedules_timer(self): - with patch("BlockServerToKafka.inst_pvs.Timer") as timer: - InstPVs(MagicMock(), MagicMock()).schedule() - - timer.assert_called_once() - timer.return_value.start.assert_called_once() - - def test_update_pvs_from_mysql(self): - mock_sql = MagicMock() - mock_sql.query.return_value = [("pv1", "VAL"), ("pv2", "VAL RBV"), ("pv3", "5.0 VAL")] - - mock_producer = MagicMock() - - inst_pvs = InstPVs(mock_producer, mock_sql) - inst_pvs.update_pvs_from_mysql() - - mock_producer.remove_config.assert_called_once_with([]) - self.assertCountEqual( - mock_producer.add_config.call_args.args[0], ["pv1.VAL", "pv2.VAL", "pv2.RBV", "pv3.VAL"] - ) - - # Check that more calls where SQL query returns the same thing - # don't cause extra updates - inst_pvs.update_pvs_from_mysql() - self.assertEqual(mock_producer.remove_config.call_count, 1) - self.assertEqual(mock_producer.add_config.call_count, 1) - - # If SQL call does return something different then should update - mock_sql.query.return_value = [("new_pv1", "VAL"), ("pv2", "VAL")] - inst_pvs.update_pvs_from_mysql() - - self.assertCountEqual( - mock_producer.remove_config.call_args.args[0], ["pv1.VAL", "pv2.RBV", "pv3.VAL"] - ) - self.assertCountEqual(mock_producer.add_config.call_args.args[0], ["new_pv1.VAL"]) diff --git a/README.md b/README.md index f1034b9d..e9ae58a6 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Channel access servers which help run the instrument. They are written in python Contains: 1. [Archive Access](https://github.com/ISISComputingGroup/ibex_developers_manual/wiki/Logging-from-the-archive) -1. [Blocks Server (with kafka extension)](https://github.com/ISISComputingGroup/ibex_developers_manual/wiki/BlockServer) +1. [Blocks Server](https://github.com/ISISComputingGroup/ibex_developers_manual/wiki/BlockServer) 1. [Database Server](https://github.com/ISISComputingGroup/ibex_developers_manual/wiki/The-DatabaseServer) 1. [Reflectometry Server](https://github.com/ISISComputingGroup/ibex_developers_manual/wiki/Reflectometers) diff --git a/requirements.txt b/requirements.txt index 95e4fd5b..3b4d5c02 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,8 @@ -ess-streaming-data-types genie-python git+https://github.com/ISISComputingGroup/pcaspy git+https://github.com/ISISComputingGroup/CaChannel.git contextlib2 GitPython -kafka_python lxml pysnmp tornado diff --git a/start_bs_to_kafka_cmd.bat b/start_bs_to_kafka_cmd.bat deleted file mode 100644 index 5bf20c53..00000000 --- a/start_bs_to_kafka_cmd.bat +++ /dev/null @@ -1,22 +0,0 @@ -setlocal - -set MYDIRCD=%~dp0 -call %MYDIRCD%..\..\..\config_env_base.bat -@echo on - -%HIDEWINDOW% h - -set "GETMACRO=%EPICS_KIT_ROOT%\support\icpconfig\master\bin\%EPICS_HOST_ARCH%\icpconfigGetMacro.exe" -set "MYIOCNAME=BSKAFKA" - -set "KAFKA_BROKER=livedata.isis.cclrc.ac.uk:31092" -REM allow local config override in globals.txt -for /f %%a in ( '%GETMACRO% "KAFKA_BROKER" %MYIOCNAME%' ) do ( set "KAFKA_BROKER=%%a" ) - -set EPICS_CAS_INTF_ADDR_LIST=127.0.0.1 -set EPICS_CAS_BEACON_ADDR_LIST=127.255.255.255 - -set PYTHONUNBUFFERED=TRUE - -@echo %DATE% %TIME% starting BS to Kafka -%PYTHON3W% %MYDIRCD%\BlockServerToKafka\main.py -d %INSTRUMENT%_sampleEnv -r %INSTRUMENT%_runLog -c %INSTRUMENT%_forwarderConfig -b %KAFKA_BROKER% -p %MYPVPREFIX%