-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDistributedQuorum.repy
More file actions
190 lines (178 loc) · 8.12 KB
/
DistributedQuorum.repy
File metadata and controls
190 lines (178 loc) · 8.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#Quorum:consistency through voting based protocol
#Declaration of global variables
mycontext['vessels'] = ['193.196.39.10','216.48.80.12','203.178.133.2','129.93.229.138','134.121.64.4','131.179.150.70','128.223.8.113','139.19.158.225']
# Logical Clock
mycontext['lclock'] = 0
mycontext['boardContent']=''
mycontext['stoplock'] = getlock()
mycontext['port'] = 0
if getmyip() in mycontext['vessels']:
mycontext['vessels'].remove(getmyip())
# produce and send HTML response. It accepts dynamic board content as an argument
def renderHTML(sockobj,message):
htmlresponse = """<html><head><meta content="text/html; charset=utf-8" http-equiv="content-type">
<title>Blackboard GroupName 5</title></head><body><h2> Board Content</h2><br>
""" + message + """<h3>Submit to board</h3><textarea rows="4" cols="50" name="comment" form="usrform"></textarea>
<form action="" id="usrform" method="post"><input type="submit"></form> </body></html>"""
sockobj.send("HTTP/1.1 200 OK\r\nContent-type: text/html\r\n" + \
"Content-length: %i\r\n\r\n%s" % (len(htmlresponse), htmlresponse))
# choose random N/2 Nodes and return their IP addresses as a list
def SelectNodes_Randomly():
selected_nodeslist = []
for counter in range(0,len(mycontext['vessels'])/2):
rand_index= int(randomfloat()*len(mycontext['vessels']))
while(mycontext['vessels'][rand_index] in selected_nodeslist or mycontext['vessels'][rand_index] == getmyip()):
rand_index = int(randomfloat()*len(mycontext['vessels']))
selected_nodeslist.append(mycontext['vessels'][rand_index])
return selected_nodeslist
# Read board content, this function accepts the ip addresses of selected nodes (N/2)
# and tries to open TCP socket with each node and read its content (one at a time)
# returns the board content of each node contacted as a node
def readBoard_content(nodes):
# empty list to contain board content
boardlist = []
for node in nodes:
try:
# try establishing socket connection with nodes one at a time
socketobject = openconn(node,mycontext['port'],timeout=20)
#send read request (RREQUEST)
socketobject.send("RREQUEST")
board = socketobject.recv(1024)
# after recieving, append the board content to a list and close the socket
boardlist.append(board)
socketobject.close()
except:
print 'Exception in readBoard'
continue
return boardlist
# This function is the major role player in choosing the latest node
# It accepts the list of boards read from each node contacts
def chooseLatestBoard_content(boardlist):
timestamp = []
# first check if there is enough data (N/2) to be read otherwise comparsion is irrelevant
if len(boardlist) != len(mycontext['vessels'])/2:
return "failure"
# first chop off the time stamp of each board and put them in one list
for fboard in boardlist:
timestamp.append(fboard.split('-')[0])
# compute the latest time stamp
maxstamp = int(max(timestamp))
# find out the board conent with this latest time stamp
for fboard in boardlist:
cstamp = int(fboard.split('-')[0])
if cstamp == maxstamp:
chosenboard = fboard
break
# if the latest time stamp that is got from the board is greater than the local logical clock,
# update this clock by the one got from the boards
# In this case there is also a need to update its own board to the one got from external node
if maxstamp > mycontext['lclock']:
mycontext['lclock'] = maxstamp
chosenboard = chosenboard.split('-')[1]
mycontext['boardContent'] = chosenboard
return "success"
# This function is crucial for writing process
# it takes the message to be written as an argument
def lockandwrite(message):
# First choose N/2 nodes randomly and update nodes
nodes = SelectNodes_Randomly()
# Read the contents of these N/2 nodes first and update to the latest. This is achieved by calling
# the readBoard_content() function.
# Then choose the latest board, which is achieved by the chooseLatestBoard_content() function
if chooseLatestBoard_content(readBoard_content(nodes)) == "fail":
return
# prepare message to write which has the form of "current timestamp (logical clock)" + delimiter + "board content
messageTOSend = str(mycontext['lclock'] + 1) + '-' + mycontext['boardContent'] + message
# sort nodes by their IP addresses before contacting to aviod deadlock
nodes.sort()
# update local board and clock as well (N/2) + 1, which makes it N/2 + 1
mycontext['boardContent'] += message
mycontext['lclock'] += 1
try:
for node in nodes:
socketobject = openconn(node,mycontext['port'],timeout=20)
# Send remote lock request
socketobject.send("LOCK")
# wait to recieve acknowldegement of locking
ack = socketobject.recv(1024)
# if locking acknowledged, send the prepared messafe
if ack == "LOCK_ACK":
socketobject.send(messageTOSend)
# wait for update acknowledge messagev
ack = socketobject.recv(1024)
# if update acknowledged, close the socket with this node
if ack == "UPD":
socketobject.send("UNLOCK")
socketobject.close()
except:
# if write process fails generate exception and fail
print 'Exception in lockandwrite'
return False
return True
def board(ip, port, sockobj, thiscommhandle, listencommhandle):
try:
msgheader = sockobj.recv(1024) # Receive message,
except:
return
# React depending on message type: HTTP GET or POST, or some other type of communication.
if msgheader.startswith( 'GET' ):
#choose nodes, read, choose latest board and render it in HTML message
chooseLatestBoard_content(readBoard_content(SelectNodes_Randomly()))
renderHTML(sockobj,mycontext['boardContent'])
stopcomm(thiscommhandle)
elif msgheader.startswith( 'POST' ):
# lock since this operation has to exclusively access the global variables
mycontext['stoplock'].acquire()
# Extract message
loc = msgheader.index('comment=')
mesg = msgheader[loc+8:] + "<br>"
# call lockandwrite function to write to N/2 + 1 nodes
# details of this function documented in the function definition
lockandwrite(mesg)
# after writing, reder HTML with the latest contails
renderHTML(sockobj,mycontext['boardContent'])
stopcomm(thiscommhandle)
mycontext['stoplock'].release()
elif msgheader.startswith( 'RREQUEST' ):
# if a ReadRequest comes, then prepare message to send which is current time stamp + delimiter + board content
localb = str(mycontext['lclock']) + "-" + mycontext['boardContent']
sockobj.send(localb)
sockobj.close()
elif msgheader.startswith( 'LOCK' ):
# if lock request comes, try locking
# here the non-blocking form of the acquire function is used to avoid indefinite blocking
# if locking successful, send back acknlowedgement
if mycontext['stoplock'].acquire(False) is True:
try:
sockobj.send('LOCK_ACK')
# wait for further message after locking
newboard = sockobj.recv(1024)
# when getting the board content and clock, update the clock and the board
mycontext['boardContent'] = newboard.split('-')[1]
mycontext['lclock'] = int(newboard.split('-')[0])
# send acknowlegement back
sockobj.send('UPD')
# wait for unlock message and release the lock
ack = sockobj.recv(1024)
mycontext['stoplock'].release()
# socket is closed whether locking is successful or not
except:
# if exception happens, release the lock and leave
mycontext['stoplock'].release()
sockobj.close()
else:
stopcomm(thiscommhandle)
if callfunc == 'initialize':
if len(callargs) > 1:
raise Exception("Too many call arguments")
# Running remotely (assuming that we pass input argument only remotely):
# whenever this vessel gets a connection on its IPaddress:Clearinghouseport it'll call function board
elif len(callargs) == 1:
mycontext['port'] = int(callargs[0])
ip = getmyip()
# Running locally
# whenever we get a connection on 127.0.0.1:12345 we'll call board
else:
mycontext['port'] = 12345
ip = '127.0.0.1'
listencommhandle = waitforconn(ip,mycontext['port'],board)