From 4fd89cce16033c695d7ed637a01c21d363c9a5a9 Mon Sep 17 00:00:00 2001 From: Mark Eberlein Date: Tue, 20 Jul 2021 13:57:55 -0700 Subject: [PATCH] Add message cloning and rendering This commit adds the ability to clone message sources and destinations as well as cleaning up some behavior where the application would not exit when Ctrl-C was sent from the terminal. The exit behavior currently shows some errors, but does exit. It is worth looking at how to get this exit behavior to act in a clean manner in the future. --- examples/multi-node/Test_Federate_p1u.py | 22 +++- examples/multi-node/Test_Federate_p1uhs.py | 5 +- .../multi-node/Test_Federate_sub_trans.py | 2 - examples/multi-node/runner.json | 7 +- helics_cli/cli.py | 6 +- helics_cli/observer.py | 118 ++++++++++++------ helics_cli/utils/process.py | 14 ++- web/src/html/index.html | 58 ++++----- 8 files changed, 145 insertions(+), 87 deletions(-) diff --git a/examples/multi-node/Test_Federate_p1u.py b/examples/multi-node/Test_Federate_p1u.py index 421b366..c92a124 100644 --- a/examples/multi-node/Test_Federate_p1u.py +++ b/examples/multi-node/Test_Federate_p1u.py @@ -9,7 +9,8 @@ # logger.setLoggingLevel(logging.DEBUG) - +GLOBAL_ENDPOINT: h.HelicsEndpoint +LOCAL_ENDPOINT: h.HelicsEndpoint def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages): # calculate the voltage fed to feeders below @@ -28,6 +29,11 @@ def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages): load_diff[i] = abs(last_loads[i] - new_loads[i]) logger.debug(f"total load difference: {load_diff}") + # Send some silly messages + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u/p1u_status") + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u_global_status") + h.helicsEndpointSendBytesTo(LOCAL_ENDPOINT, "local endpoint okay".encode("utf-8"), "p1u/p1u_status") + return new_loads @@ -56,9 +62,15 @@ def run_p1u_federate(fed_name, broker_address, feeders): h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat) # Create value federate - vfed = h.helicsCreateValueFederate(fed_name, fedinfo) + vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo) print("Value federate created") + # Register a local endpoint + global GLOBAL_ENDPOINT + global LOCAL_ENDPOINT + LOCAL_ENDPOINT = h.helicsFederateRegisterEndpoint(vfed, "p1u_status", "string") + GLOBAL_ENDPOINT = h.helicsFederateRegisterGlobalEndpoint(vfed, "p1u_global_status", "string") + # Register the publications pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, "Circuit.full_network.TotalPower.E", "double", "kW") pub_voltages = [] @@ -113,12 +125,16 @@ def run_p1u_federate(fed_name, broker_address, feeders): last_loads = new_loads iters += 1 + # Send some silly messages + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u/p1u_status") + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u_global_status") + h.helicsPublicationPublishDouble(pub_load, sum(last_loads)) logger.info(f"feeder loads {last_loads} at time {currenttime} after {iters} iters") t += 1 # all other federates should have finished, so now you can close - h.helicsFederateFinalize(vfed) + h.helicsFederateDisconnect(vfed) print(f"{fed_name}: Test Federate finalized") h.helicsFederateDestroy(vfed) diff --git a/examples/multi-node/Test_Federate_p1uhs.py b/examples/multi-node/Test_Federate_p1uhs.py index 598d384..06e0dd7 100644 --- a/examples/multi-node/Test_Federate_p1uhs.py +++ b/examples/multi-node/Test_Federate_p1uhs.py @@ -53,9 +53,12 @@ def run_p1uhs_federate(fed_name, broker_address=None): h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat) # Create value federate # - vfed = h.helicsCreateValueFederate(fed_name, fedinfo) + vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo) print("Value federate created") + # Register global endpoint + h.helicsFederateRegisterGlobalEndpoint(vfed, "p1uhs_global_status", "string") + # Register the publications # pub_name = f"Circuit.feeder_p1u.{fed_name}.p1ux.TotalPower.E" pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, pub_name, "double", "kW") diff --git a/examples/multi-node/Test_Federate_sub_trans.py b/examples/multi-node/Test_Federate_sub_trans.py index f2ac39a..4d07082 100644 --- a/examples/multi-node/Test_Federate_sub_trans.py +++ b/examples/multi-node/Test_Federate_sub_trans.py @@ -44,8 +44,6 @@ def run_sub_trans(fed_name, feeders, broker_address): # Create value federate # vfed = h.helicsCreateValueFederate(fed_name, fedinfo) - print("this is a test print") - # time.sleep(100) logger.info("Value federate created") # Register the publication # diff --git a/examples/multi-node/runner.json b/examples/multi-node/runner.json index 4dcb603..4bb7ba2 100644 --- a/examples/multi-node/runner.json +++ b/examples/multi-node/runner.json @@ -1,12 +1,13 @@ { - "broker": { + "broker": { "federates": 3, "observer": { "directory": "../../", "host": "localhost", - "name": "observer", + "name": "broker", "include": ["Circuit.feeder_p1u.p1uhs0.p1ux.voltage", "Circuit.feeder_p1u.p1uhs0.p1ux.TotalPower.E", "full_network.voltage"], - "exclude": [] + "message_source": ["p1u/p1u_status"], + "message_destination": ["p1u/p1u_status", "p1u_global_status"] } }, "federates": [ diff --git a/helics_cli/cli.py b/helics_cli/cli.py index 79870ad..26f9b94 100644 --- a/helics_cli/cli.py +++ b/helics_cli/cli.py @@ -93,7 +93,7 @@ def setup(name, path, purge): @click.option("--silent", is_flag=True) @click.option("--no-log-files", is_flag=True, default=False) @click.option( - "--broker-loglevel", "--loglevel", "-l", type=int, default=3, help="Log level for HELICS broker", + "--broker-loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker", ) @click.option("--web", "-w", is_flag=True, default=False, help="Run the web interface on startup") def run(path, silent, no_log_files, broker_loglevel, web): @@ -186,7 +186,7 @@ def run(path, silent, no_log_files, broker_loglevel, web): for p in process_handler.process_list: p.wait() except KeyboardInterrupt: - click.echo("Warning: User interrupted processes. Terminating safely ...", status="info") + click.echo("Warning: User interrupted processes. Terminating safely ...") process_handler.shutdown() logger.debug("Closing output") for o in process_handler.output_list: @@ -248,7 +248,7 @@ def validate(path): "--n-federates", required=True, type=click.INT, help="Number of federates to observe", ) @click.option("--path", type=click.Path(exists=True), default="./", help="Internal path to config file used for filtering output") -@click.option("--broker_loglevel", "--loglevel", "-l", type=click.INT, default=2, help="Log level for HELICS broker") +@click.option("--broker_loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker") def observe(n_federates: int, path: str, log_level) -> int: return observer.run(n_federates, path, log_level) diff --git a/helics_cli/observer.py b/helics_cli/observer.py index 1622e14..011d52a 100644 --- a/helics_cli/observer.py +++ b/helics_cli/observer.py @@ -10,10 +10,21 @@ from .utils.message_handler import MessageHandler, SimpleMessage from .database import initialize_database, MetaData +import signal + + +def signal_handler(signal, frame): + h.helicsCloseLibrary() + h.helicsCleanupLibrary() + raise KeyboardInterrupt + + +signal.signal(signal.SIGINT, signal_handler) logger = logging.getLogger(__name__) OBSERVER_BROKER: h.HelicsBroker = None OBSERVER_FEDERATE: h.HelicsCombinationFederate = None +OBSERVER_ENDPOINT: h.HelicsEndpoint = None SERVER_MESSAGE_HANDLER: MessageHandler = None time_control = {"nonstop": True, "requested_time": 0.0, "exited": False} @@ -47,10 +58,12 @@ def init_combination_federate( OBSERVER_FEDERATE = h.helicsCreateCombinationFederate(core_name, fedinfo) -def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], current_time=0.0): +def write_database_data(db, federate: h.HelicsFederate, subscriptions=None, current_time=0.0): + if subscriptions is None: + subscriptions = [] logger.debug("Making query ...") - federates = [name for name in federate.query("root", "federates") if name != "__observer__"] + federates = [name for name in federate.query("root", "federates") if not name.startswith("__observer__") and not name.endswith("_filters")] for name in federates: logger.debug(f"Query for exists: {name}") @@ -58,9 +71,8 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren if federate.query(name, "state") == "disconnected": continue - logger.debug(f"Query for current_time: {name}") data = federate.query(name, "current_time") - logger.debug(f"data is: {data}") + logger.debug(f"Query for current_time: {name}, data is: {data}") try: granted_time = data["granted_time"] requested_time = data["requested_time"] @@ -76,7 +88,7 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren if federate.query(name, "state") == "disconnected": continue logger.debug(f"Query for publications: {name}") - publications = federate.query(name, "publications") # .replace("[", "").replace("]", "").split(";") + publications = federate.query(name, "publications") logger.debug(f"{name} publishes: {publications}") for pub_str in publications: @@ -101,6 +113,28 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren (pub_str, name, granted_time, value, 1), ) + logger.debug("checking for messages on observer clone endpoint") + if OBSERVER_ENDPOINT and OBSERVER_ENDPOINT.has_message(): + logger.debug("found messages") + db.execute("UPDATE Messages SET new_value=0;") + while OBSERVER_ENDPOINT.has_message(): + message = OBSERVER_ENDPOINT.get_message() + logger.debug("reading messages") + if message.is_valid(): + logger.debug("writing messages to database") + db.execute( + "INSERT INTO Messages(sender, destination, send_time, receive_time, value, new_value)" + " VALUES (?,?,?,?,?,?);", + ( + message.original_source, + message.original_destination, + message.time, + granted_time, + message.data, + 1 + ), + ) + db.commit() @@ -126,7 +160,7 @@ def process_message(message: SimpleMessage): if signal_data["operation"] == "STOP": logger.info("got STOP") time_control["exited"] = True - OBSERVER_FEDERATE.finalize() # TODO: see if this is the right way to exit. + OBSERVER_FEDERATE.disconnect() h.helicsBrokerDisconnect(OBSERVER_BROKER) # h.helicsBrokerClearTimeBarrier(OBSERVER_BROKER) return SimpleMessage("SIGNAL_RESPONSE", "{}") @@ -163,7 +197,7 @@ def ingest_messages(): SERVER_MESSAGE_HANDLER.send_server(response) -def run(n_federates: int, config_path: str, log_level: int, message_handler: MessageHandler = None): +def run(n_federates: int, config_path: str, log_level: str, message_handler: MessageHandler = None): file_out = logging.FileHandler("observer.log", mode="w") file_out.setLevel(logging.DEBUG) @@ -177,24 +211,16 @@ def run(n_federates: int, config_path: str, log_level: int, message_handler: Mes if SERVER_MESSAGE_HANDLER.Enabled: time_control["nonstop"] = False - try: - asyncio.run(_run(n_federates, config_path, log_level)) - except KeyboardInterrupt: - logger.info("User canceled operation") - except h.HelicsException: - logger.error("Failed and threw HelicsException") - h.helicsCloseLibrary() - return 1 - finally: - logger.debug("In finally") - h.helicsCloseLibrary() - return 0 + return asyncio.run(_run(n_federates, config_path, log_level)) -async def _run(n_federates: int, config_path: str, log_level: int = 2): +async def _run(n_federates: int, config_path: str, log_level: str = "warning"): path_to_config = os.path.abspath(config_path) path = os.path.dirname(path_to_config) + if log_level == "debug": + logger.setLevel(logging.DEBUG) + with open(path_to_config) as f: config = json.loads(f.read()) logger.info("Read config: %s", config["broker"]["observer"]) @@ -215,16 +241,15 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): logger.info("Creating observer federate") init_combination_federate("__observer__") + global OBSERVER_ENDPOINT + OBSERVER_ENDPOINT = OBSERVER_FEDERATE.register_global_endpoint("__observer_clone__") logger.info("Entering initializing mode") - # if not time_control["nonstop"]: - # h.helicsBrokerSetTimeBarrier(broker, 0.0) - # else: time.sleep(2) - federates = OBSERVER_FEDERATE.query("root", "federates") # .replace("[", "").replace("]", "").split(";") - federates = [name for name in federates if name != "__observer__"] + federates = OBSERVER_FEDERATE.query("root", "federates") + federates = [name for name in federates if not name.startswith("__")] logger.info(f"federates: {federates}") @@ -236,22 +261,41 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): time.sleep(1) # Assuming all federates have connected. - - logger.info("Querying all topics") - metadata["federates"] = ",".join([f for f in federates if not f.startswith("__")]) - publications = OBSERVER_FEDERATE.query("root", "publications") # .replace("[", "").replace("]", "").split(";") + publications = OBSERVER_FEDERATE.query("root", "publications") subscriptions = [] + logger.info(f"publications: {publications}") + # TODO: improve subscription filtering to be a bit more friendly for pub in publications: if "include" in config["broker"]["observer"].keys() and pub not in config["broker"]["observer"]["include"]: continue else: subscriptions.append(OBSERVER_FEDERATE.register_subscription(pub)) - # TODO: message clones - h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0) + endpoints = OBSERVER_FEDERATE.query("root", "endpoints") + logger.info(f"endpoints: {endpoints}") + + if "message_source" in config["broker"]["observer"].keys(): + message_source = config["broker"]["observer"]["message_source"] + message_destination = config["broker"]["observer"]["message_destination"] + + try: + # clone_filter = OBSERVER_FEDERATE.register_global_cloning_filter("__observer_clone__") + clone_filter = OBSERVER_FEDERATE.register_global_filter(h.HELICS_FILTER_TYPE_CLONE, f"__observer_clone__") + for endpoint in endpoints: + if endpoint in message_source or endpoint in message_destination: + logger.debug(f"registering clone for {endpoint}") + if endpoint in message_source: + clone_filter.add_source_target(endpoint) + if endpoint in message_destination: + clone_filter.add_destination_target(endpoint) + except h.HelicsException as ex: + logger.error(f"{ex}") + + if not time_control["nonstop"]: + h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0) try: logger.info("calling exec") @@ -283,9 +327,6 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): continue else: await asyncio.sleep(0.001) # There's probably a better way to do this, but this is the solution for now. - # time.sleep(1) # There's probably a better way to do this, but this is the solution for now. - # if not time_control["exited"]: - # ingest_messages(current_time) if not time_control["nonstop"] or current_time >= 9223372036.3: continue @@ -297,17 +338,14 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): logger.info("Finished observe.") logger.info("finalizing monitoring task") - # message_handler.cancel() - # try: - # await message_handler - # except asyncio.CancelledError: - # logger.info("Monitor shutdown") + logger.info("Closing database ...") db.close() - except KeyboardInterrupt: logger.debug("Observer got keyboard interrupt") raise + except h.HelicsException as ex: + logger.debug(f"Caught HelicsException: {ex}") finally: logger.debug("Observer finalizing") logger.info("Finalizing federate ...") diff --git a/helics_cli/utils/process.py b/helics_cli/utils/process.py index eeb2cce..41cd620 100644 --- a/helics_cli/utils/process.py +++ b/helics_cli/utils/process.py @@ -23,16 +23,18 @@ def __init__(self, process_list, output_list, has_web, message_handler, use_brok def shutdown(self): print("in shutdown...") if self.use_broker_process and self.broker_process.is_alive(): - self.broker_process.terminate() - # self.broker_process.kill() - self.broker_process.close() + # self.broker_process.terminate() + self.broker_process.kill() + # self.broker_process.close() + # self.web_process.join(3) print("shutdown broker") if self.has_web and self.web_process.is_alive(): - self.web_process.terminate() - # self.web_process.kill() - self.web_process.close() + # self.web_process.terminate() + self.web_process.kill() + # self.web_process.close() + # self.web_process.join(3) print("shutdown web") diff --git a/web/src/html/index.html b/web/src/html/index.html index e112693..13b5820 100644 --- a/web/src/html/index.html +++ b/web/src/html/index.html @@ -110,35 +110,35 @@

Publications

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+

Messages

+ + + + + + + + + + +
senderdestinationsend_timereceive_time + value +
+