Skip to content
This repository was archived by the owner on Jul 18, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions examples/multi-node/Test_Federate_p1u.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@


# logger.setLoggingLevel(logging.DEBUG)

GLOBAL_ENDPOINT: h.HelicsEndpoint
LOCAL_ENDPOINT: h.HelicsEndpoint

def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages):
# calculate the voltage fed to feeders below
Expand All @@ -28,6 +29,11 @@ def pub_and_sub_calc(supply_voltage, last_loads, sub_loads, pub_voltages):
load_diff[i] = abs(last_loads[i] - new_loads[i])
logger.debug(f"total load difference: {load_diff}")

# Send some silly messages
h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u/p1u_status")
h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "inner loop okay".encode("utf-8"), "p1u_global_status")
h.helicsEndpointSendBytesTo(LOCAL_ENDPOINT, "local endpoint okay".encode("utf-8"), "p1u/p1u_status")

return new_loads


Expand Down Expand Up @@ -56,9 +62,15 @@ def run_p1u_federate(fed_name, broker_address, feeders):
h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat)

# Create value federate
vfed = h.helicsCreateValueFederate(fed_name, fedinfo)
vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo)
print("Value federate created")

# Register a local endpoint
global GLOBAL_ENDPOINT
global LOCAL_ENDPOINT
LOCAL_ENDPOINT = h.helicsFederateRegisterEndpoint(vfed, "p1u_status", "string")
GLOBAL_ENDPOINT = h.helicsFederateRegisterGlobalEndpoint(vfed, "p1u_global_status", "string")

# Register the publications
pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, "Circuit.full_network.TotalPower.E", "double", "kW")
pub_voltages = []
Expand Down Expand Up @@ -113,12 +125,16 @@ def run_p1u_federate(fed_name, broker_address, feeders):
last_loads = new_loads
iters += 1

# Send some silly messages
h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u/p1u_status")
h.helicsEndpointSendBytesTo(GLOBAL_ENDPOINT, "outer loop okay".encode("utf-8"), "p1u_global_status")

h.helicsPublicationPublishDouble(pub_load, sum(last_loads))
logger.info(f"feeder loads {last_loads} at time {currenttime} after {iters} iters")
t += 1

# all other federates should have finished, so now you can close
h.helicsFederateFinalize(vfed)
h.helicsFederateDisconnect(vfed)
print(f"{fed_name}: Test Federate finalized")

h.helicsFederateDestroy(vfed)
Expand Down
5 changes: 4 additions & 1 deletion examples/multi-node/Test_Federate_p1uhs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ def run_p1uhs_federate(fed_name, broker_address=None):
h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat)

# Create value federate #
vfed = h.helicsCreateValueFederate(fed_name, fedinfo)
vfed = h.helicsCreateCombinationFederate(fed_name, fedinfo)
print("Value federate created")

# Register global endpoint
h.helicsFederateRegisterGlobalEndpoint(vfed, "p1uhs_global_status", "string")

# Register the publications #
pub_name = f"Circuit.feeder_p1u.{fed_name}.p1ux.TotalPower.E"
pub_load = h.helicsFederateRegisterGlobalTypePublication(vfed, pub_name, "double", "kW")
Expand Down
2 changes: 0 additions & 2 deletions examples/multi-node/Test_Federate_sub_trans.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def run_sub_trans(fed_name, feeders, broker_address):

# Create value federate #
vfed = h.helicsCreateValueFederate(fed_name, fedinfo)
print("this is a test print")
# time.sleep(100)
logger.info("Value federate created")

