-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmapper.cpp
More file actions
115 lines (94 loc) · 3.22 KB
/
mapper.cpp
File metadata and controls
115 lines (94 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include "mapper.h"
// Mapper Class Constructor
template <typename K, typename V>
Mapper<K, V>::Mapper(unordered_map<K, list<V>> *map_, priority_queue<InputFile, vector<InputFile>, CompareFile> *filesQueue_, pthread_mutex_t *mutex_, pthread_barrier_t *barrier_, int nrOfReducers_, FUNC f_) {
map = map_;
filesQueue = filesQueue_;
mutex = mutex_;
barrier = barrier_;
nrOfReducers = nrOfReducers_;
f = f_;
}
// Mapper Class Destructor
template <typename K, typename V>
Mapper<K, V>::~Mapper() {}
// Method to execute the Mapper
template <typename K, typename V>
void Mapper<K, V>::execute() {
// Initialize partial-lists for Reducers
for (int i = 0; i < nrOfReducers; i++) {
list<V> list;
map->insert({i, list});
}
// While PQ have files for processing
while (!filesQueue->empty()) {
// Critic zone for threads
// (only one can get a file at a point of time)
int r = pthread_mutex_lock(mutex);
if (r) {
printf("ERROR: Mutex lock failed!\n");
exit(-1);
}
// Getting a file from PQ
string filename;
bool emptyQueue = true;
if (!filesQueue->empty()) {
filename = filesQueue->top().name;
filesQueue->pop();
emptyQueue = false;
}
r = pthread_mutex_unlock(mutex);
if (r) {
printf("Eroare la deblocarea mutex-ului");
exit(-1);
}
// End of critic zone
// If PQ is empty exit from function
if (emptyQueue) {
return;
}
// Read from input file and apply function
ifstream file (filename);
if (file.is_open()) {
int inputs;
string buffer;
// Get number of lines
getline(file, buffer);
stringstream data1(buffer);
data1 >> inputs;
// For every element of file execute the f on it
for (int i = 0; i < inputs; i++) {
if (!getline (file,buffer))
break;
// Get the element from the input
stringstream data2(buffer);
V element;
data2 >> element;
// Apply function on elements for every type of reducer
for (int j = 0; j < nrOfReducers; j++) {
bool result = f((void *)&element, (void *)&j);
// If the element is valid, we put it in a partial-list
if (result) {
auto it = map->find(j);
it->second.push_back(element);
}
}
}
file.close();
} else {
cout << "ERROR: Unable to open " << filename << "\n";
}
}
}
// Static method to execute the Mapper from a thread
template <typename K, typename V>
void* Mapper<K, V>::pthread_execute(void *arg) {
// Get instance of Mapper from thread
Mapper<K, V> *mapper = (Mapper<K, V>*)arg;
// Execute the Mapper
mapper->execute();
// Wait all the Mappers to finish the work and all Reducers to start
pthread_barrier_wait(mapper->barrier);
// Exit from the thread
pthread_exit(NULL);
}