diff --git a/__init__.py b/__init__.py index 0d88759..e69de29 100644 --- a/__init__.py +++ b/__init__.py @@ -1,15 +0,0 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php diff --git a/block_server_monitor.py b/block_server_monitor.py index 7a51190..642ce6b 100644 --- a/block_server_monitor.py +++ b/block_server_monitor.py @@ -1,27 +1,16 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php import json +import logging +import numpy as np +import numpy.typing as npt + from epics import PV from threading import RLock -from server_common.helpers import BLOCK_PREFIX -from server_common.utilities import dehex_and_decompress, print_and_log +from ibex_non_ca_helpers.compress_hex import dehex_and_decompress from kafka_producer import ProducerWrapper +logger = logging.getLogger(__name__) class BlockServerMonitor: """ @@ -33,17 +22,18 @@ class BlockServerMonitor: def __init__(self, address: str, pvprefix: str, producer: ProducerWrapper) -> None: self.pv_prefix = pvprefix self.address = address - self.channel = PV(self.address) + self.channel = PV(self.address, auto_monitor=True, callback=self.update) self.producer = producer self.last_pvs = [] self.monitor_lock = RLock() + connected = self.channel.wait_for_connection(timeout=5) if not connected: - print_and_log(f"Unable to find pv {self.address}") + logger.error(f"Unable to find pv {self.address}") return + logger.info(f"Connected to {self.address}") - self.pv.add_callback(self.update) def block_name_to_pv_name(self, blk: str) -> str: """ @@ -55,24 +45,8 @@ def block_name_to_pv_name(self, blk: str) -> str: Returns: string : the associated PV name. """ - return f"{self.pv_prefix}{BLOCK_PREFIX}{blk}" - - @staticmethod - def convert_to_string(pv_array: bytearray) -> str: - """ - Convert from byte array to string and remove null characters. - - We cannot get the number of elements in the array so convert to bytes and remove the - null characters. + return f"{self.pv_prefix}CS:SB:{blk}" - Args: - pv_array (bytearray): The byte array of PVs. - - Returns: - string : The string formed from the bytearray. - """ - - return bytearray(pv_array).decode("utf-8").replace("\x00", "") def update_config(self, blocks: list[str]) -> None: """ @@ -87,12 +61,12 @@ def update_config(self, blocks: list[str]) -> None: pvs = [self.block_name_to_pv_name(blk) for blk in blocks] if pvs != self.last_pvs: - print_and_log(f"Blocks configuration changed to: {pvs}") + logger.info(f"Blocks configuration changed to: {pvs}") self.producer.remove_config(self.last_pvs) self.producer.add_config(pvs) self.last_pvs = pvs - def update(self, _, value: bytearray, **kwargs) -> None: # noqa: ANN401 + def update(self, value: npt.NDArray[np.uint8], **kwargs) -> None: # noqa: ANN401 """ Updates the kafka config when the blockserver changes. This is called from the monitor. @@ -105,8 +79,7 @@ def update(self, _, value: bytearray, **kwargs) -> None: # noqa: ANN401 """ with self.monitor_lock: - data = self.convert_to_string(value) - data = dehex_and_decompress(bytes(data, encoding="utf-8")) + logger.info("new update %s ", value) + data = dehex_and_decompress(value.tobytes()) blocks = json.loads(data) - self.update_config(blocks) diff --git a/forwarder_config.py b/forwarder_config.py index 68e2126..360e7d7 100644 --- a/forwarder_config.py +++ b/forwarder_config.py @@ -1,18 +1,3 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php from typing import List from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( @@ -47,7 +32,3 @@ def create_forwarder_configuration(self, pvs: List[str]) -> bytes: def remove_forwarder_configuration(self, pvs: List[str]) -> bytes: return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) # pyright: ignore - - @staticmethod - def remove_all_forwarder_configuration() -> bytes: - return serialise_fc00(UpdateType.REMOVEALL, []) # pyright: ignore diff --git a/inst_pvs.py b/inst_pvs.py index 92d04fd..6c6740f 100644 --- a/inst_pvs.py +++ b/inst_pvs.py @@ -1,23 +1,27 @@ +import logging from threading import Timer -from genie_python.mysql_abstraction_layer import SQLAbstraction -from server_common.utilities import print_and_log from kafka_producer import ProducerWrapper UPDATE_FREQUENCY_S = 30.0 +logger = logging.getLogger(__name__) +import mysql.connector + class InstPVs(object): def __init__( - self, producer: ProducerWrapper, sql_abstraction: SQLAbstraction | None = None + self, producer: ProducerWrapper ) -> None: self._pvs: set[str] = set() - self._sql = ( - SQLAbstraction(dbid="iocdb", user="report", password="$report", host="localhost") - if sql_abstraction is None - else sql_abstraction - ) + self._sql = mysql.connector.connect( + database="iocdb", + host="localhost", + port=3306, + user="report", + password="$report") + self.producer = producer def schedule(self) -> None: @@ -25,17 +29,22 @@ def action() -> None: self.update_pvs_from_mysql() self.schedule() + self.update_pvs_from_mysql() job = Timer(UPDATE_FREQUENCY_S, action) job.start() def update_pvs_from_mysql(self) -> None: - rows = self._sql.query('SELECT pvname, value FROM iocdb.pvinfo WHERE infoname="archive";') - if rows is None: + cursor = self._sql.cursor() + query = 'SELECT pvname, value FROM iocdb.pvinfo WHERE infoname="archive";' + cursor.execute(query) + rows = cursor.fetchall() + if not rows: + logger.error(f"No data from query ({query}") + cursor.close() return pvs = set() - for row in rows: - basename, fields = row + for (basename, fields) in rows: assert isinstance(fields, str) for field in fields.split(): if all(c in "0123456789." for c in field): @@ -46,7 +55,8 @@ def update_pvs_from_mysql(self) -> None: pvs.add(f"{basename}.{field}") if self._pvs != pvs: - print_and_log(f"Inst configuration changed to: {pvs}") + logger.info(f"Inst configuration changed to: {pvs}") self.producer.remove_config(list(self._pvs - pvs)) self.producer.add_config(list(pvs - self._pvs)) self._pvs = pvs + cursor.close() diff --git a/kafka_producer.py b/kafka_producer.py index 0932f93..65672d6 100644 --- a/kafka_producer.py +++ b/kafka_producer.py @@ -1,29 +1,16 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php +import logging from time import sleep from typing import List -from kafka import KafkaConsumer, KafkaProducer, errors -from server_common.utilities import SEVERITY, print_and_log +from confluent_kafka import Producer from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import ( Protocol, ) from forwarder_config import ForwarderConfig +logger = logging.getLogger(__name__) + class ProducerWrapper: """ @@ -40,7 +27,7 @@ def __init__( self.topic = config_topic self.converter = ForwarderConfig(data_topic, epics_protocol) while not self._set_up_producer(server): - print_and_log("Failed to create producer, retrying in 30s") + logger.error("Failed to create producer, retrying in 30s") sleep(30) def _set_up_producer(self, server: str) -> bool: @@ -48,30 +35,16 @@ def _set_up_producer(self, server: str) -> bool: Attempts to create a Kafka producer and consumer. Retries with a recursive call every 30s. """ try: - self.client = KafkaConsumer(bootstrap_servers=server) - self.producer = KafkaProducer(bootstrap_servers=server) + self.producer = Producer({"bootstrap.servers": server}) if not self.topic_exists(self.topic): - print_and_log( + logger.warning( f"WARNING: topic {self.topic} does not exist. It will be created by default." ) return True - except errors.NoBrokersAvailable: - print_and_log(f"No brokers found on server: {server[0]}", severity=SEVERITY.MAJOR) - except errors.KafkaConnectionError: - print_and_log("No server found, connection error", severity=SEVERITY.MAJOR) - except errors.InvalidConfigurationError: - print_and_log("Invalid configuration", severity=SEVERITY.MAJOR) - quit() - except errors.InvalidTopicError: - print_and_log( - "Invalid topic, to enable auto creation of topics set" - " auto.create.topics.enable to false in broker configuration", - severity=SEVERITY.MAJOR, - ) - except Exception as e: - print_and_log( - f"Unexpected error while creating producer or consumer: {str(e)}", - severity=SEVERITY.MAJOR, + + except Exception: + logger.exception( + f"Unexpected error while creating producer or consumer: ", ) return False @@ -82,10 +55,10 @@ def add_config(self, pvs: List[str]) -> None: :param pvs: A list of new PVs to add to the forwarder configuration. """ message_buffer = self.converter.create_forwarder_configuration(pvs) - self.producer.send(self.topic, message_buffer) + self.producer.produce(self.topic, message_buffer) def topic_exists(self, topic_name: str) -> bool: - return topic_name in self.client.topics() + return topic_name in self.producer.list_topics(topic_name).topics def remove_config(self, pvs: List[str]) -> None: """ @@ -94,11 +67,4 @@ def remove_config(self, pvs: List[str]) -> None: :param pvs: A list of PVs to remove from the forwarder configuration. """ message_buffer = self.converter.remove_forwarder_configuration(pvs) - self.producer.send(self.topic, message_buffer) - - def stop_all_pvs(self) -> None: - """ - Sends a stop_all command to the forwarder to clear all configuration. - """ - message_buffer = self.converter.remove_all_forwarder_configuration() - self.producer.send(self.topic, message_buffer) + self.producer.produce(self.topic, message_buffer) diff --git a/main.py b/main.py index 9df49f9..b8850fa 100644 --- a/main.py +++ b/main.py @@ -1,19 +1,4 @@ -# This file is part of the ISIS IBEX application. -# Copyright (C) 2012-2020 Science & Technology Facilities Council. -# All rights reserved. -# -# This program is distributed in the hope that it will be useful. -# This program and the accompanying materials are made available under the -# terms of the Eclipse Public License v1.0 which accompanies this distribution. -# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM -# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details. -# -# You should have received a copy of the Eclipse Public License v1.0 -# along with this program; if not, you can obtain a copy from -# https://www.eclipse.org/org/documents/epl-v10.php or -# http://opensource.org/licenses/eclipse-1.0.php - +import logging from argparse import ArgumentParser from os import environ from time import sleep @@ -22,6 +7,9 @@ from inst_pvs import InstPVs from kafka_producer import ProducerWrapper +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + if __name__ == "__main__": parser = ArgumentParser() @@ -30,27 +18,23 @@ "--data", help="Kafka topic to send Block PV data to", type=str, - default="_sampleEnv", ) parser.add_argument( "-c", "--config", help="Kafka topic to send forwarder config to", type=str, - default="forwarder_config", ) parser.add_argument( "-r", "--runlog", help="Kafka topic to send run log PV data to", type=str, - default="_runLog", ) parser.add_argument( "-b", "--broker", help="Location of the Kafka brokers (host:port)", - nargs="+", type=str, default="livedata.isis.cclrc.ac.uk:31092", ) @@ -59,7 +43,7 @@ "--pvprefix", help="PV Prefix of the block server", type=str, - default=environ["MYPVPREFIX"], + default=environ.get("MYPVPREFIX"), ) args = parser.parse_args() diff --git a/pyproject.toml b/pyproject.toml index 30f38f0..12e2df1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,9 +29,9 @@ classifiers = [ dependencies = [ "pyepics", "ess-streaming-data-types", -"genie-python", -"server_common@git+https://github.com/ISISComputingGroup/server_common.git" - +"ibex-non-ca-helpers", + "confluent_kafka", + "mysql-conector-python" ] [project.optional-dependencies]