# Register the publication #
Expand Down
7 changes: 4 additions & 3 deletions examples/multi-node/runner.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{
"broker": {
"broker": {
"federates": 3,
"observer": {
"directory": "../../",
"host": "localhost",
"name": "observer",
"name": "broker",
"include": ["Circuit.feeder_p1u.p1uhs0.p1ux.voltage", "Circuit.feeder_p1u.p1uhs0.p1ux.TotalPower.E", "full_network.voltage"],
"exclude": []
"message_source": ["p1u/p1u_status"],
"message_destination": ["p1u/p1u_status", "p1u_global_status"]
}
},
"federates": [
Expand Down
6 changes: 3 additions & 3 deletions helics_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def setup(name, path, purge):
@click.option("--silent", is_flag=True)
@click.option("--no-log-files", is_flag=True, default=False)
@click.option(
"--broker-loglevel", "--loglevel", "-l", type=int, default=3, help="Log level for HELICS broker",
"--broker-loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker",
)
@click.option("--web", "-w", is_flag=True, default=False, help="Run the web interface on startup")
def run(path, silent, no_log_files, broker_loglevel, web):
Expand Down Expand Up @@ -186,7 +186,7 @@ def run(path, silent, no_log_files, broker_loglevel, web):
for p in process_handler.process_list:
p.wait()
except KeyboardInterrupt:
click.echo("Warning: User interrupted processes. Terminating safely ...", status="info")
click.echo("Warning: User interrupted processes. Terminating safely ...")
process_handler.shutdown()
logger.debug("Closing output")
for o in process_handler.output_list:
Expand Down Expand Up @@ -248,7 +248,7 @@ def validate(path):
"--n-federates", required=True, type=click.INT, help="Number of federates to observe",
)
@click.option("--path", type=click.Path(exists=True), default="./", help="Internal path to config file used for filtering output")
@click.option("--broker_loglevel", "--loglevel", "-l", type=click.INT, default=2, help="Log level for HELICS broker")
@click.option("--broker_loglevel", "--loglevel", "-l", type=click.STRING, default="warning", help="Log level for HELICS broker")
def observe(n_federates: int, path: str, log_level) -> int:
return observer.run(n_federates, path, log_level)

Expand Down
118 changes: 78 additions & 40 deletions helics_cli/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@
from .utils.message_handler import MessageHandler, SimpleMessage
from .database import initialize_database, MetaData

import signal


def signal_handler(signal, frame):
h.helicsCloseLibrary()
h.helicsCleanupLibrary()
raise KeyboardInterrupt


signal.signal(signal.SIGINT, signal_handler)
logger = logging.getLogger(__name__)

OBSERVER_BROKER: h.HelicsBroker = None
OBSERVER_FEDERATE: h.HelicsCombinationFederate = None
OBSERVER_ENDPOINT: h.HelicsEndpoint = None
SERVER_MESSAGE_HANDLER: MessageHandler = None
time_control = {"nonstop": True, "requested_time": 0.0, "exited": False}

Expand Down Expand Up @@ -47,20 +58,21 @@ def init_combination_federate(
OBSERVER_FEDERATE = h.helicsCreateCombinationFederate(core_name, fedinfo)


def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], current_time=0.0):
def write_database_data(db, federate: h.HelicsFederate, subscriptions=None, current_time=0.0):
if subscriptions is None:
subscriptions = []
logger.debug("Making query ...")

federates = [name for name in federate.query("root", "federates") if name != "__observer__"]
federates = [name for name in federate.query("root", "federates") if not name.startswith("__observer__") and not name.endswith("_filters")]

for name in federates:
logger.debug(f"Query for exists: {name}")

if federate.query(name, "state") == "disconnected":
continue

logger.debug(f"Query for current_time: {name}")
data = federate.query(name, "current_time")
logger.debug(f"data is: {data}")
logger.debug(f"Query for current_time: {name}, data is: {data}")
try:
granted_time = data["granted_time"]
requested_time = data["requested_time"]
Expand All @@ -76,7 +88,7 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren
if federate.query(name, "state") == "disconnected":
continue
logger.debug(f"Query for publications: {name}")
publications = federate.query(name, "publications") # .replace("[", "").replace("]", "").split(";")
publications = federate.query(name, "publications")
logger.debug(f"{name} publishes: {publications}")

for pub_str in publications:
Expand All @@ -101,6 +113,28 @@ def write_database_data(db, federate: h.HelicsFederate, subscriptions=[], curren
(pub_str, name, granted_time, value, 1),
)

logger.debug("checking for messages on observer clone endpoint")
if OBSERVER_ENDPOINT and OBSERVER_ENDPOINT.has_message():
logger.debug("found messages")
db.execute("UPDATE Messages SET new_value=0;")
while OBSERVER_ENDPOINT.has_message():
message = OBSERVER_ENDPOINT.get_message()
logger.debug("reading messages")
if message.is_valid():
logger.debug("writing messages to database")
db.execute(
"INSERT INTO Messages(sender, destination, send_time, receive_time, value, new_value)"
" VALUES (?,?,?,?,?,?);",
(
message.original_source,
message.original_destination,
message.time,
granted_time,
message.data,
1
),
)

db.commit()


Expand All @@ -126,7 +160,7 @@ def process_message(message: SimpleMessage):
if signal_data["operation"] == "STOP":
logger.info("got STOP")
time_control["exited"] = True
OBSERVER_FEDERATE.finalize() # TODO: see if this is the right way to exit.
OBSERVER_FEDERATE.disconnect()
h.helicsBrokerDisconnect(OBSERVER_BROKER)
# h.helicsBrokerClearTimeBarrier(OBSERVER_BROKER)
return SimpleMessage("SIGNAL_RESPONSE", "{}")
Expand Down Expand Up @@ -163,7 +197,7 @@ def ingest_messages():
SERVER_MESSAGE_HANDLER.send_server(response)


def run(n_federates: int, config_path: str, log_level: int, message_handler: MessageHandler = None):
def run(n_federates: int, config_path: str, log_level: str, message_handler: MessageHandler = None):

file_out = logging.FileHandler("observer.log", mode="w")
file_out.setLevel(logging.DEBUG)
Expand All @@ -177,24 +211,16 @@ def run(n_federates: int, config_path: str, log_level: int, message_handler: Mes
if SERVER_MESSAGE_HANDLER.Enabled:
time_control["nonstop"] = False

try:
asyncio.run(_run(n_federates, config_path, log_level))
except KeyboardInterrupt:
logger.info("User canceled operation")
except h.HelicsException:
logger.error("Failed and threw HelicsException")
h.helicsCloseLibrary()
return 1
finally:
logger.debug("In finally")
h.helicsCloseLibrary()
return 0
return asyncio.run(_run(n_federates, config_path, log_level))


async def _run(n_federates: int, config_path: str, log_level: int = 2):
async def _run(n_federates: int, config_path: str, log_level: str = "warning"):
path_to_config = os.path.abspath(config_path)
path = os.path.dirname(path_to_config)

if log_level == "debug":
logger.setLevel(logging.DEBUG)

with open(path_to_config) as f:
config = json.loads(f.read())
logger.info("Read config: %s", config["broker"]["observer"])
Expand All @@ -215,16 +241,15 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2):
logger.info("Creating observer federate")

init_combination_federate("__observer__")
global OBSERVER_ENDPOINT
OBSERVER_ENDPOINT = OBSERVER_FEDERATE.register_global_endpoint("__observer_clone__")

logger.info("Entering initializing mode")

# if not time_control["nonstop"]:
# h.helicsBrokerSetTimeBarrier(broker, 0.0)
# else:
time.sleep(2)

federates = OBSERVER_FEDERATE.query("root", "federates") # .replace("[", "").replace("]", "").split(";")
federates = [name for name in federates if name != "__observer__"]
federates = OBSERVER_FEDERATE.query("root", "federates")
federates = [name for name in federates if not name.startswith("__")]

logger.info(f"federates: {federates}")

Expand All @@ -236,22 +261,41 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2):
time.sleep(1)

