diff --git a/src/__init__.py b/src/__init__.py index 8309909..fa17a66 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,3 +1,4 @@ from . import custom_pathsim_blocks -from . import convert_to_python +from . import pathsim_utils from . import backend +from . import convert_to_python diff --git a/src/backend.py b/src/backend.py index 1b58970..a35dc9f 100644 --- a/src/backend.py +++ b/src/backend.py @@ -2,65 +2,16 @@ import json from flask import Flask, request, jsonify from flask_cors import CORS -import math -import numpy as np + import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly import json as plotly_json -from pathsim import Simulation, Connection -from pathsim.events import Event -import pathsim.solvers -from pathsim.blocks import ( - Scope, - Block, - Constant, - StepSource, - PulseSource, - Amplifier, - Adder, - Multiplier, - Integrator, - Function, - Delay, - RNG, - PID, - Schedule, -) -from .custom_pathsim_blocks import Process, Splitter -from .convert_to_python import convert_graph_to_python -NAME_TO_SOLVER = { - "SSPRK22": pathsim.solvers.SSPRK22, - "SSPRK33": pathsim.solvers.SSPRK33, - "RKF21": pathsim.solvers.RKF21, -} - -map_str_to_object = { - "constant": Constant, - "stepsource": StepSource, - "pulsesource": PulseSource, - "amplifier": Amplifier, - "amplifier_reverse": Amplifier, - "scope": Scope, - "splitter2": Splitter, - "splitter3": Splitter, - "adder": Adder, - "adder_reverse": Adder, - "multiplier": Multiplier, - "process": Process, - "process_horizontal": Process, - "rng": RNG, - "pid": PID, - "integrator": Integrator, - "function": Function, - "delay": Delay, -} - - -# app = Flask(__name__) -# CORS(app, supports_credentials=True) +from .convert_to_python import convert_graph_to_python +from .pathsim_utils import make_pathsim_model +from pathsim.blocks import Scope app = Flask(__name__) CORS( @@ -145,126 +96,6 @@ def convert_to_python(): return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500 -def find_node_by_id(node_id: str, nodes: list) -> dict: - for node in nodes: - if node["id"] == node_id: - return node - return None - - -def find_block_by_id(block_id: str, blocks) -> Block: - for block in blocks: - if hasattr(block, "id") and block.id == block_id: - return block - return None - - -def create_integrator( - node: dict, eval_namespace: dict = None -) -> tuple[Block, list[Schedule]]: - if eval_namespace is None: - eval_namespace = globals() - - block = Integrator( - initial_value=eval(node["data"]["initial_value"], eval_namespace) - if node["data"].get("initial_value") and node["data"]["initial_value"] != "" - else 0.0, - ) - # add events to reset integrator if needed - events = [] - if node["data"]["reset_times"] != "": - - def reset_itg(_): - block.reset() - - reset_times = eval(node["data"]["reset_times"], eval_namespace) - if isinstance(reset_times, (int, float)): - # If it's a single number, convert it to a list - reset_times = [reset_times] - for t in reset_times: - events.append(Schedule(t_start=t, t_end=t, func_act=reset_itg)) - return block, events - - -def create_function(node: dict, eval_namespace: dict = None) -> Block: - if eval_namespace is None: - eval_namespace = globals() - - # Convert the expression string to a lambda function - expression = node["data"].get("expression", "x") - - # Create a safe lambda function from the expression - # The expression should use 'x' as the variable - try: - # Create a lambda function from the expression string - # We'll allow common mathematical operations and numpy functions - - # Safe namespace for eval - merge with global variables - safe_namespace = { - "x": 0, # placeholder - "np": np, - "math": math, - "sin": np.sin, - "cos": np.cos, - "tan": np.tan, - "exp": np.exp, - "log": np.log, - "sqrt": np.sqrt, - "abs": abs, - "pow": pow, - "pi": np.pi, - "e": np.e, - **eval_namespace, # Include global variables - } - - # Test the expression first to ensure it's valid - eval(expression.replace("x", "1"), safe_namespace) - - # Create the actual function - def func(x): - return eval(expression, {**safe_namespace, "x": x}) - - except Exception as e: - raise ValueError(f"Invalid function expression: {expression}. Error: {e}") - - block = Function(func=func) - return block - - -def create_scope(node: dict, edges, nodes) -> Scope: - # Find all incoming edges to this node and sort by source id for consistent ordering - incoming_edges = [edge for edge in edges if edge["target"] == node["id"]] - incoming_edges.sort(key=lambda x: x["source"]) - - # create labels for the scope based on incoming edges - labels = [] - duplicate_labels = [] - connections_order = [] # will be used later to make connections - for edge in incoming_edges: - source_node = find_node_by_id(edge["source"], nodes=nodes) - label = source_node["data"]["label"] - - connections_order.append(edge["id"]) - - # If the label already exists, try to append the source handle to it (if it exists) - if label in labels or label in duplicate_labels: - duplicate_labels.append(label) - if edge["sourceHandle"]: - new_label = label + f" ({edge['sourceHandle']})" - label = new_label - labels.append(label) - - for i, (edge, label) in enumerate(zip(incoming_edges, labels)): - if label in duplicate_labels: - if edge["sourceHandle"]: - labels[i] += f" ({edge['sourceHandle']})" - - block = Scope(labels=labels) - block._connections_order = connections_order - - return block - - # Function to convert graph to pathsim and run simulation @app.route("/run-pathsim", methods=["POST"]) def run_pathsim(): @@ -335,393 +166,5 @@ def run_pathsim(): return jsonify({"success": False, "error": f"Server error: {str(e)}"}), 500 -def make_global_variables(global_vars): - # Validate and exec global variables so that they are usable later in this script. - # Return a namespace dictionary containing the global variables - global_namespace = {} - - for var in global_vars: - var_name = var.get("name", "").strip() - var_value = var.get("value", "") - - # Validate variable name - if not var_name: - continue # Skip empty names - - if not var_name.isidentifier(): - raise ValueError( - f"Invalid Python variable name: '{var_name}'. " - "Variable names must start with a letter or underscore, " - "and contain only letters, digits, and underscores." - ) - - # Check if it's a Python keyword - import keyword - - if keyword.iskeyword(var_name): - raise ValueError( - f"'{var_name}' is a Python keyword and cannot be used as a variable name." - ) - - try: - # Execute in global namespace for backwards compatibility - exec(f"{var_name} = {var_value}", globals()) - # Also store in local namespace for eval calls - global_namespace[var_name] = eval(var_value) - except Exception as e: - raise ValueError(f"Error setting global variable '{var_name}': {str(e)}") - - return global_namespace - - -def make_solver_params(solver_prms, eval_namespace=None): - extra_params = solver_prms.pop("extra_params", "") - if extra_params == "": - extra_params = {} - else: - extra_params = eval(extra_params, eval_namespace) - assert isinstance(extra_params, dict), "extra_params must be a dictionary" - - for k, v in solver_prms.items(): - if k not in ["Solver", "log"]: - try: - solver_prms[k] = eval(v, eval_namespace) - except Exception as e: - return jsonify( - {"error": f"Invalid value for {k}: {v}. Error: {str(e)}"} - ), 400 - elif k == "log": - if v == "true": - solver_prms[k] = True - elif v == "false": - solver_prms[k] = False - else: - return jsonify( - {"error": f"Invalid value for {k}: {v}. Must be 'true' or 'false'."} - ), 400 - elif k == "Solver": - if v not in NAME_TO_SOLVER: - return jsonify( - { - "error": f"Invalid solver: {v}. Must be one of {list(NAME_TO_SOLVER.keys())}." - } - ), 400 - solver_prms[k] = NAME_TO_SOLVER[v] - - # remove solver duration from solver parameters - duration = float(solver_prms.pop("simulation_duration")) - - assert not isinstance(solver_prms["Solver"], str), solver_prms["Solver"] - - return solver_prms, extra_params, duration - - -def auto_block_construction(node: dict, eval_namespace: dict = None) -> Block: - """ - Automatically constructs a block object from a node dictionary. - - Args: - node: The node dictionary containing block information. - eval_namespace: A namespace for evaluating expressions. Defaults to None. - - Raises: - ValueError: If the block type is unknown or if there are issues with evaluation. - - Returns: - The constructed block object. - """ - if eval_namespace is None: - eval_namespace = globals() - - block_type = node["type"] - - if eval_namespace is None: - eval_namespace = globals() - - block_type = node["type"] - if block_type not in map_str_to_object: - raise ValueError(f"Unknown block type: {block_type}") - - block_class = map_str_to_object[block_type] - - # skip 'self' - parameters_for_class = block_class.__init__.__code__.co_varnames[1:] - - parameters = { - k: eval(v, eval_namespace) - for k, v in node["data"].items() - if k in parameters_for_class - } - return block_class(**parameters) - - -def make_blocks( - nodes: list[dict], edges: list[dict], eval_namespace: dict = None -) -> tuple[list[Block], list[Event]]: - blocks, events = [], [] - - for node in nodes: - block_type = node["type"] - - # Manual construction for some block types - if block_type == "integrator": - block, event_int = create_integrator(node, eval_namespace) - events.extend(event_int) - elif block_type == "function": - block = create_function(node, eval_namespace) - elif block_type == "scope": - block = create_scope(node, edges, nodes) - elif block_type == "stepsource": - block = StepSource( - amplitude=eval(node["data"]["amplitude"], eval_namespace), - tau=eval(node["data"]["delay"], eval_namespace), - ) - elif block_type == "delay": - block = Delay(tau=eval(node["data"]["delay"], eval_namespace)) - elif block_type == "splitter2": - block = Splitter( - n=2, - fractions=[ - eval(node["data"]["f1"], eval_namespace), - eval(node["data"]["f2"], eval_namespace), - ], - ) - elif block_type == "splitter3": - block = Splitter( - n=3, - fractions=[ - eval(node["data"]["f1"], eval_namespace), - eval(node["data"]["f2"], eval_namespace), - eval(node["data"]["f3"], eval_namespace), - ], - ) - else: # try automated construction - block = auto_block_construction(node, eval_namespace) - - block.id = node["id"] - block.label = node["data"]["label"] - blocks.append(block) - - return blocks, events - - -def make_blocks_old(nodes, edges, eval_namespace=None): - blocks, events = [], [] - for node in nodes: - # TODO this needs serious refactoring - if node["type"] == "constant": - block = Constant(value=eval(node["data"]["value"], eval_namespace)) - elif node["type"] == "stepsource": - block = StepSource( - amplitude=eval(node["data"]["amplitude"], eval_namespace), - tau=eval(node["data"]["delay"], eval_namespace), - ) - elif node["type"] == "pulsesource": - block = PulseSource( - amplitude=eval(node["data"]["amplitude"], eval_namespace), - T=eval(node["data"]["T"], eval_namespace), - t_rise=eval(node["data"]["t_rise"], eval_namespace), - t_fall=eval(node["data"]["t_fall"], eval_namespace), - tau=eval(node["data"]["tau"], eval_namespace), - duty=eval(node["data"]["duty"], eval_namespace), - ) - elif node["type"] in ["amplifier", "amplifier_reverse"]: - block = Amplifier(gain=eval(node["data"]["gain"], eval_namespace)) - elif node["type"] == "scope": - block = create_scope(node, edges, nodes) - elif node["type"] == "splitter2": - block = Splitter( - n=2, - fractions=[ - eval(node["data"]["f1"], eval_namespace), - eval(node["data"]["f2"], eval_namespace), - ], - ) - elif node["type"] == "splitter3": - block = Splitter( - n=3, - fractions=[ - eval(node["data"]["f1"], eval_namespace), - eval(node["data"]["f2"], eval_namespace), - eval(node["data"]["f3"], eval_namespace), - ], - ) - elif node["type"] == "adder": - # TODO handle custom operations - block = Adder() - elif node["type"] == "multiplier": - block = Multiplier() - elif node["type"] == "integrator": - block, events_int = create_integrator(node, eval_namespace) - events.extend(events_int) - elif node["type"] == "function": - block = create_function(node, eval_namespace) - elif node["type"] == "delay": - block = Delay(tau=eval(node["data"]["tau"], eval_namespace)) - elif node["type"] == "rng": - block = RNG( - sampling_rate=eval(node["data"]["sampling_rate"], eval_namespace) - ) - elif node["type"] == "pid": - block = PID( - Kp=eval(node["data"]["Kp"], eval_namespace) - if node["data"].get("Kp") - else 0, - Ki=eval(node["data"]["Ki"], eval_namespace) - if node["data"].get("Ki") - else 0, - Kd=eval(node["data"]["Kd"], eval_namespace) - if node["data"].get("Kd") - else 0, - f_max=eval(node["data"]["f_max"], eval_namespace) - if node["data"].get("f_max") - else 100, - ) - elif node["type"] in ["process", "process_horizontal"]: - block = Process( - residence_time=( - eval(node["data"]["residence_time"], eval_namespace) - if node["data"].get("residence_time") - and node["data"]["residence_time"] != "" - else 0 - ), - ic=( - eval(node["data"]["initial_value"], eval_namespace) - if node["data"].get("initial_value") - and node["data"]["initial_value"] != "" - else 0 - ), - gen=( - eval(node["data"]["source_term"], eval_namespace) - if node["data"].get("source_term") - and node["data"]["source_term"] != "" - else 0 - ), - ) - else: - raise ValueError(f"Unknown node type: {node['type']}") - block.id = node["id"] - block.label = node["data"]["label"] - blocks.append(block) - - return blocks, events - - -def make_connections(nodes, edges, blocks) -> list[Connection]: - # Create connections based on the sorted edges to match beta order - connections_pathsim = [] - - # Process each node and its sorted incoming edges to create connections - block_to_input_index = {b: 0 for b in blocks} - for node in nodes: - outgoing_edges = [edge for edge in edges if edge["source"] == node["id"]] - outgoing_edges.sort(key=lambda x: x["target"]) - - incoming_edges = [edge for edge in edges if edge["target"] == node["id"]] - incoming_edges.sort(key=lambda x: x["source"]) - - block = find_block_by_id(node["id"], blocks=blocks) - - for edge in outgoing_edges: - target_block = find_block_by_id(edge["target"], blocks=blocks) - if isinstance(block, Process): - if edge["sourceHandle"] == "inv": - output_index = 0 - elif edge["sourceHandle"] == "mass_flow_rate": - output_index = 1 - assert block.residence_time != 0, ( - "Residence time must be non-zero for mass flow rate output." - ) - else: - raise ValueError( - f"Invalid source handle '{edge['sourceHandle']}' for {edge}." - ) - elif isinstance(block, Splitter): - # Splitter outputs are always in order, so we can use the handle directly - assert edge["sourceHandle"], edge - output_index = int(edge["sourceHandle"].replace("source", "")) - 1 - if output_index >= block.n: - raise ValueError( - f"Invalid source handle '{edge['sourceHandle']}' for {edge}." - ) - else: - output_index = 0 - - if isinstance(target_block, Scope): - input_index = target_block._connections_order.index(edge["id"]) - else: - input_index = block_to_input_index[target_block] - - connection = Connection( - block[output_index], - target_block[input_index], - ) - connections_pathsim.append(connection) - block_to_input_index[target_block] += 1 - - return connections_pathsim - - -def make_default_scope(nodes, blocks) -> tuple[Scope, list[Connection]]: - scope_default = Scope( - labels=[node["data"]["label"] for node in nodes], - ) - scope_default.id = "scope_default" - scope_default.label = "Default Scope" - - # Add connections to scope - connections_pathsim = [] - input_index = 0 - for block in blocks: - if block != scope_default: - connection = Connection( - block[0], - scope_default[input_index], - ) - connections_pathsim.append(connection) - input_index += 1 - - return scope_default, connections_pathsim - - -def make_pathsim_model(graph_data: dict) -> tuple[Simulation, float]: - nodes = graph_data.get("nodes", []) - edges = graph_data.get("edges", []) - solver_prms = graph_data.get("solverParams", {}) - global_vars = graph_data.get("globalVariables", {}) - - # Get the global variables namespace to use in eval calls - global_namespace = make_global_variables(global_vars) - - # Create a combined namespace that includes built-in functions and global variables - eval_namespace = {**globals(), **global_namespace} - - solver_prms, extra_params, duration = make_solver_params( - solver_prms, eval_namespace - ) - - # Create blocks - blocks, events = make_blocks(nodes, edges, eval_namespace) - - connections_pathsim = make_connections(nodes, edges, blocks) - - # Add a Scope block if none exists - # This ensures that there is always a scope to collect outputs - if not any(isinstance(block, Scope) for block in blocks): - scope_default, connections_scope_def = make_default_scope(nodes, blocks) - blocks.append(scope_default) - connections_pathsim.extend(connections_scope_def) - - # Create the simulation - simulation = Simulation( - blocks, - connections_pathsim, - events=events, - **solver_prms, # Unpack solver parameters - **extra_params, # Unpack extra parameters - ) - return simulation, duration - - if __name__ == "__main__": app.run(port=8000, debug=True) diff --git a/src/convert_to_python.py b/src/convert_to_python.py index 22cfd24..9c8561d 100644 --- a/src/convert_to_python.py +++ b/src/convert_to_python.py @@ -1,151 +1,183 @@ from jinja2 import Environment, FileSystemLoader -import json import os - - -def process_graph_data(json_file: str) -> dict: - """Process the JSON graph data and prepare it for template rendering.""" - data = json.load(open(json_file)) - - return process_graph_data_from_dict(data) - - -def main(): - current_dir = os.path.dirname(os.path.abspath(__file__)) - templates_dir = os.path.join(current_dir, "templates") - - environment = Environment(loader=FileSystemLoader(templates_dir)) - template = environment.get_template("template.py") - - results_filename = os.path.join(current_dir, "..", "generated_script.py") - - # Process the graph data - test_file_path = os.path.join(current_dir, "..", "saved_graphs", "test3.json") - context = process_graph_data(test_file_path) - - # Render the template - with open(results_filename, mode="w", encoding="utf-8") as results: - results.write(template.render(context)) - print(f"... wrote {results_filename}") - - -def convert_graph_to_python( - graph_data: dict, output_filename: str = "generated_script.py" -) -> str: - """Convert graph data to Python script and return the generated code.""" +from inspect import signature + +from pathsim.blocks import Scope +from .custom_pathsim_blocks import ( + Process, + Splitter, +) +from .pathsim_utils import ( + map_str_to_object, + make_blocks, + make_connections, + make_global_variables, +) + + +def convert_graph_to_python(graph_data: dict) -> str: + """Convert graph data to a Python script as a string.""" # Get the directory of this file to properly reference templates current_dir = os.path.dirname(os.path.abspath(__file__)) templates_dir = os.path.join(current_dir, "templates") environment = Environment(loader=FileSystemLoader(templates_dir)) - template = environment.get_template("template.py") + template = environment.get_template("template_with_macros.py") # Process the graph data context = process_graph_data_from_dict(graph_data) # Render the template - generated_code = template.render(context) - - # Write to file - output_path = os.path.join(current_dir, "..", output_filename) - with open(output_path, mode="w", encoding="utf-8") as results: - results.write(generated_code) + return template.render(context) + + +def process_node_data(nodes: list[dict], edges: list[dict]) -> list[dict]: + """ + Given a list of node and edge data as dictionaries, process the nodes to create + variable names, class names, and expected arguments for each node. + + Returns: + The processed node data with variable names, class names, and expected arguments. + """ + nodes = nodes.copy() + used_var_names = set() + + for node in nodes: + # Make a variable name from the label + invalid_chars = set("!@#$%^&*()+=[]{}|;:'\",.-<>?/\\`~") + base_var_name = node["data"]["label"].lower().replace(" ", "_") + for char in invalid_chars: + base_var_name = base_var_name.replace(char, "") + + # Ensure the base variable name is a valid identifier + if not base_var_name.isidentifier(): + raise ValueError( + f"Variable name must be a valid identifier. {node['data']['label']} to {base_var_name}" + ) - return generated_code + # Make the variable name unique by appending a number if needed + var_name = base_var_name + counter = 1 + while var_name in used_var_names: + var_name = f"{base_var_name}_{counter}" + counter += 1 + + node["var_name"] = var_name + used_var_names.add(var_name) + + # Add pathsim class name + block_class = map_str_to_object.get(node["type"]) + node["class_name"] = block_class.__name__ + node["module_name"] = block_class.__module__ + + # Add expected arguments + node["expected_arguments"] = signature(block_class).parameters + + # if it's a scope, find labels + if node["type"] == "scope": + incoming_edges = [edge for edge in edges if edge["target"] == node["id"]] + incoming_edges.sort(key=lambda x: x["source"]) + node["labels"] = [] + for incoming_edge in incoming_edges: + source_node = next( + (n for n in nodes if n["id"] == incoming_edge["source"]) + ) + + # TODO take care of duplicated labels + node["labels"].append(source_node["data"]["label"]) + return nodes + + +# TODO: this is effectively a duplicate of pathsim_utils.make_connections +# need to refactor +def make_edge_data(data: dict) -> list[dict]: + """ + Process edges to add source/target variable names and ports. + + Does it by creating pathsim.blocks and Connections from the data with + ``make_blocks`` and ``make_connections`` functions. + + Then, since the data (source/target blocks, ports) is already in the + connections, we can simply read the ports id from the actual pathsim + connections and add them to the edges. + + Args: + data: The graph data containing "nodes" and "edges". + + Returns: + The processed edges with source/target variable names and ports. + """ + data = data.copy() + + # we need the namespace since we call make_blocks + namespace = make_global_variables(data["globalVariables"]) + blocks, _ = make_blocks(data["nodes"], data["edges"], eval_namespace=namespace) + + # Process each node and its sorted incoming edges to create connections + block_to_input_index = {b: 0 for b in blocks} + for node in data["nodes"]: + outgoing_edges = [ + edge for edge in data["edges"] if edge["source"] == node["id"] + ] + outgoing_edges.sort(key=lambda x: x["target"]) + + block = next((b for b in blocks if b.id == node["id"])) + + for edge in outgoing_edges: + target_block = next((b for b in blocks if b.id == edge["target"])) + target_node = next((n for n in data["nodes"] if n["id"] == edge["target"])) + if isinstance(block, Process): + if edge["sourceHandle"] == "inv": + output_index = 0 + elif edge["sourceHandle"] == "mass_flow_rate": + output_index = 1 + assert block.residence_time != 0, ( + "Residence time must be non-zero for mass flow rate output." + ) + else: + raise ValueError( + f"Invalid source handle '{edge['sourceHandle']}' for {edge}." + ) + elif isinstance(block, Splitter): + # Splitter outputs are always in order, so we can use the handle directly + assert edge["sourceHandle"], edge + output_index = int(edge["sourceHandle"].replace("source", "")) - 1 + if output_index >= block.n: + raise ValueError( + f"Invalid source handle '{edge['sourceHandle']}' for {edge}." + ) + else: + output_index = 0 + + if isinstance(target_block, Scope): + input_index = target_block._connections_order.index(edge["id"]) + else: + input_index = block_to_input_index[target_block] + + edge["source_var_name"] = node["var_name"] + edge["target_var_name"] = target_node["var_name"] + edge["source_port"] = f"[{output_index}]" + edge["target_port"] = f"[{input_index}]" + block_to_input_index[target_block] += 1 + + return data["edges"] def process_graph_data_from_dict(data: dict) -> dict: - """Process graph data from a dictionary (same as process_graph_data but takes dict instead of file path).""" - # Clean up labels for variable names - for block in data["nodes"]: - block["data"]["label"] = block["data"]["label"].lower().replace(" ", "_") - - def find_node_by_id(node_id: str) -> dict: - for node in data["nodes"]: - if node["id"] == node_id: - return node - return None + """ + Process graph data from a dictionary. - # Process each node to determine its incoming connections and betas - processed_blocks = [] - - for node in data["nodes"]: - # Find all incoming edges to this node - incoming_edges = [ - edge for edge in data["edges"] if edge["target"] == node["id"] - ] - - # Sort incoming edges by source id to ensure consistent ordering - incoming_edges.sort(key=lambda x: x["source"]) - - # Calculate transfer fractions and source blocks for this node - transfer_fractions = [] - source_block_labels = [] - - for edge in incoming_edges: - source_node = find_node_by_id(edge["source"]) - outgoing_edges = [ - edge for edge in data["edges"] if edge["source"] == source_node["id"] - ] - # default transfer fraction split equally - f = edge["data"].get("weight", 1 / len(outgoing_edges)) - - # Create transfer fraction variable name - f_var_name = f"f_{source_node['data']['label']}_{node['data']['label']}" - - transfer_fractions.append( - { - "var_name": f_var_name, - "value": f, - "source_label": source_node["data"]["label"], - "target_label": node["data"]["label"], - } - ) - source_block_labels.append(source_node["data"]["label"]) - - # Create processed block info - processed_block = { - "id": node["id"], - "data": node["data"], - "transfer_fractions": transfer_fractions, - "source_block_labels": source_block_labels, - "incoming_edges": incoming_edges, - } - processed_blocks.append(processed_block) - - # Collect all transfer fractions for global variable generation - all_transfer_fractions = [] - for block in processed_blocks: - all_transfer_fractions.extend(block["transfer_fractions"]) - - # Create connection data with proper indexing - connection_data = [] - - # for nodes with several inputs, the order of the connection needs to - # be the same as the order of the transfer fractions (which are sorted by source id) - for block in processed_blocks: - target_input_index = 0 - # Use the sorted incoming edges from each block to maintain order consistency - for edge in block["incoming_edges"]: - source_label = find_node_by_id(edge["source"])["data"]["label"] - target_label = block["data"]["label"] - - connection_data.append( - { - "source": source_label, - "target": target_label, - "target_input_index": target_input_index, - } - ) + Adds variable names, class names, and expected arguments to nodes, + and processes edges to include source/target variable names and ports. - target_input_index += 1 + This processed data can then be more easily used to generate Python code. + """ + data = data.copy() - return { - "blocks": processed_blocks, - "connection_data": connection_data, - "transfer_fractions": all_transfer_fractions, - } + # Process nodes to create variable names and class names + data["nodes"] = process_node_data(data["nodes"], data["edges"]) + # Process to add source/target variable names to edges + ports + data["edges"] = make_edge_data(data) -if __name__ == "__main__": - main() + return data diff --git a/src/pathsim_utils.py b/src/pathsim_utils.py new file mode 100644 index 0000000..8218d3f --- /dev/null +++ b/src/pathsim_utils.py @@ -0,0 +1,452 @@ +import math +import numpy as np +from pathsim import Simulation, Connection +from pathsim.events import Event +import pathsim.solvers +from pathsim.blocks import ( + Scope, + Block, + Constant, + StepSource, + PulseSource, + Amplifier, + Adder, + Multiplier, + Integrator, + Function, + Delay, + RNG, + PID, + Schedule, +) +from .custom_pathsim_blocks import Process, Splitter +from flask import jsonify + +NAME_TO_SOLVER = { + "SSPRK22": pathsim.solvers.SSPRK22, + "SSPRK33": pathsim.solvers.SSPRK33, + "RKF21": pathsim.solvers.RKF21, +} +map_str_to_object = { + "constant": Constant, + "stepsource": StepSource, + "pulsesource": PulseSource, + "amplifier": Amplifier, + "amplifier_reverse": Amplifier, + "scope": Scope, + "splitter2": Splitter, + "splitter3": Splitter, + "adder": Adder, + "adder_reverse": Adder, + "multiplier": Multiplier, + "process": Process, + "process_horizontal": Process, + "rng": RNG, + "pid": PID, + "integrator": Integrator, + "function": Function, + "delay": Delay, +} + + +def find_node_by_id(node_id: str, nodes: list) -> dict: + for node in nodes: + if node["id"] == node_id: + return node + return None + + +def find_block_by_id(block_id: str, blocks) -> Block: + for block in blocks: + if hasattr(block, "id") and block.id == block_id: + return block + return None + + +def create_integrator( + node: dict, eval_namespace: dict = None +) -> tuple[Block, list[Schedule]]: + if eval_namespace is None: + eval_namespace = globals() + + block = Integrator( + initial_value=eval(node["data"]["initial_value"], eval_namespace) + if node["data"].get("initial_value") and node["data"]["initial_value"] != "" + else 0.0, + ) + # add events to reset integrator if needed + events = [] + if node["data"]["reset_times"] != "": + + def reset_itg(_): + block.reset() + + reset_times = eval(node["data"]["reset_times"], eval_namespace) + if isinstance(reset_times, (int, float)): + # If it's a single number, convert it to a list + reset_times = [reset_times] + for t in reset_times: + events.append(Schedule(t_start=t, t_end=t, func_act=reset_itg)) + return block, events + + +def create_function(node: dict, eval_namespace: dict = None) -> Block: + if eval_namespace is None: + eval_namespace = globals() + + # Convert the expression string to a lambda function + expression = node["data"].get("expression", "x") + + # Create a safe lambda function from the expression + # The expression should use 'x' as the variable + try: + # Create a lambda function from the expression string + # We'll allow common mathematical operations and numpy functions + + # Safe namespace for eval - merge with global variables + safe_namespace = { + "x": 0, # placeholder + "np": np, + "math": math, + "sin": np.sin, + "cos": np.cos, + "tan": np.tan, + "exp": np.exp, + "log": np.log, + "sqrt": np.sqrt, + "abs": abs, + "pow": pow, + "pi": np.pi, + "e": np.e, + **eval_namespace, # Include global variables + } + + # Test the expression first to ensure it's valid + eval(expression.replace("x", "1"), safe_namespace) + + # Create the actual function + def func(x): + return eval(expression, {**safe_namespace, "x": x}) + + except Exception as e: + raise ValueError(f"Invalid function expression: {expression}. Error: {e}") + + block = Function(func=func) + return block + + +def create_scope(node: dict, edges, nodes) -> Scope: + # Find all incoming edges to this node and sort by source id for consistent ordering + incoming_edges = [edge for edge in edges if edge["target"] == node["id"]] + incoming_edges.sort(key=lambda x: x["source"]) + + # create labels for the scope based on incoming edges + labels = [] + duplicate_labels = [] + connections_order = [] # will be used later to make connections + for edge in incoming_edges: + source_node = find_node_by_id(edge["source"], nodes=nodes) + label = source_node["data"]["label"] + connections_order.append(edge["id"]) + + # If the label already exists, try to append the source handle to it (if it exists) + if label in labels or label in duplicate_labels: + duplicate_labels.append(label) + if edge["sourceHandle"]: + new_label = label + f" ({edge['sourceHandle']})" + label = new_label + labels.append(label) + + for i, (edge, label) in enumerate(zip(incoming_edges, labels)): + if label in duplicate_labels: + if edge["sourceHandle"]: + labels[i] += f" ({edge['sourceHandle']})" + + block = Scope(labels=labels) + block._connections_order = connections_order + + return block + + +def make_global_variables(global_vars): + # Validate and exec global variables so that they are usable later in this script. + # Return a namespace dictionary containing the global variables + global_namespace = {} + + for var in global_vars: + var_name = var.get("name", "").strip() + var_value = var.get("value", "") + + # Validate variable name + if not var_name: + continue # Skip empty names + + if not var_name.isidentifier(): + raise ValueError( + f"Invalid Python variable name: '{var_name}'. " + "Variable names must start with a letter or underscore, " + "and contain only letters, digits, and underscores." + ) + + # Check if it's a Python keyword + import keyword + + if keyword.iskeyword(var_name): + raise ValueError( + f"'{var_name}' is a Python keyword and cannot be used as a variable name." + ) + + try: + # Execute in global namespace for backwards compatibility + exec(f"{var_name} = {var_value}", globals()) + # Also store in local namespace for eval calls + global_namespace[var_name] = eval(var_value) + except Exception as e: + raise ValueError(f"Error setting global variable '{var_name}': {str(e)}") + + return global_namespace + + +def make_solver_params(solver_prms, eval_namespace=None): + extra_params = solver_prms.pop("extra_params", "") + if extra_params == "": + extra_params = {} + else: + extra_params = eval(extra_params, eval_namespace) + assert isinstance(extra_params, dict), "extra_params must be a dictionary" + + for k, v in solver_prms.items(): + if k not in ["Solver", "log"]: + try: + solver_prms[k] = eval(v, eval_namespace) + except Exception as e: + return jsonify( + {"error": f"Invalid value for {k}: {v}. Error: {str(e)}"} + ), 400 + elif k == "log": + if v == "true": + solver_prms[k] = True + elif v == "false": + solver_prms[k] = False + else: + return jsonify( + {"error": f"Invalid value for {k}: {v}. Must be 'true' or 'false'."} + ), 400 + elif k == "Solver": + if v not in NAME_TO_SOLVER: + return jsonify( + { + "error": f"Invalid solver: {v}. Must be one of {list(NAME_TO_SOLVER.keys())}." + } + ), 400 + solver_prms[k] = NAME_TO_SOLVER[v] + + # remove solver duration from solver parameters + duration = float(solver_prms.pop("simulation_duration")) + + assert not isinstance(solver_prms["Solver"], str), solver_prms["Solver"] + + return solver_prms, extra_params, duration + + +def auto_block_construction(node: dict, eval_namespace: dict = None) -> Block: + """ + Automatically constructs a block object from a node dictionary. + + Args: + node: The node dictionary containing block information. + eval_namespace: A namespace for evaluating expressions. Defaults to None. + + Raises: + ValueError: If the block type is unknown or if there are issues with evaluation. + + Returns: + The constructed block object. + """ + if eval_namespace is None: + eval_namespace = globals() + + block_type = node["type"] + + if eval_namespace is None: + eval_namespace = globals() + + block_type = node["type"] + if block_type not in map_str_to_object: + raise ValueError(f"Unknown block type: {block_type}") + + block_class = map_str_to_object[block_type] + + # skip 'self' + parameters_for_class = block_class.__init__.__code__.co_varnames[1:] + + parameters = { + k: eval(v, eval_namespace) + for k, v in node["data"].items() + if k in parameters_for_class + } + return block_class(**parameters) + + +def make_blocks( + nodes: list[dict], edges: list[dict], eval_namespace: dict = None +) -> tuple[list[Block], list[Event]]: + blocks, events = [], [] + + for node in nodes: + block_type = node["type"] + + # Manual construction for some block types + if block_type == "integrator": + block, event_int = create_integrator(node, eval_namespace) + events.extend(event_int) + elif block_type == "function": + block = create_function(node, eval_namespace) + elif block_type == "scope": + block = create_scope(node, edges, nodes) + elif block_type == "stepsource": + block = StepSource( + amplitude=eval(node["data"]["amplitude"], eval_namespace), + tau=eval(node["data"]["delay"], eval_namespace), + ) + elif block_type == "splitter2": + block = Splitter( + n=2, + fractions=[ + eval(node["data"]["f1"], eval_namespace), + eval(node["data"]["f2"], eval_namespace), + ], + ) + elif block_type == "splitter3": + block = Splitter( + n=3, + fractions=[ + eval(node["data"]["f1"], eval_namespace), + eval(node["data"]["f2"], eval_namespace), + eval(node["data"]["f3"], eval_namespace), + ], + ) + else: # try automated construction + block = auto_block_construction(node, eval_namespace) + + block.id = node["id"] + block.label = node["data"]["label"] + blocks.append(block) + + return blocks, events + + +def make_connections(nodes, edges, blocks) -> list[Connection]: + # Create connections based on the sorted edges to match beta order + connections_pathsim = [] + + # Process each node and its sorted incoming edges to create connections + block_to_input_index = {b: 0 for b in blocks} + for node in nodes: + outgoing_edges = [edge for edge in edges if edge["source"] == node["id"]] + outgoing_edges.sort(key=lambda x: x["target"]) + + incoming_edges = [edge for edge in edges if edge["target"] == node["id"]] + incoming_edges.sort(key=lambda x: x["source"]) + + block = find_block_by_id(node["id"], blocks=blocks) + + for edge in outgoing_edges: + target_block = find_block_by_id(edge["target"], blocks=blocks) + if isinstance(block, Process): + if edge["sourceHandle"] == "inv": + output_index = 0 + elif edge["sourceHandle"] == "mass_flow_rate": + output_index = 1 + assert block.residence_time != 0, ( + "Residence time must be non-zero for mass flow rate output." + ) + else: + raise ValueError( + f"Invalid source handle '{edge['sourceHandle']}' for {edge}." + ) + elif isinstance(block, Splitter): + # Splitter outputs are always in order, so we can use the handle directly + assert edge["sourceHandle"], edge + output_index = int(edge["sourceHandle"].replace("source", "")) - 1 + if output_index >= block.n: + raise ValueError( + f"Invalid source handle '{edge['sourceHandle']}' for {edge}." + ) + else: + output_index = 0 + + if isinstance(target_block, Scope): + input_index = target_block._connections_order.index(edge["id"]) + else: + input_index = block_to_input_index[target_block] + + connection = Connection( + block[output_index], + target_block[input_index], + ) + connections_pathsim.append(connection) + block_to_input_index[target_block] += 1 + + return connections_pathsim + + +def make_default_scope(nodes, blocks) -> tuple[Scope, list[Connection]]: + scope_default = Scope( + labels=[node["data"]["label"] for node in nodes], + ) + scope_default.id = "scope_default" + scope_default.label = "Default Scope" + + # Add connections to scope + connections_pathsim = [] + input_index = 0 + for block in blocks: + if block != scope_default: + connection = Connection( + block[0], + scope_default[input_index], + ) + connections_pathsim.append(connection) + input_index += 1 + + return scope_default, connections_pathsim + + +def make_pathsim_model(graph_data: dict) -> tuple[Simulation, float]: + nodes = graph_data.get("nodes", []) + edges = graph_data.get("edges", []) + solver_prms = graph_data.get("solverParams", {}) + global_vars = graph_data.get("globalVariables", {}) + + # Get the global variables namespace to use in eval calls + global_namespace = make_global_variables(global_vars) + + # Create a combined namespace that includes built-in functions and global variables + eval_namespace = {**globals(), **global_namespace} + + solver_prms, extra_params, duration = make_solver_params( + solver_prms, eval_namespace + ) + + # Create blocks + blocks, events = make_blocks(nodes, edges, eval_namespace) + + connections_pathsim = make_connections(nodes, edges, blocks) + + # Add a Scope block if none exists + # This ensures that there is always a scope to collect outputs + if not any(isinstance(block, Scope) for block in blocks): + scope_default, connections_scope_def = make_default_scope(nodes, blocks) + blocks.append(scope_default) + connections_pathsim.extend(connections_scope_def) + + # Create the simulation + simulation = Simulation( + blocks, + connections_pathsim, + events=events, + **solver_prms, # Unpack solver parameters + **extra_params, # Unpack extra parameters + ) + return simulation, duration diff --git a/src/templates/__init__.py b/src/templates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/templates/block_macros.py b/src/templates/block_macros.py new file mode 100644 index 0000000..d83b1b9 --- /dev/null +++ b/src/templates/block_macros.py @@ -0,0 +1,64 @@ +{# Macro-based approach for block creation #} +{% macro create_block(node) -%} +{{ node["var_name"] }} = {{ node["module_name"] }}.{{ node["class_name"] }}( + {%- for arg in node["expected_arguments"] %} + {%- if node["data"].get(arg) -%} + {{ arg }}={{ node["data"].get(arg) }}{% if not loop.last %}, {% endif %} + {%- endif -%} + {%- endfor %} +) +{%- endmacro -%} + + +{% macro create_integrator_block(node) -%} +{{ create_block(node) }} + +{%- if node["data"].get("reset_times") %} +def reset_itg(_): + {{ node["var_name"] }}.reset() + +for t in {{ node["data"].get("reset_times", "[]") }}: + events.append( + pathsim.events.Schedule( + t_start=t, + t_end=t, + func_act=reset_itg, + ) + ) +{%- endif %} + +{%- endmacro -%} + + +{% macro create_function_block(node) -%} + +def func(x): + return {{ node["data"]["expression"] }} + +{{ node["var_name"] }} = pathsim.blocks.Function(func=func) + +{%- endmacro -%} + +{% macro create_stepsource(node) -%} +{{ node["var_name"] }} = pathsim.blocks.StepSource( + amplitude={{ node["data"]["amplitude"] }}, + tau={{ node["data"]["delay"] }}, +) +{%- endmacro -%} + + + +{% macro create_scope_block(node) -%} +{{ node["var_name"] }} = pathsim.blocks.Scope( + labels={{ node["labels"] }} +) + +{%- endmacro -%} + +{% macro create_connections(edges) -%} +connections = [ + {% for edge in edges -%} + Connection({{ edge["source_var_name"] }}{{edge["source_port"]}}, {{ edge["target_var_name"] }}{{ edge["target_port"] }}), + {% endfor -%} +] +{%- endmacro -%} diff --git a/src/templates/template.py b/src/templates/template.py deleted file mode 100644 index eaac10d..0000000 --- a/src/templates/template.py +++ /dev/null @@ -1,76 +0,0 @@ -import pathsim -import numpy as np -import matplotlib.pyplot as plt - -from pathsim import Simulation, Connection -from pathsim.blocks import ODE, Source, Scope, Block, Pulse -from pathsim.solvers import RKBS32, RKF21 -from pathsim.events import ZeroCrossingDown, ZeroCrossingUp - - -class Process(ODE): - def __init__(self, alpha=0, betas=[], gen=0, ic=0, name=None): - self.name = name - super().__init__( - func=lambda x, u, t: alpha * x - + sum(_u * _b for _u, _b in zip(u, betas)) - + gen, - jac=lambda x, u, t: alpha, - initial_value=ic, - ) - - -# Create Process blocks -{% for block in blocks -%} -{% if block["data"]["residence_time"] != "" -%} -tau_{{ block["data"]["label"] }} = {{ block["data"]["residence_time"] }} -{% endif -%} -{%- endfor %} - -{% for tf in transfer_fractions -%} -{{ tf["var_name"] }} = {{ tf["value"] }} -{% endfor %} - -{% for block in blocks %} -{{ block["data"]["label"] }} = Process( - name="{{ block["data"]["label"] }}", - alpha={% if block["data"]["residence_time"] != "" %}-1 / tau_{{ block["data"]["label"] }}{% else %}0{% endif %}, - betas=[{% for tf in block["transfer_fractions"] %}{{ tf["var_name"] }} / tau_{{ tf["source_label"] }}{% if not loop.last %}, {% endif %}{% endfor %}], - gen={% if block["data"]["source_term"] != "" %}{{ block["data"]["source_term"] }}{% else %}0{% endif %}, - ic={% if block["data"]["initial_value"] != "" %}{{ block["data"]["initial_value"] }}{% else %}0{% endif %}, -) -{% endfor %} - -# Create Scope block -scope = Scope( - labels=[{% for block in blocks %}"{{ block["data"]["label"] }}"{% if not loop.last %}, {% endif %}{% endfor %}], -) - -# Create blocks list -blocks = [ - {% for block in blocks %}{{ block["data"]["label"] }}, - {% endfor %}scope, -] - -# Create connections -connections = [ - # Process-to-process connections -{% for conn in connection_data %} Connection({{ conn["source"] }}, {{ conn["target"] }}[{{ conn["target_input_index"] }}]), -{% endfor %} - # Process-to-scope connections -{% for block in blocks %} Connection({{ block["data"]["label"] }}, scope[{{ loop.index0 }}]), -{% endfor %}] - -# Create simulation -my_simulation = Simulation(blocks, connections, log=False) - - -if __name__ == "__main__": - my_simulation.run(50) - - my_simulation.save("simple.mdl") - - fig, ax = scope.plot() - - plt.show() - plt.show() \ No newline at end of file diff --git a/src/templates/template_with_macros.py b/src/templates/template_with_macros.py new file mode 100644 index 0000000..0663e63 --- /dev/null +++ b/src/templates/template_with_macros.py @@ -0,0 +1,66 @@ +import pathsim +from pathsim import Simulation, Connection +import numpy as np +import matplotlib.pyplot as plt +import src +{# Import macros #} +{% from 'block_macros.py' import create_block, create_source_block, create_integrator_block, create_function_block, create_scope_block, create_stepsource, create_connections -%} + +# Create global variables +{% for var in globalVariables -%} +{{ var["name"] }} = {{ var["value"] }} +{% endfor %} +# Create blocks +blocks, events = [], [] + +{% for node in nodes -%} +{%- if node["type"] == "integrator" -%} +{{ create_integrator_block(node) }} +{%- elif node["type"] == "stepsource" -%} +{{ create_stepsource(node) }} +{%- elif node["type"] == "function" -%} +{{ create_function_block(node) }} +{%- elif node["type"] == "scope" -%} +{{ create_scope_block(node) }} +{%- else -%} +{{ create_block(node) }} +{%- endif %} +blocks.append({{ node["var_name"] }}) + +{% endfor %} + +# Create connections + +{{ create_connections(edges) }} + +# Create simulation +my_simulation = Simulation( + blocks, + connections, + events=events, + Solver=pathsim.solvers.{{ solverParams["Solver"] }}, + dt={{ solverParams["dt"] }}, + dt_max={{ solverParams["dt_max"] }}, + dt_min={{ solverParams["dt_min"] }}, + iterations_max={{ solverParams["iterations_max"] }}, + log={{ solverParams["log"].capitalize() }}, + tolerance_fpi={{ solverParams["tolerance_fpi"] }}, + **{{ solverParams["extra_params"] }}, +) + +if __name__ == "__main__": + my_simulation.run({{ solverParams["simulation_duration"] }}) + + # Optional: Plotting results + scopes = [block for block in blocks if isinstance(block, pathsim.blocks.Scope)] + fig, axs = plt.subplots(nrows=len(scopes), sharex=True, figsize=(10, 5 * len(scopes))) + for i, scope in enumerate(scopes): + plt.sca(axs[i] if len(scopes) > 1 else axs) + time, data = scope.read() + # plot the recorded data + for p, d in enumerate(data): + lb = scope.labels[p] if p < len(scope.labels) else f"port {p}" + plt.plot(time, d, label=lb) + plt.legend() + plt.xlabel("Time") + plt.show() diff --git a/test/test_backend.py b/test/test_backend.py index 795ae47..ce689df 100644 --- a/test/test_backend.py +++ b/test/test_backend.py @@ -1,4 +1,8 @@ -from src.backend import create_integrator, auto_block_construction, create_function +from src.pathsim_utils import ( + create_integrator, + auto_block_construction, + create_function, +) from src.custom_pathsim_blocks import Process, Splitter import pathsim.blocks diff --git a/test/test_convert_python.py b/test/test_convert_python.py new file mode 100644 index 0000000..ee05975 --- /dev/null +++ b/test/test_convert_python.py @@ -0,0 +1,131 @@ +from src.convert_to_python import convert_graph_to_python +import json +import pytest +from pathlib import Path + +# Create sample graph data +sample_data = { + "nodes": [ + { + "id": "1", + "type": "constant", + "data": { + "label": "input_signal", + "value": "1.0", + }, + }, + { + "id": "2", + "type": "integrator", + "data": { + "label": "integrator_1", + "initial_value": "", + "reset_times": "[10, 20]", + }, + }, + {"id": "3", "type": "amplifier", "data": {"label": "amp_1", "gain": "2.0"}}, + { + "id": "4", + "type": "function", + "data": { + "label": "func_block", + "expression": "x * 2 + 1", + }, + }, + { + "id": "5", + "type": "scope", + "data": { + "label": "scope_1", + }, + }, + ], + "edges": [ + {"source": "1", "target": "2", "id": "e1-2"}, + {"source": "2", "target": "3", "id": "e2-3"}, + {"source": "3", "target": "4", "id": "e3-4"}, + {"source": "3", "target": "5", "id": "e3-5"}, + {"source": "4", "target": "5", "id": "e4-5"}, + ], + "solverParams": { + "Solver": "SSPRK22", + "dt": "0.01", + "dt_max": "1.0", + "dt_min": "1e-6", + "extra_params": "{}", + "iterations_max": "100", + "log": "true", + "simulation_duration": "duration", + "tolerance_fpi": "1e-6", + }, + "globalVariables": [ + {"id": "1", "name": "duration", "nameError": "false", "value": "50.0"}, + {"id": "2", "name": "a", "nameError": "false", "value": "2"}, + ], +} + + +@pytest.mark.parametrize( + "data", + [ + sample_data, + "test_files/constant_delay_scope.json", + "test_files/custom_nodes.json", + "test_files/same_label.json", + ], +) +def test_nested_templates(data): + """Test the nested template functionality.""" + + # Process the data + if not isinstance(data, dict): + # read from json file using path relative to current file + current_file_dir = Path(__file__).parent + file_path = current_file_dir / data + with open(file_path, "r") as f: + data = json.load(f) + + code = convert_graph_to_python(data) + print(code) + # execute the generated code and check for errors + try: + exec(code) + except Exception as e: + print(f"Error occurred: {e}") + assert False + + +def test_stepsource_delay_converted_to_tau(): + "Test that the delay parameter in a stepsource node is converted to tau in the generated code." + sample_data = { + "nodes": [ + { + "id": "1", + "type": "stepsource", + "data": { + "label": "input_signal", + "delay": "3.0", + "amplitude": "2.0", + }, + }, + ], + "edges": [], + "solverParams": { + "Solver": "SSPRK22", + "dt": "0.01", + "dt_max": "1.0", + "dt_min": "1e-6", + "extra_params": "{}", + "iterations_max": "100", + "log": "true", + "simulation_duration": "duration", + "tolerance_fpi": "1e-6", + }, + "globalVariables": [], + } + code = convert_graph_to_python(sample_data) + assert "tau=3.0" in code + + +if __name__ == "__main__": + test_nested_templates() diff --git a/test/test_files/constant_delay_scope.json b/test/test_files/constant_delay_scope.json new file mode 100644 index 0000000..f43feab --- /dev/null +++ b/test/test_files/constant_delay_scope.json @@ -0,0 +1,108 @@ +{ + "nodes": [ + { + "id": "0", + "type": "constant", + "position": { + "x": 200, + "y": 200 + }, + "data": { + "label": "constant 0", + "value": "1" + }, + "measured": { + "width": 205, + "height": 53 + } + }, + { + "id": "1", + "type": "delay", + "position": { + "x": 454, + "y": 280 + }, + "data": { + "label": "delay 1", + "tau": "2" + }, + "measured": { + "width": 96, + "height": 76 + }, + "selected": false, + "dragging": false + }, + { + "id": "2", + "type": "scope", + "position": { + "x": 657, + "y": 220 + }, + "data": { + "label": "scope 2" + }, + "measured": { + "width": 120, + "height": 140 + }, + "selected": false, + "dragging": false + } + ], + "edges": [ + { + "id": "e0-1", + "source": "0", + "target": "1", + "sourceHandle": null, + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + }, + { + "id": "e1-2", + "source": "1", + "target": "2", + "sourceHandle": null, + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + } + ], + "nodeCounter": 3, + "solverParams": { + "dt": "0.01", + "dt_min": "1e-6", + "dt_max": "1.0", + "Solver": "SSPRK22", + "tolerance_fpi": "1e-6", + "iterations_max": "100", + "log": "true", + "simulation_duration": "50.0", + "extra_params": "{}" + }, + "globalVariables": [] +} \ No newline at end of file diff --git a/test/test_files/custom_nodes.json b/test/test_files/custom_nodes.json new file mode 100644 index 0000000..48c8068 --- /dev/null +++ b/test/test_files/custom_nodes.json @@ -0,0 +1,77 @@ +{ + "nodes": [ + { + "id": "0", + "type": "process", + "position": { + "x": 567.3899893301207, + "y": 237.89224105198554 + }, + "data": { + "label": "process 0", + "residence_time": "1", + "source_term": "1", + "initial_value": "1" + }, + "measured": { + "width": 200, + "height": 120 + }, + "selected": false, + "dragging": false + }, + { + "id": "1", + "type": "splitter2", + "position": { + "x": 890.8731204009728, + "y": 445.4758224672106 + }, + "data": { + "label": "splitter2 1", + "f1": "0.5", + "f2": "0.5" + }, + "measured": { + "width": 120, + "height": 120 + }, + "selected": false, + "dragging": false + }, + { + "id": "2", + "type": "splitter3", + "position": { + "x": 1179.758987902621, + "y": 412.5260476393971 + }, + "data": { + "label": "splitter3 2", + "f1": "1/3", + "f2": "1/3", + "f3": "1/3" + }, + "measured": { + "width": 120, + "height": 120 + }, + "selected": true, + "dragging": false + } + ], + "edges": [], + "nodeCounter": 3, + "solverParams": { + "dt": "0.01", + "dt_min": "1e-6", + "dt_max": "1.0", + "Solver": "SSPRK22", + "tolerance_fpi": "1e-6", + "iterations_max": "100", + "log": "true", + "simulation_duration": "50.0", + "extra_params": "{}" + }, + "globalVariables": [] +} \ No newline at end of file diff --git a/test/test_files/same_label.json b/test/test_files/same_label.json new file mode 100644 index 0000000..8e048c9 --- /dev/null +++ b/test/test_files/same_label.json @@ -0,0 +1,149 @@ +{ + "nodes": [ + { + "id": "1", + "type": "constant", + "position": { + "x": 267.2529959675403, + "y": 181.3890542768167 + }, + "data": { + "label": "name_of_block", + "value": "1" + }, + "measured": { + "width": 206, + "height": 54 + }, + "selected": false, + "dragging": false + }, + { + "id": "2", + "type": "scope", + "position": { + "x": 921.1032555042848, + "y": 400.9936264496624 + }, + "data": { + "label": "scope 2" + }, + "measured": { + "width": 120, + "height": 140 + }, + "selected": false, + "dragging": false + }, + { + "id": "3", + "type": "splitter2", + "position": { + "x": 568, + "y": 270 + }, + "data": { + "label": "my_splitter", + "f1": "0.5", + "f2": "0.5" + }, + "measured": { + "width": 120, + "height": 120 + }, + "selected": false, + "dragging": false + } + ], + "edges": [ + { + "id": "e1-2", + "source": "1", + "target": "2", + "sourceHandle": null, + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + }, + { + "id": "e1-3", + "source": "1", + "target": "3", + "sourceHandle": null, + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + }, + { + "id": "e3-2-from_source2", + "source": "3", + "target": "2", + "sourceHandle": "source2", + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + }, + { + "id": "e3-2-from_source1", + "source": "3", + "target": "2", + "sourceHandle": "source1", + "targetHandle": null, + "type": "smoothstep", + "data": {}, + "style": { + "strokeWidth": 2, + "stroke": "#ECDFCC" + }, + "markerEnd": { + "type": "arrowclosed", + "width": 20, + "height": 20, + "color": "#ECDFCC" + } + } + ], + "nodeCounter": 5, + "solverParams": { + "dt": "0.01", + "dt_min": "1e-6", + "dt_max": "1.0", + "Solver": "SSPRK22", + "tolerance_fpi": "1e-6", + "iterations_max": "100", + "log": "true", + "simulation_duration": "50.0", + "extra_params": "{}" + }, + "globalVariables": [] +} \ No newline at end of file