diff --git a/.gitignore b/.gitignore index d8eec88..7b35329 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ # Project exclude paths /cmake-build-debug/ -/data/ \ No newline at end of file +/data/ +/.idea/ +/.vscode/ \ No newline at end of file diff --git a/log/pcsr/propotion.log b/log/pcsr/propotion.log new file mode 100644 index 0000000..29968e0 --- /dev/null +++ b/log/pcsr/propotion.log @@ -0,0 +1,11 @@ +livejournal,16,e,insert-read,500000-500000,1.53587e+06 +livejournal,16,e,insert-read,500000-500000,1.56299e+06 +livejournal,16,e,insert-read,500000-500000,1.52229e+06 + +livejournal,16,e,insert-read,100000-900000,4.55711e+06 +livejournal,16,e,insert-read,100000-900000,4.58366e+06 +livejournal,16,e,insert-read,100000-900000,4.56831e+06 + +livejournal,16,e,insert-read,900000-100000,980870 +livejournal,16,e,insert-read,900000-100000,1.00775e+06 +livejournal,16,e,insert-read,900000-100000,1.00096e+06 diff --git a/sys/livegraph/CMakeLists.txt b/sys/livegraph/CMakeLists.txt index 9fe10a1..311b909 100644 --- a/sys/livegraph/CMakeLists.txt +++ b/sys/livegraph/CMakeLists.txt @@ -22,6 +22,9 @@ include_directories(include/algorithms) add_executable(livegraph_edge test/livegraph_test_edge.cpp) target_link_libraries(livegraph_edge lg tbb) +add_executable(livegraph_propotion test/livegraph_test_propotion.cpp) +target_link_libraries(livegraph_propotion lg tbb) + add_executable(livegraph_alg test/livegraph_test_alg.cpp) target_link_libraries(livegraph_alg lg tbb) diff --git a/sys/livegraph/test/livegraph_test.h b/sys/livegraph/test/livegraph_test.h index 10265da..abe698b 100644 --- a/sys/livegraph/test/livegraph_test.h +++ b/sys/livegraph/test/livegraph_test.h @@ -39,6 +39,8 @@ lg::vertex_t internal_id = 0; lg::Graph* G; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/livegraph/test/livegraph_test_propotion.cpp b/sys/livegraph/test/livegraph_test_propotion.cpp new file mode 100644 index 0000000..3ec78e3 --- /dev/null +++ b/sys/livegraph/test/livegraph_test_propotion.cpp @@ -0,0 +1,334 @@ +#include "livegraph_test.h" +#include + +template +void add_edges(graph *GA, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx1 = GA->begin_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + VertexDictionary->find(accessor1, new_srcs[pos]); + VertexDictionary->find(accessor2, new_dests[pos]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + int w = 1;string_view weight { (char*) &w, sizeof(w) }; + tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); + tx1.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +// template +// void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ +// auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ +// for(int64_t pos = start, end = start + length; pos < end; pos++){ +// while(1){ +// try{ +// auto tx1 = GA->begin_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// VertexDictionary->find(accessor1, new_srcs[pos]); +// VertexDictionary->find(accessor2, new_dests[pos]); +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// int w = 1;string_view weight { (char*) &w, sizeof(w) }; +// tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); +// tx1.commit(); +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// while(1){ +// try{ +// auto tx2 = G->begin_read_only_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// if(VertexDictionary->find(accessor1, query_srcs[pos*9+n]) && VertexDictionary->find(accessor2, query_dests[pos*9+n])){ +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); +// } +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// } +// }; +// int64_t edges_per_thread = new_srcs.size() / num_threads; +// int64_t odd_threads = new_srcs.size() % num_threads; +// vector threads; +// int64_t start = 0; +// for(int thread_id = 0; thread_id < num_threads; thread_id ++){ +// int64_t length = edges_per_thread + (thread_id < odd_threads); +// threads.emplace_back(routine_insert_edges, thread_id, start, length); +// start += length; +// } +// for(auto& t : threads) t.join(); +// threads.clear(); +// } + + +// template +// void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ +// auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ +// for(int64_t pos = start, end = start + length; pos < end; pos++){ +// while(1){ +// try{ +// auto tx1 = GA->begin_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// VertexDictionary->find(accessor1, new_srcs[pos]); +// VertexDictionary->find(accessor2, new_dests[pos]); +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// int w = 1;string_view weight { (char*) &w, sizeof(w) }; +// tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); +// tx1.commit(); +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// for (int64_t n = 0; n < 9; n++){ +// while(1){ +// try{ +// auto tx2 = G->begin_read_only_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// if(VertexDictionary->find(accessor1, query_srcs[pos*9+n]) && VertexDictionary->find(accessor2, query_dests[pos*9+n])){ +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); +// } +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// } +// } +// }; +// int64_t edges_per_thread = new_srcs.size() / num_threads; +// int64_t odd_threads = new_srcs.size() % num_threads; +// vector threads; +// int64_t start = 0; +// for(int thread_id = 0; thread_id < num_threads; thread_id ++){ +// int64_t length = edges_per_thread + (thread_id < odd_threads); +// threads.emplace_back(routine_insert_edges, thread_id, start, length); +// start += length; +// } +// for(auto& t : threads) t.join(); +// threads.clear(); +// } + + + +template +void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + for (int64_t n = 0; n < 9; n++){ + while(1){ + try{ + auto tx1 = GA->begin_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + VertexDictionary->find(accessor1, new_srcs[pos*9+n]); + VertexDictionary->find(accessor2, new_dests[pos*9+n]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + int w = 1;string_view weight { (char*) &w, sizeof(w) }; + tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); + tx1.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + while(1){ + try{ + auto tx2 = G->begin_read_only_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + if(VertexDictionary->find(accessor1, query_srcs[pos]) && VertexDictionary->find(accessor2, query_dests[pos])){ + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); + } + break; + } + catch (exception e){ + continue; + } + } + + } + }; + int64_t edges_per_thread = query_srcs.size() / num_threads; + int64_t odd_threads = query_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + + + +template +void delete_edges(graph *GA, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + vertex_dictionary_t::const_accessor accessor1, accessor2; + auto tx3 = G->begin_transaction(); + VertexDictionary->find(accessor1, new_srcs[pos]); + VertexDictionary->find(accessor2, new_dests[pos]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + tx3.del_edge(internal_source_id, /* label */ 0, internal_destination_id); + tx3.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000};//10, 100, 1000 ,10000,100000,1000000, 10000000 + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; usget_max_vertex_id() ); + double avg_insert = 0; + double avg_delete = 0; + double avg_read = 0; + std::cout << "Running batch size: " << update_sizes[us] << std::endl; + + if (update_sizes[us] < 10000000) + n_trials = 20; + else n_trials = 5; + size_t updates_to_run = update_sizes[us]; + size_t updates_to_run2 = update_sizes2[us]; + auto perm = get_random_permutation(updates_to_run); + for (size_t ts=0; tsget_max_vertex_id(); + new_srcs.clear(); + new_dests.clear(); + query_srcs.clear(); + query_dests.clear(); + + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(GN) - 1); + auto rmat = rMat(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + // insert and read edge + gettimeofday(&t_start, &tzp); + insert_read(G, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // del edge + gettimeofday(&t_start, &tzp); + delete_edges(G, new_srcs, new_dests, thd_num); + gettimeofday(&t_end, &tzp); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/llama/tests/llama_test_propotion.cpp b/sys/llama/tests/llama_test_propotion.cpp new file mode 100644 index 0000000..f6722ba --- /dev/null +++ b/sys/llama/tests/llama_test_propotion.cpp @@ -0,0 +1,214 @@ +#include "llama_test.h" +#include "thread" + +template +void insert_read(Graph &graph, vector &new_srcs, vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&graph, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + + while(1){ + try{ + graph.tx_begin(); + edge_t edge_id = graph.add_edge(new_srcs[pos], new_dests[pos]); + uint64_t w = 1; + graph.get_edge_property_64(g_llama_property_weights)->set(edge_id, *reinterpret_cast(&(w))); + graph.tx_commit(); + break; + } + catch (exception e){ + continue; + } + } + + // for(int64_t n = 0; n < 9; n++){ + // while(1){ + // try{ + // graph.tx_begin(); + // edge_t edge_id = graph.add_edge(new_srcs[pos*9+n], new_dests[pos*9+n]); + // uint64_t w = 1; + // graph.get_edge_property_64(g_llama_property_weights)->set(edge_id, *reinterpret_cast(&(w))); + // graph.tx_commit(); + // break; + // } + // catch (exception e){ + // continue; + // } + // } + // } + + + // auto* g = get_snapshot(G); + // volatile bool result = g->find(query_srcs[pos], query_dests[pos]); + + + // for(int64_t n = 0; n < 9; n++){ + // while(1){ + // try{ + // auto* g = get_snapshot(G); + // volatile bool result = g->find(query_srcs[pos*9+n], query_dests[pos*9+n]); + // break; + // } + // catch (exception e){ + // continue; + // } + // } + // } + } + }; + int64_t smaller_size = new_srcs.size() < query_srcs.size() ? new_srcs.size() : query_srcs.size(); + int64_t edges_per_thread = smaller_size / num_threads; + int64_t odd_threads = smaller_size % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void delete_edges(Graph &graph, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [&graph, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + graph.tx_begin(); + graph.delete_edge(new_srcs[pos], graph.find(new_srcs[pos], new_dests[pos])); + graph.tx_commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + +double test_read(commandLine& P) { + auto r = random_aspen(); + auto* g = get_snapshot(G); + uint64_t n = g->max_nodes(); + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(n) - 1); + auto rmat = rMat(nn, r.ith_rand(100), a, b, c); + new_srcs.clear();new_dests.clear(); + uint32_t updates = num_edges/20; + for( uint32_t i = 0; i < updates; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + gettimeofday(&t_start, &tzp); + parallel_for(uint32_t i = 0; i < updates; i++) { + g->find(new_srcs[i], new_dests[i]); + } + gettimeofday(&t_end, &tzp); + return cal_time_elapsed(&t_start, &t_end); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + ll_database Ga = *G; + ll_writable_graph& graph = *Ga.graph(); + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000}; + std::vector update_sizes = {500000}; + std::vector update_sizes2 = {500000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + // insert edge + gettimeofday(&t_start, &tzp); + insert_read(graph, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + delete_edges(graph, new_srcs, new_dests, thd_num); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/pcsr/test/pcsr_test_propotion.cpp b/sys/pcsr/test/pcsr_test_propotion.cpp new file mode 100644 index 0000000..2ed8b70 --- /dev/null +++ b/sys/pcsr/test/pcsr_test_propotion.cpp @@ -0,0 +1,110 @@ + +// +// Created by zxy on 5/4/22. +// + +#include "pcsr_test.h" + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + PCSR &Ga = *G; + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + + gettimeofday(&t_start, &tzp); + + // for (uint32_t i =0 ; i< 500000;i++){ + // Ga.add_edge_update(new_srcs[i],new_dests[i],1); + // Ga.find_value(query_srcs[i], query_dests[i]); + // } + // for (uint32_t i =0 ; i< 100000;i++){ + // Ga.add_edge_update(new_srcs[i],new_dests[i],1); + // for (uint32_t n = 0; n < 9; n++){ + // Ga.find_value(query_srcs[i*9+n], query_dests[i*9+n]); + // } + // } + for (uint32_t i =0 ; i< 100000;i++){ + for (uint32_t n = 0; n < 9; n++){ + Ga.add_edge_update(new_srcs[i*9+n], new_dests[i*9+n], 1); + } + Ga.find_value(query_srcs[i],query_dests[i]); + } + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run + updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< +//using adjedge_type = AdjEdge; +//using Storage = IndexedEdgeStorage; +//using adjlist_type = typename Storage::adjlist_type; + +void load_graph(commandLine& P){ + auto filename = P.getOptionValue("-f", "none"); + pair_uint *edges = get_edges_from_file_adj_sym(filename.c_str(), &num_edges, &num_nodes); + + G = new Graph (num_nodes, num_edges, false, true); + std::vector vv = G->alloc_vertex_array(); + G->fill_vertex_array(vv, 1); + + std::pair *raw_edges; + raw_edges = new std::pair[num_edges]; + for (uint32_t i = 0; i < num_edges; i++) { + raw_edges[i].first = edges[i].x; + raw_edges[i].second = edges[i].y; + } + + auto perm = get_random_permutation(num_edges); + + PRINT("=============== Load Graph BEGIN ==============="); + + gettimeofday(&t_start, &tzp); + for(uint32_t i=0; i< num_edges; i++){ + const auto &e = raw_edges[i]; + G->add_edge({e.first, e.second}, true); + } + + gettimeofday(&t_end, &tzp); + free(edges); + free(raw_edges); + new_srcs.clear(); + new_dests.clear(); + //float size_gb = G->get_size() / (float) 1073741824; + //PRINT("Load Graph: Nodes: " << G->get_n() <<" Edges: " << G->edges.N<< " Size: " << size_gb << " GB"); + PRINT("Load Graph: Nodes: " << num_nodes <<" Edges: "< update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000};// + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + + std::pair *raw_edges; + std::pair *query_edges; + raw_edges = new std::pair[num_edges]; + query_edges = new std::pair[num_edges]; + for (uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + raw_edges[i].first = edge.first; + raw_edges[i].second = edge.second; + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, updates_to_run - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_edges[i] = raw_edges[index]; + } + + + + gettimeofday(&t_start, &tzp); + + size_t smaller_updates = updates_to_run < updates_to_run2 ? updates_to_run : updates_to_run2; + cilk_for(uint32_t i=0; i< smaller_updates; i++){ + // const auto &e = raw_edges[i]; + // G->add_edge({e.first, e.second}, true); + + // const auto &q = query_edges[i]; + // auto adjitr = G->get_outgoing_adjlist_range(q.first); + // for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + // auto edge = *iter; + // uint64_t dst = edge.nbr; + // if(dst == q.second) break; + // } + + // const auto &e = raw_edges[i]; + // G->add_edge({e.first, e.second}, true); + + // for(uint32_t n = 0; n < 9; n++){ + // const auto &q = query_edges[i*9+n]; + // auto adjitr = G->get_outgoing_adjlist_range(q.first); + // for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + // auto edge = *iter; + // uint64_t dst = edge.nbr; + // if(dst == q.second) break; + // } + // } + + + for(uint32_t n = 0; n < 9; n++){ + const auto &e = raw_edges[i*9+n]; + G->add_edge({e.first, e.second}, true); + } + + const auto &q = query_edges[i]; + auto adjitr = G->get_outgoing_adjlist_range(q.first); + for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + auto edge = *iter; + uint64_t dst = edge.nbr; + if(dst == q.second) break; + } + + } + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + + cilk_for(uint32_t i = 0; i < updates_to_run; i++) { + const auto &e = raw_edges[i]; + G->del_edge({e.first, e.second}, true); + } + + free(raw_edges); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/stinger/tests/stinger_test_propotion.cpp b/sys/stinger/tests/stinger_test_propotion.cpp new file mode 100644 index 0000000..bb093e8 --- /dev/null +++ b/sys/stinger/tests/stinger_test_propotion.cpp @@ -0,0 +1,172 @@ +#include "stinger_test.h" +#include + + +template +void insert_edges(graph *GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + stinger_insert_edge(GA, 0,new_srcs[pos] , new_dests[pos], 1, 0); + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void insert_read(graph *GA, std::vector &new_srcs, std::vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + // for(int64_t pos = start, end = start + length; pos < end; pos++){ + // stinger_insert_edge(GA, 0,new_srcs[pos], new_dests[pos], 1, 0); + // stinger_edge_touch(GA, query_srcs[pos], query_dests[pos], 0, 0); + // } + + // for(int64_t pos = start, end = start + length; pos < end; pos++){ + // stinger_insert_edge(GA, 0,new_srcs[pos], new_dests[pos], 1, 0); + // for(int64_t n = 0; n < 9; n++){ + // stinger_edge_touch(GA, query_srcs[pos*9+n], query_dests[pos*9+n], 0, 0); + // } + // } + + for(int64_t pos = start, end = start + length; pos < end; pos++){ + for(int64_t n = 0; n < 9; n++){ + stinger_insert_edge(GA, 0,new_srcs[pos*9+n], new_dests[pos*9+n], 1, 0); + } + stinger_edge_touch(GA, query_srcs[pos], query_dests[pos], 0, 0); + } + }; + + + int64_t smaller_size = new_srcs.size() < query_srcs.size() ? new_srcs.size() : query_srcs.size(); + int64_t edges_per_thread = smaller_size / num_threads; + int64_t odd_threads = smaller_size % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void delete_edges(graph *GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + if(new_srcs[pos] != new_dests[pos]) + stinger_remove_edge (GA, 0 ,new_srcs[pos] , new_dests[pos] ); + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + stinger &Ga = *G; + // std::vector update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + insert_read(G, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // delete + delete_edges(G, new_srcs, new_dests, thd_num); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/terrace/test/terrace_test_propotion.cpp b/sys/terrace/test/terrace_test_propotion.cpp new file mode 100644 index 0000000..c9b408d --- /dev/null +++ b/sys/terrace/test/terrace_test_propotion.cpp @@ -0,0 +1,162 @@ +#define CILK 1 +// #define OPENMP 1 +#include "terrace_test.h" + +void load_graph(commandLine& P){ + auto filename = P.getOptionValue("-f", "none"); + pair_uint *edges = get_edges_from_file_adj_sym(filename.c_str(), &num_edges, &num_nodes); + G = new Graph(num_nodes); + for (uint32_t i = 0; i < num_edges; i++) { + new_srcs.push_back(edges[i].x); + new_dests.push_back(edges[i].y); + } + auto perm = get_random_permutation(num_edges); + + PRINT("=============== Load Graph BEGIN ==============="); + gettimeofday(&t_start, &tzp); + G->add_edge_batch(new_srcs.data(), new_dests.data(), num_edges, perm); + gettimeofday(&t_end, &tzp); + free(edges); + new_srcs.clear(); + new_dests.clear(); + float size_gb = G->get_size() / (float) 1073741824; + PRINT("Load Graph: Nodes: " << G->get_num_vertices() <<" Edges: " << G->get_num_edges() << " Size: " << size_gb << " GB"); + print_time_elapsed("Load Graph Cost: ", &t_start, &t_end); + PRINT("Throughput: " << G->get_num_edges() / (float) cal_time_elapsed(&t_start, &t_end)); + PRINT("================ Load Graph END ================"); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; usget_num_vertices(); + + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(num_nodes) - 1); + auto rmat = rMat(nn, r.ith_rand(100+ts), a, b, c); + for(uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + pair_uint *edges = (pair_uint*)calloc(updates_to_run, sizeof(pair_uint)); + for (uint32_t i = 0; i < updates_to_run; i++) { + edges[i].x = new_srcs[i]; + edges[i].y = new_dests[i]; + } + integerSort_y((pair_els*)edges, updates_to_run, num_nodes); + integerSort_x((pair_els*)edges, updates_to_run, num_nodes); + new_srcs.clear(); + new_srcs.clear(); + query_srcs.clear(); + query_dests.clear(); + + for (uint32_t i = 0; i < updates_to_run; i++) { + new_srcs.push_back(edges[i].x); + new_dests.push_back(edges[i].y); + } + free(edges); + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + + // G->add_edge_batch(new_srcs.data(), new_dests.data(), updates_to_run, perm); + // for(uint32_t i = 0; i < updates_to_run2; i++) { + // G->is_edge(query_srcs[i], query_dests[i]); + // } + + uint32_t smaller_updates = updates_to_run < updates_to_run2 ? updates_to_run : updates_to_run2; + + // for(uint32_t round = 0; round < smaller_updates; round++){ + // G->add_edge(new_srcs[round], new_dests[round]); + // G->is_edge(query_srcs[round], query_dests[round]); + // } + + // for(uint32_t round = 0; round < smaller_updates; round++){ + // G->add_edge(new_srcs[round], new_dests[round]); + // for(uint32_t p = 0;p < 9; p++) { + // G->is_edge(query_srcs[round*9+p], query_dests[round*9+p]); + // } + // } + + for(uint32_t round = 0; round < smaller_updates; round++){ + for(uint32_t p = 0;p < 9; p++) { + G->add_edge(new_srcs[round*9+p], new_dests[round*9+p]); + } + G->is_edge(query_srcs[round], query_dests[round]); + } + + + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // remove edges + // puts("debug-----remove edges"); + for(uint32_t i = 0; i < updates_to_run; i++) { + G->remove_edge(new_srcs[i], new_dests[i]); + } + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/teseo/tests/teseo_test_propotion.cpp b/sys/teseo/tests/teseo_test_propotion.cpp new file mode 100644 index 0000000..1eb9007 --- /dev/null +++ b/sys/teseo/tests/teseo_test_propotion.cpp @@ -0,0 +1,276 @@ +// +// Created by zxy on 5/8/22. +// + +#include "teseo_test.h" + + +template +void insert_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void read_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + volatile auto result = tx.has_edge(query_srcs[pos], query_dests[pos]); // use volatile to make sure has_edge() is executed + tx.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void insert_read(graph &GA, std::vector &new_srcs, std::vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + // while(1){ + // try{ + // auto tx = GA.start_transaction(); + // if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + // tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + // tx.commit(); + // } + // break; + // } + // catch (exception e){ + // continue; + // } + // } + + + for(int64_t n = 0; n < 9; n++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos*9+n]!= new_dests[pos*9+n] && !tx.has_edge(new_srcs[pos*9+n], new_dests[pos*9+n])) { + tx.insert_edge(new_srcs[pos*9+n], new_dests[pos*9+n], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + + + + while(1){ + try{ + auto tx = GA.start_transaction(); + if(tx.has_edge(query_srcs[pos], query_dests[pos])) { + volatile auto result = tx.get_weight(query_srcs[pos], query_dests[pos]); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + + // for(int64_t n = 0; n < 9; n++){ + // while(1){ + // try{ + // auto tx = GA.start_transaction(); + // if(tx.has_edge(query_srcs[pos*9+n], query_dests[pos*9+n])) { + // volatile auto result = tx.get_weight(query_srcs[pos*9+n], query_dests[pos*9+n]); + // } + // tx.commit(); + // break; + // } + // catch (exception e){ + // continue; + // } + // } + // } + } + GA.unregister_thread(); + }; + int64_t smaller_size = new_srcs.size() < query_srcs.size() ? new_srcs.size() : query_srcs.size(); + int64_t edges_per_thread = smaller_size / num_threads; + int64_t odd_threads = smaller_size % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + +template +void delete_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && tx.has_edge(new_srcs[pos], new_dests[pos])){ + tx.remove_edge(new_srcs[pos], new_dests[pos]); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +void batch_ins_del_read(commandLine& P){ + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10,100,1000,10000,100000,1000000,10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + std::vector avg_insert, avg_delete; + avg_insert.clear(); avg_delete.clear(); + for(size_t i=0; i(); + PRINT("=============== Batch Insert BEGIN ==============="); + for (size_t us=0; us(nn, r.ith_rand(100 + ts), a, b, c); + for (uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + insert_read(Ga, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert[us] += cal_time_elapsed(&t_start, &t_end); + + delete_edges(Ga, new_srcs, new_dests, thd_num); + } + PRINT("=============== Batch Insert END ==============="); + } + + for (size_t us=0; us