From 5ff5e9306f78c0bbf09aaf59505c3eece05e9054 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Wed, 8 Jun 2022 13:58:05 +0200 Subject: [PATCH 01/14] fixed bug and slightly refactored dispatcher helper --- ridepy/util/dispatchers/helper_functions.py | 25 ++++++++------- .../dispatchers_cython/cdispatchers_utils.h | 31 ++++++++++--------- ridepy/util/spaces_cython/boost_graph_space.h | 5 +-- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/ridepy/util/dispatchers/helper_functions.py b/ridepy/util/dispatchers/helper_functions.py index c6b93780..cac458e6 100644 --- a/ridepy/util/dispatchers/helper_functions.py +++ b/ridepy/util/dispatchers/helper_functions.py @@ -171,29 +171,32 @@ def is_timewindow_violated_or_violation_worsened_due_to_insertion( if idx > len(stoplist) - 2: return False + # inserted stop incurs zero detour, and we don't have to wait + if ( + est_arrival_first_stop_after_insertion + <= stoplist[idx + 1].estimated_arrival_time + ): + return False + delta_cpat = ( est_arrival_first_stop_after_insertion - stoplist[idx + 1].estimated_arrival_time ) - # inserted stop incurs zero detour and we don't have to wait - if delta_cpat == 0: - return False - for stop in stoplist[idx + 1 :]: old_leeway = stop.time_window_max - stop.estimated_arrival_time new_leeway = old_leeway - delta_cpat if new_leeway < 0 and new_leeway < old_leeway: return True + elif stop.time_window_min >= stop.estimated_arrival_time + delta_cpat: + # We have to wait or arrive just on time, thus no need to check next stops + return False else: - old_departure = stop.estimated_departure_time - new_departure = max( - stop.time_window_min, stop.estimated_arrival_time + delta_cpat + # Otherwise we are incurring additional delay. Compute the remaining delay: + delta_cpat = ( + max(stop.time_window_min, stop.estimated_arrival_time + delta_cpat) + - stop.estimated_departure_time ) - delta_cpat = new_departure - old_departure - if delta_cpat == 0: - # no need to check next stops - break return False diff --git a/ridepy/util/dispatchers_cython/cdispatchers_utils.h b/ridepy/util/dispatchers_cython/cdispatchers_utils.h index 9e273813..b7cf0b9f 100644 --- a/ridepy/util/dispatchers_cython/cdispatchers_utils.h +++ b/ridepy/util/dispatchers_cython/cdispatchers_utils.h @@ -185,12 +185,15 @@ bool is_timewindow_violated_dueto_insertion( // double delta_cpat, old_leeway, new_leeway, old_departure, new_departure if (idx > static_cast(stoplist.size() - 2)) return false; - auto delta_cpat = (est_arrival_first_stop_after_insertion - - stoplist[idx + 1].estimated_arrival_time); - if (delta_cpat == 0) + // inserted stop incurs zero detour, and we don't have to wait + if (est_arrival_first_stop_after_insertion <= + stoplist[idx + 1].estimated_arrival_time) return false; + auto delta_cpat = (est_arrival_first_stop_after_insertion - + stoplist[idx + 1].estimated_arrival_time); + // BOOST_FOREACH(auto& stop, // boost::make_iterator_range(stoplist.begin()+idx, stoplist.end())) // Remember that the insertion is *after* idx'th stop. We need to check for @@ -201,17 +204,17 @@ bool is_timewindow_violated_dueto_insertion( if ((new_leeway < 0) && (new_leeway < old_leeway)) return true; - else { - auto old_departure = - max(stop->time_window_min, stop->estimated_arrival_time); - auto new_departure = - max(stop->time_window_min, stop->estimated_arrival_time + delta_cpat); - auto delta_cpat = new_departure - old_departure; - if (delta_cpat == 0) { - // no need to check next stops - return false; - } - } + else if (stop->time_window_min >= stop->estimated_arrival_time + delta_cpat) + // We have to wait or arrive just on time, thus no need to check next + // stops + return false; + else + // Otherwise we are incurring additional delay. Compute the remaining + // delay: + delta_cpat = max(stop->time_window_min, + stop->estimated_arrival_time + delta_cpat) - + max(stop->time_window_min, stop->estimated_arrival_time); + ; } return false; } diff --git a/ridepy/util/spaces_cython/boost_graph_space.h b/ridepy/util/spaces_cython/boost_graph_space.h index 81695a82..09d75dd1 100644 --- a/ridepy/util/spaces_cython/boost_graph_space.h +++ b/ridepy/util/spaces_cython/boost_graph_space.h @@ -108,8 +108,9 @@ class GraphSpace : public TransportSpace { GraphSpace(double velocity, vector vertex_vec, vector edge_vec, vector weight_vec) - : TransportSpace(), velocity(velocity), _g{vertex_vec.size()}, - vertex2label{get(vertex_name, _g)}, + : TransportSpace(), + velocity(velocity), _g{vertex_vec.size()}, vertex2label{get(vertex_name, + _g)}, _distances(static_cast(vertex_vec.size())), _predecessors(static_cast(vertex_vec.size())), _weights{weight_vec}, edge2weight{get(edge_weight, _g)} { From 517dd819422677f748288edceb5c63a9892918cc Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 11:49:39 +0200 Subject: [PATCH 02/14] this is a bit cumbersome... --- requirements.txt | 3 +- ridepy/extras/simulation_set.py | 202 ++++++++++++++++++++++++++------ 2 files changed, 168 insertions(+), 37 deletions(-) diff --git a/requirements.txt b/requirements.txt index eb69a673..bd1898c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ matplotlib cython==3.0a6 loky typer -pyarrow \ No newline at end of file +pyarrow +frozendict diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index e8e8a5b5..0a003657 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import logging import os +import sys import warnings import loky @@ -10,6 +13,7 @@ import itertools as it import pandas as pd +from collections.abc import MutableSet from collections import defaultdict from copy import deepcopy from typing import ( @@ -23,6 +27,8 @@ ) from pathlib import Path +from frozendict import frozendict + from ridepy.util import make_sim_id from ridepy.util.analytics import ( get_system_quantities, @@ -352,7 +358,7 @@ def simulate_parameter_combinations( return sim_ids -class SimulationSet: +class SimulationSet(MutableSet): """ A set of simulations. The parameter space is defined through constant `base_params`, zipped `zip_params` and cartesian product `product_params`. A set of a single simulation @@ -360,13 +366,28 @@ class SimulationSet: allowing for parallelization of simulation runs at different parameters. """ + _wrapped_methods = [ + "add", + "copy", + "difference", + "difference_update", + "discard", + "intersection", + "intersection_update", + "symmetric_difference" "symmetric_difference_update", + "union", + "update", + ] + + _delegated_attrs = ["issubset", "issuperset"] + @staticmethod def _two_level_dict_update( base_dict: dict[str, dict[str, Any]], update_dict: dict[str, dict[str, Any]] ) -> dict: """ Update two-level nested dictionary with deepcopying, - where `update_dict` overwrites entries in `base_dict`. + where `update_dict` updates entries in `base_dict`. Example ------- @@ -384,7 +405,7 @@ def _two_level_dict_update( base_dict two-level nested dict to update update_dict - two-level nested dict which overwrites entries in base_dict + two-level nested dict which updates entries in base_dict Returns ------- @@ -392,7 +413,8 @@ def _two_level_dict_update( """ d = deepcopy(base_dict) - # This sorted is needed otherwise detection of pre existing simulation run does not work. + # This `sorted` is necessary because otherwise the detection of + # existing simulation runs may fail. for outer_key in sorted(set(base_dict) | set(update_dict)): d[outer_key] = base_dict.get(outer_key, {}) | update_dict.get(outer_key, {}) return d @@ -424,6 +446,14 @@ def _zip_params_equal_length( else: return True + @staticmethod + def _freeze_two_level_dict( + d: dict[str, dict[str, Any]] + ) -> frozendict[str, frozendict[str, Any]]: + return frozendict( + {outer_key: frozendict(inner_dict) for outer_key, inner_dict in d.items()} + ) + @property def data_dir(self) -> Path: """ @@ -444,9 +474,10 @@ def data_dir(self, data_dir: Union[str, Path]) -> None: def __init__( self, *, - data_dir: Union[str, Path], + data_dir: Union[str, Path] = ".", base_params: Optional[dict[str, dict[str, Any]]] = None, zip_params: Optional[dict[str, dict[str, Sequence[Any]]]] = None, + single_combinations: Iterable[Optional[dict[str, dict[str, Any]]]] = None, product_params: Optional[dict[str, dict[str, Sequence[Any]]]] = None, cython: bool = True, debug: bool = False, @@ -472,6 +503,8 @@ def __init__( Dictionary setting parameters of which the cartesian product (i.e. all possible combinations of the supplied parameters) is created and varied throughout the simulation set, optional. + single_combinations + Iterable of single parameter combinations. Each item must fully specify a simulation run. cython Use cython. debug @@ -500,6 +533,8 @@ def __init__( self._event_path_suffix = event_path_suffix self._param_path_suffix = param_path_suffix + self.use_cython = cython + if cython: space_obj = CyEuclidean2D() dispatcher = CyBruteForceTotalTravelTimeMinimizingDispatcher @@ -532,9 +567,10 @@ def __init__( ), ) - base_params = base_params if base_params is not None else {} - zip_params = zip_params if zip_params is not None else {} - product_params = product_params if product_params is not None else {} + base_params = base_params or {} + zip_params = zip_params or {} + product_params = product_params or {} + single_combinations = single_combinations or {} if validate: # assert no unknown outer keys @@ -559,13 +595,19 @@ def __init__( assert self._zip_params_equal_length( zip_params ), "zipped parameters must be of equal length" + self.validated = True + else: + self.validated = False + self.single_combinations = single_combinations self._base_params = self._two_level_dict_update( self.default_base_params, base_params ) self._zip_params = zip_params self._product_params = product_params + self._update_parameter_combinations() + self._simulation_ids = None self.system_quantities_path = None @@ -598,6 +640,41 @@ def event_paths(self) -> list[Path]: for simulation_id in self.simulation_ids ] + @property + def base_params(self): + return self._base_params + + @property + def product_params(self): + return self._product_params + + @property + def zip_params(self): + return self._zip_params + + @property + def single_combinations(self): + return self._single_combinations + + @base_params.setter + def base_params(self, value): + self._base_params = value + self._update_parameter_combinations() + + @product_params.setter + def product_params(self, value): + self._product_params = value + self._update_parameter_combinations() + + @zip_params.setter + def zip_params(self, value): + self._zip_params = value + self._update_parameter_combinations() + + @single_combinations.setter + def single_combinations(self, value): + self._single_combinations = set(map(self._freeze_two_level_dict, value)) + @staticmethod def _make_joined_key_pairs_values( *, params: dict[str, dict[str, Sequence[Any]]], join_fn: Callable @@ -633,7 +710,7 @@ def _make_joined_key_pairs_values( return joined_key_pairs, joined_values_iter - def __iter__(self): + def _update_parameter_combinations(self): zipped_key_pairs, zipped_values_iter = self._make_joined_key_pairs_values( params=self._zip_params, join_fn=zip ) @@ -645,7 +722,7 @@ def __iter__(self): params=self._product_params, join_fn=it.product ) - def param_combinations() -> Iterator[dict[str, dict[str, Any]]]: + def param_combinations() -> Iterator[frozendict[str, dict[str, Any]]]: """ Generator yielding complete parameter sets which can be supplied to `perform_single_simulation`. @@ -665,13 +742,14 @@ def param_combinations() -> Iterator[dict[str, dict[str, Any]]]: ): d[outer_key][inner_key] = value - yield self._two_level_dict_update(self._base_params, d) + yield self._freeze_two_level_dict( + self._two_level_dict_update(self._base_params, d) + ) - self._param_combinations = param_combinations() - return self + self.param_combinations = self.single_combinations | set(param_combinations()) - def __next__(self): - return next(self._param_combinations) + def __iter__(self): + return iter(self.param_combinations) def run(self, dry_run=False): """ @@ -702,27 +780,6 @@ def run(self, dry_run=False): dry_run=dry_run, ) - def __len__(self) -> int: - """ - Number of simulations performed when calling `SimulationSet.run`. - """ - len_ = 1 - if self._zip_params: - len_ *= len(next(iter(next(iter(self._zip_params.values())).values()))) - if self._product_params: - len_ *= ft.reduce( - op.mul, - ( - len(inner_value) - for inner_dict in self._product_params.values() - for inner_value in inner_dict.values() - ), - ) - if not (self._zip_params or self._product_params): - len_ = 0 - - return len_ - def run_analytics( self, update_existing: bool = False, @@ -779,3 +836,76 @@ def run_analytics( system_quantities_df = pd.DataFrame(system_quantities, index=sim_ids) system_quantities_df.rename_axis("simulation_id", inplace=True) system_quantities_df.to_parquet(self.system_quantities_path) + + def legacy_len(self) -> int: + """ + Number of simulations performed when calling `SimulationSet.run`. + """ + len_ = 1 + if self._zip_params: + len_ *= len(next(iter(next(iter(self._zip_params.values())).values()))) + if self._product_params: + len_ *= ft.reduce( + op.mul, + ( + len(inner_value) + for inner_dict in self._product_params.values() + for inner_value in inner_dict.values() + ), + ) + if not (self._zip_params or self._product_params): + len_ = 0 + + len_ += len(self.single_combinations) + + return len_ + + def __len__(self): + return len(self.param_combinations) + + def __contains__(self, item): + return item in self.param_combinations + + def __getattr__(self, attr): + if attr in self._delegated_attrs: + return getattr(self.param_combinations, attr) + + @staticmethod + def _wrap_method(method, o): + # Note that this uses self's properties for everything except the simulation + # parameters. This is intended behavior. + return lambda *args, **kwargs: SimulationSet( + single_combinations=getattr(o.param_combinations, method)(*args, **kwargs), + data_dir=o.data_dir, + cython=o.use_cython, + debug=o.debug, + max_workers=o.max_workers, + process_chunksize=o.process_chunksize, + jsonl_chunksize=o.jsonl_chunksize, + event_path_suffix=o._event_path_suffix, + param_path_suffix=o._param_path_suffix, + validate=o.validated, + ) + + def __new__(cls): + self = super(SimulationSet, cls).__new__(SimulationSet) + for method in self._wrapped_methods: + setattr(self, method, cls._wrap_method(method, self)) + + return self + + def add(self, item): + ... + + def discard(self, item): + ... + + def __str__(self): + ... + + def __repr__(self): + ... + + @classmethod + def _from_iterable(cls, iterable): + return cls(single_combinations=iterable) From eabfedbe50aacb1259182002b3c55b84e5f83289 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 12:20:38 +0200 Subject: [PATCH 03/14] seems to work so far --- ridepy/extras/simulation_set.py | 32 ++++++++++++++-------------- ridepy/util/spaces.py | 11 ++++++++++ ridepy/util/spaces_cython/spaces.pyx | 13 +++++++++-- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 0a003657..70611de3 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -473,11 +473,11 @@ def data_dir(self, data_dir: Union[str, Path]) -> None: def __init__( self, + single_combinations: Iterable[Optional[dict[str, dict[str, Any]]]] = None, *, data_dir: Union[str, Path] = ".", base_params: Optional[dict[str, dict[str, Any]]] = None, zip_params: Optional[dict[str, dict[str, Sequence[Any]]]] = None, - single_combinations: Iterable[Optional[dict[str, dict[str, Any]]]] = None, product_params: Optional[dict[str, dict[str, Sequence[Any]]]] = None, cython: bool = True, debug: bool = False, @@ -570,7 +570,6 @@ def __init__( base_params = base_params or {} zip_params = zip_params or {} product_params = product_params or {} - single_combinations = single_combinations or {} if validate: # assert no unknown outer keys @@ -599,10 +598,14 @@ def __init__( else: self.validated = False - self.single_combinations = single_combinations self._base_params = self._two_level_dict_update( self.default_base_params, base_params ) + self.single_combinations = ( + single_combinations + if single_combinations is not None + else [self._base_params] + ) self._zip_params = zip_params self._product_params = product_params @@ -742,9 +745,10 @@ def param_combinations() -> Iterator[frozendict[str, dict[str, Any]]]: ): d[outer_key][inner_key] = value - yield self._freeze_two_level_dict( - self._two_level_dict_update(self._base_params, d) - ) + if d: + yield self._freeze_two_level_dict( + self._two_level_dict_update(self._base_params, d) + ) self.param_combinations = self.single_combinations | set(param_combinations()) @@ -887,7 +891,7 @@ def _wrap_method(method, o): validate=o.validated, ) - def __new__(cls): + def __new__(cls, *args, **kwargs): self = super(SimulationSet, cls).__new__(SimulationSet) for method in self._wrapped_methods: setattr(self, method, cls._wrap_method(method, self)) @@ -900,12 +904,8 @@ def add(self, item): def discard(self, item): ... - def __str__(self): - ... - - def __repr__(self): - ... - - @classmethod - def _from_iterable(cls, iterable): - return cls(single_combinations=iterable) + # def __str__(self): + # return f"SimulationSet(...)" + # + # def __repr__(self): + # return f"SimulationSet(...)" diff --git a/ridepy/util/spaces.py b/ridepy/util/spaces.py index 9390a8b9..5d5d9d3f 100644 --- a/ridepy/util/spaces.py +++ b/ridepy/util/spaces.py @@ -97,6 +97,9 @@ def asdict(self): def __repr__(self): return make_repr("Euclidean", self.asdict()) + def __hash__(self): + hash(repr(self)) + class Euclidean1D(Euclidean): def __init__( @@ -466,6 +469,14 @@ def __reduce__(self): self.velocity, ) + def __hash__(self): + return ( + hash(self.vertices) + + hash(self.edges) + + hash(self.weights) + + hash(self.velocity) + ) + class DiGraph(Graph): def __init__( diff --git a/ridepy/util/spaces_cython/spaces.pyx b/ridepy/util/spaces_cython/spaces.pyx index 1eef3e18..9f9ac0e4 100644 --- a/ridepy/util/spaces_cython/spaces.pyx +++ b/ridepy/util/spaces_cython/spaces.pyx @@ -131,7 +131,10 @@ cdef class Euclidean2D(TransportSpace): del self.derived_ptr def __repr__(self): - return f"Euclidean2D(velocity={self.velocity})" + return f"Euclidean2D(coord_range={self.coord_range}, velocity={self.velocity})" + + def __hash__(self): + return hash(repr(self)) @property def velocity(self): @@ -178,7 +181,10 @@ cdef class Manhattan2D(TransportSpace): del self.derived_ptr def __repr__(self): - return f"Manhattan2D(velocity={self.velocity})" + return f"Manhattan2D(coord_range={self.coord_range}, velocity={self.velocity})" + + def __hash__(self): + return hash(repr(self)) @property def velocity(self): @@ -308,6 +314,9 @@ cdef class Graph(TransportSpace): self.velocity, ) + def __hash__(self): + return hash(self.vertices) + hash(self.edges) + hash(self.weights) + hash(self.velocity) + def asdict(self): return dict( vertices=self.vertices, From eb5f2e509b5089c2a0194b806c481b90ec622ad5 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 12:26:48 +0200 Subject: [PATCH 04/14] cosmetics --- ridepy/extras/simulation_set.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 70611de3..97d1279e 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -899,13 +899,28 @@ def __new__(cls, *args, **kwargs): return self def add(self, item): + # This is dynamically overwritten in SimulationSet._wrap_method, + # but is necessary here to be able to instantiate the class. ... def discard(self, item): + # This is dynamically overwritten in SimulationSet._wrap_method + # but is necessary here to be able to instantiate the class. ... - # def __str__(self): - # return f"SimulationSet(...)" - # + def __str__(self): + return ( + f"SimulationSet(single_combinations=..., " + f"data_dir={self.data_dir!r}, " + f"cython={self.use_cython!r}, " + f" debug={self.debug!r}, " + f"max_workers={self.max_workers!r}, " + f"process_chunksize={self.process_chunksize!r}, " + f"jsonl_chunksize={self.jsonl_chunksize!r}, " + f"event_path_suffix={self._event_path_suffix!r}, " + f"param_path_suffix={self._param_path_suffix!r}, " + f"validate={self.validated!r})" + ) + # def __repr__(self): # return f"SimulationSet(...)" From d9e10c27a824be1da7c129fda1fbb8b8f50e1678 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 12:30:35 +0200 Subject: [PATCH 05/14] refactored legacy len --- ridepy/extras/simulation_set.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 97d1279e..02367a0e 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -841,27 +841,29 @@ def run_analytics( system_quantities_df.rename_axis("simulation_id", inplace=True) system_quantities_df.to_parquet(self.system_quantities_path) - def legacy_len(self) -> int: + @staticmethod + def compute_cardinality_product_params_zip_params( + self, product_params, zip_params + ) -> int: """ - Number of simulations performed when calling `SimulationSet.run`. + Number of simulations performed when calling `SimulationSet.run`, + excluding single_combinations """ len_ = 1 - if self._zip_params: - len_ *= len(next(iter(next(iter(self._zip_params.values())).values()))) - if self._product_params: + if zip_params: + len_ *= len(next(iter(next(iter(zip_params.values())).values()))) + if product_params: len_ *= ft.reduce( op.mul, ( len(inner_value) - for inner_dict in self._product_params.values() + for inner_dict in product_params.values() for inner_value in inner_dict.values() ), ) - if not (self._zip_params or self._product_params): + if not (zip_params or product_params): len_ = 0 - len_ += len(self.single_combinations) - return len_ def __len__(self): From 8832560c12096908b35e2cb6e12ebf64259407bd Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 12:30:47 +0200 Subject: [PATCH 06/14] refactored legacy len --- ridepy/extras/simulation_set.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 02367a0e..68eff491 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -843,7 +843,7 @@ def run_analytics( @staticmethod def compute_cardinality_product_params_zip_params( - self, product_params, zip_params + product_params, zip_params ) -> int: """ Number of simulations performed when calling `SimulationSet.run`, From b676e399d235a399ed49a69093484691a8392fba Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Thu, 9 Jun 2022 14:59:31 +0200 Subject: [PATCH 07/14] seems to work fine apart from one weird bug --- ridepy/extras/simulation_set.py | 56 ++++++++++++++--------- ridepy/util/spaces.py | 18 +++++--- ridepy/util/spaces_cython/spaces.pyx | 10 ++++- test/test_extras.py | 66 ++++++++++++++++++---------- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 68eff491..26486fe2 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -60,6 +60,20 @@ logger = logging.getLogger(__name__) +def freeze_two_level_dict( + d: dict[Any, dict[Any, Any]] +) -> frozendict[Any, frozendict[Any, Any]]: + return frozendict( + {outer_key: frozendict(inner_dict) for outer_key, inner_dict in d.items()} + ) + + +def thaw_two_level_dict( + d: frozendict[Any, frozendict[Any, Any]] +) -> dict[Any, dict[Any, Any]]: + return {outer_key: dict(inner_dict) for outer_key, inner_dict in d.items()} + + def make_file_path(sim_id: str, directory: Path, suffix: str): return directory / f"{sim_id}{suffix}" @@ -208,6 +222,7 @@ def perform_single_simulation( # we need a pseudorandom id that does not change if this function is called with the same params # the following does not guarantee a lack of collisions, and will fail if non-ascii characters are involved. tick = time() + params = thaw_two_level_dict(params) params_json = create_params_json(params=params) sim_id = make_sim_id(params_json) event_path = data_dir / f"{sim_id}{event_path_suffix}" @@ -415,7 +430,7 @@ def _two_level_dict_update( d = deepcopy(base_dict) # This `sorted` is necessary because otherwise the detection of # existing simulation runs may fail. - for outer_key in sorted(set(base_dict) | set(update_dict)): + for outer_key in sorted(set(base_dict) | set(update_dict), key=str): d[outer_key] = base_dict.get(outer_key, {}) | update_dict.get(outer_key, {}) return d @@ -446,14 +461,6 @@ def _zip_params_equal_length( else: return True - @staticmethod - def _freeze_two_level_dict( - d: dict[str, dict[str, Any]] - ) -> frozendict[str, frozendict[str, Any]]: - return frozendict( - {outer_key: frozendict(inner_dict) for outer_key, inner_dict in d.items()} - ) - @property def data_dir(self) -> Path: """ @@ -601,11 +608,17 @@ def __init__( self._base_params = self._two_level_dict_update( self.default_base_params, base_params ) - self.single_combinations = ( - single_combinations - if single_combinations is not None - else [self._base_params] - ) + if single_combinations is not None: + self._single_combinations = set( + map(freeze_two_level_dict, single_combinations) + ) + elif not self.compute_cardinality_product_params_zip_params( + product_params=product_params, zip_params=zip_params + ): + self._single_combinations = {freeze_two_level_dict(self.base_params)} + else: + self._single_combinations = set() + self._zip_params = zip_params self._product_params = product_params @@ -676,7 +689,8 @@ def zip_params(self, value): @single_combinations.setter def single_combinations(self, value): - self._single_combinations = set(map(self._freeze_two_level_dict, value)) + self._single_combinations = set(map(freeze_two_level_dict, value)) + self._update_parameter_combinations() @staticmethod def _make_joined_key_pairs_values( @@ -730,8 +744,8 @@ def param_combinations() -> Iterator[frozendict[str, dict[str, Any]]]: Generator yielding complete parameter sets which can be supplied to `perform_single_simulation`. """ - for zipped_params, multiplied_params in it.product( - zipped_values_iter, multiplied_values_iter + for zipped_params, multiplied_params in list( + it.product(zipped_values_iter, multiplied_values_iter) ): d = defaultdict(dict) @@ -746,7 +760,7 @@ def param_combinations() -> Iterator[frozendict[str, dict[str, Any]]]: d[outer_key][inner_key] = value if d: - yield self._freeze_two_level_dict( + yield freeze_two_level_dict( self._two_level_dict_update(self._base_params, d) ) @@ -843,14 +857,14 @@ def run_analytics( @staticmethod def compute_cardinality_product_params_zip_params( - product_params, zip_params + *, product_params, zip_params ) -> int: """ Number of simulations performed when calling `SimulationSet.run`, excluding single_combinations """ len_ = 1 - if zip_params: + if zip_params and next(iter(zip_params.values())): len_ *= len(next(iter(next(iter(zip_params.values())).values()))) if product_params: len_ *= ft.reduce( @@ -875,6 +889,8 @@ def __contains__(self, item): def __getattr__(self, attr): if attr in self._delegated_attrs: return getattr(self.param_combinations, attr) + else: + raise AttributeError(f"'SimulationSet' has no attribute '{attr}'") @staticmethod def _wrap_method(method, o): diff --git a/ridepy/util/spaces.py b/ridepy/util/spaces.py index 5d5d9d3f..8623c057 100644 --- a/ridepy/util/spaces.py +++ b/ridepy/util/spaces.py @@ -100,6 +100,9 @@ def __repr__(self): def __hash__(self): hash(repr(self)) + def __eq__(self, other): + return hash(self) == hash(other) + class Euclidean1D(Euclidean): def __init__( @@ -470,13 +473,18 @@ def __reduce__(self): ) def __hash__(self): - return ( - hash(self.vertices) - + hash(self.edges) - + hash(self.weights) - + hash(self.velocity) + return hash( + ( + hash(repr(self.vertices)), + hash(repr(self.edges)), + hash(repr(self.weights)), + hash(repr(self.velocity)), + ) ) + def __eq__(self, other): + return hash(self) == hash(other) + class DiGraph(Graph): def __init__( diff --git a/ridepy/util/spaces_cython/spaces.pyx b/ridepy/util/spaces_cython/spaces.pyx index 9f9ac0e4..8176bebe 100644 --- a/ridepy/util/spaces_cython/spaces.pyx +++ b/ridepy/util/spaces_cython/spaces.pyx @@ -314,8 +314,16 @@ cdef class Graph(TransportSpace): self.velocity, ) + def __hash__(self): - return hash(self.vertices) + hash(self.edges) + hash(self.weights) + hash(self.velocity) + return hash( + ( + hash(repr(self.vertices)), + hash(repr(self.edges)), + hash(repr(self.weights)), + hash(repr(self.velocity)), + ) + ) def asdict(self): return dict( diff --git a/test/test_extras.py b/test/test_extras.py index fee45021..56a89b97 100644 --- a/test/test_extras.py +++ b/test/test_extras.py @@ -18,6 +18,8 @@ ) from ridepy.extras.simulation_set import ( SimulationSet, + thaw_two_level_dict, + freeze_two_level_dict, ) from ridepy.util.analytics import ( get_stops_and_requests, @@ -129,7 +131,7 @@ def test_io_params(cython, tmp_path): print(params) print(restored_params) - assert params == restored_params + assert thaw_two_level_dict(params) == restored_params def test_param_scan_length(tmp_path): @@ -138,7 +140,7 @@ def test_param_scan_length(tmp_path): 2: {"z": [100, 200, 300]}, } params_to_product = { - 1: {"b": [21, 22], "d": [66, 67, 67, 68]}, + 1: {"b": [21, 22], "d": [66, 67, 68, 69]}, 2: {"w": [1000, 2000]}, } simulation_set = SimulationSet( @@ -148,11 +150,12 @@ def test_param_scan_length(tmp_path): validate=False, ) simulation_set._base_params = {} - res = list(iter(simulation_set)) + simulation_set._update_parameter_combinations() + res = set(iter(simulation_set)) - assert len(res) == 3 * 2 * 4 * 2 assert len([i for i in res if i[1]["a"] == 10]) == 2 * 4 * 2 assert len([i for i in res if i[2]["w"] == 1000]) == 3 * 2 * 4 + assert len(res) == 3 * 2 * 4 * 2 def test_param_scan(tmp_path): @@ -165,17 +168,23 @@ def test_param_scan(tmp_path): validate=False, ) simulation_set._base_params = {} - res = tuple(iter(simulation_set)) - - assert res == ( - {1: {"a": 8, "b": 88, "c": 100}, 2: {"z": 1000}}, - {1: {"a": 8, "b": 88, "c": 100}, 2: {"z": 2000}}, - {1: {"a": 8, "b": 88, "c": 200}, 2: {"z": 1000}}, - {1: {"a": 8, "b": 88, "c": 200}, 2: {"z": 2000}}, - {1: {"a": 9, "b": 99, "c": 100}, 2: {"z": 1000}}, - {1: {"a": 9, "b": 99, "c": 100}, 2: {"z": 2000}}, - {1: {"a": 9, "b": 99, "c": 200}, 2: {"z": 1000}}, - {1: {"a": 9, "b": 99, "c": 200}, 2: {"z": 2000}}, + simulation_set._update_parameter_combinations() + res = set(iter(simulation_set)) + + assert res == set( + map( + freeze_two_level_dict, + ( + {1: {"a": 8, "b": 88, "c": 100}, 2: {"z": 1000}}, + {1: {"a": 8, "b": 88, "c": 100}, 2: {"z": 2000}}, + {1: {"a": 8, "b": 88, "c": 200}, 2: {"z": 1000}}, + {1: {"a": 8, "b": 88, "c": 200}, 2: {"z": 2000}}, + {1: {"a": 9, "b": 99, "c": 100}, 2: {"z": 1000}}, + {1: {"a": 9, "b": 99, "c": 100}, 2: {"z": 2000}}, + {1: {"a": 9, "b": 99, "c": 200}, 2: {"z": 1000}}, + {1: {"a": 9, "b": 99, "c": 200}, 2: {"z": 2000}}, + ), + ) ) @@ -202,9 +211,12 @@ def test_param_scan_equivalent_to_cartesian_product(tmp_path): validate=False, ) simulation_set._base_params = {} - res = list(iter(simulation_set)) + simulation_set._update_parameter_combinations() + res = set(iter(simulation_set)) - assert res == list(param_scan_cartesian_product(params_to_product)) + assert res == set( + map(freeze_two_level_dict, param_scan_cartesian_product(params_to_product)) + ) def test_param_scan_equivalent_to_pure_zip(tmp_path): @@ -216,11 +228,17 @@ def test_param_scan_equivalent_to_pure_zip(tmp_path): validate=False, ) simulation_set._base_params = {} - res = list(iter(simulation_set)) - assert res == [ - {1: {"c": 100}, 2: {"z": 1000}}, - {1: {"c": 200}, 2: {"z": 2000}}, - ] + simulation_set._update_parameter_combinations() + res = set(iter(simulation_set)) + assert res == set( + map( + freeze_two_level_dict, + [ + {1: {"c": 100}, 2: {"z": 1000}}, + {1: {"c": 200}, 2: {"z": 2000}}, + ], + ) + ) def test_param_scan_dry_run(tmp_path): @@ -263,8 +281,8 @@ def test_param_scan_dry_run(tmp_path): stops, requests = get_stops_and_requests( space=params["general"]["space"], events=evs ) - assert len(stops) == i * 5 * 2 + 2 - assert len(requests) == i * 5 + assert len(stops) == params["general"]["n_reqs"] * 2 + 2 + assert len(requests) == params["general"]["n_reqs"] assert simulation_set.simulation_ids == simulation_set_dry_run.simulation_ids From 48561272c2f7fdf449c40b436f1686bdaea6e9fa Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Fri, 10 Jun 2022 11:53:46 +0200 Subject: [PATCH 08/14] another bug fixed --- ridepy/extras/simulation_set.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 26486fe2..e26c763f 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -608,6 +608,26 @@ def __init__( self._base_params = self._two_level_dict_update( self.default_base_params, base_params ) + + # make parameters immutable + + if single_combinations: + for outer_dict in single_combinations: + for outer_key, inner_dict in outer_dict.items(): + for inner_key, inner_value in inner_dict.items(): + if isinstance(inner_value, dict): + outer_dict[outer_key][inner_key] = frozendict(inner_dict) + + for multi_params in [zip_params, product_params]: + if multi_params: + for outer_key, inner_dict in multi_params.items(): + for inner_key, inner_value in inner_dict.items(): + for i, multi_value in enumerate(inner_value): + if isinstance(multi_value, dict): + multi_params[outer_key][inner_key][i] = frozendict( + multi_value + ) + if single_combinations is not None: self._single_combinations = set( map(freeze_two_level_dict, single_combinations) From a7a412624e3f5de9eaa32e2c60b781504748ae08 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Fri, 10 Jun 2022 12:55:05 +0200 Subject: [PATCH 09/14] typo fixed --- ridepy/extras/simulation_set.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index e26c763f..cd5f8e17 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -389,7 +389,8 @@ class SimulationSet(MutableSet): "discard", "intersection", "intersection_update", - "symmetric_difference" "symmetric_difference_update", + "symmetric_difference", + "symmetric_difference_update", "union", "update", ] From 7ad2dddbba55b9a90a6ae78744f2c27a0f531b6b Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Fri, 10 Jun 2022 16:42:46 +0200 Subject: [PATCH 10/14] replaced analytics module for now --- ridepy/extras/io.py | 4 + ridepy/extras/simulation_set.py | 12 +- ridepy/util/analytics/__init__.py | 186 ++-- ridepy/util/analytics/__init__.py.legacy | 1063 ++++++++++++++++++++++ 4 files changed, 1188 insertions(+), 77 deletions(-) create mode 100644 ridepy/util/analytics/__init__.py.legacy diff --git a/ridepy/extras/io.py b/ridepy/extras/io.py index 9d8534e7..0816a59b 100644 --- a/ridepy/extras/io.py +++ b/ridepy/extras/io.py @@ -10,6 +10,8 @@ from typing import Iterable from pathlib import Path +from frozendict import frozendict + from ridepy.data_structures import TransportSpace from ridepy.util.spaces_cython import TransportSpace as CyTransportSpace @@ -29,6 +31,8 @@ class ParamsJSONEncoder(json.JSONEncoder): """ def default(self, obj): + if isinstance(obj, frozendict): + return dict(obj) # request generator? if isinstance(obj, type): return f"{obj.__module__}.{obj.__qualname__}" diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index cd5f8e17..e03ad987 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -494,7 +494,7 @@ def __init__( jsonl_chunksize: int = 1000, event_path_suffix: str = ".jsonl", param_path_suffix: str = "_params.json", - validate: bool = True, + validate: bool = False, ) -> None: """ @@ -532,6 +532,9 @@ def __init__( Check validity of the supplied dictionary (unknown outer and inner keys, equal length for ``zip_params``) """ + for method in self._wrapped_methods: + setattr(self, method, self._wrap_method(method, self)) + self.debug = debug self.max_workers = max_workers self.process_chunksize = process_chunksize @@ -930,13 +933,6 @@ def _wrap_method(method, o): validate=o.validated, ) - def __new__(cls, *args, **kwargs): - self = super(SimulationSet, cls).__new__(SimulationSet) - for method in self._wrapped_methods: - setattr(self, method, cls._wrap_method(method, self)) - - return self - def add(self, item): # This is dynamically overwritten in SimulationSet._wrap_method, # but is necessary here to be able to instantiate the class. diff --git a/ridepy/util/analytics/__init__.py b/ridepy/util/analytics/__init__.py index 761f3fc7..8ba21da9 100644 --- a/ridepy/util/analytics/__init__.py +++ b/ridepy/util/analytics/__init__.py @@ -377,8 +377,8 @@ def _add_locations_to_stoplist_dataframe(*, reqs, stops, space) -> pd.DataFrame: # this must be changed. # See also [issue #45](https://github.com/PhysicsOfMobility/ridepy/issues/45) locations.index = locations.index.set_levels( - locations.index.levels[1].map({"origin": 1.0, "destination": -1.0}), - 1, + levels=locations.index.levels[1].map({"origin": 1.0, "destination": -1.0}), + level=1, ) # finally fill the locations missing in the stops dataframe by joining on request_id and delta_occupancy @@ -843,29 +843,29 @@ def get_vehicle_quantities(stops: pd.DataFrame, requests: pd.DataFrame) -> pd.Da efficiency_dist = total_direct_dist / total_dist_driven efficiency_time = total_direct_time / total_time_driven - avg_system_stoplist_length_service_time = stops.groupby("vehicle_id").apply( - lambda gdf: ( - gdf["system_stoplist_length_service_time"] * gdf["state_duration"] - ).sum() - / gdf["state_duration"].sum() - ) - avg_system_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( - lambda gdf: ( - gdf["system_stoplist_length_submission_time"] * gdf["state_duration"] - ).sum() - / gdf["state_duration"].sum() - ) - - avg_stoplist_length_service_time = stops.groupby("vehicle_id").apply( - lambda gdf: (gdf["stoplist_length_service_time"] * gdf["state_duration"]).sum() - / gdf["state_duration"].sum() - ) - avg_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( - lambda gdf: ( - gdf["stoplist_length_submission_time"] * gdf["state_duration"] - ).sum() - / gdf["state_duration"].sum() - ) + # avg_system_stoplist_length_service_time = stops.groupby("vehicle_id").apply( + # lambda gdf: ( + # gdf["system_stoplist_length_service_time"] * gdf["state_duration"] + # ).sum() + # / gdf["state_duration"].sum() + # ) + # avg_system_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( + # lambda gdf: ( + # gdf["system_stoplist_length_submission_time"] * gdf["state_duration"] + # ).sum() + # / gdf["state_duration"].sum() + # ) + # + # avg_stoplist_length_service_time = stops.groupby("vehicle_id").apply( + # lambda gdf: (gdf["stoplist_length_service_time"] * gdf["state_duration"]).sum() + # / gdf["state_duration"].sum() + # ) + # avg_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( + # lambda gdf: ( + # gdf["stoplist_length_submission_time"] * gdf["state_duration"] + # ).sum() + # / gdf["state_duration"].sum() + # ) return pd.DataFrame( dict( @@ -880,10 +880,10 @@ def get_vehicle_quantities(stops: pd.DataFrame, requests: pd.DataFrame) -> pd.Da total_direct_time=total_direct_time, efficiency_dist=efficiency_dist, efficiency_time=efficiency_time, - avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, - avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, - avg_stoplist_length_service_time=avg_stoplist_length_service_time, - avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, + # avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, + # avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, + # avg_stoplist_length_service_time=avg_stoplist_length_service_time, + # avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, ) ).rename_axis("vehicle_id") @@ -961,7 +961,10 @@ def get_system_quantities( total_dist_driven = stops["dist_to_next"].sum() total_time_driven = stops["time_to_next"].sum() - avg_direct_dist = serviced_requests[("submitted", "direct_travel_distance")].mean() + avg_direct_dist_submitted = requests[("submitted", "direct_travel_distance")].mean() + avg_direct_dist_serviced = serviced_requests[ + ("submitted", "direct_travel_distance") + ].mean() avg_direct_time = serviced_requests[("submitted", "direct_travel_time")].mean() total_direct_dist = serviced_requests[("submitted", "direct_travel_distance")].sum() @@ -974,32 +977,32 @@ def get_system_quantities( rejection_ratio = 1 - len(serviced_requests) / len(requests) - _stops = stops.dropna( - subset=( - "system_stoplist_length_service_time", - "system_stoplist_length_submission_time", - ) - ) - avg_system_stoplist_length_service_time = ( - _stops["system_stoplist_length_service_time"] * _stops["state_duration"] - ).sum() / _stops["state_duration"].sum() - - avg_system_stoplist_length_submission_time = ( - _stops["system_stoplist_length_submission_time"] * _stops["state_duration"] - ).sum() / _stops["state_duration"].sum() - - # not sure if it is necessary to do it again... - _stops = stops.dropna( - subset=("stoplist_length_service_time", "stoplist_length_submission_time") - ) - - avg_stoplist_length_submission_time = ( - _stops["stoplist_length_submission_time"] * _stops["state_duration"] - ).sum() / _stops["state_duration"].sum() - - avg_stoplist_length_service_time = ( - _stops["stoplist_length_service_time"] * _stops["state_duration"] - ).sum() / _stops["state_duration"].sum() + # _stops = stops.dropna( + # subset=( + # "system_stoplist_length_service_time", + # "system_stoplist_length_submission_time", + # ) + # ) + # avg_system_stoplist_length_service_time = ( + # _stops["system_stoplist_length_service_time"] * _stops["state_duration"] + # ).sum() / _stops["state_duration"].sum() + # + # avg_system_stoplist_length_submission_time = ( + # _stops["system_stoplist_length_submission_time"] * _stops["state_duration"] + # ).sum() / _stops["state_duration"].sum() + # + # # not sure if it is necessary to do it again... + # _stops = stops.dropna( + # subset=("stoplist_length_service_time", "stoplist_length_submission_time") + # ) + # + # avg_stoplist_length_submission_time = ( + # _stops["stoplist_length_submission_time"] * _stops["state_duration"] + # ).sum() / _stops["state_duration"].sum() + # + # avg_stoplist_length_service_time = ( + # _stops["stoplist_length_service_time"] * _stops["state_duration"] + # ).sum() / _stops["state_duration"].sum() stops["event_type"] = stops["delta_occupancy"].map({1.0: "pickup", -1.0: "dropoff"}) @@ -1031,33 +1034,78 @@ def get_system_quantities( avg_detour = requests["inferred", "relative_travel_time"].mean() - res = dict( + total_system_time = ( + stops.groupby("vehicle_id") + .apply( + lambda gdf: gdf.loc[gdf["request_id"] == -200, "timestamp"].item() + - gdf.loc[gdf["request_id"] == -100, "timestamp"].item() + ) + .sum() + ) + n_vehicles = len(stops.index.levels[0]) + n_vehicles_used = ( + requests.groupby(("serviced", "vehicle_id")).count().iloc[:, 0] > 0 + ).sum() + avg_request_rate_submitted = len(requests) / (total_system_time / n_vehicles) + avg_request_rate_serviced = len(serviced_requests) / ( + total_system_time / n_vehicles + ) + + avg_relative_travel_time = requests[("inferred", "relative_travel_time")].mean() + + res = {} + if params: + theoretical_request_rate = params.get("request_generator", {}).get( + "rate", np.nan + ) + velocity = params.get("general", {}).get("space").velocity + res |= dict( + n_vehicles=n_vehicles, + theoretical_request_rate=theoretical_request_rate, + max_pickup_delay=params.get("request_generator", {}).get( + "max_pickup_delay", np.nan + ), + max_delivery_delay_rel=params.get("request_generator", {}).get( + "max_delivery_delay_rel", np.nan + ), + velocity=velocity, + load_theoretical=(theoretical_request_rate * avg_direct_dist_submitted) + / (velocity * n_vehicles), + ) + + load_submitted = (avg_request_rate_submitted * avg_direct_dist_submitted) / ( + res["velocity"] * res["n_vehicles"] + ) + load_serviced = (avg_request_rate_serviced * avg_direct_dist_serviced) / ( + res["velocity"] * res["n_vehicles"] + ) + + res |= dict( avg_occupancy=avg_occupancy, + n_vehicles_used=n_vehicles_used, avg_segment_dist=avg_segment_dist, avg_segment_time=avg_segment_time, total_dist_driven=total_dist_driven, total_time_driven=total_time_driven, - avg_direct_dist=avg_direct_dist, + avg_direct_dist=avg_direct_dist_serviced, avg_direct_time=avg_direct_time, total_direct_dist=total_direct_dist, total_direct_time=total_direct_time, efficiency_dist=efficiency_dist, efficiency_time=efficiency_time, - avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, - avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, - avg_stoplist_length_service_time=avg_stoplist_length_service_time, - avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, + # avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, + # avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, + # avg_stoplist_length_service_time=avg_stoplist_length_service_time, + # avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, avg_waiting_time=avg_waiting_time, rejection_ratio=rejection_ratio, median_stoplist_length=median_stoplist_length, avg_detour=avg_detour, + avg_request_rate_submitted=avg_request_rate_submitted, + avg_request_rate_serviced=avg_request_rate_serviced, + load_submitted=load_submitted, + load_serviced=load_serviced, + avg_relative_travel_time=avg_relative_travel_time, ) - if params: - res |= dict( - n_vehicles=params["general"]["n_vehicles"], - request_rate=params["request_generator"]["rate"], - velocity=params["general"]["space"].velocity, - ) - return res diff --git a/ridepy/util/analytics/__init__.py.legacy b/ridepy/util/analytics/__init__.py.legacy new file mode 100644 index 00000000..761f3fc7 --- /dev/null +++ b/ridepy/util/analytics/__init__.py.legacy @@ -0,0 +1,1063 @@ +from collections import defaultdict + +from typing import Iterable, List, Optional, Dict, Any + +import functools as ft +import numpy as np +import pandas as pd + +from ridepy.data_structures import ( + TransportSpace, +) +from ridepy.events import VehicleStateEndEvent, VehicleStateBeginEvent + + +def _create_events_dataframe(events: Iterable[dict]) -> pd.DataFrame: + """ + Create a DataFrame of all logged events with their properties at columns. + + The schema of the returned DataFrame is the following, where + the index is an unnamed integer range index. + + .. code-block:: + + Column Dtype + ------ ----- + request_id int64 + timestamp float64 + vehicle_id float64 + event_type object + location Union[int, float64, Tuple[float64]] + origin Union[int, float64, Tuple[float64]] + destination Union[int, float64, Tuple[float64]] + pickup_timewindow_min float64 + pickup_timewindow_max float64 + delivery_timewindow_min float64 + delivery_timewindow_max float64 + + Parameters + ---------- + events + + Returns + ------- + events DataFrame, indexed by integer range + """ + return pd.DataFrame(events) + + +def _create_stoplist_dataframe(*, evs: pd.DataFrame) -> pd.DataFrame: + """ + Creates a DataFrame containing the stoplists of the transporters. + The location is only returned for the internal stops relating to + VehicleStateBeginEvent and VehicleStateEndEvent. For the rest of + the stops, location is nan. + + The schema of the returned DataFrame is the following, where + `vehicle_id` and `timestamp` are MultiIndex levels. + + .. code-block:: + + Column Dtype + ------ ----- + vehicle_id float64 + timestamp float64 + delta_occupancy float64 + request_id object + state_duration float64 + occupancy float64 + location Union[float64, int, Tuple[float64]] + + Parameters + ---------- + evs + events DataFrame + + Returns + ------- + stoplist DataFrame indexed by `vehicle_id` and `timestamp` + """ + # Create the initial dataframe: Select only events of type + # `PickupEvent` and `DeliveryEvent`, along with their vehicle_id, timestamp, + # and request_id + stops = evs[ + evs["event_type"].isin( + [ + "PickupEvent", + "DeliveryEvent", + "VehicleStateBeginEvent", + "VehicleStateEndEvent", + ] + ) + ][["vehicle_id", "timestamp", "event_type", "request_id", "location"]] + + # Translate PickupEvent and DeliveryEvent into an occupancy delta. + # NOTE: This assumes occupancy delta of +1/-1, i.e. only single-customer requests. + # If the simulator should allow for multi-customer requests in the future, + # this must be changed. + # See also [issue #45](https://github.com/PhysicsOfMobility/ridepy/issues/45) + stops["delta_occupancy"] = stops["event_type"].map( + defaultdict(float, PickupEvent=1, DeliveryEvent=-1) + ) + + # ... and drop the event_type as pickup/dropoff is now signified through delta_occupancy + stops.drop("event_type", axis=1, inplace=True) + + stops.sort_values(["vehicle_id", "timestamp", "request_id"], inplace=True) + + # Fix the stop order. The begin and end stops have identical timestamps as + # other stops, partially on the same vehicle. This is problematic as for + # proper computation of the state durations BEGIN and END **must** be first + # and last stops in every stoplist. + def fix_start_stop_order(df): + # get absolute current positions of the BEGIN/END stops + i_start = (df["request_id"] == -100).argmax() + i_stop = (df["request_id"] == -200).argmax() + + # get dataframe's integer index values for the dummy stops + idx = df.index.to_list() + + k_start = idx[i_start] + k_stop = idx[i_stop] + + # delete the existing stops from the index list... + if i_start < i_stop: + i_stop -= 1 + else: + i_start -= 1 + + del idx[i_start] + del idx[i_stop] + + # ...and insert them at the correct positions + idx.insert(0, k_start) + idx.append(k_stop) + + return df.loc[idx] + + stops = stops.groupby("vehicle_id", as_index=False).apply(fix_start_stop_order) + + # compute the durations of every state and add them as a columns to the dataframe + stops["state_duration"] = ( + stops.groupby("vehicle_id")["timestamp"].diff().shift(-1).fillna(0) + ) + # compute the occupancy as delta_occupancy cumsum + stops["occupancy"] = stops.groupby("vehicle_id")["delta_occupancy"].cumsum() + + # set index to ('vehicle_id, 'stop_id'), where stop_id in 0...N for each vehicle + stops = ( + stops.groupby("vehicle_id") + .apply(lambda df: df.reset_index(drop=True)) + .drop("vehicle_id", axis=1) + ) + stops.index.rename(names="stop_id", level=1, inplace=True) + + # check total operational times of all vehicles are almost identical + iterator = iter(stops.groupby("vehicle_id")["state_duration"].sum()) + try: + first = next(iterator) + except StopIteration: + pass + else: + assert all(np.isclose(first, x) for x in iterator) + + return stops + + +def _create_transportation_requests_dataframe( + *, evs: pd.DataFrame, stops, space +) -> pd.DataFrame: + """ + Create DataFrame containing all requests. + This includes the *submitted* requests, the *accepted* requests and the *rejected* requests. + For the accepted requests, additional *serviced* and subsequently *inferred* data is given, + under the respectively named level-0 column index *source*. Level 1 specifying the actual + kind data of data is named *quantity*. + + The schema of the returned DataFrame is the following, + where `request_id` is the index. + + .. code-block:: + + Column Dtype + ------ ----- + request_id int64 + (submitted, timestamp) float64 + (submitted, origin) float64 + (submitted, destination) float64 + (submitted, pickup_timewindow_min) float64 + (submitted, pickup_timewindow_max) float64 + (submitted, delivery_timewindow_min) float64 + (submitted, delivery_timewindow_max) float64 + (submitted, direct_travel_distance) float64 + (submitted, direct_travel_time) float64 + (accepted, timestamp) float64 + (accepted, origin) float64 + (accepted, destination) float64 + (accepted, pickup_timewindow_min) float64 + (accepted, pickup_timewindow_max) float64 + (accepted, delivery_timewindow_min) float64 + (accepted, delivery_timewindow_max) float64 + (rejected, timestamp) float64 + (inferred, relative_travel_time) float64 + (inferred, travel_time) float64 + (inferred, waiting_time) float64 + (serviced, timestamp_dropoff) float64 + (serviced, timestamp_pickup) float64 + (serviced, vehicle_id) float64 + + Parameters + ---------- + evs + events DataFrame + stops + stoplists DataFrame + space + TransportSpace the simulations were performed on + + Returns + ------- + requests DataFrame indexed by `request_id` + """ + + # first turn all request submission, acceptance and rejection events into a dataframe + reqs = ( + evs[ + evs["event_type"].isin( + [ + "RequestSubmissionEvent", + "RequestAcceptanceEvent", + "RequestRejectionEvent", + ] + ) + ] # select only request events + .drop(["vehicle_id", "location"], axis=1) # drop now empty columns + .set_index( + ["request_id", "event_type"] + ) # set index to be request_id and event_type + .unstack() # unstack so that remaining index is request_id and column index is MultiIndex event_type as level_1 + .reorder_levels( + [1, 0], axis=1 + ) # switch column index order to have event_type as level_0 + .sort_index(axis=1) # sort so that columns are grouped by event_type + .drop( + [ + ("RequestRejectionEvent", "pickup_timewindow_min"), + ("RequestRejectionEvent", "pickup_timewindow_max"), + ("RequestRejectionEvent", "delivery_timewindow_min"), + ("RequestRejectionEvent", "delivery_timewindow_max"), + ("RequestRejectionEvent", "origin"), + ("RequestRejectionEvent", "destination"), + ], + axis=1, + errors="ignore", + ) # drop columns that are empty (nan) for rejections (errors=ignore b/c rejections may not have happened) + .rename( + { + "RequestAcceptanceEvent": "accepted", + "RequestSubmissionEvent": "submitted", + "RequestRejectionEvent": "rejected", + }, + axis=1, + level=0, + ) # rename event_types to the "data source" indicators "accepted", "submitted" and "rejected" + ) + reqs.columns.rename(["source", "quantity"], inplace=True) + + # Get properties of serviced requests from the stops dataframe: + stops_tmp = stops.reset_index()[ + ["request_id", "vehicle_id", "timestamp", "delta_occupancy"] + ].set_index("request_id") + + # - vehicle ID of the vehicle that serviced the request + reqs[("serviced", "vehicle_id")] = stops_tmp[stops_tmp["delta_occupancy"] > 0][ + "vehicle_id" + ] + + # - timestamp of the pickup stop + reqs[("serviced", "timestamp_pickup")] = stops_tmp[ + stops_tmp["delta_occupancy"] > 0 + ]["timestamp"] + + # - timestamp of the dropoff stop + reqs[("serviced", "timestamp_dropoff")] = stops_tmp[ + stops_tmp["delta_occupancy"] < 0 + ]["timestamp"] + + # - travel time + reqs[("inferred", "travel_time")] = ( + reqs[("serviced", "timestamp_dropoff")] - reqs[("serviced", "timestamp_pickup")] + ) + + # If transportation as submitted were submitted, calculate more properties. + # NOTE that these properties might equally well be computed using the + # inferred requests, but in case of differences between the requests + # the resulting change in behavior might not intended. Therefore so far + # we only compute these quantities if transportation_requests are submitted. + + # - direct travel time + # `to_list()` is necessary as for dimensionality > 1 the `pd.Series` will contain tuples + # which will not be understood as a dimension by `np.shape(...)` which subsequently confuses smartVectorize + # see https://github.com/PhysicsOfMobility/ridepy/issues/85 + reqs[("submitted", "direct_travel_time")] = space.t( + reqs[("submitted", "origin")].to_list(), + reqs[("submitted", "destination")].to_list(), + ) + + # - direct travel distance + # again: `to_list()` is necessary as for dimensionality > 1 the `pd.Series` will contain tuples + # which will not be understood as a dimension by `np.shape(...)` which subsequently confuses smartVectorize + # see https://github.com/PhysicsOfMobility/ridepy/issues/85 + reqs[("submitted", "direct_travel_distance")] = space.d( + reqs[("submitted", "origin")].to_list(), + reqs[("submitted", "destination")].to_list(), + ) + + # - waiting time + reqs[("inferred", "waiting_time")] = ( + reqs[("serviced", "timestamp_pickup")] + - reqs[("submitted", "pickup_timewindow_min")] + ) + + # - relative travel time + reqs[("inferred", "relative_travel_time")] = ( + reqs[("inferred", "travel_time")] / reqs[("submitted", "direct_travel_time")] + ) + + # TODO possibly add more properties to compute HERE + + # sort columns alphabetically + # FIXME this is open for debate + reqs.sort_values(["source", "quantity"], axis=1, inplace=True) + return reqs + + +def _add_locations_to_stoplist_dataframe(*, reqs, stops, space) -> pd.DataFrame: + """ + Add non-internal stops' locations to the stoplist DataFrame as inferred + from the *accepted* requests. + + The schema of the returned DataFrame is a superset of the following, where + `vehicle_id` and `timestamp` are MultiIndex levels. + + .. code-block:: + + Column Dtype + ------ ----- + vehicle_id float64 + timestamp float64 + delta_occupancy float64 + request_id object + state_duration float64 + occupancy float64 + location Union[float64, int, Tuple[float64]] + time_to_next float64 + dist_to_next float64 + + Parameters + ---------- + reqs + requests DataFrame + stops + stops DataFrame missing stop locations for non-internal stops + + Returns + ------- + stoplist DataFrame with added stop locations indexed by `vehicle_id` and `timestamp` + """ + + # use the requests' locations and reshape them into a DateFrame indexed by + # `request_id` and `delta_occupancy` + locations = reqs.loc[:, ("accepted", ["origin", "destination"])] + locations.columns = locations.columns.droplevel(0).rename("delta_occupancy") + locations = locations.stack().rename("location") + + # NOTE: This assumes occupancy delta of +1/-1, i.e. only single-customer requests. + # If the simulator should allow for multi-customer requests in the future, + # this must be changed. + # See also [issue #45](https://github.com/PhysicsOfMobility/ridepy/issues/45) + locations.index = locations.index.set_levels( + locations.index.levels[1].map({"origin": 1.0, "destination": -1.0}), + 1, + ) + + # finally fill the locations missing in the stops dataframe by joining on request_id and delta_occupancy + # and subsequently filling up nan from the now index-aligned "location_tmp" series + stops["location"] = stops["location"].fillna( + stops.join(locations, on=["request_id", "delta_occupancy"], rsuffix="_tmp")[ + "location_tmp" + ], + ) + + def dist_time_to_next(df): + locs = df["location"] + + dist_to_next = space.d(locs[:-1].to_list(), locs[1:].to_list()) + df["dist_to_next"] = pd.Series(dist_to_next, index=df.index[:-1]) + + time_to_next = space.t(locs[:-1].to_list(), locs[1:].to_list()) + df["time_to_next"] = pd.Series(time_to_next, index=df.index[:-1]) + + return df + + stops = stops.groupby("vehicle_id").apply(dist_time_to_next) + + return stops[ + [ + "timestamp", + "delta_occupancy", + "request_id", + "state_duration", + "occupancy", + "location", + "dist_to_next", + "time_to_next", + ] + ] + + +def _add_insertion_stats_to_stoplist_dataframe(*, reqs, stops, space) -> pd.DataFrame: + """ + + The schema of the returned DataFrame is a superset of the following, where + `vehicle_id` and `timestamp` are MultiIndex levels. + + .. code-block:: + + Column Dtype + ------ ----- + vehicle_id float64 + stop_id int64 + timestamp float64 + delta_occupancy float64 + request_id int64 + state_duration float64 + occupancy float64 + location object + dist_to_next float64 + time_to_next float64 + timestamp_submitted float64 + insertion_index float64 + leg_1_dist_service_time float64 + leg_2_dist_service_time float64 + leg_direct_dist_service_time float64 + detour_dist_service_time float64 + leg_1_dist_submission_time float64 + leg_2_dist_submission_time float64 + leg_direct_dist_submission_time float64 + detour_dist_submission_time float64 + stoplist_length_submission_time float64 + stoplist_length_service_time float64 + avg_segment_dist_submission_time float64 + avg_segment_time_submission_time float64 + avg_segment_dist_service_time float64 + avg_segment_time_service_time float64 + system_stoplist_length_submission_time float64 + system_stoplist_length_service_time float64 + avg_system_segment_dist_submission_time float64 + avg_system_segment_time_submission_time float64 + avg_system_segment_dist_service_time float64 + avg_system_segment_time_service_time float64 + relative_insertion_position float64 + + + + Parameters + ---------- + reqs + requests DataFrame + stops + stops DataFrame missing stop locations for non-internal stops + + Returns + ------- + stoplist DataFrame with added stop locations indexed by `vehicle_id` and `timestamp` + """ + + stops = pd.merge( + stops, + reqs.submitted.timestamp, + how="left", + left_on="request_id", + right_index=True, + suffixes=(None, "_submitted"), + ) + actual_stops = stops.dropna(subset=("timestamp_submitted",)) + + def _properties_at_time(stop, full_sl, scope): + t = stop["timestamp"] + ts = stop["timestamp_submitted"] + pu = True if stop["delta_occupancy"] > 0 else False + + get_i_pu = lambda _sl, stop: (_sl["request_id"] == stop["request_id"]).argmax() + get_i_do = ( + lambda _sl, stop: len(_sl) + - (_sl["request_id"] == stop["request_id"])[::-1].argmax() + - 1 + ) + + sl = full_sl[ + (full_sl["timestamp_submitted"] <= t) & (t <= full_sl["timestamp"]) + ] + + sl_s = full_sl[ + (full_sl["timestamp_submitted"] <= ts) & (ts <= full_sl["timestamp"]) + ] + + if pu: + i_pu_sl = get_i_pu(sl, stop) + i_pu_sl_s = get_i_pu(sl_s, stop) + i_do_sl_s = get_i_do(sl_s, stop) + + idx_pu = sl_s.iloc[i_pu_sl_s].name + idx_do = sl_s.iloc[i_do_sl_s].name + assert idx_pu == stop.name + + i_stop_sl = i_pu_sl + i_stop_sl_s = i_pu_sl_s + else: + i_do_sl = get_i_do(sl, stop) + + i_pu_sl_s = get_i_pu(sl_s, stop) + i_do_sl_s = get_i_do(sl_s, stop) + + idx_pu = sl_s.iloc[i_pu_sl_s].name + idx_do = sl_s.iloc[i_do_sl_s].name + assert idx_do == stop.name + + i_stop_sl = i_do_sl + i_stop_sl_s = i_do_sl_s + + res = {} + + if scope != "system": + res["insertion_index"] = len(sl_s[sl_s["timestamp"] < t]) + + def _get_legs(i_stop, _sl, time_desc): + l_sl = len(_sl) + if i_stop == 0 == l_sl - 1: + stop_leg_1 = 0 + stop_leg_2 = 0 + stop_leg_d = 0 + elif i_stop == 0: + stop_leg_1 = 0 + stop_leg_2 = space.d( + stop["location"], _sl.iloc[i_stop + 1]["location"] + ) + stop_leg_d = stop_leg_1 + elif i_stop == l_sl - 1: + stop_leg_1 = space.d( + _sl.iloc[i_stop - 1]["location"], stop["location"] + ) + stop_leg_2 = 0 + stop_leg_d = stop_leg_2 + else: + stop_leg_1 = space.d( + _sl.iloc[i_stop - 1]["location"], stop["location"] + ) + stop_leg_2 = space.d( + stop["location"], _sl.iloc[i_stop + 1]["location"] + ) + stop_leg_d = space.d( + _sl.iloc[i_stop - 1]["location"], + _sl.iloc[i_stop + 1]["location"], + ) + + stop_detour = stop_leg_1 + stop_leg_2 - stop_leg_d + + return { + f"leg_1_dist_{time_desc}_time": stop_leg_1, + f"leg_2_dist_{time_desc}_time": stop_leg_2, + f"leg_direct_dist_{time_desc}_time": stop_leg_d, + f"detour_dist_{time_desc}_time": stop_detour, + } + + res |= _get_legs(i_stop_sl, sl, "service") + res |= _get_legs(i_stop_sl_s, sl_s, "submission") + + if pu: + sl.drop([idx_pu, idx_do], inplace=True) + sl_s.drop([idx_pu, idx_do], inplace=True) + else: + sl.drop(idx_do, inplace=True) + sl_s.drop([idx_pu, idx_do], inplace=True) + + res[ + f"{'system_' if scope=='system' else ''}stoplist_length_submission_time" + ] = len(sl_s) + res[ + f"{'system_' if scope=='system' else ''}stoplist_length_service_time" + ] = len(sl) + + res[ + f"avg_{'system_' if scope=='system' else ''}segment_dist_submission_time" + ] = sl_s["dist_to_next"].mean() + res[ + f"avg_{'system_' if scope=='system' else ''}segment_time_submission_time" + ] = sl_s["time_to_next"].mean() + + res[ + f"avg_{'system_' if scope=='system' else ''}segment_dist_service_time" + ] = sl["dist_to_next"].mean() + res[ + f"avg_{'system_' if scope=='system' else ''}segment_time_service_time" + ] = sl["time_to_next"].mean() + + return res + + stops = stops.merge( + actual_stops.groupby("vehicle_id").apply( + lambda df: df.apply( + ft.partial(_properties_at_time, full_sl=df, scope="vehicle"), + axis=1, + result_type="expand", + ) + ), + left_index=True, + right_index=True, + how="left", + ) + + stops = stops.merge( + actual_stops.apply( + ft.partial(_properties_at_time, full_sl=stops, scope="system"), + axis=1, + result_type="expand", + ), + left_index=True, + right_index=True, + how="left", + ) + + with pd.option_context("mode.use_inf_as_na", True): + stops["relative_insertion_position"] = ( + stops["insertion_index"] / stops["stoplist_length_submission_time"] + ).fillna(1) + + return stops + + +def get_stops_and_requests_from_events_dataframe( + *, events_df: pd.DataFrame, space: TransportSpace +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Prepare stops and requests dataframes from an events dataframe. + For details on the returned dataframes see doc on outside-facing `get_stops_and_requests`. + + Parameters + ---------- + events_df + DataFrame indexed + space + + Returns + ------- + stops + dataframe indexed by `[vehicle_id, timestamp]` containing all stops + requests + dataframe indexed by `request_id` containing all requests + """ + stops_df = _create_stoplist_dataframe(evs=events_df) + requests_df = _create_transportation_requests_dataframe( + evs=events_df, stops=stops_df, space=space + ) + + try: + stops_df = _add_locations_to_stoplist_dataframe( + reqs=requests_df, stops=stops_df, space=space + ) + except KeyError: # TODO document this + pass + + return stops_df, requests_df + + +def get_stops_and_requests( + *, events: List[dict], space: TransportSpace +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Prepare two DataFrames, containing stops and requests. + + # NOTE: This assumes occupancy delta of +1/-1, i.e. only single-customer requests. + # If the simulator should allow for multi-customer requests in the future, + # this must be changed. + # See also [issue #45](https://github.com/PhysicsOfMobility/ridepy/issues/45) + + The `stops` DataFrame returned has the following schema: + + .. code-block:: + + Column Dtype + ------ ----- + vehicle_id float64 + stop_id int64 + timestamp float64 + delta_occupancy float64 + request_id int64 + state_duration float64 + occupancy float64 + location object + dist_to_next float64 + time_to_next float64 + timestamp_submitted float64 + insertion_index float64 + leg_1_dist_service_time float64 + leg_2_dist_service_time float64 + leg_direct_dist_service_time float64 + detour_dist_service_time float64 + leg_1_dist_submission_time float64 + leg_2_dist_submission_time float64 + leg_direct_dist_submission_time float64 + detour_dist_submission_time float64 + stoplist_length_submission_time float64 + stoplist_length_service_time float64 + avg_segment_dist_submission_time float64 + avg_segment_time_submission_time float64 + avg_segment_dist_service_time float64 + avg_segment_time_service_time float64 + system_stoplist_length_submission_time float64 + system_stoplist_length_service_time float64 + avg_system_segment_dist_submission_time float64 + avg_system_segment_time_submission_time float64 + avg_system_segment_dist_service_time float64 + avg_system_segment_time_service_time float64 + relative_insertion_position float6 + + + The `requests` DataFrame returned has the following schema: + + .. code-block:: + + Column Dtype + ------ ----- + (submitted, timestamp) float64 + (submitted, origin) Union[float64, int, Tuple[float64]] + (submitted, destination) Union[float64, int, Tuple[float64]] + (submitted, pickup_timewindow_min) float64 + (submitted, pickup_timewindow_max) float64 + (submitted, delivery_timewindow_min) float64 + (submitted, delivery_timewindow_max) float64 + (submitted, direct_travel_distance) float64 + (submitted, direct_travel_time) float64 + (accepted, timestamp) float64 + (accepted, origin) Union[float64, int, Tuple[float64]] + (accepted, destination) Union[float64, int, Tuple[float64]] + (accepted, pickup_timewindow_min) float64 + (accepted, pickup_timewindow_max) float64 + (accepted, delivery_timewindow_min) float64 + (accepted, delivery_timewindow_max) float64 + (rejected, timestamp) float64 + (inferred, relative_travel_time) float64 + (inferred, travel_time) float64 + (inferred, waiting_time) float64 + (serviced, timestamp_dropoff) float64 + (serviced, timestamp_pickup) float64 + (serviced, vehicle_id) float64 + + Parameters + ---------- + events + list of all the events returned by the simulation + space + transportation space that was used for the simulations + + Returns + ------- + stops + dataframe indexed by `[vehicle_id, timestamp]` containing all stops + requests + dataframe indexed by `request_id` containing all requests + """ + + return get_stops_and_requests_from_events_dataframe( + events_df=_create_events_dataframe(events=events), space=space + ) + + +def get_vehicle_quantities(stops: pd.DataFrame, requests: pd.DataFrame) -> pd.DataFrame: + """ + Compute various quantities aggregated **per vehicle**. + + Currently the following observables are returned: + + - avg_occupancy + - avg_segment_dist + - avg_segment_time + - total_dist_driven + - total_time_driven + - avg_direct_dist + - avg_direct_time + - total_direct_dist + - total_direct_time + - efficiency_dist + - efficiency_time + - avg_system_stoplist_length_service_time + - avg_system_stoplist_length_submission_time + - avg_stoplist_length_service_time + - avg_stoplist_length_submission_time + + Parameters + ---------- + stops + Stops dataframe + requests + Requests dataframe + + Returns + ------- + ``pd.DataFrame`` containing the aforementioned observables as columns, indexed by ``vehicle_id`` + """ + serviced_requests = ( + requests[requests[("rejected", "timestamp")].isna()] + if ("rejected", "timestamp") in requests + else requests + ) + + avg_occupancy = stops.groupby("vehicle_id").apply( + lambda gdf: (gdf["occupancy"] * gdf["state_duration"]).sum() + / gdf["state_duration"].sum() + ) + + avg_segment_dist = stops.groupby("vehicle_id")["dist_to_next"].mean() + avg_segment_time = stops.groupby("vehicle_id")["time_to_next"].mean() + + total_dist_driven = stops.groupby("vehicle_id")["dist_to_next"].sum() + total_time_driven = stops.groupby("vehicle_id")["time_to_next"].sum() + + avg_direct_dist = serviced_requests.groupby(("serviced", "vehicle_id")).apply( + lambda gdf: gdf.submitted.direct_travel_distance.mean() + ) + + avg_direct_time = serviced_requests.groupby(("serviced", "vehicle_id")).apply( + lambda gdf: gdf.submitted.direct_travel_time.mean() + ) + + total_direct_dist = serviced_requests.groupby(("serviced", "vehicle_id")).apply( + lambda gdf: gdf.submitted.direct_travel_distance.sum() + ) + + total_direct_time = serviced_requests.groupby(("serviced", "vehicle_id")).apply( + lambda gdf: gdf.submitted.direct_travel_time.sum() + ) + + efficiency_dist = total_direct_dist / total_dist_driven + efficiency_time = total_direct_time / total_time_driven + + avg_system_stoplist_length_service_time = stops.groupby("vehicle_id").apply( + lambda gdf: ( + gdf["system_stoplist_length_service_time"] * gdf["state_duration"] + ).sum() + / gdf["state_duration"].sum() + ) + avg_system_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( + lambda gdf: ( + gdf["system_stoplist_length_submission_time"] * gdf["state_duration"] + ).sum() + / gdf["state_duration"].sum() + ) + + avg_stoplist_length_service_time = stops.groupby("vehicle_id").apply( + lambda gdf: (gdf["stoplist_length_service_time"] * gdf["state_duration"]).sum() + / gdf["state_duration"].sum() + ) + avg_stoplist_length_submission_time = stops.groupby("vehicle_id").apply( + lambda gdf: ( + gdf["stoplist_length_submission_time"] * gdf["state_duration"] + ).sum() + / gdf["state_duration"].sum() + ) + + return pd.DataFrame( + dict( + avg_occupancy=avg_occupancy, + avg_segment_dist=avg_segment_dist, + avg_segment_time=avg_segment_time, + total_dist_driven=total_dist_driven, + total_time_driven=total_time_driven, + avg_direct_dist=avg_direct_dist, + avg_direct_time=avg_direct_time, + total_direct_dist=total_direct_dist, + total_direct_time=total_direct_time, + efficiency_dist=efficiency_dist, + efficiency_time=efficiency_time, + avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, + avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, + avg_stoplist_length_service_time=avg_stoplist_length_service_time, + avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, + ) + ).rename_axis("vehicle_id") + + +def get_system_quantities( + stops: pd.DataFrame, + requests: pd.DataFrame, + params: Optional[dict[str, dict[str, Any]]] = None, +) -> Dict[str, pd.DataFrame]: + """ + Compute various quantities aggregated for the entire simulation. + + Currently the following observables are returned: + + - avg_occupancy + - avg_segment_dist + - avg_segment_time + - total_dist_driven + - total_time_driven + - avg_direct_dist + - avg_direct_time + - total_direct_dist + - total_direct_time + - efficiency_dist + - efficiency_time + - avg_system_stoplist_length_service_time + - avg_system_stoplist_length_submission_time + - avg_stoplist_length_service_time + - avg_stoplist_length_submission_time + - avg_waiting_time + - rejection_ratio + - median_stoplist_length + - avg_detour + - (n_vehicles) + - (request_rate) + - (velocity) + + Parameters + ---------- + stops + Stops dataframe + requests + Requests dataframe + params + Optional, adds more data (the fields in parentheses) to the result for convenience purposes + + Returns + ------- + dict containing the aforementioned observables + + Parameters + ---------- + stops + Stops dataframe + requests + Requests dataframe + + Returns + ------- + dict containing the aforementioned observables + """ + serviced_requests = ( + requests[requests[("rejected", "timestamp")].isna()] + if ("rejected", "timestamp") in requests + else requests + ) + + avg_occupancy = (stops["occupancy"] * stops["state_duration"]).sum() / stops[ + "state_duration" + ].sum() + + avg_segment_dist = stops["dist_to_next"].mean() + avg_segment_time = stops["time_to_next"].mean() + + total_dist_driven = stops["dist_to_next"].sum() + total_time_driven = stops["time_to_next"].sum() + + avg_direct_dist = serviced_requests[("submitted", "direct_travel_distance")].mean() + avg_direct_time = serviced_requests[("submitted", "direct_travel_time")].mean() + + total_direct_dist = serviced_requests[("submitted", "direct_travel_distance")].sum() + total_direct_time = serviced_requests[("submitted", "direct_travel_time")].sum() + + efficiency_dist = total_direct_dist / total_dist_driven + efficiency_time = total_direct_time / total_time_driven + + avg_waiting_time = serviced_requests.inferred.waiting_time.mean() + + rejection_ratio = 1 - len(serviced_requests) / len(requests) + + _stops = stops.dropna( + subset=( + "system_stoplist_length_service_time", + "system_stoplist_length_submission_time", + ) + ) + avg_system_stoplist_length_service_time = ( + _stops["system_stoplist_length_service_time"] * _stops["state_duration"] + ).sum() / _stops["state_duration"].sum() + + avg_system_stoplist_length_submission_time = ( + _stops["system_stoplist_length_submission_time"] * _stops["state_duration"] + ).sum() / _stops["state_duration"].sum() + + # not sure if it is necessary to do it again... + _stops = stops.dropna( + subset=("stoplist_length_service_time", "stoplist_length_submission_time") + ) + + avg_stoplist_length_submission_time = ( + _stops["stoplist_length_submission_time"] * _stops["state_duration"] + ).sum() / _stops["state_duration"].sum() + + avg_stoplist_length_service_time = ( + _stops["stoplist_length_service_time"] * _stops["state_duration"] + ).sum() / _stops["state_duration"].sum() + + stops["event_type"] = stops["delta_occupancy"].map({1.0: "pickup", -1.0: "dropoff"}) + + submission_events = requests.loc[ + :, [("submitted", "timestamp"), ("serviced", "vehicle_id")] + ].dropna() + submission_events.columns = ["timestamp", "vehicle_id"] + submission_events = ( + submission_events.reset_index() + .set_index(["vehicle_id", "timestamp"]) + .sort_index() + .assign(event_type="submission") + ) + + event_log = pd.concat( + [ + stops[["event_type", "request_id"]], + submission_events, + ], + axis="index", + ).sort_index() + + median_stoplist_length = ( + event_log["event_type"] + .map(dict(submission=2, pickup=-1, dropoff=-1)) + .cumsum() + .median() + ) + + avg_detour = requests["inferred", "relative_travel_time"].mean() + + res = dict( + avg_occupancy=avg_occupancy, + avg_segment_dist=avg_segment_dist, + avg_segment_time=avg_segment_time, + total_dist_driven=total_dist_driven, + total_time_driven=total_time_driven, + avg_direct_dist=avg_direct_dist, + avg_direct_time=avg_direct_time, + total_direct_dist=total_direct_dist, + total_direct_time=total_direct_time, + efficiency_dist=efficiency_dist, + efficiency_time=efficiency_time, + avg_system_stoplist_length_service_time=avg_system_stoplist_length_service_time, + avg_system_stoplist_length_submission_time=avg_system_stoplist_length_submission_time, + avg_stoplist_length_service_time=avg_stoplist_length_service_time, + avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, + avg_waiting_time=avg_waiting_time, + rejection_ratio=rejection_ratio, + median_stoplist_length=median_stoplist_length, + avg_detour=avg_detour, + ) + + if params: + res |= dict( + n_vehicles=params["general"]["n_vehicles"], + request_rate=params["request_generator"]["rate"], + velocity=params["general"]["space"].velocity, + ) + + return res From 9440f04b5f4538456e4c48935d2d59992fde07a0 Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Fri, 10 Jun 2022 17:20:31 +0200 Subject: [PATCH 11/14] refactor_analytics version --- ridepy/util/analytics/__init__.py | 70 +++++++++++++++++-------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/ridepy/util/analytics/__init__.py b/ridepy/util/analytics/__init__.py index 8ba21da9..2058d5df 100644 --- a/ridepy/util/analytics/__init__.py +++ b/ridepy/util/analytics/__init__.py @@ -10,6 +10,8 @@ TransportSpace, ) from ridepy.events import VehicleStateEndEvent, VehicleStateBeginEvent +from ridepy.extras.io import create_params_json +from ridepy.util import make_sim_id def _create_events_dataframe(events: Iterable[dict]) -> pd.DataFrame: @@ -915,7 +917,7 @@ def get_system_quantities( - avg_stoplist_length_submission_time - avg_waiting_time - rejection_ratio - - median_stoplist_length + - avg_stoplist_length - avg_detour - (n_vehicles) - (request_rate) @@ -957,6 +959,8 @@ def get_system_quantities( avg_segment_dist = stops["dist_to_next"].mean() avg_segment_time = stops["time_to_next"].mean() + avg_segment_duration = stops["state_duration"].mean() + avg_idle_duration = (stops["state_duration"] - stops["time_to_next"]).mean() total_dist_driven = stops["dist_to_next"].sum() total_time_driven = stops["time_to_next"].sum() @@ -1004,33 +1008,42 @@ def get_system_quantities( # _stops["stoplist_length_service_time"] * _stops["state_duration"] # ).sum() / _stops["state_duration"].sum() - stops["event_type"] = stops["delta_occupancy"].map({1.0: "pickup", -1.0: "dropoff"}) - submission_events = requests.loc[ :, [("submitted", "timestamp"), ("serviced", "vehicle_id")] ].dropna() + submission_events.columns = ["timestamp", "vehicle_id"] submission_events = ( - submission_events.reset_index() + submission_events.reset_index(drop=True) .set_index(["vehicle_id", "timestamp"]) + .assign(n_stops_delta=2) + ) + + stops = stops.copy() + stops["n_stops_delta"] = stops["delta_occupancy"].map({1.0: -1, -1.0: -1, 0.0: 0}) + + stop_events = ( + stops[["timestamp", "n_stops_delta"]] + .reset_index("stop_id", drop=True) + .set_index("timestamp", append=True) + ) + + all_events = ( + pd.concat( + [ + submission_events, + stop_events, + ], + axis="index", + ) .sort_index() - .assign(event_type="submission") + .squeeze() ) - event_log = pd.concat( - [ - stops[["event_type", "request_id"]], - submission_events, - ], - axis="index", - ).sort_index() - - median_stoplist_length = ( - event_log["event_type"] - .map(dict(submission=2, pickup=-1, dropoff=-1)) - .cumsum() - .median() + avg_stoplist_length = ( + all_events.groupby("vehicle_id").cumsum().groupby("vehicle_id").mean().mean() ) + assert not avg_stoplist_length < 0 avg_detour = requests["inferred", "relative_travel_time"].mean() @@ -1051,8 +1064,6 @@ def get_system_quantities( total_system_time / n_vehicles ) - avg_relative_travel_time = requests[("inferred", "relative_travel_time")].mean() - res = {} if params: theoretical_request_rate = params.get("request_generator", {}).get( @@ -1071,20 +1082,20 @@ def get_system_quantities( velocity=velocity, load_theoretical=(theoretical_request_rate * avg_direct_dist_submitted) / (velocity * n_vehicles), + seat_capacity=params.get("general", {}).get("seat_capacity", np.nan), + load_submitted=(avg_request_rate_submitted * avg_direct_dist_submitted) + / (velocity * n_vehicles), + load_serviced=(avg_request_rate_serviced * avg_direct_dist_serviced) + / (velocity * n_vehicles), ) - load_submitted = (avg_request_rate_submitted * avg_direct_dist_submitted) / ( - res["velocity"] * res["n_vehicles"] - ) - load_serviced = (avg_request_rate_serviced * avg_direct_dist_serviced) / ( - res["velocity"] * res["n_vehicles"] - ) - res |= dict( avg_occupancy=avg_occupancy, n_vehicles_used=n_vehicles_used, avg_segment_dist=avg_segment_dist, avg_segment_time=avg_segment_time, + avg_segment_duration=avg_segment_duration, + avg_idle_duration=avg_idle_duration, total_dist_driven=total_dist_driven, total_time_driven=total_time_driven, avg_direct_dist=avg_direct_dist_serviced, @@ -1099,13 +1110,10 @@ def get_system_quantities( # avg_stoplist_length_submission_time=avg_stoplist_length_submission_time, avg_waiting_time=avg_waiting_time, rejection_ratio=rejection_ratio, - median_stoplist_length=median_stoplist_length, + avg_stoplist_length=avg_stoplist_length, avg_detour=avg_detour, avg_request_rate_submitted=avg_request_rate_submitted, avg_request_rate_serviced=avg_request_rate_serviced, - load_submitted=load_submitted, - load_serviced=load_serviced, - avg_relative_travel_time=avg_relative_travel_time, ) return res From 3f64c2015950c27b2bf8261a62e36079ae21defc Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Mon, 13 Jun 2022 16:28:42 +0200 Subject: [PATCH 12/14] bugs fixed --- ridepy/extras/simulation_set.py | 73 +++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index e03ad987..9f5bf9d1 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -57,7 +57,7 @@ read_params_json, ) -logger = logging.getLogger(__name__) +log = logging.getLogger(__name__) def freeze_two_level_dict( @@ -235,16 +235,16 @@ def perform_single_simulation( ): # assume that a previous simulation run already exists. this works because we write # to param_path *after* a successful simulation run. - logger.info( + log.info( f"Pre-existing param json exists for {params_json=} at {param_path=}, skipping simulation" ) return sim_id else: - logger.info( + log.info( f"No pre-existing param json exists for {params_json=} at {param_path=}, running simulation" ) if event_path.exists(): - logger.info( + log.info( f"Potentially incomplete simulation data exists at {event_path=}, this will be overwritten" ) event_path.unlink() @@ -382,11 +382,9 @@ class SimulationSet(MutableSet): """ _wrapped_methods = [ - "add", "copy", "difference", "difference_update", - "discard", "intersection", "intersection_update", "symmetric_difference", @@ -613,6 +611,9 @@ def __init__( self.default_base_params, base_params ) + if single_combinations: + single_combinations = list(single_combinations) + # make parameters immutable if single_combinations: @@ -633,9 +634,7 @@ def __init__( ) if single_combinations is not None: - self._single_combinations = set( - map(freeze_two_level_dict, single_combinations) - ) + self._single_combinations = set(single_combinations) elif not self.compute_cardinality_product_params_zip_params( product_params=product_params, zip_params=zip_params ): @@ -648,17 +647,20 @@ def __init__( self._update_parameter_combinations() - self._simulation_ids = None + self._simulation_ids = { + make_sim_id(params_json=create_params_json(params=params)) + for params in self.param_combinations + } self.system_quantities_path = None @property - def simulation_ids(self) -> list[str]: + def simulation_ids(self) -> set[str]: """ Get simulation IDs. """ # protect simulation ids - return self._simulation_ids if self._simulation_ids is not None else [] + return self._simulation_ids @property def param_paths(self) -> list[Path]: @@ -810,7 +812,7 @@ def run(self, dry_run=False): If True, do not actually simulate. """ - self._simulation_ids = simulate_parameter_combinations( + simulate_parameter_combinations( param_combinations=iter(self), data_dir=self.data_dir, debug=self.debug, @@ -920,28 +922,37 @@ def __getattr__(self, attr): def _wrap_method(method, o): # Note that this uses self's properties for everything except the simulation # parameters. This is intended behavior. - return lambda *args, **kwargs: SimulationSet( - single_combinations=getattr(o.param_combinations, method)(*args, **kwargs), - data_dir=o.data_dir, - cython=o.use_cython, - debug=o.debug, - max_workers=o.max_workers, - process_chunksize=o.process_chunksize, - jsonl_chunksize=o.jsonl_chunksize, - event_path_suffix=o._event_path_suffix, - param_path_suffix=o._param_path_suffix, - validate=o.validated, - ) + def wrapped(*args): + log.debug(f"Was told to {method}") + args = [arg.param_combinations for arg in args] + return SimulationSet( + single_combinations=getattr(o.param_combinations, method)(*args), + data_dir=o.data_dir, + cython=o.use_cython, + debug=o.debug, + max_workers=o.max_workers, + process_chunksize=o.process_chunksize, + jsonl_chunksize=o.jsonl_chunksize, + event_path_suffix=o._event_path_suffix, + param_path_suffix=o._param_path_suffix, + validate=o.validated, + ) + + return wrapped def add(self, item): - # This is dynamically overwritten in SimulationSet._wrap_method, - # but is necessary here to be able to instantiate the class. - ... + log.debug(f"Was told to add") + self.param_combinations.add(item) + self._simulation_ids.add( + make_sim_id(params_json=create_params_json(params=item)) + ) def discard(self, item): - # This is dynamically overwritten in SimulationSet._wrap_method - # but is necessary here to be able to instantiate the class. - ... + log.debug(f"Was told to discard") + self.param_combinations.discard(item) + self._simulation_ids.discard( + make_sim_id(params_json=create_params_json(params=item)) + ) def __str__(self): return ( From 3a0e188a42211a4f4dc78e5f7bad83986ae79e6d Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Mon, 13 Jun 2022 17:35:21 +0200 Subject: [PATCH 13/14] more updates to sim set --- ridepy/extras/simulation_set.py | 78 ++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 25 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 9f5bf9d1..925bab42 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -539,6 +539,9 @@ def __init__( self.jsonl_chunksize = jsonl_chunksize self.data_dir = Path(data_dir) + self._param_data_dir = self.data_dir + self._event_data_dir = self.data_dir + self._event_path_suffix = event_path_suffix self._param_path_suffix = param_path_suffix @@ -652,8 +655,6 @@ def __init__( for params in self.param_combinations } - self.system_quantities_path = None - @property def simulation_ids(self) -> set[str]: """ @@ -662,26 +663,6 @@ def simulation_ids(self) -> set[str]: # protect simulation ids return self._simulation_ids - @property - def param_paths(self) -> list[Path]: - """ - Get list of JSON parameter files. - """ - return [ - self.data_dir / f"{simulation_id}{self._param_path_suffix}" - for simulation_id in self.simulation_ids - ] - - @property - def event_paths(self) -> list[Path]: - """ - Get list of resulting output event JSON Lines file paths. - """ - return [ - self.data_dir / f"{simulation_id}{self._event_path_suffix}" - for simulation_id in self.simulation_ids - ] - @property def base_params(self): return self._base_params @@ -831,9 +812,7 @@ def run_analytics( requests_path_suffix: str = "_requests.pq", only_stops_and_requests: bool = False, # only compute stops and requests vehicle_quantities_path_suffix: str = "_vehicle_quantities.pq", - system_quantities_filename: str = "system_quantities.pq", ): - self.system_quantities_path = self.data_dir / system_quantities_filename if not self.simulation_ids: warnings.warn( @@ -845,7 +824,7 @@ def run_analytics( else: compute_vehicle_quantities = True compute_system_quantities = ( - not self.system_quantities_path.exists() or update_existing + self.system_quantities_path is None or update_existing ) with loky.get_reusable_executor(max_workers=self.max_workers) as executor: @@ -968,5 +947,54 @@ def __str__(self): f"validate={self.validated!r})" ) + def get_parameter(self, parameter: tuple[str, str]) -> pd.Series: + index = self.simulation_ids + values = [] + for sim_id in index: + params = get_params( + directory=self._param_data_dir, + sim_id=sim_id, + param_path_suffix=self._param_path_suffix, + ) + values.append(params[parameter[0]][parameter[1]]) + + return pd.Series(values, index=index, name=parameter) + # def __repr__(self): # return f"SimulationSet(...)" + @property + def system_quantities_path(self): + if (self.hpc_output_dir_local / "system_quantities.pq").exists(): + return self.hpc_output_dir_local / "system_quantities.pq" + + @property + def param_paths(self) -> list[Path]: + """ + Get list of JSON parameter files. + """ + res = [] + + for simulation_id in self.simulation_ids: + param_path = ( + self.hpc_input_dir_local / f"{simulation_id}{self._param_path_suffix}" + ) + if param_path.exists(): + res.append(param_path) + + return res + + @property + def event_paths(self) -> list[Path]: + """ + Get list of resulting output event JSON Lines file paths. + """ + res = [] + + for simulation_id in self.simulation_ids: + event_path = ( + self.hpc_output_dir_local / f"{simulation_id}{self._event_path_suffix}" + ) + if event_path.exists(): + res.append(event_path) + + return res From 2f02cb8de18072fed54cd8eb2907871ce7a1fefd Mon Sep 17 00:00:00 2001 From: Felix Jung Date: Wed, 29 Jun 2022 18:59:47 +0200 Subject: [PATCH 14/14] removed bogus hpc stuff from sim set --- ridepy/extras/simulation_set.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ridepy/extras/simulation_set.py b/ridepy/extras/simulation_set.py index 925bab42..b43608b3 100644 --- a/ridepy/extras/simulation_set.py +++ b/ridepy/extras/simulation_set.py @@ -962,10 +962,11 @@ def get_parameter(self, parameter: tuple[str, str]) -> pd.Series: # def __repr__(self): # return f"SimulationSet(...)" + @property def system_quantities_path(self): - if (self.hpc_output_dir_local / "system_quantities.pq").exists(): - return self.hpc_output_dir_local / "system_quantities.pq" + if (self.data_dir / "system_quantities.pq").exists(): + return self.data_dir / "system_quantities.pq" @property def param_paths(self) -> list[Path]: @@ -975,9 +976,7 @@ def param_paths(self) -> list[Path]: res = [] for simulation_id in self.simulation_ids: - param_path = ( - self.hpc_input_dir_local / f"{simulation_id}{self._param_path_suffix}" - ) + param_path = self.data_dir / f"{simulation_id}{self._param_path_suffix}" if param_path.exists(): res.append(param_path) @@ -991,9 +990,7 @@ def event_paths(self) -> list[Path]: res = [] for simulation_id in self.simulation_ids: - event_path = ( - self.hpc_output_dir_local / f"{simulation_id}{self._event_path_suffix}" - ) + event_path = self.data_dir / f"{simulation_id}{self._event_path_suffix}" if event_path.exists(): res.append(event_path)