diff --git a/pypsa2smspp/transformation.py b/pypsa2smspp/transformation.py index 64bdca0..bb48877 100644 --- a/pypsa2smspp/transformation.py +++ b/pypsa2smspp/transformation.py @@ -13,7 +13,7 @@ import xarray as xr import os from pypsa2smspp.transformation_config import TransformationConfig -from pysmspp import SMSNetwork, SMSFileType, Variable, Block, SMSConfig +from pysmspp import SMSNetwork, SMSFileType, Attribute, Dimension, Variable, Block, SMSConfig from pypsa2smspp import logger from copy import deepcopy import pysmspp @@ -899,6 +899,15 @@ def convert_to_blocks(self): sn = SMSNetwork(file_type=SMSFileType.eBlockFile) master = sn index_id = 0 + + if False: # check if TSSB problem + name_id = 'TwoStageStochasticBlock' + sn = self.convert_to_twostagestochasticblock(master, index_id, name_id) + + # InnerBlock for UC is inside StochasticBlock + master = sn.blocks[name_id] + name_id = 'InnerBlock' + index_id += 1 # ----------------- # Check if investment problem @@ -925,6 +934,277 @@ def convert_to_blocks(self): # Save final self.sms_network = sn return sn + + + def build_tssb_scenario_set(self): + """ + Build a DiscreteScenarioSet for a TSSB (two-stage stochastic block) structure. + This helper loads a benchmark network from ``fp_tssb`` and extracts the scenario + data from the DiscreteScenarioSet block to create a new in-memory scenario set. + """ + # TODO: this shall be completely revised to create the scenario set from the data of the StochasticNetwork, not from a benchmark file. The current implementation is just a placeholder. Demand shall be aggregated by node and time and scenarios shall be created from the aggregated demand profiles. pool_weights shall be created from the probabilities of the scenarios. + sn_benchmark = SMSNetwork(fp_tssb) + + pool_weights = ( + sn_benchmark.blocks["Block_0"] + .blocks["DiscreteScenarioSet"] + .variables["PoolWeights"] + .data + ) + + scenarios = ( + sn_benchmark.blocks["Block_0"] + .blocks["DiscreteScenarioSet"] + .variables["Scenarios"] + .data + ) + + ScenarioSize = scenarios.shape[1] + NumberScenarios = scenarios.shape[0] + + dds_block = Block( + block_type="DiscreteScenarioSet", + ScenarioSize=ScenarioSize, + NumberScenarios=NumberScenarios, + Scenarios=Variable( + "Scenarios", + "double", + ("NumberScenarios", "ScenarioSize"), + scenarios, + ), + PoolWeights=Variable( + "PoolWeights", + "double", + ("NumberScenarios",), + pool_weights, + ), + ) + + return dds_block + + + def build_tssb_abstract_path(self): + """ + Build an AbstractPath for a TSSB (two-stage stochastic block) structure. + """ + # TODO: extract this from unit types depending on expandability, accounting for x_battery/x_converter. For ThermalUnitBlocks, x_thermal is mapped and similarly for the other units + variables = [ + "x_thermal", + "x_intermittent", + "x_battery", + "x_converter", + "x_intermittent", + ] + locations = ["0", "1", "2", "2", "3"] + + path_group_indices = np.array( + [str(item) for pair in zip(locations, variables) for item in pair], + dtype="object", + ) + + path_node_types = np.tile(["B", "V"], len(variables)) + + TotalLength = len(variables) * 2 + PathDim = len(variables) # for AbstractPath + + def mask_by_node_type(arr, path_node_types): + return np.ma.masked_array(arr, mask=path_node_types == "B") + + path_element_indices = mask_by_node_type(np.zeros(TotalLength), path_node_types) + path_range_indices = mask_by_node_type(np.ones(TotalLength), path_node_types) + + abstract_path_block = Block( + PathDim=Dimension("PathDim", PathDim), + TotalLength=Dimension("TotalLength", TotalLength), + PathElementIndices=Variable( + "PathElementIndices", + "u4", + ("TotalLength",), + path_element_indices, # important to have missing values! only ones does not work + ), + PathGroupIndices=Variable( + "PathGroupIndices", + "str", + ("TotalLength",), + np.array( + path_group_indices, + dtype="object", + ), + ), + PathNodeTypes=Variable( + "PathNodeTypes", + "c", + ("TotalLength",), + path_node_types, + ), + PathRangeIndices=Variable( + "PathRangeIndices", + "u4", + ("TotalLength",), + path_range_indices, # important to have missing values! only ones does not work + ), + PathStart=Variable( + "PathStart", + "u4", + ("PathDim",), + np.arange(0, TotalLength, 2, dtype=np.uint32), # ignored missing values + ), + ) + + return abstract_path_block + + + def build_tssb_stochastic_block(self, TimeHorizon=24, NumberNodes=2, block=None): + """ + Build a StochasticBlock for a TSSB (two-stage stochastic block) structure. + """ + # TODO: this requires some minimal adaptations to properly link the required inputs with the input network. Moreover, the additional link is to properly link "block" that it will become the ucblock populated in the following steps. The current implementation is just a placeholder with dummy values. + NumberDataMappings = 1 # only demand suppored for now + + set_size_demand = [0, 0] + set_elements_demand = [0, TimeHorizon * NumberNodes, 0, TimeHorizon * NumberNodes] + function_name_demand = ["UCBlock::set_active_power_demand"] + + caller = ["B"] # The caller is a Block + caller_type = ["D"] + block_location = [0] # U CBlock + + set_size = np.array(set_size_demand, dtype=np.uint32) + set_elements = np.array(set_elements_demand, dtype=np.uint32) + + NumberDataMappings = set_size.shape[0] // 2 + SetSize_dim = set_size.shape[0] + SetElements_dim = set_elements.shape[0] + + if block is None: + block = Block( + id=Attribute("id", "0"), + filename=Attribute("filename", "EC_CO_Test_TUB.nc4[0]"), + ) + + stochastic_block = Block( + block_type="StochasticBlock", + NumberDataMappings=NumberDataMappings, + SetSize_dim=SetSize_dim, + SetElements_dim=SetElements_dim, + FunctionName=Variable( + "FunctionName", + "str", + ("NumberDataMappings",), + np.repeat( + np.array(function_name_demand, dtype="object"), + NumberDataMappings, + ), + ), + Caller=Variable( + "Caller", + "c", + ("NumberDataMappings",), + np.array(caller, dtype="object"), + ), + DataType=Variable( + "DataType", + "c", + ("NumberDataMappings",), + np.array(caller_type, dtype="object"), + ), + SetSize=Variable( + "SetSize", + "u4", + ("SetSize_dim",), + set_size, + ), + SetElements=Variable( + "SetElements", + "u4", + ("SetElements_dim",), + set_elements, + ), + AbstractPath=Block( + PathDim=Dimension("PathDim", len(block_location)), + TotalLength=Dimension("TotalLength", 0), + PathGroupIndices=Variable( + "PathGroupIndices", + "str", + ("TotalLength",), + np.array([], dtype="object"), + ), + PathElementIndices=Variable( + "PathElementIndices", + "u4", + ("TotalLength",), + [], # ignored missing values (masked array) + ), + PathRangeIndices=Variable( + "PathRangeIndices", + "u4", + ("TotalLength",), + [], # ignored missing values + ), + PathStart=Variable( + "PathStart", + "u4", + ("PathDim",), + np.array(block_location, dtype=np.uint32), + ), + PathNodeTypes=Variable("PathNodeTypes", "c", ("TotalLength",), []), + ), + Block=block, + ) + + return stochastic_block + + + def convert_to_twostagestochasticblock(self, master, index_id, name_id): + """ + Adds a TwoStageStochasticBlock to the SMSNetwork, which is used for stochastic problems. + + Parameters + ---------- + master : SMSNetwork + The root SMSNetwork object + index_id : int + ID for block naming + name_id : str + Name for the TwoStageStochasticBlock + + Returns + ------- + SMSNetwork + The updated SMSNetwork with the TwoStageStochasticBlock added. + """ + + dds = self.build_tssb_scenario_set() + abstract_path = self.build_tssb_abstract_path() + stochastic_block = self.build_tssb_stochastic_block() + master.add( + "TwoStageStochasticBlock", + "Block_0", + id="0", + NumberScenarios=Dimension( + "NumberScenarios", dds.dimensions["NumberScenarios"].value + ), + DiscreteScenarioSet=dds, + StaticAbstractPath=abstract_path, + StochasticBlock=stochastic_block, + ) + + # ----------------- + # TwoStageStochasticBlock dimensions (currently empty, but can be extended) + # ----------------- + kwargs = self.dimensions.get('TwoStageStochasticBlock', {}) + + # ----------------- + # Add TwoStageStochasticBlock itself + # ----------------- + master.add( + "TwoStageStochasticBlock", + name_id, + id=f"{index_id}", + **kwargs + ) + + return master def convert_to_investmentblock(self, master, index_id, name_id): """ diff --git a/test/conftest.py b/test/conftest.py index bf8b953..7809b6f 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -64,3 +64,6 @@ def get_test_cases(inputs_dir = HERE / "configs" / "data" / "test"): test_cases = get_test_cases() +def get_network(fp: Path | str) -> str: + """Helper to load a network from a .nc file.""" + return str(Path(HERE) / "networks" / fp) diff --git a/test/networks/pypsa_stoch_load.nc b/test/networks/pypsa_stoch_load.nc new file mode 100644 index 0000000..b215352 Binary files /dev/null and b/test/networks/pypsa_stoch_load.nc differ diff --git a/test/test_stochastic.py b/test/test_stochastic.py new file mode 100644 index 0000000..0b66861 --- /dev/null +++ b/test/test_stochastic.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +from pathlib import Path +import pytest +import pypsa + +from conftest import ( + create_test_config, + safe_remove, + test_cases, + REL_TOL, + ABS_TOL, + OUT_TEST, + get_network, +) + +from network_definition import NetworkDefinition +from pypsa2smspp.transformation import Transformation + +from pypsa2smspp.network_correction import ( + clean_ciclicity_storage, + add_slack_unit, +) + +def run_tssb(fp) -> None: + """ + UCBlock regression test: + - build network from Excel + - solve reference with PyPSA + - run full SMS++ pipeline in one call (config-driven) + - compare objectives + """ + + n = pypsa.Network(fp) + + # Work on a copy for reference solve + network = n.copy() + + # ---- (1) PyPSA optimization (reference) ---- + network.optimize(solver_name="highs") + + try: + obj_pypsa = float(network.objective + getattr(network, "objective_constant", 0.0)) + except Exception: + obj_pypsa = float(network.objective) + + # ---- (2) SMS++ pipeline (ONE CALL) ---- + transformation = Transformation() + n = transformation.run(network, verbose=False) + + obj_smspp = float(transformation.result.objective_value) + + # If you want to ensure UCBlock is used, either: + # (a) enforce run.mode: ucblock in the YAML used here, OR + # (b) if you expose transformation.last_mode_used, check it: + # + # if hasattr(transformation, "last_mode_used") and transformation.last_mode_used != "ucblock": + # pytest.skip(f"Not UCBlock mode for this case (mode={transformation.last_mode_used}).") + + assert obj_smspp == pytest.approx(obj_pypsa, rel=REL_TOL, abs=ABS_TOL) + + +def test_stochastic_network(fp=get_network("pypsa_stoch_load.nc")): + """ + Uses a dedicated YAML config that forces UCBlock mode (recommended). + """ + run_tssb(fp) + + +if __name__ == "__main__": + test_stochastic_network()