From c48211941a8fc4cec108bc9b5f4240100254a3a6 Mon Sep 17 00:00:00 2001 From: Adam Li Date: Wed, 2 Nov 2022 22:29:50 -0400 Subject: [PATCH 1/5] Add initial working tsPC skeleton method and WIP tsPC method itself --- doc/references.bib | 8 + dodiscover/_protocol.py | 25 +- dodiscover/ci/oracle.py | 6 +- dodiscover/ci/simulate.py | 6 + dodiscover/constraint/fcialg.py | 4 +- dodiscover/constraint/skeleton.py | 125 ++---- dodiscover/constraint/timeseries/__init__.py | 1 + dodiscover/constraint/timeseries/skeleton.py | 361 ++++++++++++++++++ dodiscover/constraint/timeseries/tsfcialg.py | 0 dodiscover/constraint/timeseries/tspcalg.py | 168 ++++++++ dodiscover/constraint/timeseries/utils.py | 26 ++ dodiscover/constraint/utils.py | 81 ++++ dodiscover/context.py | 202 +++++++++- dodiscover/context_builder.py | 192 ++++++++-- dodiscover/typing.py | 1 + dodiscover/utils.py | 10 + pyproject.toml | 4 +- tests/unit_tests/constraint/test_skeleton.py | 2 +- .../constraint/timeseries/test_skeleton.py | 240 ++++++++++++ tests/unit_tests/test_context_builder.py | 38 +- 20 files changed, 1356 insertions(+), 144 deletions(-) create mode 100644 dodiscover/constraint/timeseries/__init__.py create mode 100644 dodiscover/constraint/timeseries/skeleton.py create mode 100644 dodiscover/constraint/timeseries/tsfcialg.py create mode 100644 dodiscover/constraint/timeseries/tspcalg.py create mode 100644 dodiscover/constraint/timeseries/utils.py create mode 100644 dodiscover/utils.py create mode 100644 tests/unit_tests/constraint/timeseries/test_skeleton.py diff --git a/doc/references.bib b/doc/references.bib index 10f0e8db..289508d1 100644 --- a/doc/references.bib +++ b/doc/references.bib @@ -16,6 +16,14 @@ @article{Colombo2012 url = {https://doi.org/10.1214/11-AOS940} } +@article{Entner2010causal, + title = {On causal discovery from time series data using FCI}, + author = {Entner, Doris and Hoyer, Patrik O}, + journal = {Probabilistic graphical models}, + pages = {121--128}, + year = {2010} +} + @article{Lopez2016revisiting, title = {Revisiting classifier two-sample tests}, author = {Lopez-Paz, David and Oquab, Maxime}, diff --git a/dodiscover/_protocol.py b/dodiscover/_protocol.py index bcc7a470..63208975 100644 --- a/dodiscover/_protocol.py +++ b/dodiscover/_protocol.py @@ -1,7 +1,9 @@ -from typing import Dict, FrozenSet, Iterable, Protocol +from typing import Dict, FrozenSet, Iterable, List, Protocol import networkx as nx +from dodiscover.typing import Column + class Graph(Protocol): """Protocol for graphs to work with dodiscover algorithms.""" @@ -27,7 +29,7 @@ def remove_node(self, u) -> None: """Remove a node from the graph.""" pass - def remove_edges_from(self, edges) -> None: + def remove_edges_from(self, edges: List) -> None: """Remove a set of edges from the graph.""" pass @@ -48,6 +50,25 @@ def to_undirected(self) -> nx.Graph: pass +class TimeSeriesGraph(Graph, Protocol): + """A protocol for time-series graphs.""" + + def lagged_neighbors(self, u: Column) -> Iterable: + """Return neighbors of u that are in a previous time point.""" + pass + + def contemporaneous_neighbors(self, u: Column) -> Iterable: + """Return neighbors of u that are in the same time point.""" + pass + + # TODO: refactor to + # 1. remove_forward_homologous_edges(self, u, v) + # 2. remove_backward_homologous_edges(self, u, v) + def set_auto_removal(self, auto: bool) -> None: + """Specify how to auto-remove homologous edges.""" + pass + + class EquivalenceClass(Graph, Protocol): """Protocol for equivalence class of graphs.""" diff --git a/dodiscover/ci/oracle.py b/dodiscover/ci/oracle.py index f0da16dc..a0c62ed0 100644 --- a/dodiscover/ci/oracle.py +++ b/dodiscover/ci/oracle.py @@ -1,4 +1,4 @@ -from typing import Optional, Set +from typing import Optional, Set, Union import networkx as nx import numpy as np @@ -6,7 +6,7 @@ from dodiscover.typing import Column -from .._protocol import Graph +from .._protocol import Graph, TimeSeriesGraph from .base import BaseConditionalIndependenceTest @@ -23,7 +23,7 @@ class Oracle(BaseConditionalIndependenceTest): _allow_multivariate_input: bool = True - def __init__(self, graph: Graph) -> None: + def __init__(self, graph: Union[Graph, TimeSeriesGraph]) -> None: self.graph = graph def test( diff --git a/dodiscover/ci/simulate.py b/dodiscover/ci/simulate.py index 8b030f53..e235b799 100644 --- a/dodiscover/ci/simulate.py +++ b/dodiscover/ci/simulate.py @@ -103,3 +103,9 @@ def nonlinear_additive_gaussian( Y = nonlinear_func(freq * (2 * Axy * X + Z * Azy + std * Y_noise + cause_var)) return X, Y, Z + + +def vector_auto_regressive_from_summary( + summary_G, max_lag=1, n_times=1000, random_state: int = None +): + pass diff --git a/dodiscover/constraint/fcialg.py b/dodiscover/constraint/fcialg.py index 06b87869..2a52c3e1 100644 --- a/dodiscover/constraint/fcialg.py +++ b/dodiscover/constraint/fcialg.py @@ -685,7 +685,9 @@ def learn_skeleton( d.pop("test_stat") if "pvalue" in d: d.pop("pvalue") - context = make_context(context).graph(new_init_graph).state_variable("PAG", pag).build() + context = ( + make_context(context).init_graph(new_init_graph).state_variable("PAG", pag).build() + ) # # now compute all possibly d-separating sets and learn a better skeleton skel_alg = LearnSemiMarkovianSkeleton( diff --git a/dodiscover/constraint/skeleton.py b/dodiscover/constraint/skeleton.py index b0512345..af7bd19a 100644 --- a/dodiscover/constraint/skeleton.py +++ b/dodiscover/constraint/skeleton.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict -from itertools import chain, combinations -from typing import Iterable, Optional, Set, SupportsFloat, Tuple, Union +from itertools import chain +from typing import Optional, Set, Tuple import networkx as nx import numpy as np @@ -13,86 +13,11 @@ from ..context import Context from ..context_builder import make_context +from .utils import _find_neighbors_along_path, _iter_conditioning_set logger = logging.getLogger() -def _iter_conditioning_set( - possible_variables: Iterable, - x_var: Union[SupportsFloat, str], - y_var: Union[SupportsFloat, str], - size_cond_set: int, -) -> Iterable[Set]: - """Iterate function to generate the conditioning set. - - Parameters - ---------- - possible_variables : iterable - A set/list/dict of possible variables to consider for the conditioning set. - This can be for example, the current adjacencies. - x_var : node - The node for the 'x' variable. - y_var : node - The node for the 'y' variable. - size_cond_set : int - The size of the conditioning set to consider. If there are - less adjacent variables than this number, then all variables will be in the - conditioning set. - - Yields - ------ - Z : set - The set of variables for the conditioning set. - """ - exclusion_set = {x_var, y_var} - - all_adj_excl_current = [p for p in possible_variables if p not in exclusion_set] - - # loop through all possible combinations of the conditioning set size - for cond in combinations(all_adj_excl_current, size_cond_set): - cond_set = set(cond) - yield cond_set - - -def _find_neighbors_along_path(G: nx.Graph, start, end) -> Set: - """Find neighbors that are along a path from start to end. - - Parameters - ---------- - G : nx.Graph - The graph. - start : Node - The starting node. - end : Node - The ending node. - - Returns - ------- - nbrs : Set - The set of neighbors that are also along a path towards - the 'end' node. - """ - - def _assign_weight(u, v, edge_attr): - if u == node or v == node: - return np.inf - else: - return 1 - - nbrs = set() - for node in G.neighbors(start): - if not G.has_edge(start, node): - raise RuntimeError(f"{start} and {node} are not connected, but they are assumed to be.") - - # find a path from start node to end - path = nx.shortest_path(G, source=node, target=end, weight=_assign_weight) - if len(path) > 0: - if start in path: - raise RuntimeError("There is an error with the input. This is not possible.") - nbrs.add(node) - return nbrs - - class LearnSkeleton: """Learn a skeleton graph from a Markovian causal model. @@ -268,6 +193,9 @@ def _initialize_params(self) -> None: else: self.max_combinations_ = self.max_combinations + # track progress of the algorithm for which edges to remove to ensure stability + self.remove_edges = set() + def evaluate_edge( self, data: pd.DataFrame, X: Column, Y: Column, Z: Optional[Set[Column]] = None ) -> Tuple[float, float]: @@ -297,6 +225,24 @@ def evaluate_edge( self.n_ci_tests += 1 return test_stat, pvalue + def _preprocess_data(self, data: pd.DataFrame, context: Context): + if set(context.observed_variables) != set(data.columns.values): + raise RuntimeError( + "The observed variable names in data and context do not match: \n" + f"- {context.observed_variables} \n" + f"- {data.columns}" + ) + + edge_attrs = set( + chain.from_iterable(d.keys() for *_, d in context.init_graph.edges(data=True)) + ) + if "test_stat" in edge_attrs or "pvalue" in edge_attrs: + raise RuntimeError( + "Running skeleton discovery with adjacency graph " + "with 'test_stat' or 'pvalue' is not supported yet." + ) + return data, context + def fit(self, data: pd.DataFrame, context: Context) -> None: """Run structure learning to learn the skeleton of the causal graph. @@ -307,28 +253,19 @@ def fit(self, data: pd.DataFrame, context: Context) -> None: context : Context A context object. """ + data, context = self._preprocess_data(data, context) self.context = make_context(context).build() + # initialize learning parameters + self._initialize_params() + # get the initialized graph adj_graph = self.context.init_graph X = data - # track progress of the algorithm for which edges to remove to ensure stability - self.remove_edges = set() - - # initialize learning parameters - self._initialize_params() - # the size of the conditioning set will start off at the minimum size_cond_set = self.min_cond_set_size_ - edge_attrs = set(chain.from_iterable(d.keys() for *_, d in adj_graph.edges(data=True))) - if "test_stat" in edge_attrs or "pvalue" in edge_attrs: - raise RuntimeError( - "Running skeleton discovery with adjacency graph " - "with 'test_stat' or 'pvalue' is not supported yet." - ) - # store the absolute value of test-statistic values and pvalue for # every single candidate parent-child edge (X -> Y) nx.set_edge_attributes(adj_graph, np.inf, "test_stat") @@ -372,12 +309,6 @@ def fit(self, data: pd.DataFrame, context: Context) -> None: skeleton_method=self.skeleton_method, ) - logger.debug( - f"Adj({x_var}) without {y_var} with size={len(possible_adjacencies) - 1} " - f"with p={size_cond_set}. The possible variables to condition on are: " - f"{possible_variables}." - ) - # check that number of adjacencies is greater then the # cardinality of the conditioning set if len(possible_variables) < size_cond_set: diff --git a/dodiscover/constraint/timeseries/__init__.py b/dodiscover/constraint/timeseries/__init__.py new file mode 100644 index 00000000..91adae29 --- /dev/null +++ b/dodiscover/constraint/timeseries/__init__.py @@ -0,0 +1 @@ +from .skeleton import LearnTimeSeriesSkeleton diff --git a/dodiscover/constraint/timeseries/skeleton.py b/dodiscover/constraint/timeseries/skeleton.py new file mode 100644 index 00000000..9972f7aa --- /dev/null +++ b/dodiscover/constraint/timeseries/skeleton.py @@ -0,0 +1,361 @@ +import logging +from itertools import chain +from typing import Iterator, Optional, Set, Tuple + +import networkx as nx +import numpy as np +import pandas as pd + +from dodiscover._protocol import TimeSeriesGraph +from dodiscover.ci import BaseConditionalIndependenceTest +from dodiscover.constraint.config import SkeletonMethods +from dodiscover.context import ContextType +from dodiscover.typing import Column, SeparatingSet + +from ...context_builder import make_ts_context +from ..skeleton import LearnSkeleton +from ..utils import _iter_conditioning_set +from .utils import convert_ts_df_to_multiindex + +logger = logging.getLogger() + + +def nodes_in_time_order(G: TimeSeriesGraph) -> Iterator: + """Return nodes from G in time order starting from max-lag to t=0.""" + for t in range(G.max_lag, -1, -1): + for node in G.nodes_at(t): + yield node + + +class LearnTimeSeriesSkeleton(LearnSkeleton): + """Learn a skeleton time-series graph from a Markovian causal model. + + Parameters + ---------- + ci_estimator : BaseConditionalIndependenceTest + The conditional independence test function. + sep_set : dictionary of dictionary of list of set + Mapping node to other nodes to separating sets of variables. + If ``None``, then an empty dictionary of dictionary of list of sets + will be initialized. + alpha : float, optional + The significance level for the conditional independence test, by default 0.05. + min_cond_set_size : int + The minimum size of the conditioning set, by default 0. The number of variables + used in the conditioning set. + max_cond_set_size : int, optional + Maximum size of the conditioning set, by default None. Used to limit + the computation spent on the algorithm. + max_combinations : int, optional + The maximum number of conditional independence tests to run from the set + of possible conditioning sets. By default None, which means the algorithm will + check all possible conditioning sets. If ``max_combinations=n`` is set, then + for every conditioning set size, 'p', there will be at most 'n' CI tests run + before the conditioning set size 'p' is incremented. For controlling the size + of 'p', see ``min_cond_set_size`` and ``max_cond_set_size``. This can be used + in conjunction with ``keep_sorted`` parameter to only test the "strongest" + dependences. + skeleton_method : SkeletonMethods + The method to use for testing conditional independence. Must be one of + ('complete', 'neighbors', 'neighbors_path'). See Notes for more details. + keep_sorted : bool + Whether or not to keep the considered conditioning set variables in sorted + dependency order. If True (default) will sort the existing dependencies of each variable + by its dependencies from strongest to weakest (i.e. largest CI test statistic value + to lowest). This can be used in conjunction with ``max_combinations`` parameter + to only test the "strongest" dependences. + separate_lag_phase : bool, + Whether or not to separate the lagged and contemporaneous skeleton learning + phase. If False (default), then will test all CI dependences in the same loop. + contemporaneous_edges : bool, + Whether or not there are contemporaneous edges (i.e. edges that occur at the same time point + between two nodes). By default is True. + """ + + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + sep_set: Optional[SeparatingSet] = None, + alpha: float = 0.05, + min_cond_set_size: int = 0, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + keep_sorted: bool = False, + latent_confounding: bool = False, + max_path_length: Optional[int] = None, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ) -> None: + super().__init__( + ci_estimator, + sep_set, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + keep_sorted, + **ci_estimator_kwargs, + ) + self.max_path_length = max_path_length + self.latent_confounding = latent_confounding + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def evaluate_edge( + self, data: pd.DataFrame, X: Column, Y: Column, Z: Optional[Set[Column]] = None + ) -> Tuple[float, float]: + """Evaluate an edge, but the data frame has columns as '_'.""" + return super().evaluate_edge(data, X, Y, Z) + + def _learn_skeleton( + self, data: pd.DataFrame, adj_graph: TimeSeriesGraph, nbr_search: str = "all" + ): + """Private method to learn a skeleton""" + # the size of the conditioning set will start off at the minimum + size_cond_set = self.min_cond_set_size_ + + # If there is latent confounding, we need to test all nodes starting from + # max-lag. Otherwise, we can + if self.latent_confounding: + # depends on whether or not there are latent confounders + testable_nodes = list(nodes_in_time_order(adj_graph)) + + # to do causal discovery of time-series graphs, + # homologous edges should not be removed automatically + adj_graph.set_auto_removal("forwards") + else: + # depends on whether or not there are latent confounders + testable_nodes = set(adj_graph.nodes_at(0)) + + # to do causal discovery of time-series graphs, + # homologous edges should not be removed automatically + adj_graph.set_auto_removal("backwards") + + print(f"Testable nodes: {testable_nodes}") + + # Outer loop: iterate over 'size_cond_set' until stopping criterion is met + # - 'size_cond_set' > 'max_cond_set_size' or + # - All (X, Y) pairs have candidate conditioning sets of size < 'size_cond_set' + while 1: + cont = False + # initialize set of edges to remove at the end of every loop + self.remove_edges = set() + + # loop through every node + # Note: in time-series graphs, all nodes are defined as a 2-tuple + # of (, ) + for y_var in testable_nodes: + # we only consider variables with required lab + if not self.latent_confounding and y_var[1] != 0: + continue + + # TODO: need more efficient way of querying all possible edges + if nbr_search == "all": + lagged_nbrs = adj_graph.lagged_neighbors(y_var) + contemporaneous_nbrs = adj_graph.contemporaneous_neighbors(y_var) + possible_adjacencies = set(lagged_nbrs).union(set(contemporaneous_nbrs)) + elif nbr_search == "lagged": + possible_adjacencies = adj_graph.lagged_neighbors(y_var) + elif nbr_search == "contemporaneous": + possible_adjacencies = adj_graph.contemporaneous_neighbors(y_var) + + logger.info(f"Considering node {y_var}...\n\n") + + for x_var in possible_adjacencies: + # a node cannot be a parent to itself in DAGs + if y_var == x_var: + continue + + # ignore fixed edges + if (x_var, y_var) in self.context.included_edges.edges: + continue + + # compute the possible variables used in the conditioning set + possible_variables = self._compute_candidate_conditioning_sets( + adj_graph, + y_var, + x_var, + skeleton_method=self.skeleton_method, + ) + + logger.debug( + f"Adj({x_var}) without {y_var} with size={len(possible_adjacencies) - 1} " + f"with p={size_cond_set}. The possible variables to condition on are: " + f"{possible_variables}." + ) + + # check that number of adjacencies is greater then the + # cardinality of the conditioning set + if len(possible_variables) < size_cond_set: + logger.debug( + f"\n\nBreaking for {x_var}, {y_var}, {len(possible_adjacencies)}, " + f"{size_cond_set}, {possible_variables}" + ) + continue + else: + cont = True + + # generate iterator through the conditioning sets + conditioning_sets = _iter_conditioning_set( + possible_variables=possible_variables, + x_var=x_var, + y_var=y_var, + size_cond_set=size_cond_set, + ) + + # now iterate through the possible parents + for comb_idx, cond_set in enumerate(conditioning_sets): + # check the number of combinations of possible parents we have tried + # to use as a separating set + if ( + self.max_combinations_ is not None + and comb_idx >= self.max_combinations_ + ): + break + + # compute conditional independence test + test_stat, pvalue = self.evaluate_edge(data, x_var, y_var, set(cond_set)) + + # if any "independence" is found through inability to reject + # the null hypothesis, then we will break the loop comparing X and Y + # and say X and Y are conditionally independent given 'cond_set' + if pvalue > self.alpha: + logger.debug(f"Removing {x_var} - {y_var} with {cond_set}.") + break + + # post-process the CI test results + removed_edge = self._postprocess_ci_test( + adj_graph, x_var, y_var, cond_set, test_stat, pvalue + ) + + # summarize the comparison of XY + self._summarize_xy_comparison(x_var, y_var, removed_edge, pvalue) + + # finally remove edges after performing + # conditional independence tests + logger.info("\n---------------------------------------------") + logger.info(f"For p = {size_cond_set}, removing all edges: {self.remove_edges}") + + # TODO: should not hack the removal of edges to remove + from_set = [] + to_set = [] + for u, v in self.remove_edges: + # the opposite is already in there... + if v in from_set and u in to_set: + continue + from_set.append(u) + to_set.append(v) + self.remove_edges = set() + for u, v in zip(from_set, to_set): + self.remove_edges.add((u, v)) + + # Remove non-significant links + # Note: Removing edges at the end ensures "stability" of the algorithm + # with respect to the randomness choice of pairs of edges considered in the inner loop + adj_graph.remove_edges_from(self.remove_edges) + + # increment the conditioning set size + size_cond_set += 1 + + # only allow conditioning set sizes up to maximum set number + if size_cond_set > self.max_cond_set_size_ or cont is False: + break + + return adj_graph + + def fit(self, data: pd.DataFrame, context: ContextType) -> None: + """Run structure learning to learn the skeleton of the causal graph. + + Parameters + ---------- + data : pd.DataFrame + The data to learn the causal graph from. + context : Context + A context object. + """ + if self.separate_lag_phase and not self.contemporaneous_edges: + raise ValueError( + "There is assumed no contemporaneous edges, but you also " + "specified to separate the lag and contemporaneous phase." + ) + + data, context = self._preprocess_data(data, context) + self.context = ( + make_ts_context(context).observed_variables(data.columns.values.tolist()).build() + ) + + # initialize learning parameters + self._initialize_params() + + # get the initialized graph + adj_graph: TimeSeriesGraph = self.context.init_graph + + # store the absolute value of test-statistic values and pvalue for + # every single candidate parent-child edge (X -> Y) + nx.set_edge_attributes(adj_graph, np.inf, "test_stat") + nx.set_edge_attributes(adj_graph, -1e-5, "pvalue") + + logger.info( + f"\n\nRunning skeleton phase with: \n" + f"max_combinations: {self.max_combinations_},\n" + f"min_cond_set_size: {self.min_cond_set_size_},\n" + f"max_cond_set_size: {self.max_cond_set_size_},\n" + ) + + # learn the skeleton graph + if self.separate_lag_phase: + # first do the lagged search + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="lagged") + + # then do contemporaneous + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="contemporaneous") + else: + adj_graph = self._learn_skeleton(data, adj_graph, nbr_search="all") + + # possibly remove all contemporaneous edges if there is + # no assumption of contemporaneous causal structure + # TODO: can make sure we don't inner-test the CI relations between contemporaneous edges + if not self.contemporaneous_edges: + auto_removal = adj_graph._auto_removal + adj_graph.set_auto_removal(None) + for u, v in adj_graph.edges: + if u[1] == v[1]: + adj_graph.remove_edge(u, v) + adj_graph.set_auto_removal(auto_removal) + + self.adj_graph_ = adj_graph + + def _preprocess_data(self, data: pd.DataFrame, context: ContextType): + """Preprocess data and context. + + In time-series causal discovery, dataframe of the shape (n_times, n_signals) + are re-formatted to a dataframe with (n_samples, n_signals x lag_points) + with a multi-index column with variable names at level 1 and lag time + points at level 2. For example, a multi-index column of ``[('x', 0), ('x', -1)]`` + would indicate the first column is the variable x at lag 0 and the second + column is variable x at lag 1. + """ + # first reformat data + max_lag = context.max_lag + data = convert_ts_df_to_multiindex(data, max_lag) + + # run preprocessing + if set(context.observed_variables) != set(data.columns.get_level_values("variable")): + raise RuntimeError( + "The observed variable names in data and context do not match: \n" + f"- {context.observed_variables} \n" + f"- {data.columns}" + ) + edge_attrs = set( + chain.from_iterable(d.keys() for *_, d in context.init_graph.edges(data=True)) + ) + if "test_stat" in edge_attrs or "pvalue" in edge_attrs: + raise RuntimeError( + "Running skeleton discovery with adjacency graph " + "with 'test_stat' or 'pvalue' is not supported yet." + ) + + return data, context diff --git a/dodiscover/constraint/timeseries/tsfcialg.py b/dodiscover/constraint/timeseries/tsfcialg.py new file mode 100644 index 00000000..e69de29b diff --git a/dodiscover/constraint/timeseries/tspcalg.py b/dodiscover/constraint/timeseries/tspcalg.py new file mode 100644 index 00000000..bfb7e00f --- /dev/null +++ b/dodiscover/constraint/timeseries/tspcalg.py @@ -0,0 +1,168 @@ +import logging +from typing import Optional, Tuple + +import networkx as nx +import pandas as pd + +from dodiscover.ci.base import BaseConditionalIndependenceTest +from dodiscover.constraint.config import SkeletonMethods +from dodiscover.constraint.timeseries import LearnTimeSeriesSkeleton +from dodiscover.context import TimeSeriesContext +from dodiscover.context_builder import make_ts_context +from dodiscover.typing import SeparatingSet + +from ..._protocol import EquivalenceClass +from ..pcalg import PC + +logger = logging.getLogger() + + +class TSPC(PC): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + alpha: float = 0.05, + min_cond_set_size: Optional[int] = None, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + apply_orientations: bool = True, + max_iter: int = 1000, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ): + """[Experimental] Time-series PC algorithm. + + A PC algorithm specialized for time-series, which differs in two ways: + 1. learning the skeleton: during the removal of a non-contemporaneous edge, + remove all corresponding homologous edges. + 2. orienting edges: during the orientation of a non-contemporaneous edge, + remove all corresponding homologous edges. + + Homologous edges are edges that have repeating structure over time. + + Parameters + ---------- + ci_estimator : BaseConditionalIndependenceTest + _description_ + alpha : float, optional + _description_, by default 0.05 + min_cond_set_size : Optional[int], optional + _description_, by default None + max_cond_set_size : Optional[int], optional + _description_, by default None + max_combinations : Optional[int], optional + _description_, by default None + skeleton_method : SkeletonMethods, optional + _description_, by default SkeletonMethods.NBRS + apply_orientations : bool, optional + _description_, by default True + max_iter : int, optional + _description_, by default 1000 + contemporaneous_edges : bool + Whether or not to assume contemporaneous edges. + + References + ---------- + .. footbibliography:: + """ + super().__init__( + ci_estimator, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + apply_orientations, + max_iter, + **ci_estimator_kwargs, + ) + self.contemporaneous_edges = contemporaneous_edges + + def learn_skeleton( + self, + data: pd.DataFrame, + context: TimeSeriesContext, + sep_set: Optional[SeparatingSet] = None, + ) -> Tuple[nx.Graph, SeparatingSet]: + skel_alg = LearnTimeSeriesSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.skeleton_method, + keep_sorted=False, + separate_lag_phase=False, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + return skel_graph, sep_set + + def fit(self, data: pd.DataFrame, context: TimeSeriesContext) -> None: + self.context_ = make_ts_context(context).build() + graph = self.context_.init_graph + self.init_graph_ = graph + self.fixed_edges_ = self.context_.included_edges + + # create a reference to the underlying data to be used + self.X_ = data + + # initialize graph object to apply learning + self.separating_sets_ = self._initialize_sep_sets(self.init_graph_) + + # learn skeleton graph and the separating sets per variable + graph, self.separating_sets_ = self.learn_skeleton( + self.X_, self.context_, self.separating_sets_ + ) + + # convert networkx.Graph to relevant causal graph object + graph = self.convert_skeleton_graph(graph) + + # orient edges on the causal graph object + if self.apply_orientations: + # first orient lagged edges + self.orient_lagged_edges(graph) + + if self.contemporaneous_edges: + # next orient contemporaneous edges if necessary + self.orient_contemporaneous_edges(graph) + + # store resulting data structures + self.graph_ = graph + + def orient_lagged_edges(self, graph): + # for every node in the PAG, evaluate neighbors that have any edge + for edge in graph.edges: + # get the variables in time-order + u, v = sorted(edge, key=lambda x: x[1]) + + # if we encountered a contemporaneous edge, continue + if u[1] == v[1]: + continue + + # now orient this edge as u -> v + graph.orient_uncertain_edge(u, v) + + def orient_contemporaneous_edges(self, graph): + # for all pairs of non-adjacent variables with a common neighbor + # check if we can orient the edge as a collider + self.orient_unshielded_triples(graph, self.separating_sets_) + self.orient_edges(graph) + + def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: + # TODO: make this an equivalence class graph + from pywhy_graphs import StationaryTimeSeriesGraph + + # TODO: convert to a 1-liner when networkx allows input of tuples as nodes + # convert Graph object to a undirected stationary ts-graph + edges = graph.edges + graph = StationaryTimeSeriesGraph() + graph.add_edges_from(edges) + return graph diff --git a/dodiscover/constraint/timeseries/utils.py b/dodiscover/constraint/timeseries/utils.py new file mode 100644 index 00000000..e304b3e4 --- /dev/null +++ b/dodiscover/constraint/timeseries/utils.py @@ -0,0 +1,26 @@ +import numpy as np + + +def convert_ts_df_to_multiindex(df, max_lag): + n_samples = df.shape[0] + + # add lag column denoting which row corresponds to which lag + # for a set time window of 'max_lag' + q, r = divmod(n_samples, max_lag + 1) + lag_list = [lag for lag in range(max_lag, -1, -1)] + lag_col = q * lag_list + lag_list[:r] + df["lag"] = lag_col + + # add naming for the time-series variables + df.rename_axis("variable", axis=1, inplace=True) + + # compute which window each row in the dataframe is on + df["windowed_sample"] = np.arange(len(df)) // (max_lag + 1) + + # convert lag to '-' + df = df.assign(lag=-df["lag"]) + + # create a multi-index with the first level as variable and + # the second level as lag + df = df.pivot(index="windowed_sample", columns="lag") + return df diff --git a/dodiscover/constraint/utils.py b/dodiscover/constraint/utils.py index 00e6e2a2..b76a4724 100644 --- a/dodiscover/constraint/utils.py +++ b/dodiscover/constraint/utils.py @@ -1,3 +1,8 @@ +from itertools import combinations +from typing import Iterable, Set, SupportsFloat, Union + +import networkx as nx +import numpy as np import pandas as pd from dodiscover import Graph @@ -49,3 +54,79 @@ def is_in_sep_set( check_var in _sep_set for _sep_set in sep_set[x_var][y_var] ) return func(check_var in _sep_set for _sep_set in sep_set[x_var][y_var]) + + +def _iter_conditioning_set( + possible_variables: Iterable, + x_var: Union[SupportsFloat, str], + y_var: Union[SupportsFloat, str], + size_cond_set: int, +) -> Iterable[Set]: + """Iterate function to generate the conditioning set. + + Parameters + ---------- + possible_variables : iterable + A set/list/dict of possible variables to consider for the conditioning set. + This can be for example, the current adjacencies. + x_var : node + The node for the 'x' variable. + y_var : node + The node for the 'y' variable. + size_cond_set : int + The size of the conditioning set to consider. If there are + less adjacent variables than this number, then all variables will be in the + conditioning set. + + Yields + ------ + Z : set + The set of variables for the conditioning set. + """ + exclusion_set = {x_var, y_var} + + all_adj_excl_current = [p for p in possible_variables if p not in exclusion_set] + + # loop through all possible combinations of the conditioning set size + for cond in combinations(all_adj_excl_current, size_cond_set): + cond_set = set(cond) + yield cond_set + + +def _find_neighbors_along_path(G: nx.Graph, start, end) -> Set: + """Find neighbors that are along a path from start to end. + + Parameters + ---------- + G : nx.Graph + The graph. + start : Node + The starting node. + end : Node + The ending node. + + Returns + ------- + nbrs : Set + The set of neighbors that are also along a path towards + the 'end' node. + """ + + def _assign_weight(u, v, edge_attr): + if u == node or v == node: + return np.inf + else: + return 1 + + nbrs = set() + for node in G.neighbors(start): + if not G.has_edge(start, node): + raise RuntimeError(f"{start} and {node} are not connected, but they are assumed to be.") + + # find a path from start node to end + path = nx.shortest_path(G, source=node, target=end, weight=_assign_weight) + if len(path) > 0: + if start in path: + raise RuntimeError("There is an error with the input. This is not possible.") + nbrs.add(node) + return nbrs diff --git a/dodiscover/context.py b/dodiscover/context.py index 8830af85..fdb010d3 100644 --- a/dodiscover/context.py +++ b/dodiscover/context.py @@ -1,9 +1,11 @@ -from typing import Any, Dict, Set +import inspect +from typing import Any, Dict, Set, TypeVar import networkx as nx -from ._protocol import Graph +from ._protocol import TimeSeriesGraph from .typing import Column, NetworkxGraph +from .utils import dict_compare class Context: @@ -14,11 +16,11 @@ class Context: Parameters ---------- - variables : Set + observed_variables : Set Set of observed variables. If neither ``latents``, nor ``variables`` is set, then it is presumed that ``variables`` consists of the columns of ``data`` and ``latents`` is the empty set. - latents : Set + latent_variables : Set Set of latent "unobserved" variables. If neither ``latents``, nor ``variables`` is set, then it is presumed that ``variables`` consists of the columns of ``data`` and ``latents`` is the empty set. @@ -28,6 +30,10 @@ class Context: Included edges without direction. excluded_edges : nx.Graph Excluded edges without direction. + state_variables : dict + A dictionary of state variables that are preserved during the + causal discovery algorithm. For example, the FCI algorithm must store + the intermediate PAG before running a second phase of skeleton discovery. Raises ------ @@ -46,28 +52,112 @@ class Context: _variables: Set[Column] _latents: Set[Column] - _init_graph: Graph + _init_graph: nx.Graph _included_edges: nx.Graph _excluded_edges: nx.Graph _state_variables: Dict[str, Any] def __init__( self, - variables: Set[Column], - latents: Set[Column], - init_graph: Graph, + observed_variables: Set[Column], + latent_variables: Set[Column], + init_graph: nx.Graph, included_edges: NetworkxGraph, excluded_edges: NetworkxGraph, state_variables: Dict[str, Any], ) -> None: # set to class self._state_variables = state_variables - self._variables = variables - self._latents = latents + self._variables = observed_variables + self._latents = latent_variables self._init_graph = init_graph self._included_edges = included_edges self._excluded_edges = excluded_edges + @property + def _internal_graphs(self): + """Private property to store a list of the names of graph objects.""" + graphs = ["init_graph", "included_edges", "excluded_edges"] + return graphs + + @classmethod + def _get_param_names(cls): + """Get parameter names for the context.""" + # fetch the constructor or the original constructor before + # deprecation wrapping if any + init = getattr(cls.__init__, "deprecated_original", cls.__init__) + if init is object.__init__: + # No explicit constructor to introspect + return [] + + # introspect the constructor arguments to find the context parameters + # to represent + init_signature = inspect.signature(init) + # Consider the constructor parameters excluding 'self' + parameters = [ + p + for p in init_signature.parameters.values() + if p.name != "self" and p.kind != p.VAR_KEYWORD + ] + for p in parameters: + if p.kind == p.VAR_POSITIONAL: + raise RuntimeError( + "dodiscover context objects should always " + "specify their parameters in the signature" + " of their __init__ (no varargs)." + " %s with constructor %s doesn't " + " follow this convention." % (cls, init_signature) + ) + # Extract and sort argument names excluding 'self' + return sorted([p.name for p in parameters]) + + def get_params(self, deep=True): + """ + Get parameters for this context. + + Parameters + ---------- + deep : bool, default=True + If True, will return the parameters for this context and + contained subobjects that are context. + + Returns + ------- + params : dict + Parameter names mapped to their values. + """ + out = dict() + for key in self._get_param_names(): + value = getattr(self, key) + if deep and hasattr(value, "get_params") and not isinstance(value, type): + deep_items = value.get_params().items() + out.update((key + "__" + k, val) for k, val in deep_items) + out[key] = value + return out + + def __eq__(self, context: "Context") -> bool: + context_params = context.get_params() + self_params = self.get_params() + + # graph objects that we must check explicitly + graph_comps = self._internal_graphs + context_graphs = [] + self_graphs = [] + for name in graph_comps: + context_graphs.append(context_params.pop(name)) + self_graphs.append(self_params.pop(name)) + + # check all graphs are isomorphic + for ctx_graph, self_graph in zip(context_graphs, self_graphs): + if not nx.is_isomorphic(ctx_graph, self_graph): + return False + + # finally check the rest + added, removed, modified, _ = dict_compare(context_params, self_params) + if len(added) > 0 or len(removed) > 0 or len(modified) > 0: + return False + return True + @property def included_edges(self) -> nx.Graph: return self._included_edges @@ -77,7 +167,7 @@ def excluded_edges(self) -> nx.Graph: return self._excluded_edges @property - def init_graph(self) -> Graph: + def init_graph(self) -> nx.Graph: return self._init_graph @property @@ -124,3 +214,93 @@ def state_variable(self, name: str) -> Any: raise RuntimeError(f"{name} is not a state variable: {self._state_variables}") return self._state_variables[name] + + +class TimeSeriesContext(Context): + """Context for time-series causal discovery. + + Parameters + ---------- + variables : Set + Set of observed variables. If neither ``latents``, + nor ``variables`` is set, then it is presumed that ``variables`` consists + of the columns of ``data`` and ``latents`` is the empty set. In a time-series + context, variables do not have a time-index. See Notes for details. + latents : Set + Set of latent "unobserved" variables. If neither ``latents``, + nor ``variables`` is set, then it is presumed that ``variables`` consists + of the columns of ``data`` and ``latents`` is the empty set. + init_graph : Graph + The graph to start with. + included_edges : nx.Graph + Included edges without direction. + excluded_edges : nx.Graph + Excluded edges without direction. + state_variables : Dict[str, Any] + _description_ + included_lag_edges : _type_ + _description_ + excluded_lag_edges : _type_ + _description_ + max_lag : int + _description_ + contemporaneous_edges : bool + Whether or not to assume contemporaneous edges. + """ + + def __init__( + self, + observed_variables: Set[Column], + latent_variables: Set[Column], + init_graph: nx.Graph, + included_edges: NetworkxGraph, + excluded_edges: NetworkxGraph, + state_variables: Dict[str, Any], + included_lag_edges: TimeSeriesGraph, + excluded_lag_edges: TimeSeriesGraph, + max_lag: int, + contemporaneous_edges: bool, + ) -> None: + super().__init__( + observed_variables, + latent_variables, + init_graph, + included_edges, + excluded_edges, + state_variables, + ) + self._max_lag = max_lag + self._included_lag_edges = included_lag_edges + self._excluded_lag_edges = excluded_lag_edges + self._contemporaneous_edges = contemporaneous_edges + + @property + def _internal_graphs(self): + """Private property to store a list of the names of graph objects.""" + graphs = [ + "init_graph", + "included_edges", + "excluded_edges", + "included_lag_edges", + "excluded_lag_edges", + ] + return graphs + + @property + def contemporaneous_edges(self) -> bool: + return self._contemporaneous_edges + + @property + def max_lag(self) -> int: + return self._max_lag + + @property + def included_lag_edges(self) -> TimeSeriesGraph: + return self._included_lag_edges + + @property + def excluded_lag_edges(self) -> TimeSeriesGraph: + return self._excluded_lag_edges + + +ContextType = TypeVar("ContextType", bound=Context) diff --git a/dodiscover/context_builder.py b/dodiscover/context_builder.py index dd331e0b..e07ce3b4 100644 --- a/dodiscover/context_builder.py +++ b/dodiscover/context_builder.py @@ -1,11 +1,12 @@ -from copy import copy, deepcopy +from copy import deepcopy +from itertools import product from typing import Any, Dict, Optional, Set, Tuple, cast import networkx as nx import pandas as pd -from ._protocol import Graph -from .context import Context +from ._protocol import TimeSeriesGraph +from .context import Context, TimeSeriesContext from .typing import Column, NetworkxGraph @@ -17,15 +18,19 @@ class ContextBuilder: `dodiscover.make_context` to build a Context data structure. """ - _graph: Optional[Graph] = None + _graph: Optional[nx.Graph] = None _included_edges: Optional[NetworkxGraph] = None _excluded_edges: Optional[NetworkxGraph] = None _observed_variables: Optional[Set[Column]] = None _latent_variables: Optional[Set[Column]] = None _state_variables: Dict[str, Any] = dict() - def graph(self, graph: Graph) -> "ContextBuilder": - """Set the partial graph to start with. + def init_graph(self, graph: nx.Graph) -> "ContextBuilder": + """Set the initial partial undirected graph to start with. + + For example, this could be the complete graph to start with, if there is + no prior knowledge. Or this could be a graph that is a continuation of a + previous causal discovery algorithm. Parameters ---------- @@ -40,18 +45,12 @@ def graph(self, graph: Graph) -> "ContextBuilder": self._graph = graph return self - def edges( - self, - include: Optional[NetworkxGraph] = None, - exclude: Optional[NetworkxGraph] = None, - ) -> "ContextBuilder": - """Set edge constraints to apply in discovery. + def excluded_edges(self, edges: NetworkxGraph) -> "ContextBuilder": + """Set excluded non-directional edge constraints to apply in discovery. Parameters ---------- - included : Optional[NetworkxGraph] - Edges that should be included in the resultant graph - excluded : Optional[NetworkxGraph] + edges : Optional[NetworkxGraph] Edges that must be excluded in the resultant graph Returns @@ -59,8 +58,33 @@ def edges( self : ContextBuilder The builder instance """ - self._included_edges = include - self._excluded_edges = exclude + self._excluded_edges = edges + return self + + def included_edges(self, edges: NetworkxGraph) -> "ContextBuilder": + """Set included non-directional edge constraints to apply in discovery. + + Parameters + ---------- + edges : Optional[NetworkxGraph] + Edges that must be included in the resultant graph + + Returns + ------- + self : ContextBuilder + The builder instance + """ + self._included_edges = edges + return self + + def latent_variables(self, latents: Set[Column]): + """Latent variables.""" + self._latent_variables = latents + return self + + def observed_variables(self, observed: Set[Column]): + """Observed variables.""" + self._observed_variables = observed return self def variables( @@ -145,13 +169,15 @@ def build(self) -> Context: if self._observed_variables is None: raise ValueError("Could not infer variables from data or given arguments.") + # initialize an empty graph object as the default for included/excluded edges empty_graph = lambda: nx.empty_graph(self._observed_variables, create_using=nx.Graph) + return Context( init_graph=self._interpolate_graph(), included_edges=self._included_edges or empty_graph(), excluded_edges=self._excluded_edges or empty_graph(), - variables=self._observed_variables, - latents=self._latent_variables or set(), + observed_variables=self._observed_variables, + latent_variables=self._latent_variables or set(), state_variables=self._state_variables, ) @@ -188,7 +214,7 @@ def _interpolate_graph(self) -> nx.Graph: raise ValueError("Must set variables() before building Context.") complete_graph = lambda: nx.complete_graph(self._observed_variables, create_using=nx.Graph) - has_all_variables = lambda g: set(g.nodes) == set(self._observed_variables) + has_all_variables = lambda g: set(g.nodes) == self._observed_variables # initialize the starting graph if self._graph is None: @@ -202,6 +228,109 @@ def _interpolate_graph(self) -> nx.Graph: return self._graph +class TimeSeriesContextBuilder(ContextBuilder): + """A builder class for creating TimeSeriesContext objects ergonomically. + + The context builder provides a way to capture assumptions, domain knowledge, + and data relevant for time-series. This should NOT be instantiated directly. + One should instead use `dodiscover.make_ts_context` to build a TimeSeriesContext + data structure. + """ + + _contemporaneous_edges: Optional[bool] = None + _max_lag: Optional[int] = None + _included_lag_edges: Optional[TimeSeriesGraph] = None + _exccluded_lag_edges: Optional[TimeSeriesGraph] = None + + def _interpolate_graph(self) -> nx.Graph: + from pywhy_graphs.classes import StationaryTimeSeriesGraph + from pywhy_graphs.classes.timeseries import complete_ts_graph + + if self._observed_variables is None: + raise ValueError("Must set variables() before building Context.") + if self._max_lag is None: + raise ValueError("Must set max_lag before building Context.") + + # initialize the starting graph + if self._graph is None: + include_contemporaneous = self._contemporaneous_edges or True + # create a complete graph over all nodes and time points + complete_graph = complete_ts_graph( + variables=self._observed_variables, + max_lag=self._max_lag, + include_contemporaneous=include_contemporaneous, + create_using=StationaryTimeSeriesGraph, + ) + return complete_graph + else: + if set(self._graph.variables) != set(self._observed_variables): + raise ValueError( + f"The nodes within the initial graph, {self._graph.nodes}, " + f"do not match the nodes in the passed in data, {self._observed_variables}." + ) + has_all_nodes = lambda g: set(g.nodes) == set( + product(self._observed_variables, range(self._max_lag + 1)) + ) + if not has_all_nodes(self._graph): + raise RuntimeError("Graph does not contain all possible nodes.") + + return self._graph + + def init_graph(self, graph: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + """Set the initial time-series graph to begin causal discovery with.""" + return super().init_graph(graph) + + def max_lag(self, lag: int) -> "TimeSeriesContextBuilder": + """Set the maximum time lag.""" + if lag <= 0: + raise ValueError(f"Lag in time-series graphs should be > 0, not {lag}.") + self._max_lag = lag + return self + + def contemporaneous_edges(self, present: bool) -> "TimeSeriesContextBuilder": + """Whether or not to assume contemporaneous edges.""" + self._contemporaneous_edges = present + return self + + def included_lag_edges(self, edges: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + """Apriori set lagged edges.""" + self._included_lag_edges = edges + return self + + def excluded_lag_edges(self, edges: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + """Apriori excluded lagged edges.""" + self._excluded_lag_edges = edges + return self + + def build(self) -> TimeSeriesContext: + """Build a time-series context object.""" + from pywhy_graphs.classes.timeseries import empty_ts_graph + + if self._observed_variables is None: + raise ValueError("Could not infer variables from data or given arguments.") + + if self._max_lag is None: + raise ValueError("Max lag must be set before building time-series context") + + # initialize an empty graph object as the default for included/excluded edges + empty_graph = lambda: nx.empty_graph(self._observed_variables, create_using=nx.Graph) + empty_time_graph = lambda: empty_ts_graph(self._observed_variables, max_lag=self._max_lag) + + # initialize assumption of contemporaneous edges by default + return TimeSeriesContext( + init_graph=self._interpolate_graph(), + included_edges=self._included_edges or empty_graph(), + excluded_edges=self._excluded_edges or empty_graph(), + observed_variables=self._observed_variables, + latent_variables=self._latent_variables or set(), + state_variables=self._state_variables, + included_lag_edges=self._included_lag_edges or empty_time_graph(), + excluded_lag_edges=self._included_lag_edges or empty_time_graph(), + max_lag=self._max_lag, + contemporaneous_edges=self._contemporaneous_edges or True, + ) + + def make_context(context: Optional[Context] = None) -> ContextBuilder: """Create a new ContextBuilder instance. @@ -219,8 +348,23 @@ def make_context(context: Optional[Context] = None) -> ContextBuilder: """ result = ContextBuilder() if context is not None: - result.graph(deepcopy(context.init_graph)) - result.edges(deepcopy(context.included_edges), deepcopy(context.excluded_edges)) - result.variables(copy(context.observed_variables), copy(context.latent_variables)) - result.state_variables(deepcopy(context.state_variables)) + params = context.get_params() + for param, value in params.items(): + if not hasattr(result, param): + raise RuntimeError(f"{param} is not a member of Context and ContexBuilder.") + # get the function for parameter + getattr(result, param)(deepcopy(value)) + return result + + +def make_ts_context(context: Optional[TimeSeriesContext] = None) -> TimeSeriesContextBuilder: + """Create a time-series context builder.""" + result = TimeSeriesContextBuilder() + if context is not None: + params = context.get_params() + for param, value in params.items(): + if not hasattr(result, param): + raise RuntimeError(f"{param} is not a member of Context and ContexBuilder.") + # get the function for parameter + getattr(result, param)(deepcopy(value)) return result diff --git a/dodiscover/typing.py b/dodiscover/typing.py index ddbbd3d6..01acfad2 100644 --- a/dodiscover/typing.py +++ b/dodiscover/typing.py @@ -8,4 +8,5 @@ # The separating set used in constraint-based causal discovery SeparatingSet = Dict[Column, Dict[Column, List[Set[Column]]]] +# The relevant networkx graphs accepted in this module NetworkxGraph = Union[nx.Graph, nx.DiGraph] diff --git a/dodiscover/utils.py b/dodiscover/utils.py new file mode 100644 index 00000000..210bd5c4 --- /dev/null +++ b/dodiscover/utils.py @@ -0,0 +1,10 @@ +def dict_compare(d1, d2): + """Compare two dictionaries.""" + d1_keys = set(d1.keys()) + d2_keys = set(d2.keys()) + shared_keys = d1_keys.intersection(d2_keys) + added = d1_keys - d2_keys + removed = d2_keys - d1_keys + modified = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} + same = set(o for o in shared_keys if d1[o] == d2[o]) + return added, removed, modified, same diff --git a/pyproject.toml b/pyproject.toml index f92ed1ce..cf55c830 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -195,7 +195,7 @@ convention = 'numpy' ignore-decorators = '(copy_doc|property|.*setter|.*getter|pyqtSlot|Slot)' match = '^(?!setup|__init__|test_).*\.py' match-dir = '^dodiscover.*' -add_ignore = 'D100,D104,D107' +add_ignore = 'D100,D104,D105,D107' [tool.mypy] ignore_missing_imports = true @@ -206,7 +206,7 @@ minversion = '6.0' addopts = '--durations 20 --junit-xml=junit-results.xml --verbose' filterwarnings = [] log_cli = true -log_cli_level = 'ERROR' +log_cli_level = -1 [tool.coverage.run] branch = true diff --git a/tests/unit_tests/constraint/test_skeleton.py b/tests/unit_tests/constraint/test_skeleton.py index 0b622e85..12f0f69a 100644 --- a/tests/unit_tests/constraint/test_skeleton.py +++ b/tests/unit_tests/constraint/test_skeleton.py @@ -188,7 +188,7 @@ def test_learn_pds_skeleton(): # learn the skeleton of the graph now with the first stage skeleton context = ( make_context(context) - .graph(first_stage_pag.to_undirected()) + .init_graph(first_stage_pag.to_undirected()) .state_variable("PAG", first_stage_pag) .build() ) diff --git a/tests/unit_tests/constraint/timeseries/test_skeleton.py b/tests/unit_tests/constraint/timeseries/test_skeleton.py new file mode 100644 index 00000000..c1b40ad4 --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_skeleton.py @@ -0,0 +1,240 @@ +import networkx as nx +import numpy as np +import pandas as pd +import pytest +from pywhy_graphs import StationaryTimeSeriesDiGraph, StationaryTimeSeriesGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import LearnTimeSeriesSkeleton +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_skeleton_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + +def test_markovian_skeleton_oracle(): + r"""Test tsPC's skeleton algorithm assuming no latent confounders nor contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton( + ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + for edge in G.edges: + print(edge) + print(G.nodes) + alg.fit(data, context) + skel_graph = alg.adj_graph_ + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + + for edge in skel_graph.edges: + print(edge) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +def test_markovian_skeleton_with_contemporaneous_edges(): + r"""Test tsFCI's "PC" algorithm skeleton assuming contemporaneous edges. + + Tests the ts skeleton method with an oracle from a modified version of + Figure 1 and 2 of the tsFCI paper. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + + with contemporaneous edges + x1(t) -> x2(t); + x3(t) -> x2(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + (("x3", 0), ("x2", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton(ci_estimator=oracle) + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # learn the graph + alg.fit(data, context) + skel_graph = alg.adj_graph_ + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +@pytest.mark.skip(reason="make work...") +def test_semi_markovian_skeleton_oracle(): + r"""Test tsFCI's "FCI" algorithm skeleton assuming latent confounders. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + + However, in the dataset, x3 is not observed. The expected + skeleton is the undirected graph version of Figure 2b. + + x1(t-2) -> x1(t-1) -> x1(t) + ^ + | \ \ + v > > + x2(t-2) <-> x2(t-1) <-> x2(t) + + and x2(t-2) <-> x2(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 2 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create expected graph + expected_G = StationaryTimeSeriesGraph(max_lag=max_lag) + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x2", -1), ("x2", 0)), + (("x2", -2), ("x2", 0)), + (("x1", -2), ("x2", -2)), # contemporaneous edge at end + ] + expected_G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + data.drop("x3", axis=1, inplace=True) + + # create an oracle using the original graph`` + oracle = Oracle(G) + alg = LearnTimeSeriesSkeleton( + ci_estimator=oracle, + # separate_lag_phase=separate_lag_phase, + latent_confounding=True, + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + alg.fit(data, context) + skel_graph = alg.adj_graph_ + + for edge in skel_graph.edges: + print(sorted(edge, key=lambda x: x[1])) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), expected_G.to_undirected()) diff --git a/tests/unit_tests/test_context_builder.py b/tests/unit_tests/test_context_builder.py index 6f8c8e0a..ca866f62 100644 --- a/tests/unit_tests/test_context_builder.py +++ b/tests/unit_tests/test_context_builder.py @@ -1,8 +1,10 @@ import networkx as nx import numpy as np import pandas as pd +import pytest from dodiscover import make_context +from dodiscover.context_builder import make_ts_context seed = 12345 @@ -14,8 +16,8 @@ def make_df() -> pd.DataFrame: return pd.DataFrame(np.hstack((X, Y)), columns=["x", "y"]) -def test_constructor(): - ctx = make_context() +@pytest.mark.parametrize("ctx", [make_context(), make_ts_context()]) +def test_constructor(ctx): assert ctx is not None @@ -23,7 +25,7 @@ def test_build_with_initial_graph(): graph = nx.DiGraph() graph.add_edges_from([("x", "y")]) data = make_df() - ctx = make_context().graph(graph).variables(data=data).build() + ctx = make_context().init_graph(graph).variables(data=data).build() assert ctx.init_graph is graph @@ -31,3 +33,33 @@ def test_build_with_observed_and_latents(): ctx = make_context().variables(observed=set("x"), latents=set("y")).build() assert ctx.observed_variables == set("x") assert ctx.latent_variables == set("y") + + +def test_with_context(): + """Test make and builder with a previous context""" + graph = nx.DiGraph() + graph.add_edges_from([("x", "y")]) + data = make_df() + ctx = make_context().init_graph(graph).variables(data=data).build() + + new_ctx = make_context(ctx).build() + + # test equality + assert ctx == new_ctx + + +def test_ts_context(): + graph = nx.DiGraph() + graph.add_edges_from([("x", "y")]) + data = make_df() + max_lag = 2 + builder = make_ts_context().init_graph(graph).variables(data=data) + + with pytest.raises(ValueError, match="Max lag must be set before building time-series context"): + builder.build() + + ctx = builder.max_lag(max_lag).build() + new_ctx = make_ts_context(ctx).build() + + # test equality + assert ctx == new_ctx From b7a9eba39c26bf561af9adba10be0720192a288b Mon Sep 17 00:00:00 2001 From: Adam Li Date: Thu, 8 Dec 2022 17:15:03 -0500 Subject: [PATCH 2/5] Fixing tsPC. Does not work rn --- dodiscover/__init__.py | 2 +- dodiscover/_protocol.py | 22 ++- dodiscover/ci/base.py | 2 +- dodiscover/constraint/__init__.py | 4 +- dodiscover/constraint/config.py | 2 + dodiscover/constraint/pcalg.py | 53 +++-- dodiscover/constraint/skeleton.py | 6 +- dodiscover/constraint/timeseries/__init__.py | 4 +- dodiscover/constraint/timeseries/skeleton.py | 163 ++++++++++++---- dodiscover/constraint/timeseries/tsfcialg.py | 162 ++++++++++++++++ dodiscover/constraint/timeseries/tspcalg.py | 62 +++--- dodiscover/context.py | 5 +- dodiscover/context_builder.py | 28 +-- dodiscover/typing.py | 3 + pyproject.toml | 2 +- .../constraint/timeseries/test_fcialg.py | 117 +++++++++++ .../constraint/timeseries/test_pcalg.py | 183 ++++++++++++++++++ .../constraint/timeseries/test_skeleton.py | 26 ++- 18 files changed, 733 insertions(+), 113 deletions(-) create mode 100644 tests/unit_tests/constraint/timeseries/test_fcialg.py create mode 100644 tests/unit_tests/constraint/timeseries/test_pcalg.py diff --git a/dodiscover/__init__.py b/dodiscover/__init__.py index 446ee292..d15ddbe2 100644 --- a/dodiscover/__init__.py +++ b/dodiscover/__init__.py @@ -8,4 +8,4 @@ from ._protocol import EquivalenceClass, Graph from ._version import __version__ # noqa: F401 from .constraint import FCI, PC -from .context_builder import ContextBuilder, make_context +from .context_builder import ContextBuilder, make_context, make_ts_context diff --git a/dodiscover/_protocol.py b/dodiscover/_protocol.py index 63208975..86fa27b9 100644 --- a/dodiscover/_protocol.py +++ b/dodiscover/_protocol.py @@ -1,8 +1,8 @@ -from typing import Dict, FrozenSet, Iterable, List, Protocol +from typing import Dict, FrozenSet, Iterable, List, Optional, Protocol import networkx as nx -from dodiscover.typing import Column +from .typing import Column class Graph(Protocol): @@ -53,21 +53,29 @@ def to_undirected(self) -> nx.Graph: class TimeSeriesGraph(Graph, Protocol): """A protocol for time-series graphs.""" - def lagged_neighbors(self, u: Column) -> Iterable: + @property + def max_lag(self) -> int: + pass + + def lagged_neighbors(self, u: Column) -> List[Column]: """Return neighbors of u that are in a previous time point.""" pass - def contemporaneous_neighbors(self, u: Column) -> Iterable: + def contemporaneous_neighbors(self, u: Column) -> List[Column]: """Return neighbors of u that are in the same time point.""" pass # TODO: refactor to # 1. remove_forward_homologous_edges(self, u, v) # 2. remove_backward_homologous_edges(self, u, v) - def set_auto_removal(self, auto: bool) -> None: + def set_auto_removal(self, auto: Optional[str]) -> None: """Specify how to auto-remove homologous edges.""" pass + def nodes_at(self, t): + """Nodes at specific time point.""" + pass + class EquivalenceClass(Graph, Protocol): """Protocol for equivalence class of graphs.""" @@ -97,6 +105,10 @@ def bidirected_edge_name(self) -> str: """Name of the bidirected edges.""" pass + def get_graphs(self, graph_name: Optional[str] = None): + """Get edge sub-graph.""" + pass + def orient_uncertain_edge(self, u, v) -> None: """Orients an uncertain edge in the equivalence class to directed ``'u'*->'v'``.""" pass diff --git a/dodiscover/ci/base.py b/dodiscover/ci/base.py index 205a7d9e..5d9abbee 100644 --- a/dodiscover/ci/base.py +++ b/dodiscover/ci/base.py @@ -3,7 +3,7 @@ import pandas as pd -from dodiscover.typing import Column +from ..typing import Column class BaseConditionalIndependenceTest(metaclass=ABCMeta): diff --git a/dodiscover/constraint/__init__.py b/dodiscover/constraint/__init__.py index 6bf409ef..b4bf9018 100644 --- a/dodiscover/constraint/__init__.py +++ b/dodiscover/constraint/__init__.py @@ -1,3 +1,5 @@ +from .config import SkeletonMethods from .fcialg import FCI from .pcalg import PC -from .skeleton import LearnSemiMarkovianSkeleton, LearnSkeleton, SkeletonMethods +from .skeleton import LearnSemiMarkovianSkeleton, LearnSkeleton +from .timeseries import LearnTimeSeriesSkeleton, TimeSeriesFCI, TimeSeriesPC diff --git a/dodiscover/constraint/config.py b/dodiscover/constraint/config.py index b78c335d..4abc6fc1 100644 --- a/dodiscover/constraint/config.py +++ b/dodiscover/constraint/config.py @@ -31,3 +31,5 @@ class SkeletonMethods(Enum, metaclass=MetaEnum): NBRS_PATH = "neighbors_path" PDS = "pds" PDS_PATH = "pds_path" + PDS_T = "pds_t" + PDS_T_PATH = 'pds_t_path' diff --git a/dodiscover/constraint/pcalg.py b/dodiscover/constraint/pcalg.py index e4b79603..773ee7c6 100644 --- a/dodiscover/constraint/pcalg.py +++ b/dodiscover/constraint/pcalg.py @@ -1,16 +1,16 @@ import logging from itertools import combinations, permutations -from typing import Optional +from typing import Iterator, Optional import networkx as nx from dodiscover.ci.base import BaseConditionalIndependenceTest -from dodiscover.constraint.config import SkeletonMethods from dodiscover.constraint.utils import is_in_sep_set from dodiscover.typing import Column, SeparatingSet from .._protocol import EquivalenceClass from ._classes import BaseConstraintDiscovery +from .config import SkeletonMethods logger = logging.getLogger() @@ -121,6 +121,25 @@ def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: graph = CPDAG(incoming_undirected_edges=graph) return graph + def _orientable_triples(self, graph: EquivalenceClass) -> Iterator: + """Iterate through orientable triple edges. + + Used in orientation of colliders. + """ + for u in graph.nodes: + for v_i, v_j in combinations(graph.neighbors(u), 2): + yield (v_i, u, v_j) + + def _orientable_edges(self, graph: EquivalenceClass) -> Iterator: + """Iterate through orientable edges. + + Used in orientation of edges. + """ + for (i, j) in permutations(graph.nodes, 2): + if i == j: + continue + yield (i, j) + def orient_edges(self, graph: EquivalenceClass) -> None: """Orient edges in a skeleton graph to estimate the causal DAG, or CPDAG. @@ -134,17 +153,13 @@ def orient_edges(self, graph: EquivalenceClass) -> None: A skeleton graph. If ``None``, then will initialize PC using a complete graph. By default None. """ - node_ids = graph.nodes - # For all the combination of nodes i and j, apply the following # rules. idx = 0 finished = False while idx < self.max_iter and not finished: # type: ignore change_flag = False - for (i, j) in permutations(node_ids, 2): - if i == j: - continue + for (i, j) in self._orientable_edges(graph): # Rule 1: Orient i-j into i->j whenever there is an arrow k->i # such that k and j are nonadjacent. r1_add = self._apply_meek_rule1(graph, i, j) @@ -178,6 +193,11 @@ def orient_unshielded_triples( ) -> None: """Orient colliders given a graph and separation set. + Unshielded triples are (i, j, k), such that there is no edge (i, k). + This will be oriented as a collider, if 'i' is not conditionally + independent to 'j' given 'k', but is conditionally independent without + 'k'. + Parameters ---------- graph : EquivalenceClass @@ -186,16 +206,15 @@ def orient_unshielded_triples( The separating set between any two nodes. """ # for every node in the PAG, evaluate neighbors that have any edge - for u in graph.nodes: - for v_i, v_j in combinations(graph.neighbors(u), 2): - # Check that there is no edge of any type between - # v_i and v_j, else this is a "shielded" collider. - # Then check to see if 'u' is in "any" separating - # set. If it is not, then there is a collider. - if v_j not in graph.neighbors(v_i) and not is_in_sep_set( - u, sep_set, v_i, v_j, mode="any" - ): - self._orient_collider(graph, v_i, u, v_j) + for v_i, u, v_j in self._orientable_triples(graph): + # Check that there is no edge of any type between + # v_i and v_j, else this is a "shielded" collider. + # Then check to see if 'u' is in "any" separating + # set. If it is not, then there is a collider. + if v_j not in graph.neighbors(v_i) and not is_in_sep_set( + u, sep_set, v_i, v_j, mode="any" + ): + self._orient_collider(graph, v_i, u, v_j) def _orient_collider( self, graph: EquivalenceClass, v_i: Column, u: Column, v_j: Column diff --git a/dodiscover/constraint/skeleton.py b/dodiscover/constraint/skeleton.py index af7bd19a..80e5f26a 100644 --- a/dodiscover/constraint/skeleton.py +++ b/dodiscover/constraint/skeleton.py @@ -221,7 +221,11 @@ def evaluate_edge( """ if Z is None: Z = set() - test_stat, pvalue = self.ci_estimator.test(data, {X}, {Y}, Z, **self.ci_estimator_kwargs) + try: + test_stat, pvalue = self.ci_estimator.test(data, {X}, {Y}, Z, **self.ci_estimator_kwargs) + except Exception as e: + print(X, Y, Z) + raise (e) self.n_ci_tests += 1 return test_stat, pvalue diff --git a/dodiscover/constraint/timeseries/__init__.py b/dodiscover/constraint/timeseries/__init__.py index 91adae29..ea5d7fdd 100644 --- a/dodiscover/constraint/timeseries/__init__.py +++ b/dodiscover/constraint/timeseries/__init__.py @@ -1 +1,3 @@ -from .skeleton import LearnTimeSeriesSkeleton +from .skeleton import LearnTimeSeriesSkeleton, LearnTimeSeriesSemiMarkovianSkeleton +from .tsfcialg import TimeSeriesFCI +from .tspcalg import TimeSeriesPC diff --git a/dodiscover/constraint/timeseries/skeleton.py b/dodiscover/constraint/timeseries/skeleton.py index 9972f7aa..b3116480 100644 --- a/dodiscover/constraint/timeseries/skeleton.py +++ b/dodiscover/constraint/timeseries/skeleton.py @@ -8,11 +8,11 @@ from dodiscover._protocol import TimeSeriesGraph from dodiscover.ci import BaseConditionalIndependenceTest -from dodiscover.constraint.config import SkeletonMethods -from dodiscover.context import ContextType +from dodiscover.context import TimeSeriesContext, Context from dodiscover.typing import Column, SeparatingSet from ...context_builder import make_ts_context +from ..config import SkeletonMethods from ..skeleton import LearnSkeleton from ..utils import _iter_conditioning_set from .utils import convert_ts_df_to_multiindex @@ -30,6 +30,18 @@ def nodes_in_time_order(G: TimeSeriesGraph) -> Iterator: class LearnTimeSeriesSkeleton(LearnSkeleton): """Learn a skeleton time-series graph from a Markovian causal model. + Learning time-series causal graph skeletons is a more complex task compared to its + iid counterpart. There are a few key differences to be aware of: + + 1. Without extending the maximum-lag, there is always latent confounding: Consider as + an example, two variable lag-1 ts-causal-graph. + + t-1 t + X o -> o + Y o -> o + + with Y(t-1) -> X(t). Assuming stationarity, then + Parameters ---------- ci_estimator : BaseConditionalIndependenceTest @@ -82,7 +94,6 @@ def __init__( max_combinations: Optional[int] = None, skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, keep_sorted: bool = False, - latent_confounding: bool = False, max_path_length: Optional[int] = None, separate_lag_phase: bool = False, contemporaneous_edges: bool = True, @@ -100,7 +111,6 @@ def __init__( **ci_estimator_kwargs, ) self.max_path_length = max_path_length - self.latent_confounding = latent_confounding self.separate_lag_phase = separate_lag_phase self.contemporaneous_edges = contemporaneous_edges @@ -118,23 +128,18 @@ def _learn_skeleton( size_cond_set = self.min_cond_set_size_ # If there is latent confounding, we need to test all nodes starting from - # max-lag. Otherwise, we can - if self.latent_confounding: - # depends on whether or not there are latent confounders - testable_nodes = list(nodes_in_time_order(adj_graph)) - - # to do causal discovery of time-series graphs, - # homologous edges should not be removed automatically - adj_graph.set_auto_removal("forwards") - else: - # depends on whether or not there are latent confounders - testable_nodes = set(adj_graph.nodes_at(0)) + # max-lag. Because adjacencies are only repeated backwards in time + testable_nodes = list(nodes_in_time_order(adj_graph)) - # to do causal discovery of time-series graphs, - # homologous edges should not be removed automatically - adj_graph.set_auto_removal("backwards") + # to do causal discovery of time-series graphs, + # homologous edges should not be removed automatically + adj_graph.set_auto_removal("forwards") + + # # to do causal discovery of time-series graphs, + # # homologous edges should not be removed automatically + # adj_graph.set_auto_removal("backwards") - print(f"Testable nodes: {testable_nodes}") + print(f'Testing nodes in the following order for learning skeleton: {testable_nodes}') # Outer loop: iterate over 'size_cond_set' until stopping criterion is met # - 'size_cond_set' > 'max_cond_set_size' or @@ -148,22 +153,23 @@ def _learn_skeleton( # Note: in time-series graphs, all nodes are defined as a 2-tuple # of (, ) for y_var in testable_nodes: - # we only consider variables with required lab - if not self.latent_confounding and y_var[1] != 0: - continue + # we only consider variables with required lag + # if y_var[1] != 0: + # continue # TODO: need more efficient way of querying all possible edges if nbr_search == "all": lagged_nbrs = adj_graph.lagged_neighbors(y_var) contemporaneous_nbrs = adj_graph.contemporaneous_neighbors(y_var) - possible_adjacencies = set(lagged_nbrs).union(set(contemporaneous_nbrs)) + possible_adjacencies = list(set(lagged_nbrs).union(set(contemporaneous_nbrs))) elif nbr_search == "lagged": possible_adjacencies = adj_graph.lagged_neighbors(y_var) elif nbr_search == "contemporaneous": possible_adjacencies = adj_graph.contemporaneous_neighbors(y_var) logger.info(f"Considering node {y_var}...\n\n") - + print(f'\n\nTesting {y_var} against possible adjacencies {possible_adjacencies}') + print(f'size conditioning set p = {size_cond_set}') for x_var in possible_adjacencies: # a node cannot be a parent to itself in DAGs if y_var == x_var: @@ -223,7 +229,7 @@ def _learn_skeleton( # the null hypothesis, then we will break the loop comparing X and Y # and say X and Y are conditionally independent given 'cond_set' if pvalue > self.alpha: - logger.debug(f"Removing {x_var} - {y_var} with {cond_set}.") + print(f"Removing {x_var} - {y_var} with {cond_set}.") break # post-process the CI test results @@ -244,7 +250,7 @@ def _learn_skeleton( to_set = [] for u, v in self.remove_edges: # the opposite is already in there... - if v in from_set and u in to_set: + if v in to_set and u in from_set: continue from_set.append(u) to_set.append(v) @@ -255,7 +261,8 @@ def _learn_skeleton( # Remove non-significant links # Note: Removing edges at the end ensures "stability" of the algorithm # with respect to the randomness choice of pairs of edges considered in the inner loop - adj_graph.remove_edges_from(self.remove_edges) + print(f'Removing edges {self.remove_edges}') + adj_graph.remove_edges_from(list(self.remove_edges)) # increment the conditioning set size size_cond_set += 1 @@ -266,7 +273,7 @@ def _learn_skeleton( return adj_graph - def fit(self, data: pd.DataFrame, context: ContextType) -> None: + def fit(self, data: pd.DataFrame, context: Context) -> None: """Run structure learning to learn the skeleton of the causal graph. Parameters @@ -284,7 +291,9 @@ def fit(self, data: pd.DataFrame, context: ContextType) -> None: data, context = self._preprocess_data(data, context) self.context = ( - make_ts_context(context).observed_variables(data.columns.values.tolist()).build() + make_ts_context(context) + .observed_variables(data.columns.get_level_values("variable").tolist()) + .build() ) # initialize learning parameters @@ -319,16 +328,16 @@ def fit(self, data: pd.DataFrame, context: ContextType) -> None: # no assumption of contemporaneous causal structure # TODO: can make sure we don't inner-test the CI relations between contemporaneous edges if not self.contemporaneous_edges: - auto_removal = adj_graph._auto_removal + auto_removal = adj_graph._auto_removal # type: ignore adj_graph.set_auto_removal(None) - for u, v in adj_graph.edges: + for u, v in adj_graph.edges: # type: ignore if u[1] == v[1]: - adj_graph.remove_edge(u, v) + adj_graph.remove_edge(u, v) # type: ignore adj_graph.set_auto_removal(auto_removal) self.adj_graph_ = adj_graph - def _preprocess_data(self, data: pd.DataFrame, context: ContextType): + def _preprocess_data(self, data: pd.DataFrame, context: TimeSeriesContext): """Preprocess data and context. In time-series causal discovery, dataframe of the shape (n_times, n_signals) @@ -339,7 +348,7 @@ def _preprocess_data(self, data: pd.DataFrame, context: ContextType): column is variable x at lag 1. """ # first reformat data - max_lag = context.max_lag + max_lag = context.max_lag # type: ignore data = convert_ts_df_to_multiindex(data, max_lag) # run preprocessing @@ -359,3 +368,89 @@ def _preprocess_data(self, data: pd.DataFrame, context: ContextType): ) return data, context + + +class LearnTimeSeriesSemiMarkovianSkeleton(LearnTimeSeriesSkeleton): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + sep_set: Optional[SeparatingSet] = None, + alpha: float = 0.05, + min_cond_set_size: int = 0, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.PDS_T, + keep_sorted: bool = False, + max_path_length: Optional[int] = None, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ) -> None: + super().__init__( + ci_estimator, + sep_set, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + keep_sorted, + **ci_estimator_kwargs, + ) + if max_path_length is None: + max_path_length = np.inf + self.max_path_length = max_path_length + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def _compute_candidate_conditioning_sets( + self, adj_graph: nx.Graph, x_var: Column, y_var: Column, skeleton_method: SkeletonMethods + ) -> Set[Column]: + import pywhy_graphs as pgraph + + # get PAG from the context object + pag = self.context.state_variable("PAG") + + if skeleton_method == SkeletonMethods.PDS: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_PATH: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_path( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_T: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_t( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + elif skeleton_method == SkeletonMethods.PDS_T_PATH: + # determine how we want to construct the candidates for separating nodes + # perform conditioning independence testing on all combinations + possible_variables = pgraph.pds_t_path( + pag, x_var, y_var, max_path_length=self.max_path_length # type: ignore + ) + + if self.keep_sorted: + # Note it is assumed in public API that 'test_stat' is set + # inside the adj_graph + possible_variables = sorted( + possible_variables, + key=lambda n: adj_graph.edges[x_var, n]["test_stat"], + reverse=True, + ) # type: ignore + + if x_var in possible_variables: + possible_variables.remove(x_var) + if y_var in possible_variables: + possible_variables.remove(y_var) + + return possible_variables + + def fit(self, data: pd.DataFrame, context: TimeSeriesContext) -> None: + return super().fit(data, context) diff --git a/dodiscover/constraint/timeseries/tsfcialg.py b/dodiscover/constraint/timeseries/tsfcialg.py index e69de29b..fc70bf71 100644 --- a/dodiscover/constraint/timeseries/tsfcialg.py +++ b/dodiscover/constraint/timeseries/tsfcialg.py @@ -0,0 +1,162 @@ +import logging +from typing import Optional, Tuple + +import networkx as nx +import pandas as pd + +from dodiscover.ci.base import BaseConditionalIndependenceTest +from dodiscover.constraint.timeseries import LearnTimeSeriesSkeleton, LearnTimeSeriesSemiMarkovianSkeleton +from dodiscover.context_builder import make_ts_context +from dodiscover.context import Context +from dodiscover.typing import SeparatingSet + +from ..._protocol import EquivalenceClass +from ..config import SkeletonMethods +from ..fcialg import FCI + + +class TimeSeriesFCI(FCI): + def __init__( + self, + ci_estimator: BaseConditionalIndependenceTest, + alpha: float = 0.05, + min_cond_set_size: Optional[int] = None, + max_cond_set_size: Optional[int] = None, + max_combinations: Optional[int] = None, + skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, + apply_orientations: bool = True, + max_iter: int = 1000, + max_path_length: Optional[int] = None, + selection_bias: bool = False, + pds_skeleton_method: SkeletonMethods = SkeletonMethods.PDS, + separate_lag_phase: bool = False, + contemporaneous_edges: bool = True, + **ci_estimator_kwargs, + ): + super().__init__( + ci_estimator, + alpha, + min_cond_set_size, + max_cond_set_size, + max_combinations, + skeleton_method, + apply_orientations, + max_iter, + max_path_length, + selection_bias, + pds_skeleton_method, + **ci_estimator_kwargs, + ) + self.separate_lag_phase = separate_lag_phase + self.contemporaneous_edges = contemporaneous_edges + + def learn_skeleton(self, data: pd.DataFrame, context: Context, sep_set: Optional[SeparatingSet] = None) -> Tuple[nx.Graph, SeparatingSet]: + import pywhy_graphs + + from dodiscover import make_ts_context + + # initially learn the skeleton + skel_alg = LearnTimeSeriesSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.skeleton_method, + keep_sorted=False, + separate_lag_phase=self.separate_lag_phase, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + + # convert the undirected skeleton graph to a PAG, where + # all left-over edges have a "circle" endpoint + pag = pywhy_graphs.StationaryTimeSeriesPAG( + incoming_circle_edges=skel_graph, name="PAG derived with tsFCI") + + # orient colliders + self.orient_unshielded_triples(pag, sep_set) + + # convert the adjacency graph + new_init_graph = pag.to_ts_undirected() + + # Update the Context: + # add the corresponding intermediate PAG now to the context + # new initialization graph + for (_, _, d) in new_init_graph.edges(data=True): + if "test_stat" in d: + d.pop("test_stat") + if "pvalue" in d: + d.pop("pvalue") + context = ( + make_ts_context(context).init_graph(new_init_graph).state_variable("PAG", pag).build() + ) + + # # now compute all possibly d-separating sets and learn a better skeleton + skel_alg = LearnTimeSeriesSemiMarkovianSkeleton( + self.ci_estimator, + sep_set=sep_set, + alpha=self.alpha, + min_cond_set_size=self.min_cond_set_size, + max_cond_set_size=self.max_cond_set_size, + max_combinations=self.max_combinations, + skeleton_method=self.pds_skeleton_method, + keep_sorted=False, + max_path_length=self.max_path_length, + separate_lag_phase=self.separate_lag_phase, + contemporaneous_edges=self.contemporaneous_edges, + **self.ci_estimator_kwargs, + ) + skel_alg.fit(data, context) + + skel_graph = skel_alg.adj_graph_ + sep_set = skel_alg.sep_set_ + self.n_ci_tests += skel_alg.n_ci_tests + return skel_graph, sep_set + + def fit(self, data: pd.DataFrame, context: Context) -> None: + self.context_ = make_ts_context(context).build() + graph = self.context_.init_graph + self.init_graph_ = graph + self.fixed_edges_ = self.context_.included_edges + + # create a reference to the underlying data to be used + self.X_ = data + + # initialize graph object to apply learning + self.separating_sets_ = self._initialize_sep_sets(self.init_graph_) + + # learn skeleton graph and the separating sets per variable + graph, self.separating_sets_ = self.learn_skeleton( + self.X_, self.context_, self.separating_sets_ + ) + + # convert networkx.Graph to relevant causal graph object + graph = self.convert_skeleton_graph(graph) + + # orient edges on the causal graph object + if self.apply_orientations: + # first orient lagged edges + self.orient_lagged_edges(graph) + + if self.contemporaneous_edges: + # next orient contemporaneous edges if necessary + self.orient_contemporaneous_edges(graph) + + # store resulting data structures + self.graph_ = graph + + def orient_lagged_edges(self, graph: EquivalenceClass): + pass + + def orient_contemporaneous_edges(self, graph: EquivalenceClass): + pass + + def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: + return super().convert_skeleton_graph(graph) \ No newline at end of file diff --git a/dodiscover/constraint/timeseries/tspcalg.py b/dodiscover/constraint/timeseries/tspcalg.py index bfb7e00f..987e865c 100644 --- a/dodiscover/constraint/timeseries/tspcalg.py +++ b/dodiscover/constraint/timeseries/tspcalg.py @@ -1,23 +1,24 @@ import logging -from typing import Optional, Tuple +from itertools import combinations, permutations +from typing import Iterator, Optional, Tuple import networkx as nx import pandas as pd from dodiscover.ci.base import BaseConditionalIndependenceTest -from dodiscover.constraint.config import SkeletonMethods -from dodiscover.constraint.timeseries import LearnTimeSeriesSkeleton -from dodiscover.context import TimeSeriesContext +from dodiscover.context import Context from dodiscover.context_builder import make_ts_context from dodiscover.typing import SeparatingSet from ..._protocol import EquivalenceClass +from ..config import SkeletonMethods from ..pcalg import PC +from .skeleton import LearnTimeSeriesSkeleton logger = logging.getLogger() -class TSPC(PC): +class TimeSeriesPC(PC): def __init__( self, ci_estimator: BaseConditionalIndependenceTest, @@ -28,6 +29,7 @@ def __init__( skeleton_method: SkeletonMethods = SkeletonMethods.NBRS, apply_orientations: bool = True, max_iter: int = 1000, + separate_lag_phase: bool = False, contemporaneous_edges: bool = True, **ci_estimator_kwargs, ): @@ -77,12 +79,13 @@ def __init__( max_iter, **ci_estimator_kwargs, ) + self.separate_lag_phase = separate_lag_phase self.contemporaneous_edges = contemporaneous_edges def learn_skeleton( self, data: pd.DataFrame, - context: TimeSeriesContext, + context: Context, sep_set: Optional[SeparatingSet] = None, ) -> Tuple[nx.Graph, SeparatingSet]: skel_alg = LearnTimeSeriesSkeleton( @@ -105,7 +108,7 @@ def learn_skeleton( self.n_ci_tests += skel_alg.n_ci_tests return skel_graph, sep_set - def fit(self, data: pd.DataFrame, context: TimeSeriesContext) -> None: + def fit(self, data: pd.DataFrame, context: Context) -> None: self.context_ = make_ts_context(context).build() graph = self.context_.init_graph self.init_graph_ = graph @@ -137,18 +140,14 @@ def fit(self, data: pd.DataFrame, context: TimeSeriesContext) -> None: # store resulting data structures self.graph_ = graph - def orient_lagged_edges(self, graph): - # for every node in the PAG, evaluate neighbors that have any edge - for edge in graph.edges: - # get the variables in time-order - u, v = sorted(edge, key=lambda x: x[1]) - - # if we encountered a contemporaneous edge, continue - if u[1] == v[1]: - continue - - # now orient this edge as u -> v - graph.orient_uncertain_edge(u, v) + def orient_lagged_edges(self, graph: EquivalenceClass): + undirected_subgraph = graph.get_graphs(graph.undirected_edge_name) + # get non-lag nodes + for node in undirected_subgraph.nodes_at(t=0): + # get all lagged nbrs + for nbr in undirected_subgraph.lagged_neighbors(node): + # now orient this edge as u -> v + graph.orient_uncertain_edge(nbr, node) def orient_contemporaneous_edges(self, graph): # for all pairs of non-adjacent variables with a common neighbor @@ -156,13 +155,22 @@ def orient_contemporaneous_edges(self, graph): self.orient_unshielded_triples(graph, self.separating_sets_) self.orient_edges(graph) + def _orientable_edges(self, graph: EquivalenceClass) -> Iterator: + for (i, j) in permutations(graph.nodes_at(t=0), 2): # type: ignore + if i == j: + continue + yield (i, j) + + def _orientable_triples(self, graph: EquivalenceClass) -> Iterator: + # for every node in the PAG, evaluate neighbors that have any edge + for u in graph.nodes_at(t=0): # type: ignore + for v_i, v_j in combinations(graph.neighbors(u), 2): + yield (v_i, u, v_j) + def convert_skeleton_graph(self, graph: nx.Graph) -> EquivalenceClass: - # TODO: make this an equivalence class graph - from pywhy_graphs import StationaryTimeSeriesGraph - - # TODO: convert to a 1-liner when networkx allows input of tuples as nodes - # convert Graph object to a undirected stationary ts-graph - edges = graph.edges - graph = StationaryTimeSeriesGraph() - graph.add_edges_from(edges) + from pywhy_graphs import StationaryTimeSeriesCPDAG + + # convert Graph object to a CPDAG object with + # all undirected edges + graph = StationaryTimeSeriesCPDAG(incoming_undirected_edges=graph) return graph diff --git a/dodiscover/context.py b/dodiscover/context.py index fdb010d3..c0c93b65 100644 --- a/dodiscover/context.py +++ b/dodiscover/context.py @@ -135,7 +135,10 @@ def get_params(self, deep=True): out[key] = value return out - def __eq__(self, context: "Context") -> bool: + def __eq__(self, context: object) -> bool: + if not isinstance(context, Context): + raise RuntimeError(f"Context is not comparable to non-context types {context}.") + context_params = context.get_params() self_params = self.get_params() diff --git a/dodiscover/context_builder.py b/dodiscover/context_builder.py index e07ce3b4..168407b7 100644 --- a/dodiscover/context_builder.py +++ b/dodiscover/context_builder.py @@ -1,5 +1,4 @@ from copy import deepcopy -from itertools import product from typing import Any, Dict, Optional, Set, Tuple, cast import networkx as nx @@ -214,7 +213,7 @@ def _interpolate_graph(self) -> nx.Graph: raise ValueError("Must set variables() before building Context.") complete_graph = lambda: nx.complete_graph(self._observed_variables, create_using=nx.Graph) - has_all_variables = lambda g: set(g.nodes) == self._observed_variables + has_all_variables = lambda g: set(g.nodes) == set(self._observed_variables) # initialize the starting graph if self._graph is None: @@ -268,36 +267,39 @@ def _interpolate_graph(self) -> nx.Graph: f"The nodes within the initial graph, {self._graph.nodes}, " f"do not match the nodes in the passed in data, {self._observed_variables}." ) - has_all_nodes = lambda g: set(g.nodes) == set( - product(self._observed_variables, range(self._max_lag + 1)) - ) - if not has_all_nodes(self._graph): - raise RuntimeError("Graph does not contain all possible nodes.") + + for var_name in self._observed_variables: + for lag in range(self._max_lag + 1): + if (var_name, -lag) not in self._graph.nodes: + raise RuntimeError( + f"Graph does not contain all possible nodes, " + f"such as {(var_name, -lag)}." + ) return self._graph - def init_graph(self, graph: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + def init_graph(self, graph: TimeSeriesGraph) -> "ContextBuilder": """Set the initial time-series graph to begin causal discovery with.""" return super().init_graph(graph) - def max_lag(self, lag: int) -> "TimeSeriesContextBuilder": + def max_lag(self, lag: int) -> "ContextBuilder": """Set the maximum time lag.""" if lag <= 0: raise ValueError(f"Lag in time-series graphs should be > 0, not {lag}.") self._max_lag = lag return self - def contemporaneous_edges(self, present: bool) -> "TimeSeriesContextBuilder": + def contemporaneous_edges(self, present: bool) -> "ContextBuilder": """Whether or not to assume contemporaneous edges.""" self._contemporaneous_edges = present return self - def included_lag_edges(self, edges: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + def included_lag_edges(self, edges: TimeSeriesGraph) -> "ContextBuilder": """Apriori set lagged edges.""" self._included_lag_edges = edges return self - def excluded_lag_edges(self, edges: TimeSeriesGraph) -> "TimeSeriesContextBuilder": + def excluded_lag_edges(self, edges: TimeSeriesGraph) -> "ContextBuilder": """Apriori excluded lagged edges.""" self._excluded_lag_edges = edges return self @@ -357,7 +359,7 @@ def make_context(context: Optional[Context] = None) -> ContextBuilder: return result -def make_ts_context(context: Optional[TimeSeriesContext] = None) -> TimeSeriesContextBuilder: +def make_ts_context(context: Optional[Context] = None) -> TimeSeriesContextBuilder: """Create a time-series context builder.""" result = TimeSeriesContextBuilder() if context is not None: diff --git a/dodiscover/typing.py b/dodiscover/typing.py index 01acfad2..426fd765 100644 --- a/dodiscover/typing.py +++ b/dodiscover/typing.py @@ -2,6 +2,9 @@ import networkx as nx +# from .context import Context as BaseContext +# from .context_builder import ContextBuilder as BaseContextBuilder + # Pandas DataFrame columns that are also compatible with Graph nodes Column = Union[int, float, str] diff --git a/pyproject.toml b/pyproject.toml index cf55c830..8ba84268 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -206,7 +206,7 @@ minversion = '6.0' addopts = '--durations 20 --junit-xml=junit-results.xml --verbose' filterwarnings = [] log_cli = true -log_cli_level = -1 +log_cli_level = 'ERROR' [tool.coverage.run] branch = true diff --git a/tests/unit_tests/constraint/timeseries/test_fcialg.py b/tests/unit_tests/constraint/timeseries/test_fcialg.py new file mode 100644 index 00000000..c8d89a8d --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_fcialg.py @@ -0,0 +1,117 @@ +import networkx as nx +import numpy as np +import pandas as pd +import pytest +from pywhy_graphs import StationaryTimeSeriesDiGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import TimeSeriesFCI +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesFCI(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + +def test_timeseries_fci_oracle(): + r"""Test tsFCI's algorithm assuming no contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + print(data.columns) + data.drop('x3', axis=1, inplace=True) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesFCI( + ci_estimator=oracle, + separate_lag_phase=False, + contemporaneous_edges=False + ) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # learn the skeleton graph + skel_graph, sep_set = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + assert sep_set[("x1", 0)][("x1", -1)] == [] + assert {("x1", -1)} in sep_set[("x2", 0)][("x1", -2)] + + # now test the full fit algorithm + # alg.fit(data, context) + # learned_graph = alg.graph_ + + # # all edges in skeleton are inside G + # assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) diff --git a/tests/unit_tests/constraint/timeseries/test_pcalg.py b/tests/unit_tests/constraint/timeseries/test_pcalg.py new file mode 100644 index 00000000..10fa0d19 --- /dev/null +++ b/tests/unit_tests/constraint/timeseries/test_pcalg.py @@ -0,0 +1,183 @@ +import networkx as nx +import numpy as np +import pandas as pd +from pywhy_graphs import StationaryTimeSeriesDiGraph + +from dodiscover.ci import Oracle +from dodiscover.constraint.timeseries import TimeSeriesPC +from dodiscover.constraint.timeseries.utils import convert_ts_df_to_multiindex +from dodiscover.context_builder import make_ts_context + +seed = 12345 +rng = np.random.default_rng(seed) + + +def test_evaluate_edge(): + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + (("x1", 0), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesPC(ci_estimator=oracle) + data = convert_ts_df_to_multiindex(data, max_lag) + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x1", -1)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x1", 0), ("x2", 0), {("x1", -1)}) + assert pvalue == 1.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0)) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) + assert pvalue == 1.0 + + +class TestTsPCSimple: + """Test tsPC algorithm against the modified graph in the tsFCI paper.""" + + def setup(self): + pass + + def test_timeseries_pc_oracle(self): + r"""Test tsPC's algorithm assuming no contemporaneous edges. + + Tests the ts skeleton method with an oracle from + Figure 1 and 2 of the tsFCI paper with the difference that the graph + is fully observed here. + + Figure 1, where "\>" and "/>" are edges pointing + down, or up diagonally. + + x1(t-2) -> x1(t-1) -> x1(t) + \> \> + x2(t-2) x2(t-1) x2(t) + /> /> + x3(t-2) -> x3(t-1) -> x3(t) + """ + var_names = ["x1", "x2", "x3"] + ts_edges = [ + (("x1", -1), ("x1", 0)), + (("x1", -1), ("x2", 0)), + (("x3", -1), ("x2", 0)), + (("x3", -1), ("x3", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + alg = TimeSeriesPC(ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False) + + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # learn the skeleton graph + skel_graph, sep_set = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + assert sep_set[("x1", -1)][("x3", -1)] == [] + assert {("x3", -1)} in sep_set[("x2", 0)][("x3", 0)] + + # now test the full fit algorithm + alg.fit(data, context) + learned_graph = alg.graph_ + + # all edges in skeleton are inside G + assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) + + + def test_timeseries_pc_contemporaneous(self): + """Test tsPC algorithm with contemporaneous edges. + + Uses figure 2 from PCMCI+ paper. + """ + # t-1 t + # x o -> o + # ^ \> ^ + # y o -> o + # ^ ^ + # z o o + var_names = ["x", "y", "z"] + ts_edges = [ + (("x", -1), ("x", 0)), + (("x", -1), ("y", 0)), + (("y", -1), ("y", 0)), + (("y", 0), ("x", 0)), + (("z", 0), ("y", 0)), + ] + max_lag = 1 + G = StationaryTimeSeriesDiGraph(max_lag=max_lag) + G.add_edges_from(ts_edges) + + # create a dataset, starting from the observed time-series + data_arr = rng.random((10, len(var_names))).round(2) + data = pd.DataFrame( + data=data_arr, + columns=var_names, + ) + + # create an oracle + oracle = Oracle(G) + context = make_ts_context().max_lag(max_lag).variables(data=data).build() + + # First: test that the algorithm return the correct answer + # learn the skeleton graph without contemporaneous edges + alg = TimeSeriesPC(ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=True) + skel_graph, _ = alg.learn_skeleton(data, context) + + # all edges in skeleton are inside G + assert all(edge in skel_graph.edges for edge in G.edges) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + # now test the full fit algorithm + alg.fit(data, context) + learned_graph = alg.graph_ + + # all edges in skeleton are inside G + assert nx.is_isomorphic(learned_graph.to_directed(), G.to_directed()) + assert nx.is_isomorphic(learned_graph.sub_directed_graph(), G) + + # learn the skeleton graph without contemporaneous edges + alg = TimeSeriesPC(ci_estimator=oracle, separate_lag_phase=False, contemporaneous_edges=False) + skel_graph, _ = alg.learn_skeleton(data, context) + # all edges in skeleton are inside G + assert not all(edge in skel_graph.edges for edge in G.edges) + assert not nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) + + +class TestTsPCSimulated: + """Test tsPC algorithm against more complex simulated graphs.""" + + def setup(self): + pass + + \ No newline at end of file diff --git a/tests/unit_tests/constraint/timeseries/test_skeleton.py b/tests/unit_tests/constraint/timeseries/test_skeleton.py index c1b40ad4..86a71251 100644 --- a/tests/unit_tests/constraint/timeseries/test_skeleton.py +++ b/tests/unit_tests/constraint/timeseries/test_skeleton.py @@ -53,6 +53,12 @@ def test_skeleton_evaluate_edge(): _, pvalue = alg.evaluate_edge(data, ("x3", -1), ("x1", 0), {("x1", -1)}) assert pvalue == 1.0 + _, pvalue = alg.evaluate_edge(data, ("x2", -1), ("x3", 0), {}) + assert pvalue == 0.0 + + _, pvalue = alg.evaluate_edge(data, ("x2", -1), ("x3", 0), {('x3', -1)}) + assert pvalue == 0.0 + def test_markovian_skeleton_oracle(): r"""Test tsPC's skeleton algorithm assuming no latent confounders nor contemporaneous edges. @@ -96,17 +102,11 @@ def test_markovian_skeleton_oracle(): context = make_ts_context().max_lag(max_lag).variables(data=data).build() - for edge in G.edges: - print(edge) - print(G.nodes) alg.fit(data, context) skel_graph = alg.adj_graph_ # all edges in skeleton are inside G assert all(edge in skel_graph.edges for edge in G.edges) - - for edge in skel_graph.edges: - print(edge) assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) @@ -150,16 +150,24 @@ def test_markovian_skeleton_with_contemporaneous_edges(): ) # create an oracle - oracle = Oracle(G) + oracle = Oracle(G.copy(double_max_lag=True)) alg = LearnTimeSeriesSkeleton(ci_estimator=oracle) - context = make_ts_context().max_lag(max_lag).variables(data=data).build() + context = make_ts_context().max_lag(max_lag + 1).variables(data=data).build() # learn the graph alg.fit(data, context) skel_graph = alg.adj_graph_ + skel_graph.set_max_lag(max_lag) # all edges in skeleton are inside G assert all(edge in skel_graph.edges for edge in G.edges) + print(skel_graph) + print(G) + + for edge in skel_graph.to_undirected().edges: + if not G.to_undirected().has_edge(*edge): + print(edge) + assert nx.is_isomorphic(skel_graph.to_undirected(), G.to_undirected()) @@ -224,8 +232,6 @@ def test_semi_markovian_skeleton_oracle(): oracle = Oracle(G) alg = LearnTimeSeriesSkeleton( ci_estimator=oracle, - # separate_lag_phase=separate_lag_phase, - latent_confounding=True, ) context = make_ts_context().max_lag(max_lag).variables(data=data).build() From e58b2df65e9f01b20c1b62c2422a567b8321fbea Mon Sep 17 00:00:00 2001 From: Adam Li Date: Mon, 2 Jan 2023 23:24:49 -0500 Subject: [PATCH 3/5] Fix Signed-off-by: Adam Li --- dodiscover/constraint/pcalg.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dodiscover/constraint/pcalg.py b/dodiscover/constraint/pcalg.py index 77abaedc..7123d4d9 100644 --- a/dodiscover/constraint/pcalg.py +++ b/dodiscover/constraint/pcalg.py @@ -135,10 +135,9 @@ def _orientable_edges(self, graph: EquivalenceClass) -> Iterator: Used in orientation of edges. """ - for (i, j) in permutations(graph.nodes, 2): - if i == j: - continue - yield (i, j) + for i in graph.nodes: + for j in graph.neighbors(j): + yield (i, j) def orient_edges(self, graph: EquivalenceClass) -> None: """Orient edges in a skeleton graph to estimate the causal DAG, or CPDAG. From e2577d947d5d5165476373dc21209ee5c6a85ea5 Mon Sep 17 00:00:00 2001 From: Adam Li Date: Mon, 9 Jan 2023 15:26:31 -0500 Subject: [PATCH 4/5] Begin tracking PR Signed-off-by: Adam Li --- doc/tutorials/index.rst | 9 +++++++++ doc/tutorials/timeseries/lpcmci-with-summary.ipynb | 0 2 files changed, 9 insertions(+) create mode 100644 doc/tutorials/timeseries/lpcmci-with-summary.ipynb diff --git a/doc/tutorials/index.rst b/doc/tutorials/index.rst index 46070e2f..c18b6257 100644 --- a/doc/tutorials/index.rst +++ b/doc/tutorials/index.rst @@ -16,3 +16,12 @@ no latent confounders. markovian/example-pc-algo +[Experimental] Time-series causal discovery +=========================================== +We + +.. toctree:: + :maxdepth: 1 + :titlesonly: + + timeseries/lpcmci-with-summary \ No newline at end of file diff --git a/doc/tutorials/timeseries/lpcmci-with-summary.ipynb b/doc/tutorials/timeseries/lpcmci-with-summary.ipynb new file mode 100644 index 00000000..e69de29b From 6df8110917a6e8b12d628e4b4c0bb92c3e66269c Mon Sep 17 00:00:00 2001 From: Adam Li Date: Tue, 10 Jan 2023 21:23:02 -0500 Subject: [PATCH 5/5] Adding section Signed-off-by: Adam Li --- doc/tutorials/index.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/tutorials/index.rst b/doc/tutorials/index.rst index c18b6257..14ddbbb3 100644 --- a/doc/tutorials/index.rst +++ b/doc/tutorials/index.rst @@ -18,7 +18,7 @@ no latent confounders. [Experimental] Time-series causal discovery =========================================== -We +We include some tutorials on novel time-series causal discovery algorithms. .. toctree:: :maxdepth: 1