From e0b4b2ab29afadc48adb682cfd66ccdf71ba1888 Mon Sep 17 00:00:00 2001 From: Yin Ziyao Date: Wed, 15 May 2024 23:54:00 +0800 Subject: [PATCH 01/10] add mix operation test for teseo --- sys/teseo/CMakeLists.txt | 3 + sys/teseo/tests/teseo_test_propotion.cpp | 244 +++++++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 sys/teseo/tests/teseo_test_propotion.cpp diff --git a/sys/teseo/CMakeLists.txt b/sys/teseo/CMakeLists.txt index b6b2f40..518264e 100644 --- a/sys/teseo/CMakeLists.txt +++ b/sys/teseo/CMakeLists.txt @@ -30,6 +30,9 @@ TARGET_LINK_LIBRARIES(teseo_alg teseo1 td) add_executable(teseo_edge tests/teseo_test_edge.cpp) TARGET_LINK_LIBRARIES(teseo_edge teseo1 td) +add_executable(teseo_propotion tests/teseo_test_propotion.cpp) +TARGET_LINK_LIBRARIES(teseo_propotion teseo1 td) + add_executable(teseo_mem tests/teseo_test_memory.cpp) TARGET_LINK_LIBRARIES(teseo_mem teseo1 td) diff --git a/sys/teseo/tests/teseo_test_propotion.cpp b/sys/teseo/tests/teseo_test_propotion.cpp new file mode 100644 index 0000000..080d2ce --- /dev/null +++ b/sys/teseo/tests/teseo_test_propotion.cpp @@ -0,0 +1,244 @@ +// +// Created by yzy on 5/14/24. +// + +#include "teseo_test.h" + + +template +void insert_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void read_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + puts("------insert_edges------"); + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + volatile auto result = tx.has_edge(query_srcs[pos], query_dests[pos]); // use volatile to make sure has_edge() is executed + tx.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void insert_read(graph &GA, std::vector &new_srcs, std::vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && !tx.has_edge(new_srcs[pos], new_dests[pos])) { + tx.insert_edge(new_srcs[pos], new_dests[pos], 1.0); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + while(1){ + try{ + auto tx = GA.start_transaction(); + volatile auto result = tx.has_edge(query_srcs[pos], query_dests[pos]); // use volatile to make sure has_edge() is executed + tx.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + +template +void delete_edges(graph &GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + GA.register_thread(); + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx = GA.start_transaction(); + if(new_srcs[pos]!= new_dests[pos] && tx.has_edge(new_srcs[pos], new_dests[pos])){ + tx.remove_edge(new_srcs[pos], new_dests[pos]); + tx.commit(); + } + break; + } + catch (exception e){ + continue; + } + } + } + GA.unregister_thread(); + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +void batch_ins_del_read(commandLine& P){ + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10,100,1000,10000,100000,1000000,10000000}; + std::vector update_sizes = {500000}; + std::vector update_sizes2 = {500000}; + std::vector avg_insert, avg_delete; + avg_insert.clear(); avg_delete.clear(); + for(size_t i=0; i(); + PRINT("=============== Batch Insert BEGIN ==============="); + for (size_t us=0; us(nn, r.ith_rand(100 + ts), a, b, c); + for (uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + // mix operation + insert_read(Ga, new_srcs, new_dests, query_srcs, query_dests, thd_num); + + // contrast test, do insert and read in group + // insert_edges(Ga, new_srcs, new_dests, thd_num); + // read_edges(Ga, query_srcs, query_dests, thd_num); + + + gettimeofday(&t_end, &tzp); + avg_insert[us] += cal_time_elapsed(&t_start, &t_end); + + delete_edges(Ga, new_srcs, new_dests, thd_num); + } + PRINT("=============== Batch Insert END ==============="); + } + + for (size_t us=0; us Date: Thu, 23 May 2024 23:29:18 +0800 Subject: [PATCH 02/10] Add hybrid test for pcsr. --- log/pcsr/propotion.log | 11 +++ sys/pcsr/CMakeLists.txt | 1 + sys/pcsr/test/pcsr_test.h | 2 + sys/pcsr/test/pcsr_test_propotion.cpp | 105 ++++++++++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 log/pcsr/propotion.log create mode 100644 sys/pcsr/test/pcsr_test_propotion.cpp diff --git a/log/pcsr/propotion.log b/log/pcsr/propotion.log new file mode 100644 index 0000000..29968e0 --- /dev/null +++ b/log/pcsr/propotion.log @@ -0,0 +1,11 @@ +livejournal,16,e,insert-read,500000-500000,1.53587e+06 +livejournal,16,e,insert-read,500000-500000,1.56299e+06 +livejournal,16,e,insert-read,500000-500000,1.52229e+06 + +livejournal,16,e,insert-read,100000-900000,4.55711e+06 +livejournal,16,e,insert-read,100000-900000,4.58366e+06 +livejournal,16,e,insert-read,100000-900000,4.56831e+06 + +livejournal,16,e,insert-read,900000-100000,980870 +livejournal,16,e,insert-read,900000-100000,1.00775e+06 +livejournal,16,e,insert-read,900000-100000,1.00096e+06 diff --git a/sys/pcsr/CMakeLists.txt b/sys/pcsr/CMakeLists.txt index a365bce..be5f4b3 100644 --- a/sys/pcsr/CMakeLists.txt +++ b/sys/pcsr/CMakeLists.txt @@ -11,6 +11,7 @@ include_directories(include/algorithms) add_executable(pcsr_alg test/pcsr_test_alg.cpp) add_executable(pcsr_edge test/pcsr_test_edge.cpp) +add_executable(pcsr_propotion test/pcsr_test_propotion.cpp) add_executable(pcsr_mem test/pcsr_test_memory.cpp) add_executable(pcsr_scala test/pcsr_test_scala.cpp) # -src 9 -maxiters 5 -f ../../../data/slashdot.adj diff --git a/sys/pcsr/test/pcsr_test.h b/sys/pcsr/test/pcsr_test.h index 07fc23b..8c741a0 100644 --- a/sys/pcsr/test/pcsr_test.h +++ b/sys/pcsr/test/pcsr_test.h @@ -29,6 +29,8 @@ PCSR *G; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/pcsr/test/pcsr_test_propotion.cpp b/sys/pcsr/test/pcsr_test_propotion.cpp new file mode 100644 index 0000000..1e5b890 --- /dev/null +++ b/sys/pcsr/test/pcsr_test_propotion.cpp @@ -0,0 +1,105 @@ +#include "pcsr_test.h" + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + PCSR &Ga = *G; + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + + gettimeofday(&t_start, &tzp); + + // for (uint32_t i =0 ; i< 500000;i++){ + // Ga.add_edge_update(new_srcs[i],new_dests[i],1); + // Ga.find_value(query_srcs[i], query_dests[i]); + // } + // for (uint32_t i =0 ; i< 100000;i++){ + // Ga.add_edge_update(new_srcs[i],new_dests[i],1); + // for (uint32_t n = 0; n < 9; n++){ + // Ga.find_value(query_srcs[i*9+n], query_dests[i*9+n]); + // } + // } + for (uint32_t i =0 ; i< 100000;i++){ + for (uint32_t n = 0; n < 9; n++){ + Ga.add_edge_update(new_srcs[i*9+n], new_dests[i*9+n], 1); + } + Ga.find_value(query_srcs[i],query_dests[i]); + } + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = 1000000 / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< Date: Wed, 29 May 2024 19:14:43 +0800 Subject: [PATCH 03/10] fix mistakes of teset_test_propotion --- sys/teseo/tests/teseo_test_propotion.cpp | 82 ++++++++++++++++-------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/sys/teseo/tests/teseo_test_propotion.cpp b/sys/teseo/tests/teseo_test_propotion.cpp index 080d2ce..1eb9007 100644 --- a/sys/teseo/tests/teseo_test_propotion.cpp +++ b/sys/teseo/tests/teseo_test_propotion.cpp @@ -1,5 +1,5 @@ // -// Created by yzy on 5/14/24. +// Created by zxy on 5/8/22. // #include "teseo_test.h" @@ -78,35 +78,74 @@ void insert_read(graph &GA, std::vector &new_srcs, std::vector threads; int64_t start = 0; for(int thread_id = 0; thread_id < num_threads; thread_id ++){ @@ -160,8 +199,8 @@ void batch_ins_del_read(commandLine& P){ std::ofstream log_file(log, ios::app); // std::vector update_sizes = {10,100,1000,10000,100000,1000000,10000000}; - std::vector update_sizes = {500000}; - std::vector update_sizes2 = {500000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; std::vector avg_insert, avg_delete; avg_insert.clear(); avg_delete.clear(); for(size_t i=0; i Date: Wed, 29 May 2024 19:16:12 +0800 Subject: [PATCH 04/10] fix mistakes of pcsr_test_propotion --- sys/pcsr/test/pcsr_test_propotion.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sys/pcsr/test/pcsr_test_propotion.cpp b/sys/pcsr/test/pcsr_test_propotion.cpp index 1e5b890..2ed8b70 100644 --- a/sys/pcsr/test/pcsr_test_propotion.cpp +++ b/sys/pcsr/test/pcsr_test_propotion.cpp @@ -1,3 +1,8 @@ + +// +// Created by zxy on 5/4/22. +// + #include "pcsr_test.h" void batch_ins_del_read(commandLine& P){ @@ -80,7 +85,7 @@ void batch_ins_del_read(commandLine& P){ } double time_i = (double) avg_insert / n_trials; - double insert_throughput = 1000000 / time_i; + double insert_throughput = (updates_to_run + updates_to_run2) / time_i; printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); log_file<< gname<<","< Date: Wed, 29 May 2024 19:18:27 +0800 Subject: [PATCH 05/10] add hybrid test for terrace --- sys/terrace/CMakeLists.txt | 3 + sys/terrace/test/terrace_test_propotion.cpp | 162 ++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 sys/terrace/test/terrace_test_propotion.cpp diff --git a/sys/terrace/CMakeLists.txt b/sys/terrace/CMakeLists.txt index dbc4983..81ca52c 100644 --- a/sys/terrace/CMakeLists.txt +++ b/sys/terrace/CMakeLists.txt @@ -19,6 +19,9 @@ add_subdirectory(src) add_executable(terrace_edge test/terrace_test_edge.cpp) target_link_libraries(terrace_edge terrc) +add_executable(terrace_propotion test/terrace_test_propotion.cpp) +target_link_libraries(terrace_propotion terrc) + add_executable(terrace_alg test/terrace_test_alg.cpp) target_link_libraries(terrace_alg terrc) diff --git a/sys/terrace/test/terrace_test_propotion.cpp b/sys/terrace/test/terrace_test_propotion.cpp new file mode 100644 index 0000000..c9b408d --- /dev/null +++ b/sys/terrace/test/terrace_test_propotion.cpp @@ -0,0 +1,162 @@ +#define CILK 1 +// #define OPENMP 1 +#include "terrace_test.h" + +void load_graph(commandLine& P){ + auto filename = P.getOptionValue("-f", "none"); + pair_uint *edges = get_edges_from_file_adj_sym(filename.c_str(), &num_edges, &num_nodes); + G = new Graph(num_nodes); + for (uint32_t i = 0; i < num_edges; i++) { + new_srcs.push_back(edges[i].x); + new_dests.push_back(edges[i].y); + } + auto perm = get_random_permutation(num_edges); + + PRINT("=============== Load Graph BEGIN ==============="); + gettimeofday(&t_start, &tzp); + G->add_edge_batch(new_srcs.data(), new_dests.data(), num_edges, perm); + gettimeofday(&t_end, &tzp); + free(edges); + new_srcs.clear(); + new_dests.clear(); + float size_gb = G->get_size() / (float) 1073741824; + PRINT("Load Graph: Nodes: " << G->get_num_vertices() <<" Edges: " << G->get_num_edges() << " Size: " << size_gb << " GB"); + print_time_elapsed("Load Graph Cost: ", &t_start, &t_end); + PRINT("Throughput: " << G->get_num_edges() / (float) cal_time_elapsed(&t_start, &t_end)); + PRINT("================ Load Graph END ================"); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; usget_num_vertices(); + + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(num_nodes) - 1); + auto rmat = rMat(nn, r.ith_rand(100+ts), a, b, c); + for(uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + pair_uint *edges = (pair_uint*)calloc(updates_to_run, sizeof(pair_uint)); + for (uint32_t i = 0; i < updates_to_run; i++) { + edges[i].x = new_srcs[i]; + edges[i].y = new_dests[i]; + } + integerSort_y((pair_els*)edges, updates_to_run, num_nodes); + integerSort_x((pair_els*)edges, updates_to_run, num_nodes); + new_srcs.clear(); + new_srcs.clear(); + query_srcs.clear(); + query_dests.clear(); + + for (uint32_t i = 0; i < updates_to_run; i++) { + new_srcs.push_back(edges[i].x); + new_dests.push_back(edges[i].y); + } + free(edges); + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + + // G->add_edge_batch(new_srcs.data(), new_dests.data(), updates_to_run, perm); + // for(uint32_t i = 0; i < updates_to_run2; i++) { + // G->is_edge(query_srcs[i], query_dests[i]); + // } + + uint32_t smaller_updates = updates_to_run < updates_to_run2 ? updates_to_run : updates_to_run2; + + // for(uint32_t round = 0; round < smaller_updates; round++){ + // G->add_edge(new_srcs[round], new_dests[round]); + // G->is_edge(query_srcs[round], query_dests[round]); + // } + + // for(uint32_t round = 0; round < smaller_updates; round++){ + // G->add_edge(new_srcs[round], new_dests[round]); + // for(uint32_t p = 0;p < 9; p++) { + // G->is_edge(query_srcs[round*9+p], query_dests[round*9+p]); + // } + // } + + for(uint32_t round = 0; round < smaller_updates; round++){ + for(uint32_t p = 0;p < 9; p++) { + G->add_edge(new_srcs[round*9+p], new_dests[round*9+p]); + } + G->is_edge(query_srcs[round], query_dests[round]); + } + + + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // remove edges + // puts("debug-----remove edges"); + for(uint32_t i = 0; i < updates_to_run; i++) { + G->remove_edge(new_srcs[i], new_dests[i]); + } + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< Date: Wed, 29 May 2024 19:21:44 +0800 Subject: [PATCH 06/10] add hybrid test for stinger --- .vscode/settings.json | 5 + sys/stinger/CMakeLists.txt | 3 + sys/stinger/tests/stinger_test.h | 2 + sys/stinger/tests/stinger_test_propotion.cpp | 172 +++++++++++++++++++ sys/terrace/test/terrace_test.h | 2 + sys/teseo/tests/teseo_test.h | 2 + 6 files changed, 186 insertions(+) create mode 100644 .vscode/settings.json create mode 100644 sys/stinger/tests/stinger_test_propotion.cpp diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a3b2b51 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "files.associations": { + "vector": "cpp" + } +} \ No newline at end of file diff --git a/sys/stinger/CMakeLists.txt b/sys/stinger/CMakeLists.txt index 150a4fb..84855b4 100644 --- a/sys/stinger/CMakeLists.txt +++ b/sys/stinger/CMakeLists.txt @@ -18,6 +18,9 @@ target_link_libraries(stinger_alg stinger) add_executable(stinger_edge tests/stinger_test_edge.cpp) target_link_libraries(stinger_edge stinger) +add_executable(stinger_propotion tests/stinger_test_propotion.cpp) +target_link_libraries(stinger_propotion stinger) + add_executable(stinger_mem tests/stinger_test_memory.cpp) target_link_libraries(stinger_mem stinger) diff --git a/sys/stinger/tests/stinger_test.h b/sys/stinger/tests/stinger_test.h index c4eb54c..49e4ae7 100644 --- a/sys/stinger/tests/stinger_test.h +++ b/sys/stinger/tests/stinger_test.h @@ -37,6 +37,8 @@ using namespace std; struct stinger * G; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/stinger/tests/stinger_test_propotion.cpp b/sys/stinger/tests/stinger_test_propotion.cpp new file mode 100644 index 0000000..bb093e8 --- /dev/null +++ b/sys/stinger/tests/stinger_test_propotion.cpp @@ -0,0 +1,172 @@ +#include "stinger_test.h" +#include + + +template +void insert_edges(graph *GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + stinger_insert_edge(GA, 0,new_srcs[pos] , new_dests[pos], 1, 0); + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void insert_read(graph *GA, std::vector &new_srcs, std::vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + // for(int64_t pos = start, end = start + length; pos < end; pos++){ + // stinger_insert_edge(GA, 0,new_srcs[pos], new_dests[pos], 1, 0); + // stinger_edge_touch(GA, query_srcs[pos], query_dests[pos], 0, 0); + // } + + // for(int64_t pos = start, end = start + length; pos < end; pos++){ + // stinger_insert_edge(GA, 0,new_srcs[pos], new_dests[pos], 1, 0); + // for(int64_t n = 0; n < 9; n++){ + // stinger_edge_touch(GA, query_srcs[pos*9+n], query_dests[pos*9+n], 0, 0); + // } + // } + + for(int64_t pos = start, end = start + length; pos < end; pos++){ + for(int64_t n = 0; n < 9; n++){ + stinger_insert_edge(GA, 0,new_srcs[pos*9+n], new_dests[pos*9+n], 1, 0); + } + stinger_edge_touch(GA, query_srcs[pos], query_dests[pos], 0, 0); + } + }; + + + int64_t smaller_size = new_srcs.size() < query_srcs.size() ? new_srcs.size() : query_srcs.size(); + int64_t edges_per_thread = smaller_size / num_threads; + int64_t odd_threads = smaller_size % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void delete_edges(graph *GA, std::vector &new_srcs, std::vector &new_dests, int num_threads){ + auto routine_insert_edges = [&](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + if(new_srcs[pos] != new_dests[pos]) + stinger_remove_edge (GA, 0 ,new_srcs[pos] , new_dests[pos] ); + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + stinger &Ga = *G; + // std::vector update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000}; + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + gettimeofday(&t_start, &tzp); + insert_read(G, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // delete + delete_edges(G, new_srcs, new_dests, thd_num); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/teseo/tests/teseo_test.h b/sys/teseo/tests/teseo_test.h index 866c5d8..469e937 100644 --- a/sys/teseo/tests/teseo_test.h +++ b/sys/teseo/tests/teseo_test.h @@ -33,6 +33,8 @@ teseo::Teseo *G; OpenMP *OMP; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; From 88cb612628205ffb43334c2ca80f44591ed59d40 Mon Sep 17 00:00:00 2001 From: Yin Ziyao Date: Wed, 29 May 2024 19:24:56 +0800 Subject: [PATCH 07/10] update gitignore --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d8eec88..7b35329 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ # Project exclude paths /cmake-build-debug/ -/data/ \ No newline at end of file +/data/ +/.idea/ +/.vscode/ \ No newline at end of file From 68328c801cef30d4c88b30b1ba388d80e0008ecd Mon Sep 17 00:00:00 2001 From: Yin Ziyao Date: Wed, 29 May 2024 19:27:50 +0800 Subject: [PATCH 08/10] add hybrid test for risgraph --- .vscode/settings.json | 5 - sys/risgraph/CMakeLists.txt | 5 + sys/risgraph/test/risgraph_test_propotion.cpp | 192 ++++++++++++++++++ 3 files changed, 197 insertions(+), 5 deletions(-) delete mode 100644 .vscode/settings.json create mode 100644 sys/risgraph/test/risgraph_test_propotion.cpp diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index a3b2b51..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "files.associations": { - "vector": "cpp" - } -} \ No newline at end of file diff --git a/sys/risgraph/CMakeLists.txt b/sys/risgraph/CMakeLists.txt index b26d24a..b5c960f 100644 --- a/sys/risgraph/CMakeLists.txt +++ b/sys/risgraph/CMakeLists.txt @@ -32,6 +32,11 @@ target_link_libraries(risgraph_edge tbb) set_target_properties(risgraph_edge PROPERTIES COMPILE_FLAGS ${COMPILE_FLAGS64} LINK_FLAGS ${LINK_FLAGS_COMMON}) +add_executable(risgraph_propotion test/risgraph_test_propotion.cpp) +target_link_libraries(risgraph_propotion tbb) +set_target_properties(risgraph_propotion PROPERTIES COMPILE_FLAGS ${COMPILE_FLAGS64} + LINK_FLAGS ${LINK_FLAGS_COMMON}) + add_executable(risgraph_mem test/risgraph_test_memory.cpp) target_link_libraries(risgraph_mem tbb) set_target_properties(risgraph_mem PROPERTIES COMPILE_FLAGS ${COMPILE_FLAGS64} diff --git a/sys/risgraph/test/risgraph_test_propotion.cpp b/sys/risgraph/test/risgraph_test_propotion.cpp new file mode 100644 index 0000000..30362f0 --- /dev/null +++ b/sys/risgraph/test/risgraph_test_propotion.cpp @@ -0,0 +1,192 @@ +// +// Created by zxy on 5/8/22. +// +#include "risgraph_test.h" +#include "utils/io_util.h" +#include "utils/rmat_util.h" +#include "type.hpp" +#include "io.hpp" +#include "storage.hpp" +#include +//using adjedge_type = AdjEdge; +//using Storage = IndexedEdgeStorage; +//using adjlist_type = typename Storage::adjlist_type; + +void load_graph(commandLine& P){ + auto filename = P.getOptionValue("-f", "none"); + pair_uint *edges = get_edges_from_file_adj_sym(filename.c_str(), &num_edges, &num_nodes); + + G = new Graph (num_nodes, num_edges, false, true); + std::vector vv = G->alloc_vertex_array(); + G->fill_vertex_array(vv, 1); + + std::pair *raw_edges; + raw_edges = new std::pair[num_edges]; + for (uint32_t i = 0; i < num_edges; i++) { + raw_edges[i].first = edges[i].x; + raw_edges[i].second = edges[i].y; + } + + auto perm = get_random_permutation(num_edges); + + PRINT("=============== Load Graph BEGIN ==============="); + + gettimeofday(&t_start, &tzp); + for(uint32_t i=0; i< num_edges; i++){ + const auto &e = raw_edges[i]; + G->add_edge({e.first, e.second}, true); + } + + gettimeofday(&t_end, &tzp); + free(edges); + free(raw_edges); + new_srcs.clear(); + new_dests.clear(); + //float size_gb = G->get_size() / (float) 1073741824; + //PRINT("Load Graph: Nodes: " << G->get_n() <<" Edges: " << G->edges.N<< " Size: " << size_gb << " GB"); + PRINT("Load Graph: Nodes: " << num_nodes <<" Edges: "< update_sizes = {10, 100, 1000, 10000, 100000, 1000000, 10000000};// + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + + std::pair *raw_edges; + std::pair *query_edges; + raw_edges = new std::pair[num_edges]; + query_edges = new std::pair[num_edges]; + for (uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + raw_edges[i].first = edge.first; + raw_edges[i].second = edge.second; + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, updates_to_run - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_edges[i] = raw_edges[index]; + } + + + + gettimeofday(&t_start, &tzp); + + size_t smaller_updates = updates_to_run < updates_to_run2 ? updates_to_run : updates_to_run2; + cilk_for(uint32_t i=0; i< smaller_updates; i++){ + // const auto &e = raw_edges[i]; + // G->add_edge({e.first, e.second}, true); + + // const auto &q = query_edges[i]; + // auto adjitr = G->get_outgoing_adjlist_range(q.first); + // for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + // auto edge = *iter; + // uint64_t dst = edge.nbr; + // if(dst == q.second) break; + // } + + // const auto &e = raw_edges[i]; + // G->add_edge({e.first, e.second}, true); + + // for(uint32_t n = 0; n < 9; n++){ + // const auto &q = query_edges[i*9+n]; + // auto adjitr = G->get_outgoing_adjlist_range(q.first); + // for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + // auto edge = *iter; + // uint64_t dst = edge.nbr; + // if(dst == q.second) break; + // } + // } + + + for(uint32_t n = 0; n < 9; n++){ + const auto &e = raw_edges[i*9+n]; + G->add_edge({e.first, e.second}, true); + } + + const auto &q = query_edges[i]; + auto adjitr = G->get_outgoing_adjlist_range(q.first); + for(auto iter = adjitr.first ; iter != adjitr.second;iter++){ + auto edge = *iter; + uint64_t dst = edge.nbr; + if(dst == q.second) break; + } + + } + + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + + cilk_for(uint32_t i = 0; i < updates_to_run; i++) { + const auto &e = raw_edges[i]; + G->del_edge({e.first, e.second}, true); + } + + free(raw_edges); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< Date: Wed, 29 May 2024 19:30:36 +0800 Subject: [PATCH 09/10] add hybrid test for llama --- sys/llama/CMakeLists.txt | 1 + sys/llama/tests/llama_test.h | 2 + sys/llama/tests/llama_test_propotion.cpp | 214 +++++++++++++++++++++++ 3 files changed, 217 insertions(+) create mode 100644 sys/llama/tests/llama_test_propotion.cpp diff --git a/sys/llama/CMakeLists.txt b/sys/llama/CMakeLists.txt index 228a152..7b70981 100644 --- a/sys/llama/CMakeLists.txt +++ b/sys/llama/CMakeLists.txt @@ -17,6 +17,7 @@ include_directories(include/utils) include_directories(include/algorithms) add_executable(llama_edge tests/llama_test_edge.cpp) +add_executable(llama_propotion tests/llama_test_propotion.cpp) add_executable(llama_alg tests/llama_test_alg.cpp) add_executable(llama_mem tests/llama_test_memory.cpp) add_executable(llama_scala tests/llama_test_scala.cpp) diff --git a/sys/llama/tests/llama_test.h b/sys/llama/tests/llama_test.h index b404e0a..8018356 100644 --- a/sys/llama/tests/llama_test.h +++ b/sys/llama/tests/llama_test.h @@ -33,6 +33,8 @@ ll_database *G; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/llama/tests/llama_test_propotion.cpp b/sys/llama/tests/llama_test_propotion.cpp new file mode 100644 index 0000000..f6722ba --- /dev/null +++ b/sys/llama/tests/llama_test_propotion.cpp @@ -0,0 +1,214 @@ +#include "llama_test.h" +#include "thread" + +template +void insert_read(Graph &graph, vector &new_srcs, vector &new_dests, std::vector &query_srcs, std::vector &query_dests, int num_threads){ + auto routine_insert_edges = [&graph, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + + while(1){ + try{ + graph.tx_begin(); + edge_t edge_id = graph.add_edge(new_srcs[pos], new_dests[pos]); + uint64_t w = 1; + graph.get_edge_property_64(g_llama_property_weights)->set(edge_id, *reinterpret_cast(&(w))); + graph.tx_commit(); + break; + } + catch (exception e){ + continue; + } + } + + // for(int64_t n = 0; n < 9; n++){ + // while(1){ + // try{ + // graph.tx_begin(); + // edge_t edge_id = graph.add_edge(new_srcs[pos*9+n], new_dests[pos*9+n]); + // uint64_t w = 1; + // graph.get_edge_property_64(g_llama_property_weights)->set(edge_id, *reinterpret_cast(&(w))); + // graph.tx_commit(); + // break; + // } + // catch (exception e){ + // continue; + // } + // } + // } + + + // auto* g = get_snapshot(G); + // volatile bool result = g->find(query_srcs[pos], query_dests[pos]); + + + // for(int64_t n = 0; n < 9; n++){ + // while(1){ + // try{ + // auto* g = get_snapshot(G); + // volatile bool result = g->find(query_srcs[pos*9+n], query_dests[pos*9+n]); + // break; + // } + // catch (exception e){ + // continue; + // } + // } + // } + } + }; + int64_t smaller_size = new_srcs.size() < query_srcs.size() ? new_srcs.size() : query_srcs.size(); + int64_t edges_per_thread = smaller_size / num_threads; + int64_t odd_threads = smaller_size % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +template +void delete_edges(Graph &graph, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [&graph, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + graph.tx_begin(); + graph.delete_edge(new_srcs[pos], graph.find(new_srcs[pos], new_dests[pos])); + graph.tx_commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + +double test_read(commandLine& P) { + auto r = random_aspen(); + auto* g = get_snapshot(G); + uint64_t n = g->max_nodes(); + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(n) - 1); + auto rmat = rMat(nn, r.ith_rand(100), a, b, c); + new_srcs.clear();new_dests.clear(); + uint32_t updates = num_edges/20; + for( uint32_t i = 0; i < updates; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + gettimeofday(&t_start, &tzp); + parallel_for(uint32_t i = 0; i < updates; i++) { + g->find(new_srcs[i], new_dests[i]); + } + gettimeofday(&t_end, &tzp); + return cal_time_elapsed(&t_start, &t_end); +} + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + ll_database Ga = *G; + ll_writable_graph& graph = *Ga.graph(); + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000}; + std::vector update_sizes = {500000}; + std::vector update_sizes2 = {500000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; us(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + // insert edge + gettimeofday(&t_start, &tzp); + insert_read(graph, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + delete_edges(graph, new_srcs, new_dests, thd_num); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","< Date: Wed, 29 May 2024 19:32:18 +0800 Subject: [PATCH 10/10] add hybrid test for livegraph --- sys/livegraph/CMakeLists.txt | 3 + sys/livegraph/test/livegraph_test.h | 2 + .../test/livegraph_test_propotion.cpp | 334 ++++++++++++++++++ 3 files changed, 339 insertions(+) create mode 100644 sys/livegraph/test/livegraph_test_propotion.cpp diff --git a/sys/livegraph/CMakeLists.txt b/sys/livegraph/CMakeLists.txt index 9fe10a1..311b909 100644 --- a/sys/livegraph/CMakeLists.txt +++ b/sys/livegraph/CMakeLists.txt @@ -22,6 +22,9 @@ include_directories(include/algorithms) add_executable(livegraph_edge test/livegraph_test_edge.cpp) target_link_libraries(livegraph_edge lg tbb) +add_executable(livegraph_propotion test/livegraph_test_propotion.cpp) +target_link_libraries(livegraph_propotion lg tbb) + add_executable(livegraph_alg test/livegraph_test_alg.cpp) target_link_libraries(livegraph_alg lg tbb) diff --git a/sys/livegraph/test/livegraph_test.h b/sys/livegraph/test/livegraph_test.h index 10265da..abe698b 100644 --- a/sys/livegraph/test/livegraph_test.h +++ b/sys/livegraph/test/livegraph_test.h @@ -39,6 +39,8 @@ lg::vertex_t internal_id = 0; lg::Graph* G; std::vector new_srcs; std::vector new_dests; +std::vector query_srcs; +std::vector query_dests; uint32_t num_nodes; uint64_t num_edges; std::string src, dest; diff --git a/sys/livegraph/test/livegraph_test_propotion.cpp b/sys/livegraph/test/livegraph_test_propotion.cpp new file mode 100644 index 0000000..3ec78e3 --- /dev/null +++ b/sys/livegraph/test/livegraph_test_propotion.cpp @@ -0,0 +1,334 @@ +#include "livegraph_test.h" +#include + +template +void add_edges(graph *GA, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + auto tx1 = GA->begin_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + VertexDictionary->find(accessor1, new_srcs[pos]); + VertexDictionary->find(accessor2, new_dests[pos]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + int w = 1;string_view weight { (char*) &w, sizeof(w) }; + tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); + tx1.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + +// template +// void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ +// auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ +// for(int64_t pos = start, end = start + length; pos < end; pos++){ +// while(1){ +// try{ +// auto tx1 = GA->begin_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// VertexDictionary->find(accessor1, new_srcs[pos]); +// VertexDictionary->find(accessor2, new_dests[pos]); +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// int w = 1;string_view weight { (char*) &w, sizeof(w) }; +// tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); +// tx1.commit(); +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// while(1){ +// try{ +// auto tx2 = G->begin_read_only_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// if(VertexDictionary->find(accessor1, query_srcs[pos*9+n]) && VertexDictionary->find(accessor2, query_dests[pos*9+n])){ +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); +// } +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// } +// }; +// int64_t edges_per_thread = new_srcs.size() / num_threads; +// int64_t odd_threads = new_srcs.size() % num_threads; +// vector threads; +// int64_t start = 0; +// for(int thread_id = 0; thread_id < num_threads; thread_id ++){ +// int64_t length = edges_per_thread + (thread_id < odd_threads); +// threads.emplace_back(routine_insert_edges, thread_id, start, length); +// start += length; +// } +// for(auto& t : threads) t.join(); +// threads.clear(); +// } + + +// template +// void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ +// auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ +// for(int64_t pos = start, end = start + length; pos < end; pos++){ +// while(1){ +// try{ +// auto tx1 = GA->begin_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// VertexDictionary->find(accessor1, new_srcs[pos]); +// VertexDictionary->find(accessor2, new_dests[pos]); +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// int w = 1;string_view weight { (char*) &w, sizeof(w) }; +// tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); +// tx1.commit(); +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// for (int64_t n = 0; n < 9; n++){ +// while(1){ +// try{ +// auto tx2 = G->begin_read_only_transaction(); +// vertex_dictionary_t::const_accessor accessor1, accessor2; +// if(VertexDictionary->find(accessor1, query_srcs[pos*9+n]) && VertexDictionary->find(accessor2, query_dests[pos*9+n])){ +// lg::vertex_t internal_source_id = accessor1->second; +// lg::vertex_t internal_destination_id = accessor2->second; +// string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); +// } +// break; +// } +// catch (exception e){ +// continue; +// } +// } +// } +// } +// }; +// int64_t edges_per_thread = new_srcs.size() / num_threads; +// int64_t odd_threads = new_srcs.size() % num_threads; +// vector threads; +// int64_t start = 0; +// for(int thread_id = 0; thread_id < num_threads; thread_id ++){ +// int64_t length = edges_per_thread + (thread_id < odd_threads); +// threads.emplace_back(routine_insert_edges, thread_id, start, length); +// start += length; +// } +// for(auto& t : threads) t.join(); +// threads.clear(); +// } + + + +template +void insert_read(graph *GA, vector &new_srcs, vector &new_dests, vector &query_srcs, vector &query_dests,int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests, &query_srcs, &query_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + for (int64_t n = 0; n < 9; n++){ + while(1){ + try{ + auto tx1 = GA->begin_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + VertexDictionary->find(accessor1, new_srcs[pos*9+n]); + VertexDictionary->find(accessor2, new_dests[pos*9+n]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + int w = 1;string_view weight { (char*) &w, sizeof(w) }; + tx1.put_edge(internal_source_id, /* label */ 0, internal_destination_id, weight); + tx1.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + while(1){ + try{ + auto tx2 = G->begin_read_only_transaction(); + vertex_dictionary_t::const_accessor accessor1, accessor2; + if(VertexDictionary->find(accessor1, query_srcs[pos]) && VertexDictionary->find(accessor2, query_dests[pos])){ + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + string_view lg_weight = tx2.get_edge(internal_source_id, /* label */ 0, internal_destination_id); + } + break; + } + catch (exception e){ + continue; + } + } + + } + }; + int64_t edges_per_thread = query_srcs.size() / num_threads; + int64_t odd_threads = query_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + + + +template +void delete_edges(graph *GA, vector &new_srcs, vector &new_dests, int num_threads){ + auto routine_insert_edges = [GA, &new_srcs, &new_dests](int thread_id, uint64_t start, uint64_t length){ + for(int64_t pos = start, end = start + length; pos < end; pos++){ + while(1){ + try{ + vertex_dictionary_t::const_accessor accessor1, accessor2; + auto tx3 = G->begin_transaction(); + VertexDictionary->find(accessor1, new_srcs[pos]); + VertexDictionary->find(accessor2, new_dests[pos]); + lg::vertex_t internal_source_id = accessor1->second; + lg::vertex_t internal_destination_id = accessor2->second; + tx3.del_edge(internal_source_id, /* label */ 0, internal_destination_id); + tx3.commit(); + break; + } + catch (exception e){ + continue; + } + } + } + }; + int64_t edges_per_thread = new_srcs.size() / num_threads; + int64_t odd_threads = new_srcs.size() % num_threads; + vector threads; + int64_t start = 0; + for(int thread_id = 0; thread_id < num_threads; thread_id ++){ + int64_t length = edges_per_thread + (thread_id < odd_threads); + threads.emplace_back(routine_insert_edges, thread_id, start, length); + start += length; + } + for(auto& t : threads) t.join(); + threads.clear(); +} + + + + +void batch_ins_del_read(commandLine& P){ + PRINT("=============== Batch Insert BEGIN ==============="); + + auto gname = P.getOptionValue("-gname", "none"); + auto thd_num = P.getOptionLongValue("-core", 1); + auto log = P.getOptionValue("-log","none"); + std::ofstream log_file(log, ios::app); + + // std::vector update_sizes = {10, 100, 1000 ,10000,100000,1000000, 10000000};//10, 100, 1000 ,10000,100000,1000000, 10000000 + std::vector update_sizes = {900000}; + std::vector update_sizes2 = {100000}; + auto r = random_aspen(); + auto update_times = std::vector(); + size_t n_trials = 1; + for (size_t us=0; usget_max_vertex_id() ); + double avg_insert = 0; + double avg_delete = 0; + double avg_read = 0; + std::cout << "Running batch size: " << update_sizes[us] << std::endl; + + if (update_sizes[us] < 10000000) + n_trials = 20; + else n_trials = 5; + size_t updates_to_run = update_sizes[us]; + size_t updates_to_run2 = update_sizes2[us]; + auto perm = get_random_permutation(updates_to_run); + for (size_t ts=0; tsget_max_vertex_id(); + new_srcs.clear(); + new_dests.clear(); + query_srcs.clear(); + query_dests.clear(); + + double a = 0.5; + double b = 0.1; + double c = 0.1; + size_t nn = 1 << (log2_up(GN) - 1); + auto rmat = rMat(nn, r.ith_rand(100+ts), a, b, c); + for( uint32_t i = 0; i < updates_to_run; i++) { + std::pair edge = rmat(i); + new_srcs.push_back(edge.first); + new_dests.push_back(edge.second); + } + + // generate random deges from new_srcs and new_dests + std::default_random_engine generator; + std::uniform_int_distribution distribution(0, new_srcs.size() - 1); + for (size_t i = 0; i < updates_to_run2; i++) { + size_t index = distribution(generator); + query_srcs.push_back(new_srcs[index]); + query_dests.push_back(new_dests[index]); + } + + // insert and read edge + gettimeofday(&t_start, &tzp); + insert_read(G, new_srcs, new_dests, query_srcs, query_dests, thd_num); + gettimeofday(&t_end, &tzp); + avg_insert += cal_time_elapsed(&t_start, &t_end); + + // del edge + gettimeofday(&t_start, &tzp); + delete_edges(G, new_srcs, new_dests, thd_num); + gettimeofday(&t_end, &tzp); + } + double time_i = (double) avg_insert / n_trials; + double insert_throughput = (updates_to_run+updates_to_run2) / time_i; + printf("batch_size = %zu, average insert: %f, throughput %e\n", updates_to_run, time_i, insert_throughput); + log_file<< gname<<","<