diff --git a/hyperbench/data/dataset.py b/hyperbench/data/dataset.py index d04c716..76201f4 100644 --- a/hyperbench/data/dataset.py +++ b/hyperbench/data/dataset.py @@ -6,11 +6,10 @@ import requests from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple from torch import Tensor from torch.utils.data import Dataset as TorchDataset -from hyperbench.types.hypergraph import HIFHypergraph -from hyperbench.types.hdata import HData +from hyperbench.types import HData, HIFHypergraph from hyperbench.utils.hif_utils import validate_hif_json @@ -137,14 +136,14 @@ def __getitem__(self, index: int | List[int]) -> HData: sampled_edge_index, sampled_node_ids, sampled_edge_ids ) - new_node_features = self.hdata.x[sampled_node_ids] + new_x = self.hdata.x[sampled_node_ids] new_edge_attr = None if self.hdata.edge_attr is not None and len(sampled_edge_ids) > 0: new_edge_attr = self.hdata.edge_attr[sampled_edge_ids] return HData( - x=new_node_features, + x=new_x, edge_index=new_edge_index, edge_attr=new_edge_attr, num_nodes=len(sampled_node_ids), @@ -185,8 +184,9 @@ def process(self) -> HData: ] ) else: - # Fallback to zeros if no numeric attributes - x = torch.zeros((num_nodes, 1), dtype=torch.float) + # Fallback to ones if no node features, 1 is better as it can help during + # training (e.g., avoid zero multiplication), especially in first epochs + x = torch.ones((num_nodes, 1), dtype=torch.float) # remap node and edge IDs to 0-based contiguous IDs # Use dict comprehension for faster lookups @@ -322,7 +322,7 @@ def __sample_edge_index( node_ids = edge_index[0] edge_ids = edge_index[1] - sampled_node_ids = torch.tensor(sampled_node_ids_list) + sampled_node_ids = torch.tensor(sampled_node_ids_list, device=node_ids.device) # Find incidences where the node is in our sampled node set # Example: edge_index[0] = [0, 0, 1, 2, 3, 4], sampled_node_ids = [0, 3] @@ -405,9 +405,11 @@ def __to_0based_ids( Returns: Tensor of 0-based ids. """ - id_to_0based_id = torch.zeros(n, dtype=torch.long) + device = original_ids.device + + id_to_0based_id = torch.zeros(n, dtype=torch.long, device=device) n_ids_to_keep = len(ids_to_keep) - id_to_0based_id[ids_to_keep] = torch.arange(n_ids_to_keep) + id_to_0based_id[ids_to_keep] = torch.arange(n_ids_to_keep, device=device) return id_to_0based_id[original_ids] diff --git a/hyperbench/data/loader.py b/hyperbench/data/loader.py index 1fd8d58..b7f3752 100644 --- a/hyperbench/data/loader.py +++ b/hyperbench/data/loader.py @@ -97,13 +97,13 @@ def __batch_node_features(self, batch: List[HData]) -> Tuple[Tensor, int]: Returns: Tensor: Concatenated node features with shape (total_nodes, num_features). """ - per_sample_node_features = [data.x for data in batch] + per_sample_x = [data.x for data in batch] # Stack all nodes along the node dimension from all samples into a single tensor - batched_node_features = torch.cat(per_sample_node_features, dim=0) - total_nodes = batched_node_features.size(0) + batched_x = torch.cat(per_sample_x, dim=0) + total_nodes = batched_x.size(0) - return batched_node_features, total_nodes + return batched_x, total_nodes def __batch_edges(self, batch: List[HData]) -> Tuple[Tensor, Optional[Tensor], int]: """Batches hyperedge indices and attributes, adjusting indices for concatenated nodes. diff --git a/hyperbench/tests/data/dataset_test.py b/hyperbench/tests/data/dataset_test.py index 08c97ac..404e4ab 100644 --- a/hyperbench/tests/data/dataset_test.py +++ b/hyperbench/tests/data/dataset_test.py @@ -727,7 +727,7 @@ class TestDataset(Dataset): def test_process_with_no_node_attributes_fallback(): - """Test process() falls back to torch zeros when no numeric attributes.""" + """Test process() falls back to torch ones when no node features.""" mock_hypergraph = HIFHypergraph( network_type="undirected", nodes=[ @@ -746,7 +746,7 @@ class TestDataset(Dataset): dataset = TestDataset() assert dataset.hdata.x.shape == (2, 1) - assert torch.allclose(dataset.hdata.x, torch.tensor([[0.0], [0.0]])) + assert torch.allclose(dataset.hdata.x, torch.tensor([[1.0], [1.0]])) def test_process_with_single_node_attribute(): diff --git a/hyperbench/tests/types/graph_test.py b/hyperbench/tests/types/graph_test.py new file mode 100644 index 0000000..2739584 --- /dev/null +++ b/hyperbench/tests/types/graph_test.py @@ -0,0 +1,423 @@ +import pytest +import torch + +from hyperbench.types.graph import Graph + + +@pytest.fixture +def mock_single_edge_graph(): + return Graph([[0, 1]]) + + +@pytest.fixture +def mock_linear_graph(): + # Linear graph: 0-1-2-3 + return Graph([[0, 1], [1, 2], [2, 3]]) + + +@pytest.fixture +def mock_graph_with_only_selfloops(): + return Graph([[0, 0], [1, 1]]) + + +@pytest.fixture +def mock_graph_with_one_selfloop(): + return Graph([[0, 1], [1, 1], [2, 3]]) + + +@pytest.mark.parametrize( + "graph, expected_edges", + [ + pytest.param(Graph([]), [], id="empty_graph"), + pytest.param(Graph([[0, 1]]), [[0, 1]], id="single_edge"), + pytest.param( + Graph([[0, 1], [1, 2], [2, 3]]), + [[0, 1], [1, 2], [2, 3]], + id="linear_graph", + ), + ], +) +def test_init_edges(graph, expected_edges): + assert graph.edges == expected_edges + + +@pytest.mark.parametrize( + "graph, expected_num_nodes", + [ + pytest.param(Graph([]), 0, id="empty_graph"), + pytest.param(Graph([[0, 1]]), 2, id="single_edge"), + pytest.param(Graph([[0, 0]]), 1, id="single_edge_selfloop"), + pytest.param(Graph([[0, 1], [1, 2], [2, 3]]), 4, id="linear_graph"), + pytest.param(Graph([[0, 0], [1, 1]]), 2, id="only_selfloops"), + pytest.param(Graph([[0, 1], [1, 1], [2, 3]]), 4, id="one_selfloop"), + pytest.param(Graph([[0, 1], [2, 3]]), 4, id="disconnected_graph"), + pytest.param( + Graph([[0, 1], [0, 1], [1, 2]]), + 3, + id="duplicate_edges", + ), + pytest.param(Graph([[0, 1], [0, 2], [1, 2]]), 3, id="complete_graph"), + ], +) +def test_num_nodes(graph, expected_num_nodes): + assert graph.num_nodes == expected_num_nodes + + +@pytest.mark.parametrize( + "graph, expected_num_edges", + [ + pytest.param(Graph([]), 0, id="empty_graph"), + pytest.param(Graph([[0, 1]]), 1, id="single_edge"), + pytest.param(Graph([[0, 0]]), 1, id="single_edge_selfloop"), + pytest.param(Graph([[0, 1], [1, 2], [2, 3]]), 3, id="linear_graph"), + pytest.param(Graph([[0, 0], [1, 1]]), 2, id="only_selfloops"), + pytest.param(Graph([[0, 1], [1, 1], [2, 3]]), 3, id="one_selfloop"), + pytest.param(Graph([[0, 1], [2, 3]]), 2, id="disconnected_graph"), + pytest.param( + Graph([[0, 1], [0, 1], [1, 2]]), + 3, + id="duplicate_edges", + ), + pytest.param(Graph([[0, 1], [0, 2], [1, 2]]), 3, id="complete_graph"), + ], +) +def test_num_edges(graph, expected_num_edges): + assert graph.num_edges == expected_num_edges + + +@pytest.mark.parametrize( + "graph, expected_edges_after_removal", + [ + pytest.param(Graph([]), [], id="empty_graph"), + pytest.param( + Graph([[0, 1], [2, 3]]), + [[0, 1], [2, 3]], + id="no_selfloops", + ), + pytest.param(Graph([[0, 0]]), [], id="one_edge_one_selfloop"), + pytest.param(Graph([[0, 1], [1, 1]]), [[0, 1]], id="one_selfloop"), + pytest.param( + Graph([[0, 0], [1, 1], [2, 2]]), + [], + id="all_selfloops", + ), + pytest.param( + Graph([[0, 1], [1, 2], [2, 2]]), + [[0, 1], [1, 2]], + id="mixed_edges", + ), + pytest.param( + Graph([[0, 0], [0, 1], [1, 1], [1, 2]]), + [[0, 1], [1, 2]], + id="mixed_edges_multiple_selfloops", + ), + pytest.param( + Graph([[0, 0], [1, 1], [2, 2], [3, 4]]), + [[3, 4]], + id="multiple_consecutive_selfloops", + ), + ], +) +def test_remove_selfloops(graph, expected_edges_after_removal): + """Test removing self-loops for various graph configurations.""" + graph.remove_selfloops() + assert graph.edges == expected_edges_after_removal + + +def test_remove_selfloops_preserves_order(): + graph = Graph([[0, 1], [1, 1], [2, 3], [3, 3], [4, 5]]) + graph.remove_selfloops() + assert graph.edges == [[0, 1], [2, 3], [4, 5]] + + +@pytest.mark.parametrize( + "graph, expected_edge_index", + [ + pytest.param( + Graph([]), + torch.empty((2, 0), dtype=torch.long), + id="empty_graph", + ), + pytest.param( + Graph([[0, 1]]), + torch.tensor([[0], [1]], dtype=torch.long), + id="single_edge", + ), + pytest.param( + Graph([[0, 1], [1, 2]]), + torch.tensor([[0, 1], [1, 2]], dtype=torch.long), + id="multiple_edges", + ), + pytest.param( + Graph([[0, 1], [1, 2], [2, 3]]), + torch.tensor([[0, 1, 2], [1, 2, 3]], dtype=torch.long), + id="linear_graph", + ), + pytest.param( + Graph([[0, 0], [1, 1]]), + torch.tensor([[0, 1], [0, 1]], dtype=torch.long), + id="only_selfloops", + ), + pytest.param( + Graph([[0, 1], [0, 1], [1, 2]]), + torch.tensor([[0, 0, 1], [1, 1, 2]], dtype=torch.long), + id="duplicate_edges", + ), + ], +) +def test_to_edge_index(graph, expected_edge_index): + edge_index = graph.to_edge_index() + assert torch.equal(edge_index, expected_edge_index) + + +def test_to_edge_index_returns_long_dtype(mock_single_edge_graph): + edge_index = mock_single_edge_graph.to_edge_index() + assert edge_index.dtype == torch.long + + +def test_to_edge_index_large_graph(): + edges = [[i, i + 1] for i in range(1000)] + graph = Graph(edges) + + edge_index = graph.to_edge_index() + + assert edge_index.shape == (2, 1000) + assert edge_index[0, 0] == 0 + assert edge_index[1, -1] == 1000 + + +def test_to_edge_index_does_not_modify_graph(mock_linear_graph): + original_edges = [edge[:] for edge in mock_linear_graph.edges] + _ = mock_linear_graph.to_edge_index() + + assert mock_linear_graph.edges == original_edges + + +def test_to_edge_index_is_contiguous(mock_single_edge_graph): + """Test that to_edge_index returns a contiguous tensor.""" + edge_index = mock_single_edge_graph.to_edge_index() + assert edge_index.is_contiguous() + + +def test_to_edge_index_before_and_after_removal_all_selfloops( + mock_graph_with_only_selfloops, +): + edge_index_before = mock_graph_with_only_selfloops.to_edge_index() + assert edge_index_before.shape == (2, 2) + + mock_graph_with_only_selfloops.remove_selfloops() + edge_index_after = mock_graph_with_only_selfloops.to_edge_index() + + expected = torch.tensor([], dtype=torch.long).reshape(2, 0) + + assert edge_index_after.shape == (2, 0) + assert torch.equal(edge_index_after, expected) + + +def test_to_edge_index_before_and_after_removal_one_selfloops( + mock_graph_with_one_selfloop, +): + edge_index_before = mock_graph_with_one_selfloop.to_edge_index() + assert edge_index_before.shape == (2, 3) + + mock_graph_with_one_selfloop.remove_selfloops() + edge_index_after = mock_graph_with_one_selfloop.to_edge_index() + + expected = torch.tensor([[0, 2], [1, 3]]) + + assert edge_index_after.shape == (2, 2) + assert torch.equal(edge_index_after, expected) + + +def test_bidirectional_edges(): + graph = Graph([[0, 1], [1, 0]]) + assert graph.num_edges == 2 + assert graph.num_nodes == 2 + + edge_index = graph.to_edge_index() + + expected = torch.tensor([[0, 1], [1, 0]]) + + assert torch.equal(edge_index, expected) + + +def test_star_graph(): + """Test star graph (all edges connected to central node).""" + graph = Graph([[0, 1], [0, 2], [0, 3], [0, 4]]) + assert graph.num_nodes == 5 + assert graph.num_edges == 4 + + edge_index = graph.to_edge_index() + + assert edge_index.shape == (2, 4) + + +def test_cyclic_graph(): + """Test cyclic graph (a closed loop).""" + graph = Graph([[0, 1], [1, 2], [2, 3], [3, 0]]) + assert graph.num_nodes == 4 + assert graph.num_edges == 4 + + edge_index = graph.to_edge_index() + + assert edge_index.shape == (2, 4) + + +def test_from_directed_to_undirected_edge_index_single_directed_edge(): + """A single directed edge (0 -> 1) should produce bidirectional edges.""" + edge_index = torch.tensor([[0], [1]]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + # Should contain both (0, 1) and (1, 0) + assert (0, 1) in edges + assert (1, 0) in edges + assert len(edges) == 2 + + +def test_from_directed_to_undirected_edge_index_already_undirected_does_not_create_duplicates(): + edge_index = torch.tensor([[0, 1], [1, 0]]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + assert edges == {(0, 1), (1, 0)} + + +def test_from_directed_to_undirected_edge_index_removes_duplicate_edges(): + edge_index = torch.tensor([[0, 0, 1], [1, 1, 0]]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + assert edges == {(0, 1), (1, 0)} + + +def test_from_directed_to_undirected_edge_index_triangle_directed(): + """ + A directed triangle should become a bidirectional triangle. + + Example: + Directed cycle: 0 -> 1 -> 2 -> 0 + Bidirectional traingle: 0 <-> 1 <-> 2 <-> 0 + """ + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + bidirectional_triangle = {(0, 1), (1, 0), (1, 2), (2, 1), (2, 0), (0, 2)} + assert edges == bidirectional_triangle + + +def test_from_directed_to_undirected_edge_index_preserves_selfloops_in_input(): + edge_index = torch.tensor([[0, 1, 1], [1, 0, 1]]) # (1, 1) is a self-loop + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + assert (1, 1) in edges + + +def test_from_directed_to_undirected_edge_index_empty_edge_index_returns_empty_tensor(): + edge_index = torch.tensor([[], []]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + + assert result.shape == (2, 0) + + +def test_from_directed_to_undirected_edge_index_with_selfloops_adds_all_selfloops(): + edge_index = torch.tensor([[0, 1], [1, 2]]) + + result = Graph.from_directed_to_undirected_edge_index( + edge_index, with_selfloops=True + ) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + # Should have self-loops for nodes 0, 1, 2 (inferred from max index) + assert (0, 0) in edges + assert (1, 1) in edges + assert (2, 2) in edges + + +def test_from_directed_to_undirected_edge_index_with_selfloops_does_not_duplicate_selfloops(): + edge_index = torch.tensor([[0, 1], [1, 1]]) # (1, 1) is already a self-loop + + result = Graph.from_directed_to_undirected_edge_index( + edge_index, with_selfloops=True + ) + edges = list(zip(result[0].tolist(), result[1].tolist())) + + assert (0, 0) in edges + assert (1, 1) in edges + + +@pytest.mark.parametrize( + "with_selfloops", + [ + pytest.param(True, id="with_selfloops"), + pytest.param(False, id="without_selfloops"), + ], +) +def test_from_directed_to_undirected_edge_index_preserves_device(with_selfloops): + edge_index = torch.tensor([[0], [1]], device="cpu") + + result = Graph.from_directed_to_undirected_edge_index( + edge_index, with_selfloops=with_selfloops + ) + + assert result.device == edge_index.device + + +def test_from_directed_to_undirected_edge_index_disconnected_components(): + # Two disconnected components: (0, 1) and (2, 3) + edge_index = torch.tensor([[0, 2], [1, 3]]) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + edges = set(zip(result[0].tolist(), result[1].tolist())) + + expected = {(0, 1), (1, 0), (2, 3), (3, 2)} + assert edges == expected + + +@pytest.mark.parametrize( + "edge_index, expected_num_undirected_edges", + [ + pytest.param( + torch.tensor([[0], [1]]), + 2, + id="single_edge_becomes_two", + ), + pytest.param( + torch.tensor([[0, 1], [1, 0]]), + 2, + id="bidirectional_stays_two", + ), + pytest.param( + torch.tensor([[0, 1, 2], [1, 2, 0]]), + 6, + id="directed_triangle_becomes_six", + ), + pytest.param( + torch.tensor([[0, 0], [1, 2]]), + 4, + id="star_two_edges_becomes_four", + ), + ], +) +def test_from_directed_to_undirected_edge_index_edge_count( + edge_index, expected_num_undirected_edges +): + result = Graph.from_directed_to_undirected_edge_index(edge_index) + + assert result.shape[1] == expected_num_undirected_edges + + +def test_from_directed_to_undirected_edge_index_dtype_preserved(): + edge_index = torch.tensor([[0, 1], [1, 2]], dtype=torch.long) + + result = Graph.from_directed_to_undirected_edge_index(edge_index) + + assert result.dtype == edge_index.dtype diff --git a/hyperbench/tests/types/hdata_test.py b/hyperbench/tests/types/hdata_test.py new file mode 100644 index 0000000..9eccaa8 --- /dev/null +++ b/hyperbench/tests/types/hdata_test.py @@ -0,0 +1,82 @@ +import pytest +import torch + +from hyperbench.types import HData + + +@pytest.fixture +def mock_hdata(): + x = torch.randn(5, 4) # 5 nodes with 4 features each + edge_index = torch.tensor( + [ + [0, 1, 2, 3, 4, 0], # node IDs + [0, 0, 1, 1, 2, 2], + ] + ) # hyperedge IDs + edge_attr = torch.randn(3, 2) # 3 hyperedges with 2 features each + + return HData(x=x, edge_index=edge_index, edge_attr=edge_attr) + + +def test_hdata_to_cpu(mock_hdata): + returned = mock_hdata.to("cpu") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "cpu" + assert mock_hdata.edge_index.device.type == "cpu" + assert mock_hdata.edge_attr is not None + assert mock_hdata.edge_attr.device.type == "cpu" + + +def test_hdata_to_cpu_handles_none_edge_attr(mock_hdata): + mock_hdata.edge_attr = None + returned = mock_hdata.to("cpu") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "cpu" + assert mock_hdata.edge_index.device.type == "cpu" + assert mock_hdata.edge_attr is None + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_hdata_to_cuda(mock_hdata): + returned = mock_hdata.to("cuda") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "cuda" + assert mock_hdata.edge_index.device.type == "cuda" + assert mock_hdata.edge_attr is not None + assert mock_hdata.edge_attr.device.type == "cuda" + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") +def test_hdata_to_cuda_handles_none_edge_attr(mock_hdata): + mock_hdata.edge_attr = None + returned = mock_hdata.to("cuda") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "cuda" + assert mock_hdata.edge_index.device.type == "cuda" + assert mock_hdata.edge_attr is None + + +@pytest.mark.skipif(not torch.mps.is_available(), reason="MPS not available") +def test_hdata_to_mps(mock_hdata): + returned = mock_hdata.to("mps") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "mps" + assert mock_hdata.edge_index.device.type == "mps" + assert mock_hdata.edge_attr is not None + assert mock_hdata.edge_attr.device.type == "mps" + + +@pytest.mark.skipif(not torch.mps.is_available(), reason="MPS not available") +def test_hdata_to_mps_handles_none_edge_attr(mock_hdata): + mock_hdata.edge_attr = None + returned = mock_hdata.to("mps") + + assert returned is mock_hdata + assert mock_hdata.x.device.type == "mps" + assert mock_hdata.edge_index.device.type == "mps" + assert mock_hdata.edge_attr is None diff --git a/hyperbench/tests/types/hypergraph_test.py b/hyperbench/tests/types/hypergraph_test.py index d66e083..2ffa4bf 100644 --- a/hyperbench/tests/types/hypergraph_test.py +++ b/hyperbench/tests/types/hypergraph_test.py @@ -1,6 +1,8 @@ +import pytest import json +import torch -from hyperbench.types import HIFHypergraph +from hyperbench.types import HIFHypergraph, Hypergraph from hyperbench.tests import MOCK_BASE_PATH @@ -11,3 +13,119 @@ def test_build_HIFHypergraph_instance(): hypergraph = HIFHypergraph.from_hif(hiftext) assert isinstance(hypergraph, HIFHypergraph) + + +@pytest.mark.parametrize( + "edges, expected_edges", + [ + pytest.param([], [], id="empty_hypergraph"), + pytest.param([[0]], [[0]], id="single_node_single_edge"), + pytest.param( + [[0, 1, 2]], + [[0, 1, 2]], + id="single_edge_multiple_nodes", + ), + pytest.param( + [[0, 1], [2, 3, 4], [5]], + [[0, 1], [2, 3, 4], [5]], + id="multiple_edges", + ), + pytest.param( + [[0, 1, 2], [1, 2, 3], [2, 3, 4]], + [[0, 1, 2], [1, 2, 3], [2, 3, 4]], + id="multiple_overlapping_edges", + ), + pytest.param([[0, 0, 1]], [[0, 0, 1]], id="duplicate_node_within_edge"), + pytest.param([[9, 2, 5, 1]], [[9, 2, 5, 1]], id="unordered_nodes"), + ], +) +def test_init_preserves_edges(edges, expected_edges): + hypergraph = Hypergraph(edges) + assert hypergraph.edges == expected_edges + + +@pytest.mark.parametrize( + "edges, expected_num_nodes", + [ + pytest.param([], 0, id="empty_hypergraph"), + pytest.param([[0]], 1, id="single_node_single_edge"), + pytest.param([[0, 1, 2]], 3, id="multiple_nodes_single_edge"), + pytest.param([[0], [1], [2]], 3, id="three_singleton_edges"), + pytest.param([[0], [1], [1]], 2, id="three_singleton_edges_two_overlapping"), + pytest.param([[0, 1], [2, 3]], 4, id="two_disjoint_edges"), + pytest.param([[0, 1], [1, 2]], 3, id="two_overlapping_edges"), + pytest.param( + [[0, 1, 2], [1, 2, 3]], + 4, + id="overlapping_edges_multiple_nodes", + ), + pytest.param( + [[0, 1, 2], [3, 4, 5], [6, 7, 8]], + 9, + id="multiple_disjoint_edges", + ), + pytest.param([[5, 10, 15]], 3, id="non_contiguous_node_ids"), + pytest.param([[0, 0, 1]], 2, id="edge_with_duplicate_node"), + pytest.param([[0, 1], [0, 1, 2]], 3, id="edge_subset_of_another"), + pytest.param([[9, 2, 5, 1]], 4, id="unordered_node_ids"), + ], +) +def test_num_nodes(edges, expected_num_nodes): + hypergraph = Hypergraph(edges) + assert hypergraph.num_nodes == expected_num_nodes + + +@pytest.mark.parametrize( + "edges, expected_num_edges", + [ + pytest.param([], 0, id="empty_hypergraph"), + pytest.param([[0]], 1, id="single_edge_one_node"), + pytest.param([[0, 1, 2]], 1, id="single_edge_multiple_nodes"), + pytest.param([[0], [1], [2]], 3, id="three_singleton_edges"), + pytest.param([[0, 1], [2, 3]], 2, id="two_disjoint_edges"), + pytest.param([[0, 1], [1, 2]], 2, id="two_overlapping_edges"), + pytest.param( + [[0, 1, 2], [1, 2, 3], [3, 4]], + 3, + id="three_edges_with_overlap", + ), + ], +) +def test_num_edges(edges, expected_num_edges): + hypergraph = Hypergraph(edges) + assert hypergraph.num_edges == expected_num_edges + + +@pytest.mark.parametrize( + "edge_index_data, expected_edges", + [ + pytest.param([[[], []]], [], id="empty_hypergraph"), + pytest.param([[[0], [0]]], [[0]], id="single_node_single_edge"), + pytest.param( + [[[0, 1, 2, 3], [0, 0, 0, 0]]], + [[0, 1, 2, 3]], + id="multiple_nodes_single_edge", + ), + pytest.param( + [[[0, 1, 2], [0, 1, 2]]], + [[0], [1], [2]], + id="multiple_edges_single_nodes", + ), + pytest.param( + [[[0, 1, 2, 3], [0, 0, 1, 1]]], + [[0, 1], [2, 3]], + id="two_edges_multiple_nodes", + ), + pytest.param( + [[[0, 1, 2, 3, 4, 5], [0, 0, 1, 2, 2, 2]]], + [[0, 1], [2], [3, 4, 5]], + id="complex_varying_edge_sizes", + ), + ], +) +def test_from_edge_index_parametrized(edge_index_data, expected_edges): + nodes, edges = edge_index_data[0] + edge_index = torch.tensor([nodes, edges], dtype=torch.long) + hypergraph = Hypergraph.from_edge_index(edge_index) + + assert hypergraph.edges == expected_edges diff --git a/hyperbench/tests/utils/data_utils_test.py b/hyperbench/tests/utils/data_utils_test.py index d0f18bb..87fe363 100644 --- a/hyperbench/tests/utils/data_utils_test.py +++ b/hyperbench/tests/utils/data_utils_test.py @@ -1,4 +1,3 @@ -import pytest import torch from torch import Tensor diff --git a/hyperbench/tests/utils/graph_utils_test.py b/hyperbench/tests/utils/graph_utils_test.py new file mode 100644 index 0000000..2f958ae --- /dev/null +++ b/hyperbench/tests/utils/graph_utils_test.py @@ -0,0 +1,730 @@ +import pytest +import torch +import warnings + +from hyperbench.utils import ( + get_sparse_adjacency_matrix, + get_sparse_normalized_degree_matrix, + get_sparse_normalized_laplacian, + reduce_to_graph_edge_index_on_random_direction, + smoothing_with_gcn_laplacian_matrix, +) + + +@pytest.fixture(autouse=True) +def suppress_sparse_csr_warning(): + """ + Suppress PyTorch sparse CSR beta warning. + It could be avoided by doing sparse @ dense, as it doesn't trigger CSR warning. + However, it's inefficient for large graphs. + + Example: + ``` + AD = torch.sparse.mm(A, D.to_dense()) + L = torch.sparse.mm(D, AD).to_sparse_coo() + ``` + """ + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="Sparse CSR tensor support is in beta state", + category=UserWarning, + ) + yield + + +@pytest.fixture(autouse=True) +def seed(): + """Fix random seed for deterministic projections.""" + torch.manual_seed(42) + + +def test_get_sparse_adjacency_matrix_returns_sparse_tensor(): + edge_index = torch.tensor([[0, 1], [1, 0]]) + result = get_sparse_adjacency_matrix(edge_index, num_nodes=2) + + assert result.is_sparse + + +@pytest.mark.parametrize( + "edge_index, num_nodes", + [ + pytest.param(torch.tensor([[0, 1], [1, 0]]), 2, id="2_nodes"), + pytest.param(torch.tensor([[0, 1, 2], [1, 2, 0]]), 4, id="4_nodes_3_edges"), + pytest.param(torch.tensor([[], []], dtype=torch.long), 5, id="5_nodes_empty"), + ], +) +def test_get_sparse_adjacency_matrix_shape(edge_index, num_nodes): + result = get_sparse_adjacency_matrix(edge_index, num_nodes=num_nodes) + + assert result.shape == (num_nodes, num_nodes) + + +def test_get_sparse_adjacency_matrix_empty_edge_index(): + """Empty edge_index produces all-zero adjacency matrix when converted to dense.""" + edge_index = torch.tensor([[], []], dtype=torch.long) + result = get_sparse_adjacency_matrix(edge_index, num_nodes=3) + dense = result.to_dense() + + assert torch.all(dense == 0) + + +@pytest.mark.parametrize( + "edge_index, num_nodes, expected_entries", + [ + pytest.param( + torch.tensor([[0], [2]]), + 3, + [(0, 2, 1.0)], + id="single_directed_edge", + ), + pytest.param( + torch.tensor([[0, 1], [1, 0]]), + 2, + [(0, 1, 1.0), (1, 0, 1.0)], + id="undirected_edge", + ), + pytest.param( + torch.tensor([[1], [1]]), + 3, + [(1, 1, 1.0)], + id="self_loop", + ), + pytest.param( + torch.tensor([[0, 1, 2], [1, 2, 0]]), + 3, + [(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0)], + id="triangle_directed", + ), + pytest.param( + torch.tensor([[0, 1, 2, 2], [1, 2, 0, 1]]), + 3, + [(0, 1, 1.0), (1, 2, 1.0), (2, 0, 1.0), (2, 1, 1.0)], + id="multiple_edges_between_nodes", + ), + pytest.param( + torch.tensor([[0, 1, 2, 2], [1, 2, 0, 0]]), + 3, + [(0, 1, 1.0), (1, 2, 1.0), (2, 0, 2.0)], # Duplicate edges are summed + id="duplicate_edges_to_same_target", + ), + ], +) +def test_get_sparse_adjacency_matrix_entries(edge_index, num_nodes, expected_entries): + result = get_sparse_adjacency_matrix(edge_index, num_nodes=num_nodes) + dense = result.to_dense() + + for row, col, val in expected_entries: + assert dense[row, col] == val + + +def test_get_sparse_adjacency_matrix_preserves_device(): + edge_index = torch.tensor([[0], [1]], device="cpu") + + result = get_sparse_adjacency_matrix(edge_index, num_nodes=2) + + assert result.device == edge_index.device + + +@pytest.mark.parametrize( + "edge_index, num_nodes, isolated_nodes", + [ + pytest.param( + torch.tensor([[0], [1]]), + 4, + [2, 3], + id="two_isolated_nodes", + ), + pytest.param( + torch.tensor([[0, 1], [1, 0]]), + 5, + [2, 3, 4], + id="three_isolated_nodes", + ), + ], +) +def test_get_sparse_adjacency_matrix_isolated_nodes( + edge_index, num_nodes, isolated_nodes +): + """Nodes not in edge_index have zero rows and columns.""" + result = get_sparse_adjacency_matrix(edge_index, num_nodes=num_nodes) + dense = result.to_dense() + + for node in isolated_nodes: + assert torch.all(dense[node, :] == 0) + assert torch.all(dense[:, node] == 0) + + +def test_get_sparse_normalized_degree_matrix_returns_sparse_tensor(): + edge_index = torch.tensor([[0, 1], [1, 0]]) + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=2) + + assert result.is_sparse + + +@pytest.mark.parametrize( + "edge_index, num_nodes", + [ + pytest.param(torch.tensor([[0, 1], [1, 0]]), 2, id="2_nodes"), + pytest.param(torch.tensor([[0, 1, 2], [1, 2, 0]]), 4, id="4_nodes_3_edges"), + pytest.param( + torch.tensor([[], []], dtype=torch.long), 5, id="5_nodes_no_edges" + ), + ], +) +def test_get_sparse_normalized_degree_matrix_shape(edge_index, num_nodes): + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=num_nodes) + + assert result.shape == (num_nodes, num_nodes) + + +def test_get_sparse_normalized_degree_matrix_is_diagonal(): + """All non-zero entries are on the diagonal.""" + edge_index = torch.tensor([[0, 1, 1, 2], [1, 0, 2, 1]]) + + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=3) + dense = result.to_dense() + + # Off-diagonal entries should be zero + for i in range(3): + for j in range(3): + if i != j: + assert dense[i, j] == 0 + + +@pytest.mark.parametrize( + "edge_index, num_nodes, expected_diagonal", + [ + pytest.param( + torch.tensor([[0, 1], [1, 0]]), + 2, + [1.0, 1.0], # degree 1 -> 1^-0.5 = 1 + id="degree_1_each", + ), + pytest.param( + torch.tensor([[0, 0, 1], [1, 2, 0]]), + 3, + # degrees [2, 1, 0] -> [2**-0.5 == 1 / 2**0.5, 1.0, 0] -> [0.707, 1, 0] + [1 / (2**0.5), 1.0, 0.0], + id="mixed_degrees", + ), + pytest.param( + torch.tensor([[0, 0, 0, 0], [1, 2, 3, 4]]), + 5, + [0.5, 0.0, 0.0, 0.0, 0.0], # degree 4 -> 4^-0.5 = 0.5, others are isolated + id="single_hub_node", + ), + ], +) +def test_get_sparse_normalized_degree_matrix_diagonal_values( + edge_index, num_nodes, expected_diagonal +): + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=num_nodes) + dense = result.to_dense() + + for i, expected_val in enumerate(expected_diagonal): + assert torch.isclose(dense[i, i], torch.tensor(expected_val), atol=1e-6) + + +def test_get_sparse_normalized_degree_matrix_isolated_nodes_are_zero(): + """Isolated nodes (degree 0) have 0 on diagonal, not inf.""" + edge_index = torch.tensor([[0], [1]]) + + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=4) + dense = result.to_dense() + + # Nodes 2 and 3 are isolated + assert dense[2, 2] == 0 + assert dense[3, 3] == 0 + # No inf values + assert not torch.any(torch.isinf(dense)) + + +def test_get_sparse_normalized_degree_matrix_empty_edge_index(): + """Empty edge_index produces all-zero matrix (all nodes isolated).""" + edge_index = torch.tensor([[], []], dtype=torch.long) + + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=3) + dense = result.to_dense() + + assert torch.all(dense == 0) + + +def test_get_sparse_normalized_degree_matrix_preserves_device(): + edge_index = torch.tensor([[0], [1]], device="cpu") + + result = get_sparse_normalized_degree_matrix(edge_index, num_nodes=2) + + assert result.device == edge_index.device + + +def test_get_sparse_normalized_laplacian_returns_sparse_tensor(): + edge_index = torch.tensor([[0, 1], [1, 0]]) + + result = get_sparse_normalized_laplacian(edge_index) + + assert result.is_sparse + + +@pytest.mark.parametrize( + "edge_index, num_nodes", + [ + pytest.param(torch.tensor([[0, 1], [1, 0]]), 2, id="2_nodes"), + pytest.param(torch.tensor([[0, 1, 2], [1, 2, 0]]), 4, id="4_nodes"), + pytest.param(torch.tensor([[0, 1], [1, 0]]), None, id="2_nodes_inferred"), + ], +) +def test_get_sparse_normalized_laplacian_shape(edge_index, num_nodes): + result = get_sparse_normalized_laplacian(edge_index, num_nodes=num_nodes) + expected_num_nodes = num_nodes if num_nodes else edge_index.max().item() + 1 + + assert result.shape == (expected_num_nodes, expected_num_nodes) + + +def test_get_sparse_normalized_laplacian_is_symmetric(): + """GCN Laplacian L = D^-1/2 * A * D^-1/2 is symmetric.""" + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + + result = get_sparse_normalized_laplacian(edge_index) + dense = result.to_dense() + + assert torch.allclose(dense, dense.T, atol=1e-6) + + +def test_get_sparse_normalized_laplacian_self_loop_diagonal(): + """Single node graph has diagonal value 1 (self-loop normalized).""" + edge_index = torch.tensor([[0], [0]]) + + result = get_sparse_normalized_laplacian(edge_index, num_nodes=1) + dense = result.to_dense() + + # Self-loop only: degree = 1, so D^-1/2 * A * D^-1/2 = 1 * 1 * 1 = 1 + assert torch.isclose(dense[0, 0], torch.tensor(1.0), atol=1e-6) + + +@pytest.mark.parametrize( + "edge_index, num_nodes, expected_row_sum", + [ + pytest.param( + torch.tensor([[0, 1], [1, 0]]), + 2, + 1.0, # Each node has degree 2 (edge + self-loop), diagonal = 1/2 each + id="connected_graph", + ), + pytest.param( + torch.tensor([[0, 1, 2], [1, 2, 0]]), + 3, + 1.0, # Triangle: each node degree 3 (2 edges + self-loop), diag = 1/3 each + id="triangle_graph", + ), + ], +) +def test_get_sparse_normalized_laplacian_row_sum( + edge_index, num_nodes, expected_row_sum +): + """ + For connected graphs with self-loops, GCN normalization makes the + laplacian matrix row-stochastic: every row sums to 1.0. + """ + result = get_sparse_normalized_laplacian(edge_index, num_nodes=num_nodes) + dense = result.to_dense() + + # Each row should sum to 1 for connected graphs with self-loops + for i in range(num_nodes): + assert torch.isclose(dense[i].sum(), torch.tensor(expected_row_sum), atol=1e-6) + + +def test_get_sparse_normalized_laplacian_preserves_device(): + edge_index = torch.tensor([[0, 1], [1, 0]], device="cpu") + + result = get_sparse_normalized_laplacian(edge_index) + + assert result.device == edge_index.device + + +def test_get_sparse_normalized_laplacian_no_nan_or_inf(): + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + + result = get_sparse_normalized_laplacian(edge_index, num_nodes=4) + dense = result.to_dense() + + assert not torch.any(torch.isnan(dense)) + assert not torch.any(torch.isinf(dense)) + + +def test_get_sparse_normalized_laplacian_has_0_for_isolated_nodes(): + edge_index = torch.tensor([[0], [1]]) + + result = get_sparse_normalized_laplacian(edge_index, num_nodes=4) + dense = result.to_dense() + + assert torch.all(dense[2, :] == 0) + assert torch.all(dense[:, 2] == 0) + assert torch.all(dense[3, :] == 0) + assert torch.all(dense[:, 3] == 0) + + +@pytest.mark.parametrize( + "x, edge_index, with_mediators, expected_num_edges", + [ + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0]]), + torch.tensor([[0, 1], [0, 0]]), + False, + 1, # One hyperedge, so one graph edge, no mediators to create additional edges + id="single_hyperedge_2_nodes_no_mediators", + ), + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0]]), + torch.tensor([[0, 1], [0, 0]]), + True, + # Only 2 nodes and both are extremes (argmin/argmax) + # No mediators exist (mediators are nodes that are neither argmin nor argmax) + # So, with mediators enabled and no mediators -> 0 edges produced + 0, + id="single_hyperedge_2_nodes_with_mediators_produces_no_edges", + ), + pytest.param( + torch.tensor([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]), + torch.tensor([[0, 1, 2], [0, 0, 0]]), + False, + 1, # One hyperedge, so one graph edge, no mediators to create additional edges + id="single_hyperedge_3_nodes_no_mediators", + ), + pytest.param( + torch.tensor([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]), + torch.tensor([[0, 1, 2], [0, 0, 0]]), + True, + 2, # If argmin = 0 and argmax = 2, mediator 1 creates 2 edges [0,1] and [1,2] + id="single_hyperedge_3_nodes_with_mediators", + ), + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0], [0.5, 0.5], [1.0, 1.0]]), + torch.tensor([[0, 1, 2, 3], [0, 0, 0, 0]]), + True, + # 2 nodes are extremes (argmin/argmax), 2 are mediators + # Each mediator connects to both extremes: 2 mediators * 2 edges = 4 edges + 4, + id="single_hyperedge_4_nodes_with_mediators", + ), + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0], [0.5, 0.5], [1.0, 1.0]]), + torch.tensor([[0, 1, 2, 3], [0, 0, 1, 1]]), + False, + # Two hyperedges, each with 2 nodes -> 2 graph edges, + # there are no mediators to create additional edges + 2, + id="two_hyperedges_no_mediators", + ), + pytest.param( + torch.tensor( + [ + [1.0, 0.0, 0.0], + [0.0, 1.0, 0.0], + [0.0, 0.0, 1.0], + [0.5, 0.5, 0.0], + [0.0, 0.5, 0.5], + ] + ), + torch.tensor([[0, 1, 2, 2, 3, 4], [0, 0, 0, 1, 1, 1]]), + True, + # Hyperedge 0 has 3 nodes -> 1 mediator -> 2 edges + # Hyperedge 1 has 3 nodes -> 1 mediator -> 2 edges + # -> 4 edges, there are no mediators to create additional edges + 4, + id="two_hyperedges_3_nodes_each_with_mediators", + ), + ], +) +def test_reduce_to_graph_edge_count(x, edge_index, with_mediators, expected_num_edges): + result = reduce_to_graph_edge_index_on_random_direction( + x, edge_index, with_mediators=with_mediators, remove_selfloops=False + ) + + assert result.shape[1] == expected_num_edges + + +@pytest.mark.parametrize( + "x, edge_index", + [ + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0]]), + torch.tensor([[0, 1], [0, 0]]), + id="2_nodes_1_hyperedge", + ), + pytest.param( + torch.tensor([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]), + torch.tensor([[0, 1, 2], [0, 0, 0]]), + id="3_nodes_1_hyperedge", + ), + pytest.param( + torch.tensor([[1.0, 0.0], [0.0, 1.0], [0.5, 0.5], [1.0, 1.0]]), + torch.tensor([[0, 1, 2, 3], [0, 0, 1, 1]]), + id="4_nodes_2_hyperedges", + ), + ], +) +def test_reduce_to_graph_output_has_two_rows(x, edge_index): + result = reduce_to_graph_edge_index_on_random_direction(x, edge_index) + + assert result.shape[0] == 2 + + +def test_reduce_to_graph_output_dtype_is_long(): + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]]) + edge_index = torch.tensor([[0, 1], [0, 0]]) + + result = reduce_to_graph_edge_index_on_random_direction(x, edge_index) + + assert result.dtype == torch.long + + +def test_reduce_to_graph_output_nodes_are_within_bounds(): + """All node indices in the output are valid indices from the input node set.""" + x = torch.tensor( + [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0], [0.5, 0.5, 0.0]] + ) + edge_index = torch.tensor([[0, 1, 2, 1, 2, 3], [0, 0, 0, 1, 1, 1]]) + + result = reduce_to_graph_edge_index_on_random_direction(x, edge_index) + num_nodes = x.shape[0] + + assert result.min() >= 0 + assert result.max() < num_nodes + + +def test_reduce_to_graph_removes_selfloops(): + # Duplicate node in hyperedge forces a self-loop: projections are identical, + # so argmax and argmin both select index 0, producing edge [0, 0]. + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]]) + edge_index = torch.tensor([[0, 0], [0, 0]]) + + result = reduce_to_graph_edge_index_on_random_direction( + x, edge_index, remove_selfloops=True + ) + + # Either zero or one edge remains, the reason why one edge may remain is that + # after removing self-loops, there could be multiple hyperedges projecting to + # the same graph edge, which would be kept as a single edge + # Example: hyperedges [[0,1,1],[0,0,2]] both project to graph edge [0,2] + assert result.shape[1] <= 1 + + if result.shape[1] > 0: + # If any edges remain, check that no self-loops are present + assert not torch.any(result[0] == result[1]).item() + + +def test_reduce_to_graph_keeps_selfloops_when_disabled(): + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]]) + edge_index = torch.tensor([[0, 0], [0, 0]]) + + result = reduce_to_graph_edge_index_on_random_direction( + x, edge_index, remove_selfloops=False + ) + + assert result.shape[1] == 1 # One node, one hyperedge + assert result[0, 0] == result[1, 0] # Self-loop edge [0, 0] is preserved + + +def test_reduce_to_graph_raises_on_single_node_hyperedge(): + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]]) + edge_index = torch.tensor([[0], [0]]) + + with pytest.raises( + ValueError, match="The number of vertices in an hyperedge must be >= 2." + ): + reduce_to_graph_edge_index_on_random_direction(x, edge_index) + + +@pytest.mark.parametrize( + "num_nodes, num_features", + [ + pytest.param(2, 2, id="2x2"), + pytest.param(3, 4, id="3x4"), + pytest.param(5, 1, id="5x1"), + pytest.param(10, 8, id="10x8"), + ], +) +def test_smoothing_with_gcn_laplacian_output_shape_matches_x_shape( + num_nodes, num_features +): + """Output shape should match input node feature matrix X shape (|V|, C).""" + x = torch.randn(num_nodes, num_features) + edge_index = torch.tensor([[i, (i + 1) % num_nodes] for i in range(num_nodes)]).T + + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=num_nodes) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + assert result.shape == x.shape + + +def test_smoothing_with_gcn_laplacian_with_identity_laplacian_returns_original_x(): + """Smoothing with identity laplacian should return the original features.""" + num_nodes = 3 + x = torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) + + # Create identity matrix as sparse tensor + indices = torch.arange(num_nodes).unsqueeze(0).repeat(2, 1) + values = torch.ones(num_nodes) + identity_laplacian = torch.sparse_coo_tensor( + indices, values, size=(num_nodes, num_nodes) + ) + + result = smoothing_with_gcn_laplacian_matrix(x, identity_laplacian) + + assert torch.allclose(result, x, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_zero_features(): + """Zero features should remain zero after smoothing.""" + x = torch.zeros(3, 2) + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=3) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + print(result) + + assert torch.allclose(result, torch.zeros_like(x), atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_single_node_returns_original_x(): + """Single node with self-loop should return the original features.""" + x = torch.tensor([[1.0, 2.0]]) + edge_index = torch.tensor([[0], [0]]) # Self-loop + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=1) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + print(laplacian.to_dense(), result) + + # Single node with self-loop has L[0,0] = 1, so result = 1 * x = x + # as the laplacian is [[1.0]], so: + # result = L @ x = [[1.0]] @ [[1.0, 2.0]] = [[1.0 * 1.0, 1.0 * 2.0]] = [[1.0, 2.0]] = x + assert torch.allclose(result, x, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_preserves_x_device(): + device = torch.device("cpu") + + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]], device=device) + edge_index = torch.tensor([[0, 1], [1, 0]], device=device) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=2) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + assert result.device == x.device + + +def test_smoothing_with_gcn_laplacian_preserves_x_dtype(): + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]], dtype=torch.float32) + edge_index = torch.tensor([[0, 1], [1, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=2) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + assert result.dtype == x.dtype + + +def test_smoothing_with_gcn_laplacian_no_nan_or_inf(): + x = torch.randn(5, 3) + edge_index = torch.tensor([[0, 1, 2, 3], [1, 2, 3, 4]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=5) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + assert not torch.any(torch.isnan(result)) + assert not torch.any(torch.isinf(result)) + + +def test_smoothing_with_gcn_laplacian_returns_expected_x(): + x = torch.tensor([[1.0, 2.0], [3.0, 4.0]]) + edge_index = torch.tensor([[0, 1], [1, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=2) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + # For 2 nodes with bidirectional edge, GCN adds self-loops, so each node has degree 2. + # The GCN Laplacian L = D^-1/2 * A_hat * D^-1/2 = [[0.5, 0.5], + # [0.5, 0.5]] + # L @ x = [[0.5*1 + 0.5*3, 0.5*2 + 0.5*4], + # [0.5*1 + 0.5*3, 0.5*2 + 0.5*4]] + # = [[2.0, 3.0], + # [2.0, 3.0]] + expected = torch.tensor([[2.0, 3.0], [2.0, 3.0]]) + + assert torch.allclose(result, expected, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_is_equal_for_zero_and_no_drop_rate(): + """drop_rate=0 should produce the same result as no dropout.""" + x = torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=3) + + result_no_dropout = smoothing_with_gcn_laplacian_matrix(x, laplacian) + result_zero_dropout = smoothing_with_gcn_laplacian_matrix( + x, laplacian, drop_rate=0.0 + ) + + assert torch.allclose(result_no_dropout, result_zero_dropout, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_nonzero_drop_rate_changes_output(): + torch.manual_seed(123) + x = torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=3) + + result_no_dropout = smoothing_with_gcn_laplacian_matrix( + x, laplacian.clone(), drop_rate=0.0 + ) + result_with_dropout = smoothing_with_gcn_laplacian_matrix( + x, laplacian.clone(), drop_rate=0.7 + ) + + assert not torch.allclose(result_no_dropout, result_with_dropout, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_drop_rate_stochastic(): + """Different seeds should produce different outputs with dropout.""" + x = torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) + edge_index = torch.tensor([[0, 1, 2], [1, 2, 0]]) + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=3) + + torch.manual_seed(42) + result1 = smoothing_with_gcn_laplacian_matrix(x, laplacian.clone(), drop_rate=0.5) + + torch.manual_seed(99) + result2 = smoothing_with_gcn_laplacian_matrix(x, laplacian.clone(), drop_rate=0.5) + + # Different random seeds should produce different dropout masks + assert not torch.allclose(result1, result2, atol=1e-6) + + +def test_smoothing_with_gcn_laplacian_influences_connected_nodes(): + """ + Features of connected nodes should be aggregated. + For a connected graph with GCN normalization, smoothing should mix features from neighbors. + """ + # Two connected nodes with distinct features + x = torch.tensor([[1.0, 0.0], [0.0, 1.0]]) + edge_index = torch.tensor([[0, 1], [1, 0]]) # Bidirectional edge + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=2) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + # After smoothing, node 0 should have some of node 1's features and vice versa + # Row sum of GCN laplacian is 1 for connected graphs, so features are mixed + assert result[0, 1] > 0 # Node 0 now has some of feature dimension 1 from node 1 + assert result[1, 0] > 0 # Node 1 now has some of feature dimension 0 from node 0 + + +def test_smoothing_with_gcn_laplacian_isolated_nodes_have_zero_features(): + x = torch.tensor([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) + edge_index = torch.tensor([[0], [1]]) # Only nodes 0 and 1 connected + laplacian = get_sparse_normalized_laplacian(edge_index, num_nodes=3) + + result = smoothing_with_gcn_laplacian_matrix(x, laplacian) + + # Node 2 is isolated, so its output should be zero + assert torch.allclose(result[2], torch.zeros(2), atol=1e-6) diff --git a/hyperbench/tests/utils/hif_test.py b/hyperbench/tests/utils/hif_utils_test.py similarity index 100% rename from hyperbench/tests/utils/hif_test.py rename to hyperbench/tests/utils/hif_utils_test.py diff --git a/hyperbench/tests/utils/sparse_utils_test.py b/hyperbench/tests/utils/sparse_utils_test.py new file mode 100644 index 0000000..6b628ca --- /dev/null +++ b/hyperbench/tests/utils/sparse_utils_test.py @@ -0,0 +1,270 @@ +import pytest +import re as regex +import torch + +from hyperbench.utils import sparse_dropout + + +@pytest.fixture +def mock_indices(): + return torch.tensor([[0, 1, 2], [0, 1, 2]]) + + +@pytest.fixture +def mock_values(): + return torch.tensor([1.0, 2.0, 3.0]) + + +def test_dropout_zero_probability(mock_indices, mock_values): + """Test that zero dropout probability returns the original sparse tensor.""" + sparse_tensor = torch.sparse_coo_tensor(mock_indices, mock_values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.0) + + assert torch.allclose(result.coalesce().values(), mock_values) + assert torch.equal(result.coalesce().indices(), sparse_tensor.coalesce().indices()) + + +def test_dropout_full_probability(mock_indices, mock_values): + """Test that full dropout probability (1.0) drops all elements.""" + sparse_tensor = torch.sparse_coo_tensor(mock_indices, mock_values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=1.0) + + # All values should be zero when fill_value is 0 + assert torch.allclose(result.coalesce().values(), torch.zeros_like(mock_values)) + + +@pytest.mark.parametrize("invalid_prob", [-0.5, 1.5]) +def test_dropout_invalid_probability_out_of_range( + mock_indices, mock_values, invalid_prob +): + """Test that dropout probability below 0 raises ValueError.""" + sparse_tensor = torch.sparse_coo_tensor(mock_indices, mock_values, (2, 2)) + + with pytest.raises( + ValueError, + match=regex.escape("Dropout probability must be in the range [0, 1]"), + ): + sparse_dropout(sparse_tensor, dropout_prob=invalid_prob) + + +def test_dropout_preserves_indices(): + """Test that dropout preserves the sparsity pattern (indices) unchanged.""" + indices = torch.tensor([[0, 1, 2, 0], [0, 1, 2, 2]]) + values = torch.tensor([1.0, 2.0, 3.0, 4.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + # Indices should remain the same (in coalesced form) + assert torch.equal(result.coalesce().indices(), sparse_tensor.coalesce().indices()) + + +def test_dropout_preserves_shape(): + """Test that dropout preserves the tensor shape.""" + shape = (5, 10) # Shape of the tensor if it were dense + indices = torch.tensor([[0, 2, 4], [1, 5, 9]]) + values = torch.tensor([1.0, 2.0, 3.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, shape) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + assert result.size() == shape + + +def test_dropout_preserves_dtype(): + """Test that dropout preserves the tensor dtype.""" + indices = torch.tensor([[0, 1], [0, 1]]) + values = torch.tensor([1.0, 2.0], dtype=torch.float32) + sparse_tensor = torch.sparse_coo_tensor( + indices, values, (2, 2), dtype=torch.float32 + ) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + assert result.dtype == sparse_tensor.dtype + + +def test_dropout_with_fill_value_zero(mock_indices): + """Test dropout with fill_value=0.0 (default behavior).""" + values = torch.tensor([5.0, 10.0, 15.0]) + sparse_tensor = torch.sparse_coo_tensor(mock_indices, values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5, fill_value=0.0) + + coalesced = result.coalesce() + + # Values should be either original or zero + for val in coalesced.values(): + assert val in [0.0, 5.0, 10.0, 15.0] + + +def test_dropout_with_nonzero_fill_value(mock_indices): + """Test dropout with a non-zero fill_value.""" + values = torch.tensor([5.0, 10.0, 15.0]) + sparse_tensor = torch.sparse_coo_tensor(mock_indices, values, (3, 3)) + fill_value = 99.0 + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5, fill_value=fill_value) + + coalesced = result.coalesce() + + # Values should be either original or fill_value + for val in coalesced.values(): + assert val in [5.0, 10.0, 15.0, fill_value] + + +def test_dropout_with_negative_values(): + """Test dropout with negative values in the sparse tensor.""" + indices = torch.tensor([[0, 1, 2], [0, 1, 2]]) + values = torch.tensor([-1.0, -5.0, -10.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + # Should handle negative values correctly + assert result.size() == sparse_tensor.size() + assert result.coalesce().values().dtype == values.dtype + + +def test_dropout_preserves_cpu_device(): + """Test that dropout preserves the device.""" + device = torch.device("cpu") + + indices = torch.tensor([[0, 1], [0, 1]], device=device) + values = torch.tensor([1.0, 2.0], device=device) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (2, 2), device=device) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + assert result.device == sparse_tensor.device + + +@pytest.mark.skipif(not torch.cuda.is_available(), reason="Cuda not available") +def test_dropout_preserves_cuda_device(): + """Test that dropout preserves the device.""" + device = torch.device("cuda") + + indices = torch.tensor([[0, 1], [0, 1]], device=device) + values = torch.tensor([1.0, 2.0], device=device) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (2, 2), device=device) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + assert result.device == sparse_tensor.device + + +@pytest.mark.skipif(not torch.mps.is_available(), reason="MPS not available") +def test_dropout_preserves_mps_device(): + """Test that dropout preserves the device.""" + device = torch.device("mps") + + indices = torch.tensor([[0, 1], [0, 1]], device=device) + values = torch.tensor([1.0, 2.0], device=device) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (2, 2), device=device) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + assert result.device == sparse_tensor.device + + +def test_dropout_fill_value_with_full_dropout(): + """Test that fill_value is applied correctly when dropout is 1.0.""" + indices = torch.tensor([[0, 1], [0, 1]]) + values = torch.tensor([1.0, 2.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (2, 2)) + fill_value = 7.0 + + result = sparse_dropout(sparse_tensor, dropout_prob=1.0, fill_value=fill_value) + + # All values should be the fill_value + assert torch.allclose( + result.coalesce().values(), torch.full_like(values, fill_value) + ) + + +def test_dropout_with_unsorted_indices(): + """Test that dropout handles unsorted indices correctly.""" + # Create a sparse tensor with unsorted/duplicate indices + indices = torch.tensor([[2, 0, 1, 0], [2, 0, 1, 0]]) + values = torch.tensor([3.0, 1.0, 2.0, 4.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (3, 3)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + coalesced = result.coalesce() + + # Should coalesce without errors + assert coalesced is not None + + # Original had 3 unique indices (0,0), (1,1), (2,2) + assert len(coalesced.indices()[0]) == 3 + assert len(coalesced.indices()[1]) == 3 + + +def test_dropout_single_element(): + """Test dropout on a sparse tensor with a single element.""" + indices = torch.tensor([[0], [0]]) + values = torch.tensor([42.0]) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (1, 1)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.5) + + coalesced = result.coalesce() + + # Result should contain one value, either 0 or the original value + assert len(coalesced.values()) == 1 + assert coalesced.values().item() in [0.0, 42.0] + + +def test_dropout_large_sparse_matrix(): + """Test dropout on a large sparse matrix.""" + size = 1000 + num_nonzero_elements = 500 + rows = torch.randint(0, size, (num_nonzero_elements,)) + cols = torch.randint(0, size, (num_nonzero_elements,)) + + indices = torch.stack([rows, cols]) + values = torch.randn(num_nonzero_elements) + sparse_tensor = torch.sparse_coo_tensor(indices, values, (size, size)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.2) + + assert result.size() == sparse_tensor.size() + + +def test_dropout_returns_new_tensor(mock_indices, mock_values): + """Test that dropout returns a new tensor, not a reference to the original.""" + sparse_tensor = torch.sparse_coo_tensor(mock_indices, mock_values, (2, 2)) + + result = sparse_dropout(sparse_tensor, dropout_prob=0.0) + + # Even with 0 dropout, the returned tensor should be a different object + assert result is not sparse_tensor + + +def test_dropout_statistical_property_moderate_rate(): + """Test that dropout approximately respects the expected keep probability.""" + # Create a larger sparse tensor for statistical testing + num_elements = 1000 + + indices = torch.tensor([list(range(num_elements)), list(range(num_elements))]) + values = torch.ones(num_elements) + sparse_tensor = torch.sparse_coo_tensor( + indices, values, (num_elements, num_elements) + ) + + dropout_prob = 0.3 + keep_prob = 1 - dropout_prob # Keep ~70% of elements + + result = sparse_dropout(sparse_tensor, dropout_prob=dropout_prob, fill_value=0.0) + result_values = result.coalesce().values() + + # Count non-zero values (kept elements) + kept_count = (result_values != 0).sum().item() + actual_keep_prob = kept_count / num_elements + + # Allow 10% tolerance for statistical variance + tolerance = 0.1 + assert abs(actual_keep_prob - keep_prob) < tolerance diff --git a/hyperbench/train/negative_sampler.py b/hyperbench/train/negative_sampler.py index 57e54c1..5b40489 100644 --- a/hyperbench/train/negative_sampler.py +++ b/hyperbench/train/negative_sampler.py @@ -71,13 +71,14 @@ def sample(self, data: HData) -> HData: sampled_edge_indexes: List[Tensor] = [] sampled_edge_attrs: List[Tensor] = [] + device = data.x.device new_edge_id_offset = data.num_edges for new_edge_id in range(self.num_negative_samples): # Sample with multinomial without replacement to ensure unique node ids # and assign each node id equal probability of being selected by setting all of them to 1 # Example: num_nodes_per_sample=3, max_node_id=5 # -> possible output: [2, 0, 4] - equal_probabilities = torch.ones(data.num_nodes) + equal_probabilities = torch.ones(data.num_nodes, device=device) sampled_node_ids = torch.multinomial( equal_probabilities, self.num_nodes_per_sample, replacement=False ) @@ -86,7 +87,9 @@ def sample(self, data: HData) -> HData: # -> edge_index = [[2, 0, 4], # [3, 3, 3]] sampled_edge_id_tensor = torch.full( - (self.num_nodes_per_sample,), new_edge_id + new_edge_id_offset + (self.num_nodes_per_sample,), + new_edge_id + new_edge_id_offset, + device=device, ) sampled_edge_index = torch.stack( [sampled_node_ids, sampled_edge_id_tensor], dim=0 @@ -102,7 +105,7 @@ def sample(self, data: HData) -> HData: random_edge_attr = torch.randn_like(data.edge_attr[0]) sampled_edge_attrs.append(random_edge_attr) - negative_node_features = data.x[sorted(negative_node_ids)] + negative_x = data.x[sorted(negative_node_ids)] negative_edge_index = self.__new_negative_edge_index(sampled_edge_indexes) negative_edge_attr = ( torch.stack(sampled_edge_attrs, dim=0) @@ -111,7 +114,7 @@ def sample(self, data: HData) -> HData: ) return HData( - x=negative_node_features, + x=negative_x, edge_index=negative_edge_index, edge_attr=negative_edge_attr, num_nodes=len(negative_node_ids), @@ -137,31 +140,3 @@ def __new_negative_edge_index(self, sampled_edge_indexes: List[Tensor]) -> Tenso # [3, 4, 4, 3, 4, 3]] negative_edge_index = negative_edge_index[:, node_ids_order] return negative_edge_index - - -if __name__ == "__main__": - edge_index = torch.tensor([[0, 1, 2], [0, 1, 2]]) - x = torch.randn(3, 2) - edge_attr = torch.randn(3, 3) - print(f"Original node features:\n{x}") - print(f"Original edge_attr:\n{edge_attr}") - - sampler = RandomNegativeSampler(num_negative_samples=4, num_nodes_per_sample=2) - negative_hdata = sampler.sample( - HData(x=x, edge_index=edge_index, edge_attr=edge_attr) - ) - print(f"HData: {negative_hdata}") - - try: - RandomNegativeSampler(num_negative_samples=-1, num_nodes_per_sample=2) - except ValueError as e: - print(f"Caught expected exception: {e}") - try: - RandomNegativeSampler(num_negative_samples=2, num_nodes_per_sample=-1) - except ValueError as e: - print(f"Caught expected exception: {e}") - try: - s = RandomNegativeSampler(num_negative_samples=2, num_nodes_per_sample=10) - s.sample(HData(x=x, edge_index=edge_index, edge_attr=edge_attr)) - except ValueError as e: - print(f"Caught expected exception: {e}") diff --git a/hyperbench/train/trainer.py b/hyperbench/train/trainer.py index bdba4ef..c2fb6a1 100644 --- a/hyperbench/train/trainer.py +++ b/hyperbench/train/trainer.py @@ -14,7 +14,7 @@ class MultiModelTrainer: - """ + r""" A trainer class to handle training multiple models with individual trainers. Args: @@ -83,7 +83,7 @@ class MultiModelTrainer: enable_checkpointing: If ``True``, enable checkpointing. It will configure a default ModelCheckpoint callback if there is no user-defined ModelCheckpoint in - :paramref:`~lightning.pytorch.trainer.trainer.Trainer.callbacks`. + :paramref:`~hyperbench.train.MultiModelTrainer.callbacks`. Default: ``True``. enable_progress_bar: Whether to enable the progress bar by default. diff --git a/hyperbench/types/__init__.py b/hyperbench/types/__init__.py index d183b2b..ac37759 100644 --- a/hyperbench/types/__init__.py +++ b/hyperbench/types/__init__.py @@ -1,11 +1,14 @@ -from .hypergraph import HIFHypergraph +from .graph import Graph +from .hypergraph import HIFHypergraph, Hypergraph from .hdata import HData from .model import CkptStrategy, ModelConfig, TestResult __all__ = [ "CkptStrategy", - "HIFHypergraph", + "Graph", "HData", + "HIFHypergraph", + "Hypergraph", "ModelConfig", "TestResult", ] diff --git a/hyperbench/types/graph.py b/hyperbench/types/graph.py new file mode 100644 index 0000000..3d87ccb --- /dev/null +++ b/hyperbench/types/graph.py @@ -0,0 +1,120 @@ +import torch + +from torch import Tensor +from typing import List + + +class Graph: + """A simple graph data structure using edge list representation.""" + + def __init__(self, edges: List[List[int]]): + self.edges = edges + + @property + def num_nodes(self) -> int: + """Return the number of nodes in the graph.""" + nodes = set() + for edge in self.edges: + nodes.update(edge) + return len(nodes) + + @property + def num_edges(self) -> int: + """Return the number of edges in the graph.""" + return len(self.edges) + + def remove_selfloops(self): + """ + Remove self-loops from the graph. + + Returns: + List of edges without self-loops. + """ + if self.num_edges == 0: + return + + graph_edges_tensor = torch.tensor(self.edges, dtype=torch.long) + + # Example: edges = [[0, 1], + # [1, 1], + # [2, 3]] shape (|E|, 2) + # -> no_selfloop_mask = [True, False, True] + # -> edges without self-loops = [[0, 1], + # [2, 3]] + no_selfloop_mask = graph_edges_tensor[:, 0] != graph_edges_tensor[:, 1] + self.edges = graph_edges_tensor[no_selfloop_mask].tolist() + + def to_edge_index(self) -> Tensor: + """ + Convert the graph to edge index representation. + + Returns: + edge_index: Tensor of shape (2, |E|) representing edges. + """ + if self.num_edges == 0: + return torch.empty((2, 0), dtype=torch.long) + + # Example: edges = [[0, 1], + # [1, 2], + # [2, 3]] shape (|E|, 2) + # -> edge_index = [[0, 1, 2], + # [1, 2, 3]] shape (2, |E|) + edge_index = torch.tensor(self.edges, dtype=torch.long).t() + return edge_index + + @classmethod + def from_directed_to_undirected_edge_index( + cls, + edge_index: Tensor, + with_selfloops: bool = False, + ) -> Tensor: + """ + Convert a directed edge index to an undirected edge index by adding reverse edges. + + Args: + edge_index: Tensor of shape ``(2, |E|)`` representing directed edges. + with_selfloops: Whether to add self-loops to each node. Defaults to ``False``. + + Returns: + The undirected edge index tensor of shape ``(2, |E'|)``. If ``with_selfloops`` is ``True``, self-loops are added. + """ + src, dest = edge_index[0], edge_index[1] + src, dest = torch.cat([src, dest]), torch.cat([dest, src]) + + # Example: edge_index = [[0, 1, 2], + # [1, 0, 3]] + # -> after torch.stack([...], dim=0): + # undirected_edge_index = [[0, 1, 2, 1, 0, 3], + # [1, 0, 3, 0, 1, 2]] + # -> after torch.unique(..., dim=1): + # undirected_edge_index = [[0, 1, 2, 3], + # [1, 0, 3, 2]] + undirected_edge_index: Tensor = torch.stack([src, dest], dim=0).to( + edge_index.device + ) + undirected_edge_index = cls.__remove_duplicate_edges(undirected_edge_index) + + if with_selfloops: + # num_nodes assumes that the node indices in edge_index are in the range [0, num_nodes-1], + # as this is the default logic in the library dataset preprocessing. + num_nodes = int(undirected_edge_index.max().item()) + 1 + src, dest = undirected_edge_index[0], undirected_edge_index[1] + + # Add self-loops: A_hat = A + I (works as we assume node indices are in [0, num_nodes-1]) + selfloop_indices = torch.arange(num_nodes, device=edge_index.device) + src = torch.cat([src, selfloop_indices]) + dest = torch.cat([dest, selfloop_indices]) + undirected_edge_index = torch.stack([src, dest], dim=0) + undirected_edge_index = cls.__remove_duplicate_edges(undirected_edge_index) + + return undirected_edge_index + + @classmethod + def __remove_duplicate_edges(cls, edge_index: Tensor) -> Tensor: + """Remove duplicate edges from the edge index.""" + # Example: edge_index = [[0, 1, 2, 2, 0, 3, 2], + # [1, 0, 3, 2, 1, 2, 2]], shape (2, |E| = 7) + # -> after torch.unique(..., dim=1): + # edge_index = [[0, 1, 2, 2, 3], + # [1, 0, 3, 2, 2]], shape (2, |E'| = 5) + return torch.unique(edge_index, dim=1) diff --git a/hyperbench/types/hdata.py b/hyperbench/types/hdata.py index a8df730..83938a6 100644 --- a/hyperbench/types/hdata.py +++ b/hyperbench/types/hdata.py @@ -1,4 +1,4 @@ -from torch import Tensor +from torch import Tensor, device from typing import Optional @@ -45,6 +45,13 @@ def __init__( max_edge_id = edge_index[1].max().item() if edge_index.size(1) > 0 else -1 self.num_edges: int = num_edges if num_edges is not None else max_edge_id + 1 + def to(self, device: device | str, non_blocking: bool = False) -> "HData": + self.x = self.x.to(device=device, non_blocking=non_blocking) + self.edge_index = self.edge_index.to(device=device, non_blocking=non_blocking) + if self.edge_attr is not None: + self.edge_attr = self.edge_attr.to(device=device, non_blocking=non_blocking) + return self + def __repr__(self) -> str: return ( f"{self.__class__.__name__}(\n" diff --git a/hyperbench/types/hypergraph.py b/hyperbench/types/hypergraph.py index ea038c6..09364bf 100644 --- a/hyperbench/types/hypergraph.py +++ b/hyperbench/types/hypergraph.py @@ -1,3 +1,4 @@ +from torch import Tensor from typing import Optional, List, Dict, Any, Literal @@ -55,3 +56,46 @@ def num_nodes(self) -> int: def num_edges(self) -> int: """Return the number of edges in the hypergraph.""" return len(self.edges) + + +class Hypergraph: + """ + A simple hypergraph data structure using edge list representation. + """ + + def __init__(self, edges: List[List[int]]): + self.edges = edges + + @property + def num_nodes(self) -> int: + """Return the number of nodes in the hypergraph.""" + nodes = set() + for edge in self.edges: + nodes.update(edge) + return len(nodes) + + @property + def num_edges(self) -> int: + """Return the number of edges in the hypergraph.""" + return len(self.edges) + + @classmethod + def from_edge_index(cls, edge_index: Tensor) -> "Hypergraph": + """ + Create a Hypergraph from an edge index representation. + + Args: + edge_index: Tensor of shape (2, |E|) representing hyperedges, where each column is (node, hyperedge). + + Returns: + Hypergraph instance + """ + if edge_index.size(1) < 1: + return cls(edges=[]) + + max_edge_id = int(edge_index[1].max().item()) + edges = [ + edge_index[0, edge_index[1] == edge_id].tolist() + for edge_id in range(max_edge_id + 1) + ] + return cls(edges=edges) diff --git a/hyperbench/utils/__init__.py b/hyperbench/utils/__init__.py index ad4010e..7c9c09d 100644 --- a/hyperbench/utils/__init__.py +++ b/hyperbench/utils/__init__.py @@ -1,4 +1,3 @@ -from .hif_utils import validate_hif_json from .data_utils import ( empty_edgeattr, empty_edgeindex, @@ -7,6 +6,15 @@ empty_nodefeatures, to_non_empty_edgeattr, ) +from .graph_utils import ( + reduce_to_graph_edge_index_on_random_direction, + smoothing_with_gcn_laplacian_matrix, + get_sparse_adjacency_matrix, + get_sparse_normalized_degree_matrix, + get_sparse_normalized_laplacian, +) +from .hif_utils import validate_hif_json +from .sparse_utils import sparse_dropout __all__ = [ "empty_edgeattr", @@ -14,6 +22,12 @@ "empty_hdata", "empty_hifhypergraph", "empty_nodefeatures", - "validate_hif_json", + "reduce_to_graph_edge_index_on_random_direction", + "smoothing_with_gcn_laplacian_matrix", + "sparse_dropout", + "get_sparse_adjacency_matrix", + "get_sparse_normalized_degree_matrix", + "get_sparse_normalized_laplacian", "to_non_empty_edgeattr", + "validate_hif_json", ] diff --git a/hyperbench/utils/graph_utils.py b/hyperbench/utils/graph_utils.py new file mode 100644 index 0000000..1203790 --- /dev/null +++ b/hyperbench/utils/graph_utils.py @@ -0,0 +1,215 @@ +import torch + +from torch import Tensor +from typing import List, Optional +from hyperbench.types import Graph, Hypergraph + +from .sparse_utils import sparse_dropout + + +def get_sparse_adjacency_matrix(edge_index: Tensor, num_nodes: int) -> Tensor: + """ + Compute the sparse adjacency matrix from a graph edge index. + To get the normalized adjacency matrix, add self-loops to the edge_index. + + + Args: + edge_index: Edge index tensor of shape ``(2, |E|)``. + num_nodes: The number of nodes in the graph. + + Returns: + The sparse adjacency matrix of shape ``(num_nodes, num_nodes)``. + """ + src, dest = edge_index + + # Example: undirected_edge_index = [[0, 1, 2, 3], + # [1, 0, 3, 2]] + # -> adj_values = [1, 1, 1, 1] + # -> adj_indices = [[0, 1, 2, 3], + # [1, 0, 3, 2]] + # 0 1 2 3 + # -> A = [[0, 1, 0, 0], 1 + # [1, 0, 0, 0], 0 + # [0, 0, 0, 1], 3 + # [0, 0, 1, 0]] 2 + # Note: We don't have duplicate edges in undirected_edge_index, but + # even if we did, torch.sparse_coo_tensor would sum them up automatically + adj_values = torch.ones(src.size(0), device=edge_index.device) + adj_indices = torch.stack([src, dest], dim=0) + adj_matrix = torch.sparse_coo_tensor( + adj_indices, adj_values, (num_nodes, num_nodes) + ) + return adj_matrix + + +def get_sparse_normalized_degree_matrix(edge_index: Tensor, num_nodes: int) -> Tensor: + """ + Compute the sparse normalized degree matrix D^-1/2 from a graph edge index. + + Args: + edge_index: Edge index tensor of shape ``(2, |E|)``. + num_nodes: The number of nodes in the graph. + + Returns: + The sparse normalized degree matrix D^-1/2 of shape ``(num_nodes, num_nodes)``. + """ + device = edge_index.device + src, _ = edge_index + + # Compute degree for each node, initially degree matrix D has all zeros + degrees: Tensor = torch.zeros(num_nodes, device=device) + + # Example: src = [0, 1, 2, 1], degrees = [0, 0, 0, 0] + # -> degrees[0] += 1 = degrees = [1,0,0,0] + # -> degrees[1] += 1 = degrees = [1,1,0,0] + # -> degrees[2] += 1 = degrees = [1,1,1,0] + # -> degrees[1] += 1 = degrees = [1,2,1,0] + # -> final degrees = [1,2,1,0] + degree_initial_values = torch.ones( + src.size(0), device=device + ) # Each edge contributes 1 to the degree of the source node + degrees.scatter_add_(dim=0, index=src, src=degree_initial_values) + + # Compute D^-1/2 == D^-0.5 + degree_inv_sqrt: Tensor = degrees.pow(-0.5) + # Handle isolated nodes where degree is 0, which lead to inf values in degree_inv_sqrt + degree_inv_sqrt[degree_inv_sqrt == float("inf")] = 0 + + # Convert degree vector to a diagonal sparse normalized matrix D + # Example: degree_inv_sqrt = [1, 0.707, 1, 0] + # -> diagonal_indices = [[0, 1, 2, 3], + # [0, 1, 2, 3]] + # 0 1 2 3 + # -> D = [[1, 0, 0, 0], 0 + # [0, 0.707, 0, 0], 1 + # [0, 0, 1, 0], 2 + # [0, 0, 0, 0]] 3 + diagonal_indices = torch.arange(num_nodes, device=device).unsqueeze(0).repeat(2, 1) + degree_matrix = torch.sparse_coo_tensor( + indices=diagonal_indices, values=degree_inv_sqrt, size=(num_nodes, num_nodes) + ) + return degree_matrix + + +def get_sparse_normalized_laplacian( + edge_index: Tensor, + num_nodes: Optional[int] = None, +) -> Tensor: + """ + Compute the sparse Laplacian matrix from a graph edge index. + + The GCN Laplacian is defined as: L_GCN = D_hat^-1/2 * A_hat * D_hat^-1/2, + where A_hat = A + I (adjacency with self-loops) and D_hat is the degree matrix of A_hat. + + Args: + edge_index: Edge index tensor of shape ``(2, |E|)``. + num_nodes: The number of nodes in the graph. If ``None``, + it will be inferred from ``edge_index`` as ``edge_index.max().item() + 1`` + + Returns: + The sparse symmetrically normalized Laplacian matrix of shape ``(num_nodes, num_nodes)``. + """ + undirected_edge_index = Graph.from_directed_to_undirected_edge_index( + edge_index=edge_index, with_selfloops=True + ) + + # num_nodes assumes that the node indices in edge_index are in the range [0, num_nodes-1], + # as this is the default logic in the library dataset preprocessing. + num_nodes = ( + int(undirected_edge_index.max().item()) + 1 if num_nodes is None else num_nodes + ) + + degree_matrix = get_sparse_normalized_degree_matrix( + edge_index=undirected_edge_index, num_nodes=num_nodes + ) + + adj_matrix = get_sparse_adjacency_matrix( + edge_index=undirected_edge_index, num_nodes=num_nodes + ) + + # Compute normalized Laplacian matrix: L = D^-1/2 * A * D^-1/2 + normalized_laplacian_matrix = torch.sparse.mm( + degree_matrix, torch.sparse.mm(adj_matrix, degree_matrix) + ) + return normalized_laplacian_matrix.coalesce() + + +def reduce_to_graph_edge_index_on_random_direction( + x: Tensor, + edge_index: Tensor, + with_mediators: bool = False, + remove_selfloops: bool = True, +) -> Tensor: + r""" + Construct a graph from a hypergraph with methods proposed in `HyperGCN: A New Method of Training Graph Convolutional Networks on Hypergraphs `_ paper + Reference implementation: `source `_. + + Args: + x: Node feature matrix. Size ``(|V|, C)``. + edge_index: Hypergraph edge index. Size ``(2, |E|)``. + with_mediator: Whether to use mediator to transform the hyperedges to edges in the graph. Defaults to ``False``. + remove_selfloops: Whether to remove self-loops. Defaults to ``True``. + + Returns: + The graph edge index. Size ``(2, |E'|)``. + """ + device = x.device + + hypergraph = Hypergraph.from_edge_index(edge_index) + hypergraph_edges: List[List[int]] = hypergraph.edges + graph_edges: List[List[int]] = [] + + # Random direction (feature_dim, 1) for projecting nodes in each hyperedge + # Geometrically, we are choosing a random line through the origin in ℝᵈ, where ᵈ = feature_dim + random_direction = torch.rand((x.shape[1], 1), device=device) + + for edge in hypergraph_edges: + num_nodes_in_edge = len(edge) + if num_nodes_in_edge < 2: + raise ValueError("The number of vertices in an hyperedge must be >= 2.") + + # projections (num_nodes_in_edge,) contains a scalar value for each node in the hyperedge, + # indicating its projection on the random vector 'random_direction'. + # Key idea: If two points are very far apart in ℝᵈ, there is a high probability + # that a random projection will still separate them + projections = torch.matmul(x[edge], random_direction).squeeze() + + # The indices of the nodes that the farthest apart in the direction of 'random_direction' + node_max_proj_idx = torch.argmax(projections) + node_min_proj_idx = torch.argmin(projections) + + if not with_mediators: # Just connect the two farthest nodes + graph_edges.append([edge[node_min_proj_idx], edge[node_max_proj_idx]]) + continue + + for node_idx in range(num_nodes_in_edge): + if node_idx != node_max_proj_idx and node_idx != node_min_proj_idx: + graph_edges.append([edge[node_min_proj_idx], edge[node_idx]]) + graph_edges.append([edge[node_max_proj_idx], edge[node_idx]]) + + graph = Graph(edges=graph_edges) + if remove_selfloops: + graph.remove_selfloops() + + return graph.to_edge_index() + + +def smoothing_with_gcn_laplacian_matrix( + x: Tensor, + laplacian_matrix: Tensor, + drop_rate: float = 0.0, +) -> Tensor: + r""" + Return the feature matrix smoothed with GCN Laplacian matrix. + Reference implementation: `source `_. + + Args: + x: Node feature matrix. Size ``(|V|, C)``. + drop_rate: Randomly dropout the connections in adjacency matrix with probability ``drop_rate``. Default: ``0.0``. + + Returns: + The smoothed feature matrix. Size ``(|V|, C)``. + """ + if drop_rate > 0.0: + laplacian_matrix = sparse_dropout(laplacian_matrix, drop_rate) + return laplacian_matrix.matmul(x) diff --git a/hyperbench/utils/sparse_utils.py b/hyperbench/utils/sparse_utils.py new file mode 100644 index 0000000..d3c477c --- /dev/null +++ b/hyperbench/utils/sparse_utils.py @@ -0,0 +1,71 @@ +import torch + +from torch import Tensor + + +def sparse_dropout( + sparse_tensor: Tensor, + dropout_prob: float, + fill_value: float = 0.0, +) -> Tensor: + r"""Dropout function for sparse matrix. This function will return a new sparse matrix with the same shape as the input sparse matrix, but with some elements dropped out. + + Args: + sparse_tensor: The sparse matrix with format ``torch.sparse_coo_tensor``. + dropout_prob: Probability of an element to be dropped. + fill_value: The fill value for dropped elements. Defaults to ``0.0``. + + Returns: + A new sparse matrix with the same shape as the input sparse matrix, but with some elements dropped out. + """ + device = sparse_tensor.device + + # Sparse tensors may be unsorted indices or have duplicate entries + # 'coalesce()' will sum duplicates and sort indices to have a consistent format for dropout + sparse_tensor = sparse_tensor.coalesce() + + if dropout_prob > 1 or dropout_prob < 0: + raise ValueError("Dropout probability must be in the range [0, 1]") + + # Nothing to drop, return the original sparse tensor + if dropout_prob == 0: + return sparse_tensor + + values = sparse_tensor.values() + indices = sparse_tensor.indices() + + keep_prob = 1 - dropout_prob + + # Generate a binary mask matching the shape of values for elements to keep + # 'torch.bernoulli()' samples 1 with probability keep_prob and 0 with probability dropout_prob + # Example: values = [0.5, 1.2, 3.4], keep_prob = 0.8 + # -> keep_mask might be [1, 0, 1], meaning we keep the 1st and 3rd elements, drop the 2nd + keep_mask = torch.bernoulli(torch.full_like(values, keep_prob)).to(device) + + if fill_value == 0.0: + # If fill_value is 0, just zero out the dropped elements, + # as keep_mask will be 0 for dropped elements and 1 for kept elements + # Example: values = [0.5, 1.2, 3.4], keep_mask = [1, 0, 1], fill_value = 0.0 + # -> new_values = [0.5*1, 1.2*0, 3.4*1] = [0.5, 0.0, 3.4] + new_values = values * keep_mask + else: + # If fill_value is non-zero, we must fill the dropped elements with the specified fill_value instead of zero + # 'torch.logical_not(keep_mask)' identifies dropped elements where mask is 0 and + # Example: values = [0.5, 1.2, 3.4], keep_mask = [1, 0, 1], fill_value = 9.9 + # -> values_to_fill_mask = [0, 1, 0] + # -> fill_values = [0*9.9, 1*9.9, 0*9.9] = [0.0, 9.9, 0.0] + # -> new_values = [0.5*1 + 0.0, 1.2*0 + 9.9, 3.4*1 + 0.0] = [0.5, 9.9, 3.4] + values_to_fill_mask = torch.logical_not(keep_mask) + fill_values = values_to_fill_mask * fill_value + new_values = values * keep_mask + fill_values + + # Reuse the original indices and shape to preserve spasity but change values + dropout_sparse_tensor = torch.sparse_coo_tensor( + indices=indices, + values=new_values, + size=sparse_tensor.size(), + dtype=sparse_tensor.dtype, + device=device, + ) + + return dropout_sparse_tensor