-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmaster.py
More file actions
executable file
·185 lines (168 loc) · 5.58 KB
/
master.py
File metadata and controls
executable file
·185 lines (168 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python
"""
The master program for CS5414 three phase commit project.
"""
import sys
import subprocess
import time
from threading import Thread, Lock
from socket import SOCK_STREAM, socket, AF_INET
address = 'localhost'
threads = {}
msgs = {}
ack_lock = Lock()
acked_list = {}
wait_for_ack_list = {}
wait_for_ack = False
wait_chat_log = False
class ClientHandler(Thread):
def __init__(self, index, address, port):
Thread.__init__(self)
self.daemon = True
self.index = index
self.sock = socket(AF_INET, SOCK_STREAM)
self.sock.connect((address, port))
self.buffer = ""
self.valid = True
def run(self):
global threads, wait_chat_log, wait_for_ack
while self.valid:
if "\n" in self.buffer:
(l, rest) = self.buffer.split("\n", 1)
self.buffer = rest
s = l.split()
if len(s) < 2:
continue
if s[0] == 'ack':
mid = int(s[1])
ack_lock.acquire()
acked_list[mid] = True
if mid in wait_for_ack_list:
del wait_for_ack_list[mid]
if len(wait_for_ack_list) == 0:
wait_for_ack = False
ack_lock.release()
elif s[0] == 'chatLog':
chatLog = s[1]
print chatLog
wait_chat_log = False
else:
print 'WRONG MESSAGE:', s
else:
try:
data = self.sock.recv(1024)
#sys.stderr.write(data)
self.buffer += data
except:
print sys.exc_info()
self.valid = False
del threads[self.index]
self.sock.close()
break
def send(self, s):
if self.valid:
self.sock.send(str(s) + '\n')
def close(self):
try:
self.valid = False
self.sock.close()
except:
pass
def send(index, data, set_wait=False):
global threads, wait_chat_log
while wait_chat_log:
time.sleep(0.01)
pid = int(index)
if set_wait:
wait_chat_log = True
threads[pid].send(data)
def exit(exit=False):
global threads, wait_chat_log
wait = wait_chat_log
wait = wait and (not exit)
while wait:
time.sleep(0.01)
wait = wait_chat_log
time.sleep(2)
for k in threads:
threads[k].close()
subprocess.Popen(['./stopall'], stdout=open('/dev/null'), stderr=open('/dev/null'))
time.sleep(0.1)
sys.exit(0)
def timeout():
time.sleep(120)
exit(True)
def main():
global threads, wait_chat_log
global wait_for_ack
timeout_thread = Thread(target = timeout, args = ())
timeout_thread.daemon = True
timeout_thread.start()
while True:
line = ''
try:
line = sys.stdin.readline()
except: # keyboard exception, such as Ctrl+C/D
exit(True)
if line == '': # end of a file
exit()
line = line.strip() # remove trailing '\n'
if line == 'exit': # exit when reading 'exit' command
if wait_for_ack: # waitForAck wait_for_acks these commands
time.sleep(2)
if wait_for_ack:
ack_lock.acquire()
to_resend = wait_for_ack_list.copy()
ack_lock.release()
for m in to_resend:
if to_resend[m] >= 0:
send(to_resend[m], msgs[m])
while wait_for_ack:
time.sleep(0.1)
exit()
sp1 = line.split(None, 1)
sp2 = line.split()
if len(sp1) != 2: # validate input
continue
pid = int(sp2[0]) # first field is pid
cmd = sp2[1] # second field is command
if cmd == 'waitForAck':
mid = int(sp2[2])
ack_lock.acquire()
if mid not in acked_list:
wait_for_ack = True
wait_for_ack_list[mid] = pid
ack_lock.release()
elif cmd == 'start':
port = int(sp2[3])
subprocess.Popen(['./process', str(pid), sp2[2], sp2[3]], stdout=open('/dev/null'), stderr=open('/dev/null'))
# sleep for a while to allow the process be ready
time.sleep(1)
# connect to the port of the pid
handler = ClientHandler(pid, address, port)
threads[pid] = handler
handler.start()
else:
if wait_for_ack: # waitForAck wait_for_acks these commands
time.sleep(2)
if wait_for_ack:
ack_lock.acquire()
to_resend = wait_for_ack_list.copy()
ack_lock.release()
for m in to_resend:
if to_resend[m] >= 0:
send(to_resend[m], msgs[m])
while wait_for_ack:
time.sleep(0.1)
if cmd == 'msg': # message msgid msg
msgs[int(sp2[2])] = sp1[1]
send(pid, sp1[1])
elif cmd[:5] == 'crash': # crashXXX
send(pid, sp1[1])
elif cmd == 'get': # get chatLog
while wait_chat_log: # get command blocks next get command
time.sleep(0.1)
time.sleep(1)
send(pid, sp1[1], set_wait=True)
if __name__ == '__main__':
main()