-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevent_consistency.repy
More file actions
executable file
·116 lines (107 loc) · 5.83 KB
/
event_consistency.repy
File metadata and controls
executable file
·116 lines (107 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#This program implements an eventual consistency algorithm for a simple distributed blackboard using Repy and
#uses the Seattle framework to run it in distributed vessels
#========================================================================================================
#mycontext implementation of global variables
mycontext['my_messages'] =[]
mycontext['post_message']=""
mycontext['port_number']=0
mycontext['my_clock'] = 1
mycontext['elapsed_time'] = ""
mycontext['board'] = ""
mycontext['clock_lock'] = getlock() # to lock the logical lock while updating which implements mutual exclusion
mycontext['board_lock'] = getlock() # to lock the board while posting and receiving messages
#mycontext['vessels']=['137.132.80.106','202.202.43.199','157.92.44.104']
#mycontext['vessels']=['137.132.80.106','202.202.43.199','157.92.44.104','192.33.90.67','130.104.72.213','128.10.18.53']
mycontext['vessels']=['137.132.80.106','202.202.43.199','157.92.44.104','192.33.90.67','130.104.72.213','128.10.18.53', '204.123.28.57','132.187.230.2','210.32.181.184']
#========================================================================================================
# function that returns the html page of the board
def response():
htmlresponse = """<html> <head>
<meta content="text/html; charset=utf-8" http-equiv="content-type">
<title>Blackboard Group 5</title> </head> <body>
<h4>BlackBoard</h4>
<p>%s</p>
<p>Elapsed time for consistency: %s</p>
<br>
<textarea rows="4" cols="50" name="comment" form="usrform"></textarea>
<form action="" id="usrform" method="post"> <input type="submit"> </form></body> </html>""" %(mycontext['board'],mycontext['elapsed_time'])
return htmlresponse
#========================================================================================================
def send_to_other_vessels(posted_comment):
try:
for vessel in mycontext['vessels']: # looping through the vessels
mycontext['clock_lock'].acquire() # Lock
if vessel !=getmyip(): # excluding the vessel which posting
sockobj=openconn(vessel,mycontext['port_number']) # open a TCP connection to all other vessels
sockobj.send(str(posted_comment))
sockobj.close()
mycontext['clock_lock'].release()
except Exception, e:
print " can't send message to vessels :"+str(e)
#========================================================================================================
def update_my_clock(sequence_num):
mycontext['clock_lock'].acquire() # Lock
if sequence_num>mycontext["my_clock"]: # update the my_clock
mycontext["my_clock"]+=1
else:
sequence_num +=1
mycontext['clock_lock'].release() # Unlock
#========================================================================================================
def sort_post():
mycontext['board']=""
if len(mycontext['my_messages']):
for i in range(len (mycontext['my_messages'])): # sort the posted comments using sequence_num
for j in range(len(mycontext['my_messages'])-1):
if mycontext['my_messages'][j]> mycontext['my_messages'][j+1]:
temp= mycontext['my_messages'][j+1]
mycontext['my_messages'][j+1]= mycontext['my_messages'][j]
mycontext['my_messages'][j]=temp
for messages in mycontext['my_messages']:
mycontext['board']=mycontext['board']+messages.split(':')[1]
#========================================================================================================
def update_board(message):
mycontext['board_lock'].acquire() # Lock
mycontext['my_messages'].append(message) # update the board
mycontext['board_lock'].release() # Unlock
#========================================================================================================
# the board function containing the GET and POST methods
def board(ip, port, sockobj, thiscommhandle, listencommhandle):
msgheader = sockobj.recv(1024) # Receive message
# React depending on message type: HTTP GET or POST, or some other type of communication.
if msgheader.startswith( 'GET' ):
sockobj.send("HTTP/1.1 200 OK\r\nContent-type: text/html\r\n" + \
"Content-length: %i\r\n\r\n%s" % (len(response()), response()))
stopcomm(thiscommhandle)
elif msgheader.startswith( 'POST' ):
# retrieve the part after the comment which is the
posted_comment= msgheader[msgheader.find('comment') + 8:]
mycontext['post_message'] = str(mycontext['my_clock'])+"-"+getmyip()+":"+posted_comment+'<br>'#msg containig the seq_number
update_board(mycontext['post_message'])
mycontext['elapsed_time']=str(getruntime())
mycontext['clock_lock'].acquire() # Lock
mycontext['my_clock']=mycontext['my_clock']+1
mycontext['clock_lock'].release() # Unlock
send_to_other_vessels(mycontext['post_message']) # broadcast to neighbours
sort_post()
sockobj.send("HTTP/1.1 200 OK\r\nContent-type: text/html\r\n" + \
"Content-length: %i\r\n\r\n%s" % (len(response()), response()))
stopcomm(thiscommhandle) # stop the current handler
else:
sequence_num=msgheader.strip().split('-')[0] # retrieve the sequence number of the message
update_my_clock(sequence_num) # update the logical clock
update_board(msgheader) # update the board
sort_post()
stopcomm(thiscommhandle)
# the main function
if callfunc == 'initialize':
if len(callargs) > 1:
raise Exception("too many call arguments")
# Running remotely and passing the port number as argument on the command line
elif len(callargs) == 1:
mycontext['port_number']= int(callargs[0])
ip = getmyip()
# Running locally: with local ipaddress of 127.0.0.1 and port number:12345
else:
port = 12345
ip = '127.0.0.1'
listencommhandle = waitforconn(ip,mycontext['port_number'],board)