-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathnode.c
More file actions
155 lines (125 loc) · 4.46 KB
/
node.c
File metadata and controls
155 lines (125 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include "node.h"
struct node *init_node(int32_t id, uint16_t *edges, uint32_t *socks, uint8_t num,
FILE *globallog) {
struct node *newnode;
char logmsg[60];
/*Initialize the node's memory, ID and global log pointer*/
newnode = (struct node*) malloc(sizeof(struct node));
newnode->globallog = globallog;
newnode->id = id;
/*And initialize its message queue*/
newnode->queue = init_queue();
/*log beginning of execution*/
snprintf(logmsg, 60, "Node %d has begun executing!", id);
log_msg(logmsg, globallog);
/*allocate and initialize edge list, with proper weight/socket pairs*/
uint8_t i;
newnode->neighs = init_neighs();
for (i = 0; i < num; i++) {
if(edges[id*num + i]) {
uint16_t weight, socket;
weight = edges[id*num + i];
socket = socks[id*num + i];
add_edge(newnode->neighs, weight, socket);
}
}
/*log edge initialization*/
snprintf(logmsg, 60, "Node %d has finished computing edges!", id);
log_msg(logmsg, globallog);
/*initialize local log file, named after the node's ID*/
char logfilename[7];
memset(logfilename, 0, 7);
snprintf(logfilename, 7, "%d.log", id);
newnode->log = fopen(logfilename, "w");
setbuf(newnode->log, NULL);
return newnode;
}
void run_node(struct node *node, void(*algo) (struct node *node)) {
uint32_t num_neighs = node->neighs->num;
struct thread_data tdata[num_neighs];
pthread_t tids[num_neighs];
uint32_t i;
char logmsg[60];
srand(time(NULL));
/*start message-receiving threads for each of the node's sockets*/
struct edge *aux = node->neighs->head;
for (i = 0; i < num_neighs; i++) {
tdata[i].sock = aux->sock;
tdata[i].queue = node->queue;
pthread_create(&tids[i],NULL,receiver_thread,(void*)&tdata[i]);
snprintf(logmsg, 60, "Node %d now has thread %li receiving on fd %d",
node->id, tids[i], tdata[i].sock);
log_msg(logmsg, node->log);
aux = aux->next;
}
/*sleep for a random amount of time so all nodes don't start simultaneously*/
randsleep();
/*log beginning of node execution and start algorithm*/
snprintf(logmsg, 60, "Node %d is beginning algorithm execution!", node->id);
log_msg(logmsg, node->log);
algo(node);
/*log node's execution finish*/
snprintf(logmsg, 50, "Node %d has finished algorithm execution!", node->id);
log_msg(logmsg, node->log);
/*terminate all of the node's receiving threads (we can't simply join them
because they run forever, so we forcibly terminate them first)*/
for (i = 0; i < num_neighs; i++) {
pthread_cancel(tids[i]);
pthread_join(tids[i], NULL);
}
}
void log_msg(char *msg, FILE *logfile) {
struct timeval tv;
struct tm *tm_info;
uint32_t ms;
char timestamp[15], hms[10];
/*get time of day in secs and ms/usecs*/
gettimeofday(&tv, NULL);
/*round up to the nearest second, if needed*/
ms = lrint(tv.tv_usec/1000.0);
if (ms >= 1000) {
ms -= 1000;
tv.tv_sec++;
}
/*convert secs to local time*/
tm_info = localtime(&tv.tv_sec);
/*concatenate localtime + milliseconds to final timestamp*/
strftime(hms, 10, "%H:%M:%S", tm_info);
snprintf(timestamp, 15, "%s.%03d", hms, ms);
/*print given log message with the appropriate timestamp*/
fprintf(logfile, "[%s] %s\n", timestamp, msg);
}
void free_node(struct node *node) {
char logmsg[60];
/*log the node's inevitable demise*/
snprintf(logmsg, 60, "Node %d says so long, and thanks for all the fish!",
node->id);
log_msg(logmsg, node->globallog);
/*close its fds and free its memory*/
fclose(node->log);
free_neighs(node->neighs);
free_queue(node->queue);
free(node);
}
void *receiver_thread(void *thread_data) {
/*retrieve thread data and initialize structures*/
struct thread_data *data = (struct thread_data *) thread_data;
struct msgqueue *queue = data->queue;
uint32_t sock = data->sock;
/*message buffers*/
uint8_t msg[50];
uint32_t len;
/*receive messages indefinitely, and insert them in the node's queue*/
while (1) {
memset(msg, 0, 50);
if ((len = recv(sock, msg, 50, 0)) > 0) {
randsleep();
enqueue(queue, msg, len);
}
}
}
void randsleep() {
long long int usec;
usec = rand() % 3500000L;
usleep(usec);
}