Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 281 additions & 1 deletion pypsa2smspp/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import xarray as xr
import os
from pypsa2smspp.transformation_config import TransformationConfig
from pysmspp import SMSNetwork, SMSFileType, Variable, Block, SMSConfig
from pysmspp import SMSNetwork, SMSFileType, Attribute, Dimension, Variable, Block, SMSConfig
from pypsa2smspp import logger
from copy import deepcopy
import pysmspp
Expand Down Expand Up @@ -899,6 +899,15 @@ def convert_to_blocks(self):
sn = SMSNetwork(file_type=SMSFileType.eBlockFile)
master = sn
index_id = 0

if False: # check if TSSB problem
name_id = 'TwoStageStochasticBlock'
sn = self.convert_to_twostagestochasticblock(master, index_id, name_id)

# InnerBlock for UC is inside StochasticBlock
master = sn.blocks[name_id]
name_id = 'InnerBlock'
index_id += 1

# -----------------
# Check if investment problem
Expand All @@ -925,6 +934,277 @@ def convert_to_blocks(self):
# Save final
self.sms_network = sn
return sn


def build_tssb_scenario_set(self):
"""
Build a DiscreteScenarioSet for a TSSB (two-stage stochastic block) structure.
This helper loads a benchmark network from ``fp_tssb`` and extracts the scenario
data from the DiscreteScenarioSet block to create a new in-memory scenario set.
"""
# TODO: this shall be completely revised to create the scenario set from the data of the StochasticNetwork, not from a benchmark file. The current implementation is just a placeholder. Demand shall be aggregated by node and time and scenarios shall be created from the aggregated demand profiles. pool_weights shall be created from the probabilities of the scenarios.
sn_benchmark = SMSNetwork(fp_tssb)

pool_weights = (
sn_benchmark.blocks["Block_0"]
.blocks["DiscreteScenarioSet"]
.variables["PoolWeights"]
.data
)

scenarios = (
sn_benchmark.blocks["Block_0"]
.blocks["DiscreteScenarioSet"]
.variables["Scenarios"]
.data
)

ScenarioSize = scenarios.shape[1]
NumberScenarios = scenarios.shape[0]

dds_block = Block(
block_type="DiscreteScenarioSet",
ScenarioSize=ScenarioSize,
NumberScenarios=NumberScenarios,
Scenarios=Variable(
"Scenarios",
"double",
("NumberScenarios", "ScenarioSize"),
scenarios,
),
PoolWeights=Variable(
"PoolWeights",
"double",
("NumberScenarios",),
pool_weights,
),
)

return dds_block


def build_tssb_abstract_path(self):
"""
Build an AbstractPath for a TSSB (two-stage stochastic block) structure.
"""
# TODO: extract this from unit types depending on expandability, accounting for x_battery/x_converter. For ThermalUnitBlocks, x_thermal is mapped and similarly for the other units
variables = [
"x_thermal",
"x_intermittent",
"x_battery",
"x_converter",
"x_intermittent",
]
locations = ["0", "1", "2", "2", "3"]

path_group_indices = np.array(
[str(item) for pair in zip(locations, variables) for item in pair],
dtype="object",
)

path_node_types = np.tile(["B", "V"], len(variables))

TotalLength = len(variables) * 2
PathDim = len(variables) # for AbstractPath

def mask_by_node_type(arr, path_node_types):
return np.ma.masked_array(arr, mask=path_node_types == "B")

path_element_indices = mask_by_node_type(np.zeros(TotalLength), path_node_types)
path_range_indices = mask_by_node_type(np.ones(TotalLength), path_node_types)

abstract_path_block = Block(
PathDim=Dimension("PathDim", PathDim),
TotalLength=Dimension("TotalLength", TotalLength),
PathElementIndices=Variable(
"PathElementIndices",
"u4",
("TotalLength",),
path_element_indices, # important to have missing values! only ones does not work
),
PathGroupIndices=Variable(
"PathGroupIndices",
"str",
("TotalLength",),
np.array(
path_group_indices,
dtype="object",
),
),
PathNodeTypes=Variable(
"PathNodeTypes",
"c",
("TotalLength",),
path_node_types,
),
PathRangeIndices=Variable(
"PathRangeIndices",
"u4",
("TotalLength",),
path_range_indices, # important to have missing values! only ones does not work
),
PathStart=Variable(
"PathStart",
"u4",
("PathDim",),
np.arange(0, TotalLength, 2, dtype=np.uint32), # ignored missing values
),
)

return abstract_path_block


def build_tssb_stochastic_block(self, TimeHorizon=24, NumberNodes=2, block=None):
"""
Build a StochasticBlock for a TSSB (two-stage stochastic block) structure.
"""
# TODO: this requires some minimal adaptations to properly link the required inputs with the input network. Moreover, the additional link is to properly link "block" that it will become the ucblock populated in the following steps. The current implementation is just a placeholder with dummy values.
NumberDataMappings = 1 # only demand suppored for now

set_size_demand = [0, 0]
set_elements_demand = [0, TimeHorizon * NumberNodes, 0, TimeHorizon * NumberNodes]
function_name_demand = ["UCBlock::set_active_power_demand"]

caller = ["B"] # The caller is a Block
caller_type = ["D"]
block_location = [0] # U CBlock

set_size = np.array(set_size_demand, dtype=np.uint32)
set_elements = np.array(set_elements_demand, dtype=np.uint32)

NumberDataMappings = set_size.shape[0] // 2
SetSize_dim = set_size.shape[0]
SetElements_dim = set_elements.shape[0]

