Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +0,0 @@
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2020 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php
57 changes: 15 additions & 42 deletions block_server_monitor.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2020 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php
import json
import logging
import numpy as np
import numpy.typing as npt

from epics import PV
from threading import RLock

from server_common.helpers import BLOCK_PREFIX
from server_common.utilities import dehex_and_decompress, print_and_log
from ibex_non_ca_helpers.compress_hex import dehex_and_decompress

from kafka_producer import ProducerWrapper

logger = logging.getLogger(__name__)

class BlockServerMonitor:
"""
Expand All @@ -33,17 +22,18 @@ class BlockServerMonitor:
def __init__(self, address: str, pvprefix: str, producer: ProducerWrapper) -> None:
self.pv_prefix = pvprefix
self.address = address
self.channel = PV(self.address)
self.channel = PV(self.address, auto_monitor=True, callback=self.update)
self.producer = producer
self.last_pvs = []
self.monitor_lock = RLock()


connected = self.channel.wait_for_connection(timeout=5)
if not connected:
print_and_log(f"Unable to find pv {self.address}")
logger.error(f"Unable to find pv {self.address}")
return
logger.info(f"Connected to {self.address}")

self.pv.add_callback(self.update)

def block_name_to_pv_name(self, blk: str) -> str:
"""
Expand All @@ -55,24 +45,8 @@ def block_name_to_pv_name(self, blk: str) -> str:
Returns:
string : the associated PV name.
"""
return f"{self.pv_prefix}{BLOCK_PREFIX}{blk}"

@staticmethod
def convert_to_string(pv_array: bytearray) -> str:
"""
Convert from byte array to string and remove null characters.

We cannot get the number of elements in the array so convert to bytes and remove the
null characters.
return f"{self.pv_prefix}CS:SB:{blk}"

Args:
pv_array (bytearray): The byte array of PVs.

Returns:
string : The string formed from the bytearray.
"""

return bytearray(pv_array).decode("utf-8").replace("\x00", "")

def update_config(self, blocks: list[str]) -> None:
"""
Expand All @@ -87,12 +61,12 @@ def update_config(self, blocks: list[str]) -> None:

pvs = [self.block_name_to_pv_name(blk) for blk in blocks]
if pvs != self.last_pvs:
print_and_log(f"Blocks configuration changed to: {pvs}")
logger.info(f"Blocks configuration changed to: {pvs}")
self.producer.remove_config(self.last_pvs)
self.producer.add_config(pvs)
self.last_pvs = pvs

def update(self, _, value: bytearray, **kwargs) -> None: # noqa: ANN401
def update(self, value: npt.NDArray[np.uint8], **kwargs) -> None: # noqa: ANN401
"""
Updates the kafka config when the blockserver changes. This is called from the monitor.

Expand All @@ -105,8 +79,7 @@ def update(self, _, value: bytearray, **kwargs) -> None: # noqa: ANN401
"""

with self.monitor_lock:
data = self.convert_to_string(value)
data = dehex_and_decompress(bytes(data, encoding="utf-8"))
logger.info("new update %s ", value)
data = dehex_and_decompress(value.tobytes())
blocks = json.loads(data)

self.update_config(blocks)
19 changes: 0 additions & 19 deletions forwarder_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,3 @@
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2020 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php
from typing import List

