-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvoting_score.py
More file actions
112 lines (82 loc) · 3.16 KB
/
voting_score.py
File metadata and controls
112 lines (82 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from datetime import datetime
import requests
import math
import sys
MAX_SCORE_PER_PROTOCOL = 100
TIMESTAMP_COEFFICIENT = 0.0000001
MIN_VOTE_SCORE = 0
MAX_VOTE_SCORE = 1
DIVERSITY_COEFFICIENT = 0.1
MIN_DIVERSITY_MULTIPLIER = 1
MAX_DIVERSITY_MULTIPLIER = 2
VERBOSE = True
def get_votes(address):
url = f"https://api.boardroom.info/v1/voters/{address}/votes"
votes = []
try:
response = requests.get(url)
data = response.json()
votes.extend(data["data"])
while "nextCursor" in data:
next_cursor = data["nextCursor"]
next_url = url + f"?cursor={next_cursor}"
response = requests.get(next_url)
data = response.json()
votes.extend(data["data"])
except Exception as err:
print(f"Error in getting votes of address: {address}")
print(err)
return votes
def decreasing_exponential_decay(C, k, t, b):
return C * math.exp(-k * t) + b
def increasing_exponential_decay(C, k, t, b):
return C * (1 - math.exp(-k * t)) + b
def time_weighted_score(now_timestamp, vote_timestamp):
timestamp_diff = now_timestamp - vote_timestamp
if timestamp_diff < 0:
return 0 # we don't support time travellers who have voted in the future!
vote_score = decreasing_exponential_decay(MAX_VOTE_SCORE, TIMESTAMP_COEFFICIENT, timestamp_diff, MIN_VOTE_SCORE)
return vote_score
def protocol_score(now_timestamp, votes_timestamps):
total_score = 0
for vote_timestamp in votes_timestamps:
vote_score = time_weighted_score(now_timestamp, vote_timestamp)
total_score += vote_score
return min(total_score, MAX_SCORE_PER_PROTOCOL)
def diversity_multiplier(protocols_count):
if protocols_count < 1:
return 0
multiplier = increasing_exponential_decay(
MAX_DIVERSITY_MULTIPLIER - MIN_DIVERSITY_MULTIPLIER,
DIVERSITY_COEFFICIENT,
protocols_count - 1, # the reason of -1 is to get the exact MIN_DIVERSITY_MULTIPLIER for 1 protocol
MIN_DIVERSITY_MULTIPLIER
)
return multiplier
def voting_score(address):
raw_votes = get_votes(address)
votes = {} # contains votes timestamps of each protocol seperately
for raw_vote in raw_votes:
protocol = raw_vote["protocol"]
timestamp = raw_vote["timestamp"]
if protocol not in votes:
votes[protocol] = []
votes[protocol].append(timestamp)
total_vote_scores = 0
now_timestamp = datetime.utcnow().timestamp() # use same now_timestamp for all votes for sake of consistency
for protocol in votes:
score = protocol_score(now_timestamp, votes[protocol])
if VERBOSE: print(f"{protocol}: {score}")
total_vote_scores += score
multiplier = diversity_multiplier(len(votes))
if VERBOSE: print(f"Diversity Multiplier: {multiplier}")
final_score = total_vote_scores * multiplier
return final_score
if __name__ == "__main__":
addresses = sys.argv[1:]
if len(addresses) < 1:
print("input address as an argument")
for address in addresses:
print(f"Address: {address}")
score = voting_score(address)
print(f"Voting Score: {round(score, 2)}\n")