forked from Nikhil-Jones/Fake-News-Detector
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgraph.py
More file actions
221 lines (162 loc) · 7.84 KB
/
graph.py
File metadata and controls
221 lines (162 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from collections import defaultdict, deque
import random
from datetime import datetime, timedelta
class GraphNode:
def __init__(self, node_id, node_type="user", metadata=None):
self.id = node_id
self.type = node_type
self.metadata = metadata or {}
self.timestamp = datetime.now()
self.connections = set()
def add_connection(self, other_node_id):
self.connections.add(other_node_id)
def get_connection_count(self):
return len(self.connections)
class SpreadAnalysisGraph:
def __init__(self):
self.nodes = {}
self.edges = defaultdict(list)
self.posts = {}
self.users = {}
def add_node(self, node_id, node_type="user", metadata=None):
node = GraphNode(node_id, node_type, metadata)
self.nodes[node_id] = node
if node_type == "user":
self.users[node_id] = metadata or {}
elif node_type == "post":
self.posts[node_id] = metadata or {}
def add_edge(self, from_node, to_node, edge_type="shares"):
if from_node in self.nodes and to_node in self.nodes:
self.edges[from_node].append((to_node, edge_type))
self.nodes[from_node].add_connection(to_node)
def simulate_news_spread(self, post_id, initial_users, spread_probability=0.3):
self.add_node(post_id, "post", {"content": "sample_post"})
for user_id in initial_users:
self.add_node(user_id, "user", {"activity_level": random.uniform(0.1, 1.0)})
self.add_edge(user_id, post_id, "shares")
current_sharers = initial_users.copy()
spread_rounds = 0
max_rounds = 10
while current_sharers and spread_rounds < max_rounds:
new_sharers = []
for sharer in current_sharers:
potential_sharers = [f"user_{i}" for i in range(1, 100) if f"user_{i}" not in self.users]
for potential in potential_sharers[:5]:
if random.random() < spread_probability:
self.add_node(potential, "user", {"activity_level": random.uniform(0.1, 1.0)})
self.add_edge(potential, post_id, "shares")
new_sharers.append(potential)
current_sharers = new_sharers
spread_rounds += 1
return self.analyze_spread_patterns(post_id)
def analyze_spread_patterns(self, post_id):
if post_id not in self.nodes:
return {"error": "Post not found"}
sharers = []
for user_id, connections in self.edges.items():
for connected_id, edge_type in connections:
if connected_id == post_id and edge_type == "shares":
sharers.append(user_id)
total_sharers = len(sharers)
spread_velocity = self._calculate_spread_velocity(post_id)
echo_chamber_score = self._detect_echo_chambers(sharers)
bot_likelihood = self._detect_bot_behavior(sharers)
return {
"total_sharers": total_sharers,
"spread_velocity": spread_velocity,
"echo_chamber_score": echo_chamber_score,
"bot_likelihood": bot_likelihood,
"suspicious_patterns": self._identify_suspicious_patterns(total_sharers, spread_velocity, echo_chamber_score, bot_likelihood)
}
def _calculate_spread_velocity(self, post_id):
sharers = [user_id for user_id, connections in self.edges.items()
if any(connected_id == post_id and edge_type == "shares"
for connected_id, edge_type in connections)]
if len(sharers) <= 1:
return 0.0
time_span = random.uniform(0.1, 24.0)
velocity = len(sharers) / time_span
return velocity
def _detect_echo_chambers(self, sharers):
if len(sharers) < 3:
return 0.0
total_connections = 0
possible_connections = 0
for i, sharer1 in enumerate(sharers):
for sharer2 in sharers[i+1:]:
possible_connections += 1
if sharer1 in self.edges and any(connected_id == sharer2 for connected_id, _ in self.edges[sharer1]):
total_connections += 1
if possible_connections == 0:
return 0.0
echo_score = total_connections / possible_connections
return echo_score
def _detect_bot_behavior(self, sharers):
if len(sharers) < 2:
return 0.0
bot_indicators = 0
numeric_ids = []
for sharer in sharers:
if sharer.startswith("user_") and sharer[5:].isdigit():
numeric_ids.append(int(sharer[5:]))
if len(numeric_ids) > 2:
numeric_ids.sort()
sequential_count = 0
for i in range(1, len(numeric_ids)):
if numeric_ids[i] - numeric_ids[i-1] == 1:
sequential_count += 1
if sequential_count > len(numeric_ids) * 0.5:
bot_indicators += 1
high_activity_count = 0
for sharer in sharers:
if sharer in self.users:
activity = self.users[sharer].get("activity_level", 0.5)
if activity > 0.8:
high_activity_count += 1
if high_activity_count > len(sharers) * 0.7:
bot_indicators += 1
return min(bot_indicators / 2.0, 1.0)
def _identify_suspicious_patterns(self, total_sharers, spread_velocity, echo_chamber_score, bot_likelihood):
patterns = []
if spread_velocity > 10:
patterns.append("Unusually fast spread velocity")
if echo_chamber_score > 0.7:
patterns.append("High echo chamber activity")
if bot_likelihood > 0.6:
patterns.append("Potential bot network detected")
if total_sharers > 50 and spread_velocity > 5:
patterns.append("Suspicious viral pattern")
return patterns
def get_spread_explanations(self, analysis):
explanations = []
if analysis.get("error"):
return [f"Error: {analysis['error']}"]
total_sharers = analysis.get("total_sharers", 0)
spread_velocity = analysis.get("spread_velocity", 0)
echo_chamber_score = analysis.get("echo_chamber_score", 0)
bot_likelihood = analysis.get("bot_likelihood", 0)
suspicious_patterns = analysis.get("suspicious_patterns", [])
explanations.append(f"Total sharers: {total_sharers}")
explanations.append(f"Spread velocity: {spread_velocity:.2f} shares/hour")
explanations.append(f"Echo chamber score: {echo_chamber_score:.2f}")
explanations.append(f"Bot likelihood: {bot_likelihood:.2f}")
if suspicious_patterns:
explanations.append("Suspicious patterns detected:")
for pattern in suspicious_patterns:
explanations.append(f" - {pattern}")
else:
explanations.append("No suspicious spread patterns detected")
return explanations
if __name__ == "__main__":
graph = SpreadAnalysisGraph()
post_id = "post_123"
initial_users = ["user_1", "user_2", "user_3"]
print("Simulating news spread...")
analysis = graph.simulate_news_spread(post_id, initial_users, spread_probability=0.4)
print("\nSpread Analysis Results:")
for key, value in analysis.items():
print(f"{key}: {value}")
print("\nExplanations:")
explanations = graph.get_spread_explanations(analysis)
for explanation in explanations:
print(f" {explanation}")