diff --git a/examples/multi-node/Test_Federate_p1u.py b/examples/multi-node/Test_Federate_p1u.py index 421b366..c92a124 100644 --- a/examples/multi-node/Test_Federate_p1u.py +++ b/examples/multi-node/Test_Federate_p1u.py @@ -9,7 +9,8 @@ # logger.setLoggingLevel(logging.DEBUG) - +GLOBAL_ENDPOINT: h.HelicsEndpoint +LOCAL_ENDPOINT: h.HelicsEndpoint def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages): # calculate the voltage fed to feeders below @@ -28,6 +29,11 @@ def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages): load_diff[i] = abs(last_loads[i] - new_loads[i]) logger.debug(f"total load difference: {load_diff}") + # Send some silly messages + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u/p1u_status") + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u_global_status") + h.helicsEndpointSendBytesTo(LOCAL_ENDPOINT, "local endpoint okay".encode("utf-8"), "p1u/p1u_status") + return new_loads @@ -56,9 +62,15 @@ def run_p1u_federate(fed_name, broker_address, feeders): h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat) # Create value federate - vfed = h.helicsCreateValueFederate(fed_name, fedinfo) + vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo) print("Value federate created") + # Register a local endpoint + global GLOBAL_ENDPOINT + global LOCAL_ENDPOINT + LOCAL_ENDPOINT = h.helicsFederateRegisterEndpoint(vfed, "p1u_status", "string") + GLOBAL_ENDPOINT = h.helicsFederateRegisterGlobalEndpoint(vfed, "p1u_global_status", "string") + # Register the publications pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, "Circuit.full_network.TotalPower.E", "double", "kW") pub_voltages = [] @@ -113,12 +125,16 @@ def run_p1u_federate(fed_name, broker_address, feeders): last_loads = new_loads iters += 1 + # Send some silly messages + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u/p1u_status") + h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u_global_status") + h.helicsPublicationPublishDouble(pub_load, sum(last_loads)) logger.info(f"feeder loads {last_loads} at time {currenttime} after {iters} iters") t += 1 # all other federates should have finished, so now you can close - h.helicsFederateFinalize(vfed) + h.helicsFederateDisconnect(vfed) print(f"{fed_name}: Test Federate finalized") h.helicsFederateDestroy(vfed) diff --git a/examples/multi-node/Test_Federate_p1uhs.py b/examples/multi-node/Test_Federate_p1uhs.py index 598d384..06e0dd7 100644 --- a/examples/multi-node/Test_Federate_p1uhs.py +++ b/examples/multi-node/Test_Federate_p1uhs.py @@ -53,9 +53,12 @@ def run_p1uhs_federate(fed_name, broker_address=None): h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat) # Create value federate # - vfed = h.helicsCreateValueFederate(fed_name, fedinfo) + vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo) print("Value federate created") + # Register global endpoint + h.helicsFederateRegisterGlobalEndpoint(vfed, "p1uhs_global_status", "string") + # Register the publications # pub_name = f"Circuit.feeder_p1u.{fed_name}.p1ux.TotalPower.E" pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, pub_name, "double", "kW") diff --git a/examples/multi-node/Test_Federate_sub_trans.py b/examples/multi-node/Test_Federate_sub_trans.py index f2ac39a..4d07082 100644 --- a/examples/multi-node/Test_Federate_sub_trans.py +++ b/examples/multi-node/Test_Federate_sub_trans.py @@ -44,8 +44,6 @@ def run_sub_trans(fed_name, feeders, broker_address): # Create value federate # vfed = h.helicsCreateValueFederate(fed_name, fedinfo) - print("this is a test print") - # time.sleep(100) logger.info("Value federate created") # Register the publication # diff --git a/examples/multi-node/runner.json b/examples/multi-node/runner.json index 4dcb603..4bb7ba2 100644 --- a/examples/multi-node/runner.json +++ b/examples/multi-node/runner.json @@ -1,12 +1,13 @@ { - "broker": { + "broker": { "federates": 3, "observer": { "directory": "../../", "host": "localhost", - "name": "observer", + "name": "broker", "include": ["Circuit.feeder_p1u.p1uhs0.p1ux.voltage", "Circuit.feeder_p1u.p1uhs0.p1ux.TotalPower.E", "full_network.voltage"], - "exclude": [] + "message_source": ["p1u/p1u_status"], + "message_destination": ["p1u/p1u_status", "p1u_global_status"] } }, "federates": [ diff --git a/helics_cli/cli.py b/helics_cli/cli.py index 79870ad..26f9b94 100644 --- a/helics_cli/cli.py +++ b/helics_cli/cli.py @@ -93,7 +93,7 @@ def setup(name, path, purge): @click.option("--silent", is_flag=True) @click.option("--no-log-files", is_flag=True, default=False) @click.option( - "--broker-loglevel", "--loglevel", "-l", type=int, default=3, help="Log level for HELICS broker", + "--broker-loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker", ) @click.option("--web", "-w", is_flag=True, default=False, help="Run the web interface on startup") def run(path, silent, no_log_files, broker_loglevel, web): @@ -186,7 +186,7 @@ def run(path, silent, no_log_files, broker_loglevel, web): for p in process_handler.process_list: p.wait() except KeyboardInterrupt: - click.echo("Warning: User interrupted processes. Terminating safely ...", status="info") + click.echo("Warning: User interrupted processes. Terminating safely ...") process_handler.shutdown() logger.debug("Closing output") for o in process_handler.output_list: @@ -248,7 +248,7 @@ def validate(path): "--n-federates", required=True, type=click.INT, help="Number of federates to observe", ) @click.option("--path", type=click.Path(exists=True), default="./", help="Internal path to config file used for filtering output") -@click.option("--broker_loglevel", "--loglevel", "-l", type=click.INT, default=2, help="Log level for HELICS broker") +@click.option("--broker_loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker") def observe(n_federates: int, path: str, log_level) -> int: return observer.run(n_federates, path, log_level) diff --git a/helics_cli/observer.py b/helics_cli/observer.py index 1622e14..011d52a 100644 --- a/helics_cli/observer.py +++ b/helics_cli/observer.py @@ -10,10 +10,21 @@ from .utils.message_handler import MessageHandler, SimpleMessage from .database import initialize_database, MetaData +import signal + + +def signal_handler(signal, frame): + h.helicsCloseLibrary() + h.helicsCleanupLibrary() + raise KeyboardInterrupt + + +signal.signal(signal.SIGINT, signal_handler) logger = logging.getLogger(__name__) OBSERVER_BROKER: h.HelicsBroker = None OBSERVER_FEDERATE: h.HelicsCombinationFederate = None +OBSERVER_ENDPOINT: h.HelicsEndpoint = None SERVER_MESSAGE_HANDLER: MessageHandler = None time_control = {"nonstop": True, "requested_time": 0.0, "exited": False} @@ -47,10 +58,12 @@ def init_combination_federate( OBSERVER_FEDERATE = h.helicsCreateCombinationFederate(core_name, fedinfo) -def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], current_time=0.0): +def write_database_data(db, federate: h.HelicsFederate, subscriptions=None, current_time=0.0): + if subscriptions is None: + subscriptions = [] logger.debug("Making query ...") - federates = [name for name in federate.query("root", "federates") if name != "__observer__"] + federates = [name for name in federate.query("root", "federates") if not name.startswith("__observer__") and not name.endswith("_filters")] for name in federates: logger.debug(f"Query for exists: {name}") @@ -58,9 +71,8 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren if federate.query(name, "state") == "disconnected": continue - logger.debug(f"Query for current_time: {name}") data = federate.query(name, "current_time") - logger.debug(f"data is: {data}") + logger.debug(f"Query for current_time: {name}, data is: {data}") try: granted_time = data["granted_time"] requested_time = data["requested_time"] @@ -76,7 +88,7 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren if federate.query(name, "state") == "disconnected": continue logger.debug(f"Query for publications: {name}") - publications = federate.query(name, "publications") # .replace("[", "").replace("]", "").split(";") + publications = federate.query(name, "publications") logger.debug(f"{name} publishes: {publications}") for pub_str in publications: @@ -101,6 +113,28 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren (pub_str, name, granted_time, value, 1), ) + logger.debug("checking for messages on observer clone endpoint") + if OBSERVER_ENDPOINT and OBSERVER_ENDPOINT.has_message(): + logger.debug("found messages") + db.execute("UPDATE Messages SET new_value=0;") + while OBSERVER_ENDPOINT.has_message(): + message = OBSERVER_ENDPOINT.get_message() + logger.debug("reading messages") + if message.is_valid(): + logger.debug("writing messages to database") + db.execute( + "INSERT INTO Messages(sender, destination, send_time, receive_time, value, new_value)" + " VALUES (?,?,?,?,?,?);", + ( + message.original_source, + message.original_destination, + message.time, + granted_time, + message.data, + 1 + ), + ) + db.commit() @@ -126,7 +160,7 @@ def process_message(message: SimpleMessage): if signal_data["operation"] == "STOP": logger.info("got STOP") time_control["exited"] = True - OBSERVER_FEDERATE.finalize() # TODO: see if this is the right way to exit. + OBSERVER_FEDERATE.disconnect() h.helicsBrokerDisconnect(OBSERVER_BROKER) # h.helicsBrokerClearTimeBarrier(OBSERVER_BROKER) return SimpleMessage("SIGNAL_RESPONSE", "{}") @@ -163,7 +197,7 @@ def ingest_messages(): SERVER_MESSAGE_HANDLER.send_server(response) -def run(n_federates: int, config_path: str, log_level: int, message_handler: MessageHandler = None): +def run(n_federates: int, config_path: str, log_level: str, message_handler: MessageHandler = None): file_out = logging.FileHandler("observer.log", mode="w") file_out.setLevel(logging.DEBUG) @@ -177,24 +211,16 @@ def run(n_federates: int, config_path: str, log_level: int, message_handler: Mes if SERVER_MESSAGE_HANDLER.Enabled: time_control["nonstop"] = False - try: - asyncio.run(_run(n_federates, config_path, log_level)) - except KeyboardInterrupt: - logger.info("User canceled operation") - except h.HelicsException: - logger.error("Failed and threw HelicsException") - h.helicsCloseLibrary() - return 1 - finally: - logger.debug("In finally") - h.helicsCloseLibrary() - return 0 + return asyncio.run(_run(n_federates, config_path, log_level)) -async def _run(n_federates: int, config_path: str, log_level: int = 2): +async def _run(n_federates: int, config_path: str, log_level: str = "warning"): path_to_config = os.path.abspath(config_path) path = os.path.dirname(path_to_config) + if log_level == "debug": + logger.setLevel(logging.DEBUG) + with open(path_to_config) as f: config = json.loads(f.read()) logger.info("Read config: %s", config["broker"]["observer"]) @@ -215,16 +241,15 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): logger.info("Creating observer federate") init_combination_federate("__observer__") + global OBSERVER_ENDPOINT + OBSERVER_ENDPOINT = OBSERVER_FEDERATE.register_global_endpoint("__observer_clone__") logger.info("Entering initializing mode") - # if not time_control["nonstop"]: - # h.helicsBrokerSetTimeBarrier(broker, 0.0) - # else: time.sleep(2) - federates = OBSERVER_FEDERATE.query("root", "federates") # .replace("[", "").replace("]", "").split(";") - federates = [name for name in federates if name != "__observer__"] + federates = OBSERVER_FEDERATE.query("root", "federates") + federates = [name for name in federates if not name.startswith("__")] logger.info(f"federates: {federates}") @@ -236,22 +261,41 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): time.sleep(1) # Assuming all federates have connected. - - logger.info("Querying all topics") - metadata["federates"] = ",".join([f for f in federates if not f.startswith("__")]) - publications = OBSERVER_FEDERATE.query("root", "publications") # .replace("[", "").replace("]", "").split(";") + publications = OBSERVER_FEDERATE.query("root", "publications") subscriptions = [] + logger.info(f"publications: {publications}") + # TODO: improve subscription filtering to be a bit more friendly for pub in publications: if "include" in config["broker"]["observer"].keys() and pub not in config["broker"]["observer"]["include"]: continue else: subscriptions.append(OBSERVER_FEDERATE.register_subscription(pub)) - # TODO: message clones - h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0) + endpoints = OBSERVER_FEDERATE.query("root", "endpoints") + logger.info(f"endpoints: {endpoints}") + + if "message_source" in config["broker"]["observer"].keys(): + message_source = config["broker"]["observer"]["message_source"] + message_destination = config["broker"]["observer"]["message_destination"] + + try: + # clone_filter = OBSERVER_FEDERATE.register_global_cloning_filter("__observer_clone__") + clone_filter = OBSERVER_FEDERATE.register_global_filter(h.HELICS_FILTER_TYPE_CLONE, f"__observer_clone__") + for endpoint in endpoints: + if endpoint in message_source or endpoint in message_destination: + logger.debug(f"registering clone for {endpoint}") + if endpoint in message_source: + clone_filter.add_source_target(endpoint) + if endpoint in message_destination: + clone_filter.add_destination_target(endpoint) + except h.HelicsException as ex: + logger.error(f"{ex}") + + if not time_control["nonstop"]: + h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0) try: logger.info("calling exec") @@ -283,9 +327,6 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): continue else: await asyncio.sleep(0.001) # There's probably a better way to do this, but this is the solution for now. - # time.sleep(1) # There's probably a better way to do this, but this is the solution for now. - # if not time_control["exited"]: - # ingest_messages(current_time) if not time_control["nonstop"] or current_time >= 9223372036.3: continue @@ -297,17 +338,14 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2): logger.info("Finished observe.") logger.info("finalizing monitoring task") - # message_handler.cancel() - # try: - # await message_handler - # except asyncio.CancelledError: - # logger.info("Monitor shutdown") + logger.info("Closing database ...") db.close() - except KeyboardInterrupt: logger.debug("Observer got keyboard interrupt") raise + except h.HelicsException as ex: + logger.debug(f"Caught HelicsException: {ex}") finally: logger.debug("Observer finalizing") logger.info("Finalizing federate ...") diff --git a/helics_cli/utils/process.py b/helics_cli/utils/process.py index eeb2cce..41cd620 100644 --- a/helics_cli/utils/process.py +++ b/helics_cli/utils/process.py @@ -23,16 +23,18 @@ def __init__(self, process_list, output_list, has_web, message_handler, use_brok def shutdown(self): print("in shutdown...") if self.use_broker_process and self.broker_process.is_alive(): - self.broker_process.terminate() - # self.broker_process.kill() - self.broker_process.close() + # self.broker_process.terminate() + self.broker_process.kill() + # self.broker_process.close() + # self.web_process.join(3) print("shutdown broker") if self.has_web and self.web_process.is_alive(): - self.web_process.terminate() - # self.web_process.kill() - self.web_process.close() + # self.web_process.terminate() + self.web_process.kill() + # self.web_process.close() + # self.web_process.join(3) print("shutdown web") diff --git a/web/src/html/index.html b/web/src/html/index.html index e112693..13b5820 100644 --- a/web/src/html/index.html +++ b/web/src/html/index.html @@ -110,35 +110,35 @@
| sender | +destination | +send_time | +receive_time | ++ value + | +
|---|