diff --git a/dev-requirements.txt b/dev-requirements.txt index 36672d2..e9e9c50 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -281,7 +281,7 @@ tzdata==2025.2 # -c ..\..\requirements.txt # kombu # pandas -urllib3==2.5.0 +urllib3==2.6.3 # via # -c ..\..\requirements.txt # requests diff --git a/pyproject.toml b/pyproject.toml index 20e3cd4..5267083 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,8 @@ dependencies = [ "omotes-sdk-python ~= 4.3.2", "omotes-simulator-core==0.0.28", "pyesdl==25.7", - "pandas ~= 2.2.2" + "pandas ~= 2.2.2", + "kpi-calculator>=0.1.1", ] [project.optional-dependencies] @@ -72,6 +73,7 @@ starting_version = "0.0.1" [tool.pytest.ini_options] addopts = "--cov=simulator_worker --cov-report html --cov-report term-missing --cov-fail-under 20" testpaths = ["unit_test"] +python_files = ["test_*.py"] [tool.coverage.run] source = ["src"] diff --git a/requirements.txt b/requirements.txt index 2f0d178..2a6eaa3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,6 +46,8 @@ influxdb==5.3.2 # via omotes-simulator-core kombu==5.5.4 # via celery +kpi-calculator==0.1.1 + # via simulator-worker (..\..\pyproject.toml) lxml==6.0.2 # via pyecore msgpack==1.1.2 @@ -121,7 +123,7 @@ tzdata==2025.2 # via # kombu # pandas -urllib3==2.5.0 +urllib3==2.6.3 # via requests vine==5.1.0 # via diff --git a/src/simulator_worker/simulator_worker.py b/src/simulator_worker/simulator_worker.py index 502d314..e5a1ad6 100644 --- a/src/simulator_worker/simulator_worker.py +++ b/src/simulator_worker/simulator_worker.py @@ -21,6 +21,7 @@ from uuid import uuid4 import dotenv +from esdl.esdl_handler import EnergySystemHandler from omotes_sdk.internal.orchestrator_worker_events.esdl_messages import EsdlMessage from omotes_sdk.internal.worker.worker import UpdateProgressHandler, initialize_worker from omotes_sdk.types import ProtobufDict @@ -36,7 +37,11 @@ from omotes_simulator_core.infrastructure.simulation_manager import SimulationManager from omotes_simulator_core.infrastructure.utils import pyesdl_from_string -from simulator_worker.utils import add_datetime_index, create_output_esdl +from simulator_worker.utils import ( + add_datetime_index, + convert_simulator_to_kpi_format, + create_output_esdl, +) dotenv.load_dotenv() @@ -111,8 +116,66 @@ def simulator_worker_task( len(result_indexed.columns), result_indexed.shape, ) + + # ===== Create output ESDL with simulation results ===== output_esdl = create_output_esdl(input_esdl, result_indexed) + # ===== KPI Calculation ===== + logger.info("Calculating KPIs from simulation results...") + + try: + from kpicalculator import KpiManager + from kpicalculator.common.constants import DEFAULT_SYSTEM_LIFETIME_YEARS + + # Load ESDL for port-to-asset mapping + esh = pyesdl_from_string(input_esdl) + + # Convert simulator format to KPI calculator format + timeseries_by_asset = convert_simulator_to_kpi_format(result_indexed, esh.energy_system) + logger.info(f"Converted time series for {len(timeseries_by_asset)} assets") + + # Get system lifetime from workflow config + system_lifetime_value = workflow_config.get( + "system_lifetime", DEFAULT_SYSTEM_LIFETIME_YEARS + ) + if isinstance(system_lifetime_value, (int, float, str)): + system_lifetime = float(system_lifetime_value) + else: + system_lifetime = DEFAULT_SYSTEM_LIFETIME_YEARS + + # Use KpiManager directly to avoid duplicate ESDL parsing + kpi_manager = KpiManager() + kpi_manager.load_from_esdl(input_esdl, timeseries_dataframes=timeseries_by_asset) + + # Calculate KPIs + kpi_results = kpi_manager.calculate_all_kpis(system_lifetime=system_lifetime) + logger.info("KPI calculation completed successfully") + capex_value = kpi_results.get("costs", {}).get("capex", {}).get("All", 0) + logger.debug(f"KPI results: CAPEX={capex_value:.2f} EUR") + + # Add KPIs to output ESDL + output_esdl_with_kpis = kpi_manager.get_esdl_with_kpis( + kpi_results, level="system" # Can be "system", "area", or "asset" + ) + + # Serialize to string + esh_with_kpis = EnergySystemHandler() + esh_with_kpis.energy_system = output_esdl_with_kpis + output_esdl = esh_with_kpis.to_string() + logger.info("KPIs added to output ESDL successfully") + + except Exception as e: + logger.error( + ( + f"KPI calculation failed: {e}. " + f"Simulation will continue and return results without KPIs. " + f"Common causes: missing cost data in ESDL, invalid time series data, " + f"or kpi-calculator dependency issues. Check logs for details." + ) + ) + logger.debug(f"Stack trace: {traceback.format_exc()}") + # Continue without KPIs - don't fail the entire workflow + # Write output_esdl to file for debugging # with open(f"result_{simulation_id}.esdl", "w") as file: # file.writelines(output_esdl) diff --git a/src/simulator_worker/utils.py b/src/simulator_worker/utils.py index 0a865b9..c955bef 100644 --- a/src/simulator_worker/utils.py +++ b/src/simulator_worker/utils.py @@ -85,6 +85,67 @@ def add_datetime_index( return df +def convert_simulator_to_kpi_format( + simulator_result: pd.DataFrame, energy_system: esdl.EnergySystem +) -> Dict[str, pd.DataFrame]: + """Convert simulator DataFrame to KPI calculator format. + + Simulator format: columns are (port_id, property_name) tuples + KPI format: dict mapping asset_id to DataFrame with property columns + + Args: + simulator_result: DataFrame from simulator with (port_id, property) columns + energy_system: ESDL energy system for port-to-asset mapping + + Returns: + Dictionary mapping asset_id to property DataFrame + + Example: + Input columns: [('port1', 'temp'), ('port1', 'pressure'), ('port2', 'temp')] + Output: {'asset1': DataFrame with columns ['temp', 'pressure'], ...} + """ + asset_data: Dict[str, pd.DataFrame] = {} + skipped_ports = 0 + processed_ports = 0 + + for col in simulator_result.columns: + if not isinstance(col, tuple) or len(col) != 2: + continue # Skip non-property columns (like 'datetime') + + port_id, property_name = col + + # Find asset owning this port + try: + asset = find_asset_from_port(port_id, energy_system) + asset_id = asset.id + processed_ports += 1 + except ValueError: + logger.warning( + ( + f"Could not find asset for port {port_id} while processing property " + f"{property_name}. This may be due to a missing or incorrect port-to-asset " + f"mapping in the energy system. Please verify the ESDL structure." + ) + ) + skipped_ports += 1 + continue + + # Initialize asset DataFrame if not exists + if asset_id not in asset_data: + asset_data[asset_id] = pd.DataFrame(index=simulator_result.index) + logger.debug(f"Created time series container for asset {asset_id} ({asset.name})") + + # Add property column + asset_data[asset_id][property_name] = simulator_result[col] + + logger.info( + f"Converted simulator format to KPI format: {len(asset_data)} assets, " + f"{processed_ports} properties processed, {skipped_ports} skipped" + ) + + return asset_data + + def get_profileQuantityAndUnit(property_name: str) -> esdl.esdl.QuantityAndUnitType: """Get the profile quantity and unit. diff --git a/unit_test/test_kpi_integration.py b/unit_test/test_kpi_integration.py new file mode 100644 index 0000000..2736bf5 --- /dev/null +++ b/unit_test/test_kpi_integration.py @@ -0,0 +1,201 @@ +"""Test KPI integration with simulator-worker.""" + +import datetime +import os +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest + +# Check if simulator_worker.utils can be imported (requires omotes_simulator_core) +UTILS_AVAILABLE = False +try: + from simulator_worker.utils import convert_simulator_to_kpi_format + + UTILS_AVAILABLE = True +except ImportError: + convert_simulator_to_kpi_format = None # type: ignore[assignment, misc] + +# Check if full simulator worker can be imported +SIMULATOR_AVAILABLE = False +try: + from simulator_worker.simulator_worker import simulator_worker_task + from omotes_simulator_core.infrastructure.utils import pyesdl_from_string + + SIMULATOR_AVAILABLE = True +except ImportError: + simulator_worker_task = None # type: ignore[assignment, misc] + pyesdl_from_string = None # type: ignore[assignment, misc] + + +@pytest.mark.skipif(not UTILS_AVAILABLE, reason="omotes_simulator_core not installed") +class TestConvertSimulatorToKpiFormat: + """Functional tests for convert_simulator_to_kpi_format.""" + + def test_converts_tuple_columns_to_asset_dict(self) -> None: + """Test that DataFrame with tuple columns is converted to asset dict.""" + # Create mock simulator output with tuple columns (port_id, property) + simulator_df = pd.DataFrame( + { + ("port_1", "temperature"): [300.0, 301.0, 302.0], + ("port_1", "pressure"): [100000.0, 100100.0, 100200.0], + ("port_2", "temperature"): [290.0, 291.0, 292.0], + }, + index=pd.date_range("2024-01-01", periods=3, freq="h"), + ) + + # Create mock energy system with port-to-asset mapping + mock_asset_1 = MagicMock() + mock_asset_1.id = "asset_1" + mock_asset_1.name = "Test Asset 1" + + mock_asset_2 = MagicMock() + mock_asset_2.id = "asset_2" + mock_asset_2.name = "Test Asset 2" + + mock_energy_system = MagicMock() + + # Mock find_asset_from_port to return appropriate assets + with patch("simulator_worker.utils.find_asset_from_port") as mock_find: + mock_find.side_effect = lambda port_id, es: ( + mock_asset_1 if port_id == "port_1" else mock_asset_2 + ) + + result = convert_simulator_to_kpi_format(simulator_df, mock_energy_system) + + # Verify structure + assert isinstance(result, dict) + assert "asset_1" in result + assert "asset_2" in result + + # Verify asset_1 has both properties from port_1 + assert "temperature" in result["asset_1"].columns + assert "pressure" in result["asset_1"].columns + assert list(result["asset_1"]["temperature"]) == [300.0, 301.0, 302.0] + + # Verify asset_2 has property from port_2 + assert "temperature" in result["asset_2"].columns + assert list(result["asset_2"]["temperature"]) == [290.0, 291.0, 292.0] + + def test_skips_non_tuple_columns(self) -> None: + """Test that non-tuple columns are skipped.""" + simulator_df = pd.DataFrame( + { + "datetime": ["2024-01-01", "2024-01-02"], # Non-tuple column + ("port_1", "temperature"): [300.0, 301.0], + } + ) + + mock_asset = MagicMock() + mock_asset.id = "asset_1" + mock_asset.name = "Test Asset" + mock_energy_system = MagicMock() + + with patch("simulator_worker.utils.find_asset_from_port") as mock_find: + mock_find.return_value = mock_asset + result = convert_simulator_to_kpi_format(simulator_df, mock_energy_system) + + # Only tuple column should be processed + assert "asset_1" in result + assert "temperature" in result["asset_1"].columns + assert "datetime" not in result["asset_1"].columns + + def test_handles_missing_port_gracefully(self) -> None: + """Test that missing port-to-asset mapping logs warning and continues.""" + simulator_df = pd.DataFrame( + { + ("port_1", "temperature"): [300.0, 301.0], + ("unknown_port", "temperature"): [290.0, 291.0], + } + ) + + mock_asset = MagicMock() + mock_asset.id = "asset_1" + mock_asset.name = "Test Asset" + mock_energy_system = MagicMock() + + with patch("simulator_worker.utils.find_asset_from_port") as mock_find: + # port_1 found, unknown_port raises ValueError + def find_side_effect(port_id: str, es: MagicMock) -> MagicMock: + if port_id == "port_1": + return mock_asset + raise ValueError(f"Port {port_id} not found") + + mock_find.side_effect = find_side_effect + result = convert_simulator_to_kpi_format(simulator_df, mock_energy_system) + + # Should have asset_1 but not fail on unknown_port + assert "asset_1" in result + assert len(result) == 1 + + def test_empty_dataframe_returns_empty_dict(self) -> None: + """Test that empty DataFrame returns empty dict.""" + simulator_df = pd.DataFrame() + mock_energy_system = MagicMock() + + result = convert_simulator_to_kpi_format(simulator_df, mock_energy_system) + + assert result == {} + + +@pytest.mark.skipif( + not SIMULATOR_AVAILABLE or os.getenv("INFLUXDB_HOSTNAME") is None, + reason="simulator-worker or InfluxDB not available" +) +class TestKPIEndToEndIntegration: + """Integration tests for end-to-end KPI calculation in simulator workflow.""" + + def test_kpis_included_in_output_esdl(self) -> None: + """Test that KPIs are calculated and included in output ESDL.""" + # Load test ESDL + test_esdl_path = Path(__file__).parent.parent / "testdata" / "test1.esdl" + with open(test_esdl_path, "r") as f: + input_esdl = f.read() + + # Configure workflow - use Unix timestamps (seconds since epoch) + start_time = datetime.datetime(2019, 1, 1, 0, 0, tzinfo=datetime.timezone.utc) + end_time = datetime.datetime(2019, 1, 1, 2, 0, tzinfo=datetime.timezone.utc) + + workflow_config: dict[str, list[float] | float | str | bool] = { + "timestep": 3600.0, # 1 hour in seconds + "start_time": start_time.timestamp(), # Unix timestamp + "end_time": end_time.timestamp(), # Unix timestamp + "system_lifetime": 30.0, + } + + # Mock progress handler + mock_progress = MagicMock() + + # Run simulation with KPI calculation + output_esdl, _ = simulator_worker_task( + input_esdl, workflow_config, mock_progress, "simulator" + ) + + # Verify output is not None + assert output_esdl is not None + assert len(output_esdl) > 0 + + # Parse output ESDL + esh = pyesdl_from_string(output_esdl) + energy_system = esh.energy_system + + # Check if KPIs are present in the ESDL + # KPIs should be in the energy system's KPIs collection + assert hasattr(energy_system, "KPIs"), "Energy system should have KPIs attribute" + kpis = energy_system.KPIs + + # KPIs must be present - if None, the integration failed + assert kpis is not None, "KPIs should be calculated and present in output ESDL" + + # Verify KPI structure + assert hasattr(kpis, "kpi"), "KPIs should have kpi collection" + kpi_list = kpis.kpi if hasattr(kpis.kpi, "__iter__") else [kpis.kpi] + + # Verify at least some KPIs were added + assert len(kpi_list) > 0, "At least one KPI should be present in output ESDL" + + # Verify KPI structure (should have name and value) + for kpi in kpi_list: + assert hasattr(kpi, "name"), "KPI should have a name" + assert hasattr(kpi, "value"), "KPI should have a value"