-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsampling.py
More file actions
89 lines (73 loc) · 2.78 KB
/
sampling.py
File metadata and controls
89 lines (73 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Module for splitting a graph layer into 'obesrved' and 'hidden' parts
"""
from typing import Dict, List, NamedTuple, Optional
import pandas as pd
from sklearn.model_selection import train_test_split
from utils import NodeLabel, display, node_set
class GraphData(NamedTuple):
edges: pd.DataFrame
nodes: List[NodeLabel]
class LayerSplit(NamedTuple):
layer_id: int
node_index: Dict[NodeLabel, int]
observed: GraphData
hidden: GraphData
full: GraphData
def print_summary(self):
summary = pd.DataFrame.from_dict({
'nodes': {
'total': len(self.full.nodes),
'observed': len(self.observed.nodes),
'hidden': len(self.hidden.nodes),
},
'edges': {
'total': len(self.full.edges),
'observed': len(self.observed.edges),
'hidden': len(self.hidden.edges),
}
}, orient='index')
summary['obs.ratio'] = summary.observed / summary.total
display(f'Summary of random split. Layer id: {self.layer_id}')
display(summary)
@property
def n(self):
return len(self.node_index)
def random_layer_split(edges: pd.DataFrame,
layer_id: int,
hidden_ratio: float = 0.5,
random_state: Optional[int] = None):
nodes = list(node_set(edges))
nodes_observed, nodes_hidden = train_test_split(
nodes,
test_size=hidden_ratio,
random_state=random_state
)
edges_observed, edges_hidden = partition_into_observed_and_hidden(edges, nodes_hidden)
node_index = {node_id: i for i, node_id in enumerate(nodes)}
return LayerSplit(
layer_id=layer_id,
node_index=node_index,
observed=GraphData(edges=edges_observed, nodes=nodes_observed),
hidden=GraphData(edges=edges_hidden, nodes=nodes_hidden),
full=GraphData(edges=edges, nodes=nodes)
)
def layer_split_with_no_observables(edges: pd.DataFrame, layer_id: int):
nodes = list(node_set(edges))
node_index = {node_id: i for i, node_id in enumerate(nodes)}
graph_data = GraphData(edges=edges, nodes=nodes)
empty_graph = GraphData(
edges=pd.DataFrame(columns=edges.columns, index=[]),
nodes=[])
return LayerSplit(
layer_id=layer_id,
node_index=node_index,
observed=empty_graph,
hidden=graph_data,
full=graph_data
)
def partition_into_observed_and_hidden(edges, nodes_hidden):
is_hidden_edge = edges.node_1.isin(nodes_hidden) | edges.node_2.isin(nodes_hidden)
edges_hidden = edges[is_hidden_edge]
edges_observed = edges[~is_hidden_edge]
return edges_observed, edges_hidden