-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
93 lines (76 loc) · 3.3 KB
/
client.py
File metadata and controls
93 lines (76 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import time
import random
import threading
from yadtq import YADTQ
# Kafka and Redis Configuration
KAFKA_BROKER = 'localhost:9092'
REDIS_BACKEND = 'redis://localhost:6379/0'
yadtq = YADTQ(broker=KAFKA_BROKER, backend=REDIS_BACKEND)
EMERGENCY_TYPES = ["medical", "fire", "police"]
def generate_random_location():
"""Generate a random location, occasionally outside Bangalore."""
# 80% chance to generate a valid Bangalore location
if random.random() < 0.8:
latitude = random.uniform(12.8, 13.1) # Valid latitude for Bangalore
longitude = random.uniform(77.5, 77.7) # Valid longitude for Bangalore
else:
# Generate a location outside Bangalore
latitude = random.uniform(10.0, 12.0) # Example latitude range outside Bangalore
longitude = random.uniform(75.0, 77.0) # Example longitude range outside Bangalore
return {"lat": latitude, "lon": longitude}
def create_task(emergency_type):
"""Create a task payload based on the emergency type."""
task = {"location": generate_random_location()}
if emergency_type == "fire":
task["priority"] = random.choice(["high", "medium", "low"])
elif emergency_type == "medical":
task["severity"] = random.choice(["critical", "severe", "moderate"])
elif emergency_type == "police":
task["threat_level"] = random.choice(["high", "medium", "low"])
return task
def simulate_fault():
if random.random() < 0.2: # 20% chance of a simulated fault
raise ConnectionError("Simulated connection error")
def send_task():
emergency_type = random.choice(EMERGENCY_TYPES)
task_data = create_task(emergency_type)
retries = 3
for attempt in range(1, retries + 1):
try:
simulate_fault()
task_id = yadtq.send_task(emergency_type, task_data)
print(f"Task submitted: {task_id}")
threading.Thread(target=yadtq.monitor_task_status, args=(task_id,), daemon=True).start()
break # Exit retry loop on success
except Exception as e:
print(f"Error submitting task: {e}")
if attempt < retries:
print(f"Retrying... (Attempt {attempt}/{retries})")
time.sleep(2 ** attempt)
else:
print(f"Failed to submit task after {retries} attempts.")
def print_task_updates(task_id, status):
print(f"Task {task_id} status: {status.get('status', 'unknown')}")
if status.get("status") in ["success", "failed"]:
if "result" in status:
print(f"Result: {status['result']}")
if "error" in status:
print(f"Error: {status['error']}")
def send_tasks_continuously():
while True:
send_task()
time.sleep(random.uniform(3,8 )) # Add a slight delay to avoid overwhelming the system
def print_heartbeat_updates(worker_id, status, task_count, timestamp):
print(f"Heartbeat received from worker {worker_id}:")
print(f" Status: {status}")
print(f" Task count: {task_count}")
print(f" Timestamp: {time.ctime(timestamp)}\n")
if __name__ == "__main__":
threading.Thread(target=send_tasks_continuously, daemon=True).start()
threading.Thread(
target=yadtq.monitor_heartbeats,
args=(print_heartbeat_updates,),
daemon=True
).start()
while True:
time.sleep(1)