Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ FetchContent_Declare(
GraphZeppelinVerifyCC

GIT_REPOSITORY https://github.com/GraphStreamingProject/GraphZeppelin
GIT_TAG refactor
GIT_TAG main
)

FetchContent_MakeAvailable(GraphZeppelinVerifyCC)
Expand Down Expand Up @@ -75,7 +75,9 @@ add_executable(mpi_dynamicCC_tests
test/mpi_graph_tiers_test.cpp

src/skiplist.cpp
src/sketchless_skiplist.cpp
src/euler_tour_tree.cpp
src/sketchless_euler_tour_tree.cpp
src/link_cut_tree.cpp
src/input_node.cpp
src/tier_node.cpp
Expand Down
4 changes: 3 additions & 1 deletion include/mpi_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "types.h"
#include "euler_tour_tree.h"
#include "sketchless_euler_tour_tree.h"
#include "link_cut_tree.h"
#include "mpi_functions.h"

Expand Down Expand Up @@ -58,6 +59,7 @@ class InputNode {
node_id_t num_nodes;
uint32_t num_tiers;
LinkCutTree link_cut_tree;
SketchlessEulerTourTree query_ett;
UpdateMessage* update_buffer;
int buffer_size;
int buffer_capacity;
Expand All @@ -68,7 +70,7 @@ class InputNode {
int isolation_count;
bool using_sliding_window = false;
public:
InputNode(node_id_t num_nodes, uint32_t num_tiers, int batch_size);
InputNode(node_id_t num_nodes, uint32_t num_tiers, int batch_size, int seed);
~InputNode();
void update(GraphUpdate update);
void process_all_updates();
Expand Down
54 changes: 54 additions & 0 deletions include/sketchless_euler_tour_tree.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once
#include <iostream>
#include <unordered_map>
#include <set>

#include <sketchless_skiplist.h>
#include "types.h"

class SketchlessEulerTourNode {

std::unordered_map<SketchlessEulerTourNode*, SketchlessSkipListNode*> edges;

SketchlessSkipListNode* allowed_caller = nullptr;
long seed = 0;

SketchlessSkipListNode* make_edge(SketchlessEulerTourNode* other);
void delete_edge(SketchlessEulerTourNode* other);

public:
const node_id_t vertex = 0;
const uint32_t tier = 0;

SketchlessEulerTourNode(long seed, node_id_t vertex, uint32_t tier);
SketchlessEulerTourNode(long seed);
~SketchlessEulerTourNode();
bool link(SketchlessEulerTourNode& other);
bool cut(SketchlessEulerTourNode& other);

bool isvalid() const;

SketchlessSkipListNode* get_root();

bool has_edge_to(SketchlessEulerTourNode* other);

std::set<SketchlessEulerTourNode*> get_component();

long get_seed() {return seed;};

friend std::ostream& operator<<(std::ostream& os, const SketchlessEulerTourNode& ett);
};

class SketchlessEulerTourTree {
long seed = 0;
public:
std::vector<SketchlessEulerTourNode> ett_nodes;

SketchlessEulerTourTree(node_id_t num_nodes, uint32_t tier_num, int seed);

void link(node_id_t u, node_id_t v);
void cut(node_id_t u, node_id_t v);
bool has_edge(node_id_t u, node_id_t v);
SketchlessSkipListNode* get_root(node_id_t u);
bool is_connected(node_id_t u, node_id_t v);
};
56 changes: 56 additions & 0 deletions include/sketchless_skiplist.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <set>

class SketchlessEulerTourNode;

extern long sketchless_skiplist_seed;
extern double sketchless_height_factor;

class SketchlessSkipListNode {

SketchlessSkipListNode* left = nullptr;
SketchlessSkipListNode* right = nullptr;
SketchlessSkipListNode* up = nullptr;
SketchlessSkipListNode* down = nullptr;
// Store the first node to the left on the next level up
SketchlessSkipListNode* parent = nullptr;

public:

SketchlessEulerTourNode* node;

SketchlessSkipListNode(SketchlessEulerTourNode* node);
static SketchlessSkipListNode* init_element(SketchlessEulerTourNode* node);
void uninit_element(bool delete_bdry);
void uninit_list();

// Returns the closest node on the next level up at or left of the current
SketchlessSkipListNode* get_parent();
// Returns the top left root node of the skiplist
SketchlessSkipListNode* get_root();
// Returns the bottom left boundary node of the skiplist
SketchlessSkipListNode* get_first();
// Returns the bottom right node of the skiplist
SketchlessSkipListNode* get_last();

std::set<SketchlessEulerTourNode*> get_component();

// Returns the root of a new skiplist formed by joining the lists containing left and right
static SketchlessSkipListNode* join(SketchlessSkipListNode* left, SketchlessSkipListNode* right);
template <typename... T>
static SketchlessSkipListNode* join(SketchlessSkipListNode* head, T*... tail);
// Returns the root of the left list after splitting to the left of the given node
static SketchlessSkipListNode* split_left(SketchlessSkipListNode* node);
// Returns the root of the right list after splitting to the right of the given node
static SketchlessSkipListNode* split_right(SketchlessSkipListNode* node);

bool isvalid();
SketchlessSkipListNode* next();
int print_list();
};

template <typename... T>
SketchlessSkipListNode* SketchlessSkipListNode::join(SketchlessSkipListNode* head, T*... tail) {
return join(head, join(tail...));
}
8 changes: 6 additions & 2 deletions src/euler_tour_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ EulerTourTree::EulerTourTree(node_id_t num_nodes, uint32_t tier_num, int seed) {
this->temp_sketch = new Sketch(sketch_len, seed);
}

EulerTourTree::~EulerTourTree() {
delete this->temp_sketch;
}

void EulerTourTree::link(node_id_t u, node_id_t v) {
ett_nodes[u].link(ett_nodes[v], temp_sketch);
}
Expand Down Expand Up @@ -52,8 +56,8 @@ EulerTourNode::EulerTourNode(long seed) : seed(seed) {
EulerTourNode::~EulerTourNode() {
// Final boundary nodes are a memory leak
// Need to somehow delete all the skiplist nodes at the end
// for (auto edge : edges)
// edge.second->uninit_element(false);
for (auto edge : edges)
edge.second->uninit_element(false);
}

SkipListNode* EulerTourNode::make_edge(EulerTourNode* other, Sketch* temp_sketch) {
Expand Down
50 changes: 39 additions & 11 deletions src/input_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@


long normal_refreshes = 0;
long dt_operation_time = 0;

InputNode::InputNode(node_id_t num_nodes, uint32_t num_tiers, int batch_size) : num_nodes(num_nodes), num_tiers(num_tiers), link_cut_tree(num_nodes){
InputNode::InputNode(node_id_t num_nodes, uint32_t num_tiers, int batch_size, int seed) :
num_nodes(num_nodes), num_tiers(num_tiers), link_cut_tree(num_nodes), query_ett(num_nodes, 0, seed) {
update_buffer = (UpdateMessage*) malloc(sizeof(UpdateMessage)*(batch_size+1));
buffer_capacity = batch_size+1;
UpdateMessage msg;
Expand Down Expand Up @@ -32,9 +34,9 @@ void InputNode::update(GraphUpdate update) {
void InputNode::process_updates() {
if (buffer_size == 1)
return;
// If less than 1/5 of the last updates are isolated use sliding window
// If less than 1/10 of the last updates are isolated use sliding window
bool prev_strat = using_sliding_window;
using_sliding_window = (isolation_count<history_size/5) ? true : false;
using_sliding_window = (isolation_count<history_size/10) ? true : false;
if (using_sliding_window != prev_strat)
std::cout << "SWITCHED TO " << (using_sliding_window ? "SLIDING WINDOW" : "NORMAL STRAT") << std::endl;
// Broadcast the batch of updates to all nodes
Expand All @@ -50,10 +52,12 @@ void InputNode::process_updates() {
minimum_isolated_update = std::min(minimum_isolated_update, greedy_batch_buffer[i]);
// If there was no isolated update just do the necessary cuts
if (minimum_isolated_update == MAX_INT) {
for (uint32_t i = 1; i < update_buffer[0].update.edge.src; i++) {
GraphUpdate update = update_buffer[i].update;
if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst))
for (uint32_t i = 0; i < update_buffer[0].update.edge.src-1; i++) {
GraphUpdate update = update_buffer[i+1].update;
if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst)) {
link_cut_tree.cut(update.edge.src, update.edge.dst);
query_ett.cut(update.edge.src, update.edge.dst);
}
// Update isolation history
isolation_count -= (int)isolation_history_queue.front();
isolation_history_queue.pop();
Expand All @@ -65,8 +69,10 @@ void InputNode::process_updates() {
// If there was an isolated update process all the updates up to that one
for (uint32_t i = 1; i < minimum_isolated_update; i++) {
GraphUpdate update = update_buffer[i].update;
unlikely_if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst))
unlikely_if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst)) {
link_cut_tree.cut(update.edge.src, update.edge.dst);
//query_ett[update.edge.src].cut(query_ett[update.edge.dst]);
}
// Update isolation history
isolation_count -= (int)isolation_history_queue.front();
isolation_history_queue.pop();
Expand All @@ -78,8 +84,12 @@ void InputNode::process_updates() {
int end_update_idx = using_sliding_window ? minimum_isolated_update+1 : update_buffer[0].update.edge.src;
for (int update_idx = minimum_isolated_update; update_idx < end_update_idx; update_idx++) {
GraphUpdate update = update_buffer[update_idx].update;
unlikely_if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst))
START(dt_operation_timer1);
unlikely_if (update.type == DELETE && link_cut_tree.has_edge(update.edge.src, update.edge.dst)) {
link_cut_tree.cut(update.edge.src, update.edge.dst);
query_ett.cut(update.edge.src, update.edge.dst);
}
STOP(dt_operation_time, dt_operation_timer1);
uint32_t start_tier = 0;
normal_refreshes++;
bool this_update_isolated = false;
Expand Down Expand Up @@ -115,12 +125,16 @@ void InputNode::process_updates() {
std::ignore = broadcast;
EttUpdateMessage update_message;
bcast(&update_message, sizeof(EttUpdateMessage), rank);
START(dt_operation_timer2);
if (update_message.type == LINK) {
link_cut_tree.link(update_message.endpoint1, update_message.endpoint2, update_message.start_tier);
query_ett.link(update_message.endpoint1, update_message.endpoint2);
break;
} else if (update_message.type == CUT) {
link_cut_tree.cut(update_message.endpoint1, update_message.endpoint2);
query_ett.cut(update_message.endpoint1, update_message.endpoint2);
}
STOP(dt_operation_time, dt_operation_timer2);
}
}
}
Expand Down Expand Up @@ -149,19 +163,33 @@ void InputNode::process_all_updates() {

bool InputNode::connectivity_query(node_id_t a, node_id_t b) {
process_all_updates();
return link_cut_tree.find_root(a) == link_cut_tree.find_root(b);
return query_ett.is_connected(a, b);
}

std::vector<std::set<node_id_t>> InputNode::cc_query() {
process_all_updates();
return link_cut_tree.get_cc();
std::vector<std::set<node_id_t>> cc;
std::set<SketchlessEulerTourNode*> visited;
for (uint32_t i = 0; i < query_ett.ett_nodes.size(); i++) {
if (visited.find(&query_ett.ett_nodes[i]) == visited.end()) {
std::set<SketchlessEulerTourNode*> pointer_component = query_ett.ett_nodes[i].get_component();
std::set<node_id_t> component;
for (auto pointer : pointer_component) {
component.insert(pointer->vertex);
visited.insert(pointer);
}
cc.push_back(component);
}
}
return cc;
}

void InputNode::end() {
process_all_updates();
// Tell all nodes the stream is over
update_buffer[0].end = true;
bcast(update_buffer, sizeof(UpdateMessage)*buffer_capacity, 0);
std::cout << "============= INPUT NODE =============" << std::endl;
std::cout << "======================= INPUT NODE ======================" << std::endl;
std::cout << "Dynamic tree operations time (ms): " << dt_operation_time/1000 << std::endl;
std::cout << "Normal refreshes: " << normal_refreshes << std::endl;
}
Loading