forked from Abhiroop-tales/ACCORD-WWW24Demo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetection.py
More file actions
193 lines (161 loc) · 6.43 KB
/
detection.py
File metadata and controls
193 lines (161 loc) · 6.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from abc import abstractmethod
from datetime import datetime
class ConstraintNode:
@abstractmethod
def add_constraint(self, constraint):
'''Add a constraint, initializing any child nodes
Args:
constraint: List[str], constraint list
'''
pass
@abstractmethod
def check(self, activity):
'''Determine if activity is a conflict based on info in this node and children
Args:
activity: Activity
'''
pass
class DocumentNode(ConstraintNode):
def __init__(self, constraint=None):
self.constraints = {}
if constraint:
self.add_constraint(constraint)
def add_constraint(self, constraint):
if constraint[1] not in self.constraints:
self.constraints[constraint[1]] = ActionNode(constraint)
else:
self.constraints[constraint[1]].add_constraint(constraint)
def check(self, activity):
if activity.doc_id in self.constraints:
return self.constraints[activity.doc_id].check(activity)
else:
return False
class ActionNode(ConstraintNode):
def __init__(self, constraint=None):
self.constraints = {}
if constraint:
self.add_constraint(constraint)
def add_constraint(self, constraint):
constraint_type = constraint[3]
# Convert time limit edit constraints to edit constraints for backward compatibility
if constraint_type == "Time Limit Edit":
constraint_type = "Can Edit"
if constraint_type not in self.constraints:
self.constraints[constraint_type] = ActorNode(constraint)
else:
self.constraints[constraint_type].add_constraint(constraint)
def check(self, activity):
if activity.actiontype in self.constraints:
return self.constraints[activity.actiontype].check(activity)
else:
return False
class ActorNode(ConstraintNode):
def __init__(self, constraint=None):
self.constraints = {}
if constraint:
self.add_constraint(constraint)
def add_constraint(self, constraint):
if constraint[4] not in self.constraints:
self.constraints[constraint[4]] = ConditionNode(constraint)
else:
self.constraints[constraint[4]] = ConditionNode(constraint)
def check(self, activity):
if activity.actor in self.constraints:
return self.constraints[activity.actor].check(activity)
else:
return False
class ConditionNode(ConstraintNode):
def __init__(self, constraint=None):
self.conditions = []
if constraint:
self.add_constraint(constraint)
def add_constraint(self, constraint):
comparator = constraint[6]
values = constraint[8].split(",")
values = [v for v in values if v and v != '-'] # Remove empty strings
if comparator and (constraint[3] == "Can Edit" or constraint[3] == "Time Limit Edit"):
values = [datetime.fromisoformat(v) for v in values]
self.conditions.append([comparator, values])
def check(self, activity):
for condition in self.conditions:
comparator = condition[0]
true_values = condition[1]
if not comparator:
return True
if comparator == "not in":
if activity.trueValue not in true_values:
return True
if comparator == "in":
if activity.trueValue in true_values:
return True
if comparator == "gt":
for val in true_values:
if activity.trueValue > val:
return True
if comparator == "lt":
for val in true_values:
if activity.trueValue < val:
return True
return False
class Activity:
'''Data associated with an event activity
Attributes:
actiontype: str, action type
doc_id: str
actor: str, actor email
trueValue: str | None, time for edit or target user for permission changes
'''
def __init__(self, log):
'''Initialize Activity attributes
Args:
log: List[str], line from logs describing events
'''
self.doc_id = log[2]
self.actor = log[5]
action = log[1]
if action[0:3] == "Per":
action_details = action.split("-")
new_permission = action_details[1].split(':')[1]
previous_permission = action_details[2].split(':')[1]
self.trueValue = action_details[3].split(':')[1]
if new_permission == "none":
self.actiontype = "Remove Permission"
elif previous_permission == "none":
self.actiontype = "Add Permission"
else:
self.actiontype = "Update Permission"
else:
self.actiontype = "Can " + action
if self.actiontype == "Can Edit":
self.trueValue = datetime.fromisoformat(log[0]) # Activity time
else:
self.trueValue = None
class ConflictDetectionEngine:
'''Store action constraints and check lists of activities against them
Attributes:
constraint_tree: ConstraintNode
'''
def __init__(self, action_constraints=[]):
'''Initialize internal data structures and store constraints'''
self.constraint_tree = DocumentNode()
self.load_constraints(action_constraints)
def load_constraints(self, action_constraints):
'''Parse and store an additional list of constraints'''
for constraint in action_constraints:
self.constraint_tree.add_constraint(constraint)
def check_conflicts(self, activities):
'''Flag which activities are conflicts using stored constraints'''
results = []
for activity in activities:
results.append(self.constraint_tree.check(Activity(activity)))
return results
def detectmain(logdata, action_constraints):
'''Detect which activities in logs are conflicts.
Args:
logdata: List[List[str]], activity descriptions in log format
action_constraints: List[List[str]], action constraints
Returns: list of booleans equal in length to logdata, indicating if each
activity was a conflict
'''
engine = ConflictDetectionEngine(action_constraints)
return engine.check_conflicts(logdata)