from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import (
Expand Down Expand Up @@ -47,7 +32,3 @@ def create_forwarder_configuration(self, pvs: List[str]) -> bytes:

def remove_forwarder_configuration(self, pvs: List[str]) -> bytes:
return serialise_fc00(UpdateType.REMOVE, self._create_streams(pvs)) # pyright: ignore

@staticmethod
def remove_all_forwarder_configuration() -> bytes:
return serialise_fc00(UpdateType.REMOVEALL, []) # pyright: ignore
36 changes: 23 additions & 13 deletions inst_pvs.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,50 @@
import logging
from threading import Timer

from genie_python.mysql_abstraction_layer import SQLAbstraction
from server_common.utilities import print_and_log

from kafka_producer import ProducerWrapper

UPDATE_FREQUENCY_S = 30.0

logger = logging.getLogger(__name__)
import mysql.connector


class InstPVs(object):
def __init__(
self, producer: ProducerWrapper, sql_abstraction: SQLAbstraction | None = None
self, producer: ProducerWrapper
) -> None:
self._pvs: set[str] = set()
self._sql = (
SQLAbstraction(dbid="iocdb", user="report", password="$report", host="localhost")
if sql_abstraction is None
else sql_abstraction
)
self._sql = mysql.connector.connect(
database="iocdb",
host="localhost",
port=3306,
user="report",
password="$report")

self.producer = producer

def schedule(self) -> None:
def action() -> None:
self.update_pvs_from_mysql()
self.schedule()

self.update_pvs_from_mysql()
job = Timer(UPDATE_FREQUENCY_S, action)
job.start()

def update_pvs_from_mysql(self) -> None:
rows = self._sql.query('SELECT pvname, value FROM iocdb.pvinfo WHERE infoname="archive";')
if rows is None:
cursor = self._sql.cursor()
query = 'SELECT pvname, value FROM iocdb.pvinfo WHERE infoname="archive";'
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
logger.error(f"No data from query ({query}")
cursor.close()
return

pvs = set()
for row in rows:
basename, fields = row
for (basename, fields) in rows:
assert isinstance(fields, str)
for field in fields.split():
if all(c in "0123456789." for c in field):
Expand All @@ -46,7 +55,8 @@ def update_pvs_from_mysql(self) -> None:
pvs.add(f"{basename}.{field}")

if self._pvs != pvs:
print_and_log(f"Inst configuration changed to: {pvs}")
logger.info(f"Inst configuration changed to: {pvs}")
self.producer.remove_config(list(self._pvs - pvs))
self.producer.add_config(list(pvs - self._pvs))
self._pvs = pvs
cursor.close()
62 changes: 14 additions & 48 deletions kafka_producer.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2020 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php
import logging
from time import sleep
from typing import List

from kafka import KafkaConsumer, KafkaProducer, errors
from server_common.utilities import SEVERITY, print_and_log
from confluent_kafka import Producer
from streaming_data_types.fbschemas.forwarder_config_update_fc00.Protocol import (
Protocol,
)

from forwarder_config import ForwarderConfig

logger = logging.getLogger(__name__)


class ProducerWrapper:
"""
Expand All @@ -40,38 +27,24 @@ def __init__(
self.topic = config_topic
self.converter = ForwarderConfig(data_topic, epics_protocol)
while not self._set_up_producer(server):
print_and_log("Failed to create producer, retrying in 30s")
logger.error("Failed to create producer, retrying in 30s")
sleep(30)

def _set_up_producer(self, server: str) -> bool:
"""
Attempts to create a Kafka producer and consumer. Retries with a recursive call every 30s.
"""
try:
self.client = KafkaConsumer(bootstrap_servers=server)
self.producer = KafkaProducer(bootstrap_servers=server)
self.producer = Producer({"bootstrap.servers": server})
if not self.topic_exists(self.topic):
print_and_log(
logger.warning(
f"WARNING: topic {self.topic} does not exist. It will be created by default."
)
return True
except errors.NoBrokersAvailable:
print_and_log(f"No brokers found on server: {server[0]}", severity=SEVERITY.MAJOR)
except errors.KafkaConnectionError:
print_and_log("No server found, connection error", severity=SEVERITY.MAJOR)
except errors.InvalidConfigurationError:
print_and_log("Invalid configuration", severity=SEVERITY.MAJOR)
quit()
except errors.InvalidTopicError:
print_and_log(
"Invalid topic, to enable auto creation of topics set"
" auto.create.topics.enable to false in broker configuration",
severity=SEVERITY.MAJOR,
)
except Exception as e:
print_and_log(
f"Unexpected error while creating producer or consumer: {str(e)}",
severity=SEVERITY.MAJOR,

except Exception:
logger.exception(
f"Unexpected error while creating producer or consumer: ",
)
return False

Expand All @@ -82,10 +55,10 @@ def add_config(self, pvs: List[str]) -> None:
:param pvs: A list of new PVs to add to the forwarder configuration.
"""
message_buffer = self.converter.create_forwarder_configuration(pvs)
self.producer.send(self.topic, message_buffer)
self.producer.produce(self.topic, message_buffer)

def topic_exists(self, topic_name: str) -> bool:
return topic_name in self.client.topics()
return topic_name in self.producer.list_topics(topic_name).topics

def remove_config(self, pvs: List[str]) -> None:
"""
Expand All @@ -94,11 +67,4 @@ def remove_config(self, pvs: List[str]) -> None:
:param pvs: A list of PVs to remove from the forwarder configuration.
"""
message_buffer = self.converter.remove_forwarder_configuration(pvs)
self.producer.send(self.topic, message_buffer)

def stop_all_pvs(self) -> None:
"""
Sends a stop_all command to the forwarder to clear all configuration.
"""
message_buffer = self.converter.remove_all_forwarder_configuration()
self.producer.send(self.topic, message_buffer)
self.producer.produce(self.topic, message_buffer)
26 changes: 5 additions & 21 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
# This file is part of the ISIS IBEX application.
# Copyright (C) 2012-2020 Science & Technology Facilities Council.
# All rights reserved.
#
# This program is distributed in the hope that it will be useful.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v1.0 which accompanies this distribution.
# EXCEPT AS EXPRESSLY SET FORTH IN THE ECLIPSE PUBLIC LICENSE V1.0, THE PROGRAM
# AND ACCOMPANYING MATERIALS ARE PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND. See the Eclipse Public License v1.0 for more details.
#
# You should have received a copy of the Eclipse Public License v1.0
# along with this program; if not, you can obtain a copy from
# https://www.eclipse.org/org/documents/epl-v10.php or
# http://opensource.org/licenses/eclipse-1.0.php

import logging
from argparse import ArgumentParser
from os import environ
from time import sleep
Expand All @@ -22,6 +7,9 @@
from inst_pvs import InstPVs
from kafka_producer import ProducerWrapper

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

if __name__ == "__main__":
parser = ArgumentParser()

Expand All @@ -30,27 +18,23 @@
"--data",
help="Kafka topic to send Block PV data to",
type=str,
default="_sampleEnv",
)
parser.add_argument(
"-c",
"--config",
help="Kafka topic to send forwarder config to",
type=str,
default="forwarder_config",
)
parser.add_argument(
"-r",
"--runlog",
help="Kafka topic to send run log PV data to",
type=str,
default="_runLog",
)
parser.add_argument(
"-b",
"--broker",
help="Location of the Kafka brokers (host:port)",
nargs="+",
type=str,
default="livedata.isis.cclrc.ac.uk:31092",
)
Expand All @@ -59,7 +43,7 @@
"--pvprefix",
help="PV Prefix of the block server",
type=str,
default=environ["MYPVPREFIX"],
default=environ.get("MYPVPREFIX"),
)

args = parser.parse_args()
Expand Down
Loading