if block is None:
block = Block(
id=Attribute("id", "0"),
filename=Attribute("filename", "EC_CO_Test_TUB.nc4[0]"),
)

stochastic_block = Block(
block_type="StochasticBlock",
NumberDataMappings=NumberDataMappings,
SetSize_dim=SetSize_dim,
SetElements_dim=SetElements_dim,
FunctionName=Variable(
"FunctionName",
"str",
("NumberDataMappings",),
np.repeat(
np.array(function_name_demand, dtype="object"),
NumberDataMappings,
),
),
Caller=Variable(
"Caller",
"c",
("NumberDataMappings",),
np.array(caller, dtype="object"),
),
DataType=Variable(
"DataType",
"c",
("NumberDataMappings",),
np.array(caller_type, dtype="object"),
),
SetSize=Variable(
"SetSize",
"u4",
("SetSize_dim",),
set_size,
),
SetElements=Variable(
"SetElements",
"u4",
("SetElements_dim",),
set_elements,
),
AbstractPath=Block(
PathDim=Dimension("PathDim", len(block_location)),
TotalLength=Dimension("TotalLength", 0),
PathGroupIndices=Variable(
"PathGroupIndices",
"str",
("TotalLength",),
np.array([], dtype="object"),
),
PathElementIndices=Variable(
"PathElementIndices",
"u4",
("TotalLength",),
[], # ignored missing values (masked array)
),
PathRangeIndices=Variable(
"PathRangeIndices",
"u4",
("TotalLength",),
[], # ignored missing values
),
PathStart=Variable(
"PathStart",
"u4",
("PathDim",),
np.array(block_location, dtype=np.uint32),
),
PathNodeTypes=Variable("PathNodeTypes", "c", ("TotalLength",), []),
),
Block=block,
)

return stochastic_block


def convert_to_twostagestochasticblock(self, master, index_id, name_id):
"""
Adds a TwoStageStochasticBlock to the SMSNetwork, which is used for stochastic problems.

Parameters
----------
master : SMSNetwork
The root SMSNetwork object
index_id : int
ID for block naming
name_id : str
Name for the TwoStageStochasticBlock

Returns
-------
SMSNetwork
The updated SMSNetwork with the TwoStageStochasticBlock added.
"""

dds = self.build_tssb_scenario_set()
abstract_path = self.build_tssb_abstract_path()
stochastic_block = self.build_tssb_stochastic_block()
master.add(
"TwoStageStochasticBlock",
"Block_0",
id="0",
NumberScenarios=Dimension(
"NumberScenarios", dds.dimensions["NumberScenarios"].value
),
DiscreteScenarioSet=dds,
StaticAbstractPath=abstract_path,
StochasticBlock=stochastic_block,
)

# -----------------
# TwoStageStochasticBlock dimensions (currently empty, but can be extended)
# -----------------
kwargs = self.dimensions.get('TwoStageStochasticBlock', {})

# -----------------
# Add TwoStageStochasticBlock itself
# -----------------
master.add(
"TwoStageStochasticBlock",
name_id,
id=f"{index_id}",
**kwargs
)

return master

def convert_to_investmentblock(self, master, index_id, name_id):
"""
Expand Down
3 changes: 3 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@ def get_test_cases(inputs_dir = HERE / "configs" / "data" / "test"):

test_cases = get_test_cases()

def get_network(fp: Path | str) -> str:
"""Helper to load a network from a .nc file."""
return str(Path(HERE) / "networks" / fp)
Binary file added test/networks/pypsa_stoch_load.nc
Binary file not shown.
70 changes: 70 additions & 0 deletions test/test_stochastic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
from pathlib import Path
import pytest
import pypsa

from conftest import (
create_test_config,
safe_remove,
test_cases,
REL_TOL,
ABS_TOL,
OUT_TEST,
get_network,
)

from network_definition import NetworkDefinition
from pypsa2smspp.transformation import Transformation

from pypsa2smspp.network_correction import (
clean_ciclicity_storage,
add_slack_unit,
)

def run_tssb(fp) -> None:
"""
UCBlock regression test:
- build network from Excel
- solve reference with PyPSA
- run full SMS++ pipeline in one call (config-driven)
- compare objectives
"""

n = pypsa.Network(fp)

# Work on a copy for reference solve
network = n.copy()

# ---- (1) PyPSA optimization (reference) ----
network.optimize(solver_name="highs")

try:
obj_pypsa = float(network.objective + getattr(network, "objective_constant", 0.0))
except Exception:
obj_pypsa = float(network.objective)

# ---- (2) SMS++ pipeline (ONE CALL) ----
transformation = Transformation()
n = transformation.run(network, verbose=False)

obj_smspp = float(transformation.result.objective_value)

# If you want to ensure UCBlock is used, either:
# (a) enforce run.mode: ucblock in the YAML used here, OR
# (b) if you expose transformation.last_mode_used, check it:
#
# if hasattr(transformation, "last_mode_used") and transformation.last_mode_used != "ucblock":
# pytest.skip(f"Not UCBlock mode for this case (mode={transformation.last_mode_used}).")

assert obj_smspp == pytest.approx(obj_pypsa, rel=REL_TOL, abs=ABS_TOL)


def test_stochastic_network(fp=get_network("pypsa_stoch_load.nc")):
"""
Uses a dedicated YAML config that forces UCBlock mode (recommended).
"""
run_tssb(fp)


if __name__ == "__main__":
test_stochastic_network()
Loading