Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ tzdata==2025.2
# -c ..\..\requirements.txt
# kombu
# pandas
urllib3==2.5.0
urllib3==2.6.3
# via
# -c ..\..\requirements.txt
# requests
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ dependencies = [
"omotes-sdk-python ~= 4.3.2",
"omotes-simulator-core==0.0.28",
"pyesdl==25.7",
"pandas ~= 2.2.2"
"pandas ~= 2.2.2",
"kpi-calculator>=0.1.1",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -72,6 +73,7 @@ starting_version = "0.0.1"
[tool.pytest.ini_options]
addopts = "--cov=simulator_worker --cov-report html --cov-report term-missing --cov-fail-under 20"
testpaths = ["unit_test"]
python_files = ["test_*.py"]

[tool.coverage.run]
source = ["src"]
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ influxdb==5.3.2
# via omotes-simulator-core
kombu==5.5.4
# via celery
kpi-calculator==0.1.1
# via simulator-worker (..\..\pyproject.toml)
lxml==6.0.2
# via pyecore
msgpack==1.1.2
Expand Down Expand Up @@ -121,7 +123,7 @@ tzdata==2025.2
# via
# kombu
# pandas
urllib3==2.5.0
urllib3==2.6.3
# via requests
vine==5.1.0
# via
Expand Down
65 changes: 64 additions & 1 deletion src/simulator_worker/simulator_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from uuid import uuid4

import dotenv
from esdl.esdl_handler import EnergySystemHandler
from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage
from omotes_sdk.internal.worker.worker import UpdateProgressHandler, initialize_worker
from omotes_sdk.types import ProtobufDict
Expand All @@ -36,7 +37,11 @@
from omotes_simulator_core.infrastructure.simulation_manager import SimulationManager
from omotes_simulator_core.infrastructure.utils import pyesdl_from_string

from simulator_worker.utils import add_datetime_index, create_output_esdl
from simulator_worker.utils import (
add_datetime_index,
convert_simulator_to_kpi_format,
create_output_esdl,
)

dotenv.load_dotenv()

Expand Down Expand Up @@ -111,8 +116,66 @@ def simulator_worker_task(
len(result_indexed.columns),
result_indexed.shape,
)

# ===== Create output ESDL with simulation results =====
output_esdl = create_output_esdl(input_esdl, result_indexed)

# ===== KPI Calculation =====
logger.info("Calculating KPIs from simulation results...")

try:
from kpicalculator import KpiManager
from kpicalculator.common.constants import DEFAULT_SYSTEM_LIFETIME_YEARS

# Load ESDL for port-to-asset mapping
esh = pyesdl_from_string(input_esdl)

# Convert simulator format to KPI calculator format
timeseries_by_asset = convert_simulator_to_kpi_format(result_indexed, esh.energy_system)
logger.info(f"Converted time series for {len(timeseries_by_asset)} assets")

# Get system lifetime from workflow config
system_lifetime_value = workflow_config.get(
"system_lifetime", DEFAULT_SYSTEM_LIFETIME_YEARS
)
if isinstance(system_lifetime_value, (int, float, str)):
system_lifetime = float(system_lifetime_value)
else:
system_lifetime = DEFAULT_SYSTEM_LIFETIME_YEARS

# Use KpiManager directly to avoid duplicate ESDL parsing
kpi_manager = KpiManager()
kpi_manager.load_from_esdl(input_esdl, timeseries_dataframes=timeseries_by_asset)

# Calculate KPIs
kpi_results = kpi_manager.calculate_all_kpis(system_lifetime=system_lifetime)
logger.info("KPI calculation completed successfully")
capex_value = kpi_results.get("costs", {}).get("capex", {}).get("All", 0)
logger.debug(f"KPI results: CAPEX={capex_value:.2f} EUR")

# Add KPIs to output ESDL
output_esdl_with_kpis = kpi_manager.get_esdl_with_kpis(
kpi_results, level="system" # Can be "system", "area", or "asset"
)

# Serialize to string
esh_with_kpis = EnergySystemHandler()
esh_with_kpis.energy_system = output_esdl_with_kpis
output_esdl = esh_with_kpis.to_string()
logger.info("KPIs added to output ESDL successfully")

except Exception as e:
logger.error(
(
f"KPI calculation failed: {e}. "
f"Simulation will continue and return results without KPIs. "
f"Common causes: missing cost data in ESDL, invalid time series data, "
f"or kpi-calculator dependency issues. Check logs for details."
)
)
logger.debug(f"Stack trace: {traceback.format_exc()}")
# Continue without KPIs - don't fail the entire workflow

# Write output_esdl to file for debugging
# with open(f"result_{simulation_id}.esdl", "w") as file:
# file.writelines(output_esdl)
Expand Down
61 changes: 61 additions & 0 deletions src/simulator_worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,67 @@ def add_datetime_index(
return df


def convert_simulator_to_kpi_format(
simulator_result: pd.DataFrame, energy_system: esdl.EnergySystem
) -> Dict[str, pd.DataFrame]:
"""Convert simulator DataFrame to KPI calculator format.

Simulator format: columns are (port_id, property_name) tuples
KPI format: dict mapping asset_id to DataFrame with property columns

Args:
simulator_result: DataFrame from simulator with (port_id, property) columns
energy_system: ESDL energy system for port-to-asset mapping

Returns:
Dictionary mapping asset_id to property DataFrame

Example:
Input columns: [('port1', 'temp'), ('port1', 'pressure'), ('port2', 'temp')]
Output: {'asset1': DataFrame with columns ['temp', 'pressure'], ...}
"""
asset_data: Dict[str, pd.DataFrame] = {}
skipped_ports = 0
processed_ports = 0

for col in simulator_result.columns:
if not isinstance(col, tuple) or len(col) != 2:
continue # Skip non-property columns (like 'datetime')

port_id, property_name = col

# Find asset owning this port
try:
asset = find_asset_from_port(port_id, energy_system)
asset_id = asset.id
processed_ports += 1
except ValueError:
logger.warning(
(
f"Could not find asset for port {port_id} while processing property "
f"{property_name}. This may be due to a missing or incorrect port-to-asset "
f"mapping in the energy system. Please verify the ESDL structure."
)
)
skipped_ports += 1
continue

# Initialize asset DataFrame if not exists
if asset_id not in asset_data:
asset_data[asset_id] = pd.DataFrame(index=simulator_result.index)
logger.debug(f"Created time series container for asset {asset_id} ({asset.name})")

# Add property column
asset_data[asset_id][property_name] = simulator_result[col]

logger.info(
f"Converted simulator format to KPI format: {len(asset_data)} assets, "
f"{processed_ports} properties processed, {skipped_ports} skipped"
)

return asset_data


def get_profileQuantityAndUnit(property_name: str) -> esdl.esdl.QuantityAndUnitType:
"""Get the profile quantity and unit.

Expand Down
Loading