Skip to content
22 changes: 21 additions & 1 deletion beeflow/client/bee_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,9 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id),
output_dir: pathlib.Path = typer.Argument(...,
help='Path to the where the dag output will be'),
no_dag_dir: bool = typer.Option(False, '--no-dag-dir',
help='do not make a subdirectory within ouput_dir for the dags')):
help='do not make a subdirectory within ouput_dir for the dags'),
graphmls_dir: pathlib.Path = typer.Option(None, '--graphmls_dir',
help='Directory of Graphmls to convert into DAGs.')):
"""Export a DAG of the workflow to a GraphML file."""
output_dir = output_dir.resolve()
# Make sure output_dir is an absolute path and exists
Expand All @@ -730,6 +732,24 @@ def dag(wf_id: str = typer.Argument(..., callback=match_short_id),

# output_dir must be a string
output_dir = str(output_dir)

# Convert existing graphmls to DAGs if graphmls_dir was given
if graphmls_dir:
# Make sure graphmls_dir is an absolute path and exists
graphmls_dir = os.path.expanduser(graphmls_dir)
graphmls_dir = os.path.abspath(graphmls_dir)
if not os.path.exists(graphmls_dir):
error_exit(f'Path for graphmls directory "{graphmls_dir}" doesn\'t exist')
graphmls_dir = str(graphmls_dir)

# Convert
resp = wf_utils.convert_to_dag(wf_id, output_dir, graphmls_dir, no_dag_dir)
if resp:
error_exit(resp)
else:
typer.secho(f"DAG for workflow {_short_id(wf_id)} has been exported successfully.",
fg=typer.colors.GREEN)
return
# Check if the workflow is archived
wf_status = get_wf_status(wf_id)
if wf_status == 'Archived':
Expand Down
74 changes: 55 additions & 19 deletions beeflow/common/gdb/generate_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
import shutil
import subprocess
import xml.etree.ElementTree
import networkx as nx
import graphviz

Expand All @@ -10,28 +12,14 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, workflow_dir=None)
"""Generate a PNG of a workflow graph from a GraphML file."""
short_id = wf_id[:6]
graphml_path = graphmls_dir + "/" + short_id + ".graphml"
# Render png data
png_data = render_png_data(graphml_path)

if no_dag_dir:
dags_dir = output_dir
else:
dags_dir = output_dir + "/" + short_id + "-dags"
os.makedirs(dags_dir, exist_ok=True)

# Back up DAG and save
dags_dir = output_dir if no_dag_dir else os.path.join(output_dir, f"{short_id}-dags")
os.makedirs(dags_dir, exist_ok=True)
output_path = dags_dir + "/" + short_id + ".png"
backup_dag(output_path, dags_dir, short_id)

# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)

# Initialize Graphviz graph
dot = graphviz.Digraph(comment='Hierarchical Graph')

# Add nodes and edges using helper functions
add_nodes_to_dot(graph, dot)
add_edges_to_dot(graph, dot)

# Render the graph and save as PNG
png_data = dot.pipe(format='png')
save_png(output_path, png_data)

if workflow_dir:
Expand All @@ -43,6 +31,37 @@ def generate_viz(wf_id, output_dir, graphmls_dir, no_dag_dir, workflow_dir=None)
save_png(archive_dag_path, png_data)


def generate_all_viz(wf_id, output_dir, graphmls_dir, no_dag_dir):
"Create DAGs from an exisiting collection of GraphMLs."
short_id = wf_id[:6]
dags_dir = output_dir if no_dag_dir else os.path.join(output_dir, f"{short_id}-dags")

msgs = []
first = True
for filename in os.listdir(graphmls_dir):
if filename.endswith('.graphml'):
name_without_ext = os.path.splitext(filename)[0]
output_path = dags_dir + "/" + name_without_ext + ".png"
graphml_path = os.path.join(graphmls_dir, filename)

try:
png_data = render_png_data(graphml_path)
if first:
os.makedirs(dags_dir, exist_ok=True)
first = False
save_png(output_path, png_data)

except(
nx.NetworkXError,
xml.etree.ElementTree.ParseError,
subprocess.CalledProcessError
) as exc:
err_msg = f'Error while generating visualization for {graphml_path}: {exc}'
msgs.append(err_msg)

return "\n".join(map(str, msgs))


def backup_dag(path, dags_dir, short_id):
"""Backup DAGs."""
if os.path.exists(path):
Expand All @@ -54,6 +73,23 @@ def backup_dag(path, dags_dir, short_id):
shutil.copy(path, backup_path)


