diff --git a/test/conftest.py b/test/conftest.py index 33aa9cf..a602f64 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -364,44 +364,14 @@ def build_base_investmentblock(b, innerblock=None): return b -def build_tssb_block(fp_tssb): +def build_tssb_scenario_set(fp_tssb): """ - Build a test SMSNetwork containing a TSSB (two-stage stochastic block) structure. - This helper loads a benchmark network from ``fp_tssb`` and uses it as a template - to construct a new in-memory network in block-file mode. The new network - contains a top-level block with a discrete scenario set, one or more abstract - paths, and a stochastic block whose dimensions (scenario size, number of - scenarios, and data mappings) and key data arrays are consistent with those - of the benchmark network. - - Parameters - ---------- - fp_tssb : str or os.PathLike - Path to an existing TSSB network. - - Returns - ------- - SMSNetwork - A newly created :class:`pysmspp.SMSNetwork` instance with - ``file_type=SMSFileType.eBlockFile``. + Build a DiscreteScenarioSet for a TSSB (two-stage stochastic block) structure. + This helper loads a benchmark network from ``fp_tssb`` and extracts the scenario + data from the DiscreteScenarioSet block to create a new in-memory scenario set. """ sn_benchmark = SMSNetwork(fp_tssb) - sn = SMSNetwork(file_type=SMSFileType.eBlockFile) - - ScenarioSize = 48 # For DiscreteScenarioSet - NumberScenarios = 9 # For DiscreteScenarioSet - NumberDataMappings = NumberScenarios # for StochasticBlock - - PathDim = 5 # for AbstractPath - TotalLength = 10 # for AbstractPath - - SizeDim_perScenario = 2 # for StochBlock - SizeElements_perScenario = 4 # for StochBlock - PathDim2 = 9 # for AbstractPath in StochasticBlock - - # Variables of DiscreteScenarioSet - pool_weights = ( sn_benchmark.blocks["Block_0"] .blocks["DiscreteScenarioSet"] @@ -416,9 +386,53 @@ def build_tssb_block(fp_tssb): .data ) - # Variables of StaticAbstractPath + ScenarioSize = scenarios.shape[1] + NumberScenarios = scenarios.shape[0] + + dds_block = Block( + block_type="DiscreteScenarioSet", + ScenarioSize=ScenarioSize, + NumberScenarios=NumberScenarios, + Scenarios=Variable( + "Scenarios", + "double", + ("NumberScenarios", "ScenarioSize"), + scenarios, + ), + PoolWeights=Variable( + "PoolWeights", + "double", + ("NumberScenarios",), + pool_weights, + ), + ) + + return dds_block + + +def build_tssb_abstract_path(): + """ + Build an AbstractPath for a TSSB (two-stage stochastic block) structure. + """ + # TODO: extract this from unit types depending on expandability, accounting for x_battery/x_converter + variables = [ + "x_thermal", + "x_intermittent", + "x_battery", + "x_converter", + "x_intermittent", + ] + locations = ["0", "1", "2", "2", "3"] + + path_group_indices = np.array( + [str(item) for pair in zip(locations, variables) for item in pair], + dtype="object", + ) + + path_node_types = np.tile(["B", "V"], len(variables)) - path_node_types = np.tile(["B", "V"], TotalLength // 2) + TotalLength = len(variables) * 2 + PathDim = len(variables) # for AbstractPath def mask_by_node_type(arr, path_node_types): return np.ma.masked_array(arr, mask=path_node_types == "B") @@ -426,164 +440,183 @@ def mask_by_node_type(arr, path_node_types): path_element_indices = mask_by_node_type(np.zeros(TotalLength), path_node_types) path_range_indices = mask_by_node_type(np.ones(TotalLength), path_node_types) - # Variables of StochasticBlock - - set_size = ( - sn_benchmark.blocks["Block_0"] - .blocks["StochasticBlock"] - .variables["SetSize"] - .data + abstract_path_block = Block( + PathDim=Dimension("PathDim", PathDim), + TotalLength=Dimension("TotalLength", TotalLength), + PathElementIndices=Variable( + "PathElementIndices", + "u4", + ("TotalLength",), + path_element_indices, # important to have missing values! only ones does not work + ), + PathGroupIndices=Variable( + "PathGroupIndices", + "str", + ("TotalLength",), + np.array( + path_group_indices, + dtype="object", + ), + ), + PathNodeTypes=Variable( + "PathNodeTypes", + "c", + ("TotalLength",), + path_node_types, + ), + PathRangeIndices=Variable( + "PathRangeIndices", + "u4", + ("TotalLength",), + path_range_indices, # important to have missing values! only ones does not work + ), + PathStart=Variable( + "PathStart", + "u4", + ("PathDim",), + np.arange(0, TotalLength, 2, dtype=np.uint32), # ignored missing values + ), ) - set_elements = ( - sn_benchmark.blocks["Block_0"] - .blocks["StochasticBlock"] - .variables["SetElements"] - .data - ) + return abstract_path_block - sn.add( - "TwoStageStochasticBlock", - "Block_0", - id="0", - NumberScenarios=NumberScenarios, - DiscreteScenarioSet=Block( - block_type="DiscreteScenarioSet", - ScenarioSize=ScenarioSize, - NumberScenarios=NumberScenarios, - Scenarios=Variable( - "Scenarios", - "double", - ("NumberScenarios", "ScenarioSize"), - scenarios, - ), - PoolWeights=Variable( - "PoolWeights", - "double", - ("NumberScenarios",), - pool_weights, + +def build_tssb_stochastic_block(TimeHorizon=24, NumberNodes=2, block=None): + """ + Build a StochasticBlock for a TSSB (two-stage stochastic block) structure. + """ + NumberDataMappings = 1 # only demand suppored for now + + set_size_demand = [0, 0] + set_elements_demand = [0, TimeHorizon * NumberNodes, 0, TimeHorizon * NumberNodes] + function_name_demand = ["UCBlock::set_active_power_demand"] + + caller = ["B"] # The caller is a Block + caller_type = ["D"] + block_location = [0] # U CBlock + + set_size = np.array(set_size_demand, dtype=np.uint32) + set_elements = np.array(set_elements_demand, dtype=np.uint32) + + NumberDataMappings = set_size.shape[0] // 2 + SetSize_dim = set_size.shape[0] + SetElements_dim = set_elements.shape[0] + + if block is None: + block = Block( + id=Attribute("id", "0"), + filename=Attribute("filename", "EC_CO_Test_TUB.nc4[0]"), + ) + + stochastic_block = Block( + block_type="StochasticBlock", + NumberDataMappings=NumberDataMappings, + SetSize_dim=SetSize_dim, + SetElements_dim=SetElements_dim, + FunctionName=Variable( + "FunctionName", + "str", + ("NumberDataMappings",), + np.repeat( + np.array(function_name_demand, dtype="object"), + NumberDataMappings, ), ), - StaticAbstractPath=Block( - PathDim=Dimension("PathDim", PathDim), - TotalLength=Dimension("TotalLength", TotalLength), - PathElementIndices=Variable( - "PathElementIndices", - "u4", - ("TotalLength",), - path_element_indices, # important to have missing values! only ones does not work - ), + Caller=Variable( + "Caller", + "c", + ("NumberDataMappings",), + np.array(caller, dtype="object"), + ), + DataType=Variable( + "DataType", + "c", + ("NumberDataMappings",), + np.array(caller_type, dtype="object"), + ), + SetSize=Variable( + "SetSize", + "u4", + ("SetSize_dim",), + set_size, + ), + SetElements=Variable( + "SetElements", + "u4", + ("SetElements_dim",), + set_elements, + ), + AbstractPath=Block( + PathDim=Dimension("PathDim", len(block_location)), + TotalLength=Dimension("TotalLength", 0), PathGroupIndices=Variable( "PathGroupIndices", "str", ("TotalLength",), - np.array( - [ - "0", - "x_thermal", - "1", - "x_intermittent", - "2", - "x_battery", - "2", - "x_converter", - "3", - "x_intermittent", - ], - dtype="object", - ), + np.array([], dtype="object"), ), - PathNodeTypes=Variable( - "PathNodeTypes", - "c", + PathElementIndices=Variable( + "PathElementIndices", + "u4", ("TotalLength",), - path_node_types, + [], # ignored missing values (masked array) ), PathRangeIndices=Variable( "PathRangeIndices", "u4", ("TotalLength",), - path_range_indices, # important to have missing values! only ones does not work + [], # ignored missing values ), PathStart=Variable( "PathStart", "u4", ("PathDim",), - np.arange(0, TotalLength, 2, dtype=np.uint32), # ignored missing values + np.array(block_location, dtype=np.uint32), ), + PathNodeTypes=Variable("PathNodeTypes", "c", ("TotalLength",), []), ), - StochasticBlock=Block( - block_type="StochasticBlock", - NumberDataMappings=NumberDataMappings, - SetSize_dim=SizeDim_perScenario * NumberDataMappings, - SetElements_dim=SizeElements_perScenario * NumberDataMappings, - FunctionName=Variable( - "FunctionName", - "str", - ("NumberDataMappings",), - np.repeat( - np.array(["UCBlock::set_active_power_demand"], dtype="object"), - NumberDataMappings, - ), - ), - Caller=Variable( - "Caller", - "c", - ("NumberDataMappings",), - np.repeat(["B"], NumberDataMappings), - ), - DataType=Variable( - "DataType", - "c", - ("NumberDataMappings",), - np.repeat(["D"], NumberDataMappings), - ), - SetSize=Variable( - "SetSize", - "u4", - ("SetSize_dim",), - set_size, - ), - SetElements=Variable( - "SetElements", - "u4", - ("SetElements_dim",), - set_elements, - ), - AbstractPath=Block( - PathDim=Dimension("PathDim", PathDim2), - TotalLength=Dimension("TotalLength", 0), # Unlimited - PathGroupIndices=Variable( - "PathGroupIndices", - "str", - ("TotalLength",), - np.array([], dtype="object"), - ), - PathElementIndices=Variable( - "PathElementIndices", - "u4", - ("TotalLength",), - [], # ignored missing values (masked array) - ), - PathRangeIndices=Variable( - "PathRangeIndices", - "u4", - ("TotalLength",), - [], # ignored missing values - ), - PathStart=Variable( - "PathStart", - "u4", - ("PathDim",), - np.repeat([0], PathDim2), # ignored missing values - ), - PathNodeTypes=Variable("PathNodeTypes", "c", ("TotalLength",), []), - ), - Block=Block( - id=Attribute("id", "0"), - filename=Attribute("filename", "EC_CO_Test_TUB.nc4[0]"), - ), + Block=block, + ) + + return stochastic_block + + +def build_tssb_block(fp_tssb): + """ + Build a test SMSNetwork containing a TSSB (two-stage stochastic block) structure. + This helper loads a benchmark network from ``fp_tssb`` and uses it as a template + to construct a new in-memory network in block-file mode. The new network + contains a top-level block with a discrete scenario set, one or more abstract + paths, and a stochastic block whose dimensions (scenario size, number of + scenarios, and data mappings) and key data arrays are consistent with those + of the benchmark network. + + Parameters + ---------- + fp_tssb : str or os.PathLike + Path to an existing TSSB network. + + Returns + ------- + SMSNetwork + A newly created :class:`pysmspp.SMSNetwork` instance with + ``file_type=SMSFileType.eBlockFile``. + """ + sn = SMSNetwork(file_type=SMSFileType.eBlockFile) + + dds = build_tssb_scenario_set(fp_tssb) + abstract_path = build_tssb_abstract_path() + stochastic_block = build_tssb_stochastic_block() + sn.add( + "TwoStageStochasticBlock", + "Block_0", + id="0", + NumberScenarios=Dimension( + "NumberScenarios", dds.dimensions["NumberScenarios"].value ), + DiscreteScenarioSet=dds, + StaticAbstractPath=abstract_path, + StochasticBlock=stochastic_block, ) return sn