-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconcurrent_stack.cpp
More file actions
251 lines (212 loc) · 6.26 KB
/
concurrent_stack.cpp
File metadata and controls
251 lines (212 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#include <cassert>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <filesystem>
#include <iostream>
#include <random>
#include <string>
#include <omp.h>
#include <atomic>
using std::cout;
using std::endl;
using std::string;
using std::chrono::duration_cast;
using HR = std::chrono::high_resolution_clock;
using HRTimer = HR::time_point;
using std::chrono::microseconds;
using std::chrono::milliseconds;
using std::filesystem::path;
using namespace std;
static constexpr uint64_t RANDOM_SEED = 42;
static const uint32_t bucket_count = 1000;
static constexpr uint64_t MAX_OPERATIONS = 1e+15;
static const uint32_t SENTINEL_KEY = 0;
static const uint32_t SENTINEL_VALUE = 0;
static const uint32_t PROBING_RETRIES = (1 << 20);
static const uint32_t TOMBSTONE_KEY = UINT32_MAX;
#define NUM_THREADS 4
inline uint64_t packKeyValue(uint32_t key, uint32_t val) {
return (static_cast<uint64_t>(key) << 32) |
(static_cast<uint32_t>(val) & 0xFFFFFFFF);
}
// Function to unpack a 64-bit integer into two 32-bit integers
inline void unpackKeyValue(uint64_t value, uint32_t& key, uint32_t& val) {
key = static_cast<uint32_t>(value >> 32);
val = static_cast<uint32_t>(value & 0xFFFFFFFF);
}
void create_file(path pth, const uint32_t* data, uint64_t size) {
FILE* fptr = NULL;
fptr = fopen(pth.string().c_str(), "wb+");
fwrite(data, sizeof(uint32_t), size, fptr);
fclose(fptr);
}
/** Read n integer data from file given by pth and fill in the output variable
data */
void read_data(path pth, uint64_t n, uint32_t* data) {
FILE* fptr = fopen(pth.string().c_str(), "rb");
string fname = pth.string();
if (!fptr) {
string error_msg = "Unable to open file: " + fname;
perror(error_msg.c_str());
}
int freadStatus = fread(data, sizeof(uint32_t), n, fptr);
if (freadStatus == 0) {
string error_string = "Unable to read the file " + fname;
perror(error_string.c_str());
}
fclose(fptr);
}
class Node {
public:
uint32_t value;
Node* next;
Node(uint32_t value){
this->value = value;
this->next = nullptr;
}
};
class Stack {
public:
atomic<Node*> top; // How to set it to nullptr?
bool tryPush(Node* node){
Node* oldTop = top.load();
node->next = oldTop;
// return top.compare_exchange_weak(oldTop, node);
return top.compare_exchange_strong(oldTop, node);
}
void push(uint32_t value){
Node* node = new Node(value);
while(!tryPush(node)); // may implement backoff instead of busy-waiting
}
Node* tryPop(){
Node* oldTop = top.load();
if(!oldTop){
return new Node(-1);
}
Node* newTop = oldTop->next;
// return top.compare_exchange_weak(oldTop, newTop) ? oldTop : nullptr;
return top.compare_exchange_strong(oldTop, newTop) ? oldTop : nullptr;
}
int pop(){
Node* node;
while(!(node = tryPop())); // may implement backoff instead of busy-waiting
return node->value;
}
void print(){
int value;
printf("top");
while((value = pop()) >= 0){
printf(" -> %d", value);
}
printf("\n");
}
};
Stack stk;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void validFlagsDescription() {
cout << "add: specify number of push operations\n";
cout << "rns: the number of iterations\n";
cout << "rem: specify number of pop operations\n";
}
// These variables may get overwritten after parsing the CLI arguments
/** total number of insert queries */
uint64_t INSERT = 1e7;
/** total number of delete queries */
uint64_t DELETE = 5e6;
/** number of iterations */
uint64_t runs = 1;
// Code snippet to parse command line flags and initialize the variables
int parse_args(char* arg) {
string s = string(arg);
string s1;
uint64_t val;
try {
s1 = s.substr(0, 4);
string s2 = s.substr(5);
val = stol(s2);
} catch (...) {
cout << "Supported: " << std::endl;
cout << "-*=[], where * is:" << std::endl;
validFlagsDescription();
return 1;
}
if (s1 == "-add") {
INSERT = val;
} else if (s1 == "-rns") {
runs = val;
} else if (s1 == "-rem") {
DELETE = val;
} else {
std::cout << "Unsupported flag:" << s1 << "\n";
std::cout << "Use the below list flags:\n";
validFlagsDescription();
return 1;
}
return 0;
}
void* func(void* arg){
pthread_t tid = pthread_self();
const char* operation;
uint32_t p = rand() % 10;
int value;
if(p < 5){
operation = "pushed";
value = rand() % 100;
stk.push(value);
} else {
operation = "popped";
value = stk.pop();
}
pthread_mutex_lock(&lock);
printf("tid = %lu %s %d\n", tid, operation, value);
pthread_mutex_unlock(&lock);
return nullptr;
}
void operations(uint32_t ADD, uint32_t NUM_OPS, uint32_t *keys){
omp_set_num_threads(NUM_THREADS);
#pragma omp parallel for
for(uint32_t i = 0; i < NUM_OPS; i++)
{
int val;
if (i < ADD) stk.push(val);
else val = stk.pop();
}
}
int main(int argc, char* argv[]) {
for (int i = 1; i < argc; i++) {
int error = parse_args(argv[i]);
if (error == 1) {
cout << "Argument error, terminatih_keys_delng run.\n";
exit(EXIT_FAILURE);
}
}
uint64_t ADD = INSERT;
uint64_t REM = DELETE;
uint64_t NUM_OPS = ADD + REM;
cout << "NUM OPS: " << NUM_OPS << " ADD: " << ADD << " REM: " << REM << "\n";
auto* h_keys_insert = new uint32_t[ADD];
memset(h_keys_insert, 0, sizeof(uint32_t) * ADD);
// Use shared files filled with random numbers
path cwd = std::filesystem::current_path();
path path_insert_keys = cwd / "random_keys_insert.bin";
assert(std::filesystem::exists(path_insert_keys));
// Read data from file
read_data(path_insert_keys, ADD, h_keys_insert);
// Max limit of the uint32_t: 4,294,967,295
std::mt19937 gen(RANDOM_SEED);
std::uniform_int_distribution<uint32_t> dist_int(1, NUM_OPS);
float total_time = 0.0F;
HRTimer start, end;
for (int i = 0; i < runs; i++) {
start = HR::now();
operations(ADD, NUM_OPS, h_keys_insert);
end = HR::now();
float iter_time = duration_cast<milliseconds>(end - start).count();
total_time += iter_time;
}
cout << "Time taken by all operations (ms): " << total_time / runs << "\n";
delete[] h_keys_insert;
return EXIT_SUCCESS;
}