def render_png_data(graphml_path):
"""Read a GraphML file, build the Graphviz Digraph, and return PNG bytes."""
# Load the GraphML file using NetworkX
graph = nx.read_graphml(graphml_path)

# Initialize Graphviz graph
dot = graphviz.Digraph(comment='Hierarchical Graph')

# Add nodes and edges using helper functions
add_nodes_to_dot(graph, dot)
add_edges_to_dot(graph, dot)

# Render the graph
png_data = dot.pipe(format='png')
return png_data


def add_nodes_to_dot(graph, dot):
"""Add nodes from the graph to the Graphviz object with labels and colors."""
label_to_color = {
Expand Down
20 changes: 19 additions & 1 deletion beeflow/wf_manager/resources/wf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from beeflow.common import log as bee_logging
from beeflow.common.config_driver import BeeConfig as bc
from beeflow.common.gdb import neo4j_driver
from beeflow.common.gdb.generate_graph import generate_viz
from beeflow.common.gdb.generate_graph import generate_viz, generate_all_viz
from beeflow.common.gdb.graphml_key_updater import update_graphml
from beeflow.common.wf_interface import WorkflowInterface
from beeflow.common.connection import Connection
Expand Down Expand Up @@ -304,6 +304,24 @@ def export_dag(wf_id, output_dir, graphmls_dir, no_dag_dir, workflow_dir=None):
return dot_avail


def convert_to_dag(wf_id, output_dir, graphmls_dir, no_dag_dir):
"""
Convert a directory of graphmls into DAGs.

This function is used to turn graphmls that were generated
on a system without graphviz into DAGs on a system with graphviz.

Returns:
str or None: Returns an error message if Graphviz is not available or an
exception occurs; otherwise returns None on success.
"""
if not shutil.which("dot"):
msg = 'Unable to convert graphmls to DAGs. Graphviz is not available.'
else:
msg = generate_all_viz(wf_id, output_dir, graphmls_dir, no_dag_dir)
return msg


def start_workflow(wf_id):
"""Attempt to start the workflow, returning True if successful."""
db = connect_db(wfm_db, get_db_path())
Expand Down
3 changes: 2 additions & 1 deletion docs/sphinx/commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ Arguments:
Arguments:
WF_ID [required]

``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to $OUTPUT_DIR/$WD_ID-dags by default. If the ``no-dag-dir`` flag is specified when the dag command is run, the DAG will be exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The most recent version is $WF_ID.png and the others are $WD_ID_v1.png, $WF_ID_v2.png ... where v1 is the oldest. See :ref:`workflow-visualization` for more information.
``beeflow dag``: Export a directed acyclic graph (DAG) of a submitted workflow. This command can be run at any point of the workflow. To see the DAG of a workflow before it runs, submit the workflow with the ``--no-start`` flag and then use the dag command. The DAGs are exported to $OUTPUT_DIR/$WD_ID-dags by default. If the ``no-dag-dir`` flag is specified when the dag command is run, the DAG will be exported to $OUTPUT_DIR. The dag command makes multiple versions of the DAGs. The most recent version is $WF_ID.png and the others are $WD_ID_v1.png, $WF_ID_v2.png ... where v1 is the oldest. If the ``graphmls_dir`` flag is specified when the dag command is run, BEE will convert the Graphmls in the specified directory into DAGs. See :ref:`workflow-visualization` for more information.

Arguments:
- WF_ID [required]
- OUTPUT_DIR, Directory for the output [required]

Options:
``no-dag-dir``: Do not make a subdirectory within the output_dir for the DAGs.
``graphmls_dir``: Directory of Graphmls to convert into DAGs.

Generating and Managing Configuration Files
===========================================
Expand Down
4 changes: 4 additions & 0 deletions docs/sphinx/visualization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ most recent version is $WF_ID.png and the others are $WD_ID_v1.png,
$WF_ID_v2.png ... where v1 is the oldest. The graphmls used to make the DAGs are saved
in the workflow archive and are saved with their version number. These graphmls can
be useful for debugging when there are errors creating the DAGs.
If the --graphmls_dir flag is specified when the dag command is run, BEE will convert
the GraphMLs in the specified directory into DAGs. This is especially helpful
when transferring workflows from a system without Graphviz (BEE will only generate
GraphMLs in that case) to a system with Graphviz.

Example DAG
===========
Expand Down