# Assuming all federates have connected.

logger.info("Querying all topics")

metadata["federates"] = ",".join([f for f in federates if not f.startswith("__")])

publications = OBSERVER_FEDERATE.query("root", "publications") # .replace("[", "").replace("]", "").split(";")
publications = OBSERVER_FEDERATE.query("root", "publications")
subscriptions = []
logger.info(f"publications: {publications}")

# TODO: improve subscription filtering to be a bit more friendly
for pub in publications:
if "include" in config["broker"]["observer"].keys() and pub not in config["broker"]["observer"]["include"]:
continue
else:
subscriptions.append(OBSERVER_FEDERATE.register_subscription(pub))
# TODO: message clones

h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0)
endpoints = OBSERVER_FEDERATE.query("root", "endpoints")
logger.info(f"endpoints: {endpoints}")

if "message_source" in config["broker"]["observer"].keys():
message_source = config["broker"]["observer"]["message_source"]
message_destination = config["broker"]["observer"]["message_destination"]

try:
# clone_filter = OBSERVER_FEDERATE.register_global_cloning_filter("__observer_clone__")
clone_filter = OBSERVER_FEDERATE.register_global_filter(h.HELICS_FILTER_TYPE_CLONE, f"__observer_clone__")
for endpoint in endpoints:
if endpoint in message_source or endpoint in message_destination:
logger.debug(f"registering clone for {endpoint}")
if endpoint in message_source:
clone_filter.add_source_target(endpoint)
if endpoint in message_destination:
clone_filter.add_destination_target(endpoint)
except h.HelicsException as ex:
logger.error(f"{ex}")

if not time_control["nonstop"]:
h.helicsBrokerSetTimeBarrier(OBSERVER_BROKER, 0.0)

try:
logger.info("calling exec")
Expand Down Expand Up @@ -283,9 +327,6 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2):
continue
else:
await asyncio.sleep(0.001) # There's probably a better way to do this, but this is the solution for now.
# time.sleep(1) # There's probably a better way to do this, but this is the solution for now.
# if not time_control["exited"]:
# ingest_messages(current_time)

if not time_control["nonstop"] or current_time >= 9223372036.3:
continue
Expand All @@ -297,17 +338,14 @@ async def _run(n_federates: int, config_path: str, log_level: int = 2):

logger.info("Finished observe.")
logger.info("finalizing monitoring task")
# message_handler.cancel()
# try:
# await message_handler
# except asyncio.CancelledError:
# logger.info("Monitor shutdown")

logger.info("Closing database ...")
db.close()

except KeyboardInterrupt:
logger.debug("Observer got keyboard interrupt")
raise
except h.HelicsException as ex:
logger.debug(f"Caught HelicsException: {ex}")
finally:
logger.debug("Observer finalizing")
logger.info("Finalizing federate ...")
Expand Down
14 changes: 8 additions & 6 deletions helics_cli/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ def __init__(self, process_list, output_list, has_web, message_handler, use_brok
def shutdown(self):
print("in shutdown...")
if self.use_broker_process and self.broker_process.is_alive():
self.broker_process.terminate()
# self.broker_process.kill()
self.broker_process.close()
# self.broker_process.terminate()
self.broker_process.kill()
# self.broker_process.close()
# self.web_process.join(3)

print("shutdown broker")

if self.has_web and self.web_process.is_alive():
self.web_process.terminate()
# self.web_process.kill()
self.web_process.close()
# self.web_process.terminate()
self.web_process.kill()
# self.web_process.close()
# self.web_process.join(3)

print("shutdown web")

Expand Down
Loading