-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproducer.py
More file actions
executable file
·134 lines (109 loc) · 4.45 KB
/
producer.py
File metadata and controls
executable file
·134 lines (109 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python
import sys
import random
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer
import time, json
from threading import Thread
from datetime import datetime
producer = Producer(
{'bootstrap.servers': '172.16.250.13, 172.16.250.14'}
)
def recursive_producer(topic, data, key):
try:
producer.produce(topic, data, key)
producer.flush()
except:
print("Queue full, waiting 1 second . . .")
time.sleep(1000)
producer.flush()
recursive_producer(data)
topics = {"power":[], "water":[], "heat":[]}
adult_power_consumption_range = (5, 15)
child_power_consumption_range = (4, 14)
electric_car_power_consumption_range = (24, 96)
adult_water_consumtion_range = (100, 180)
child_water_consumtion_range = (80, 180)
average_house_size = 110
average_daily_electric_car_use = 15
start_timestamp = 1262300400
today = int(time.time())
monthly_variables = {
"1":(1.25, 1.75),
"2":(1.25, 1.75),
"3":(1.10, 1.60),
"4":(1.05, 1.50),
"5":(1.00, 1.40),
"6":(1.00, 1.25),
"7":(0.90, 1.15),
"8":(0.80, 1.10),
"9":(1.00, 1.30),
"10":(1.10, 1.45),
"11":(1.15, 1.55),
"12":(1.25, 1.65),
}
def task(thread_name, house):
old_time = start_timestamp
house_id = house["id"]
children = house["no_children"]
adults = house["no_adults"]
house_size = house["house_size_m2"]
electric_cars = house["no_electric_cars"]
heat_multipler = house_size / average_house_size
while old_time < today:
month = datetime.fromtimestamp(old_time).month
multipler_range = monthly_variables.get(str(month))
multipler = random.uniform(*multipler_range)
adult_power_consumption = adults * random.randrange(*adult_power_consumption_range)
children_power_consumption = children * random.randrange(*child_power_consumption_range)
adult_water_consumtion = adults * random.randrange(*adult_water_consumtion_range)
child_water_consumtion = children * random.randrange(*child_water_consumtion_range)
if not multipler_range:
continue
electric_car_consumption = electric_cars * average_daily_electric_car_use
heat_consumption = (((2 * house_size) * heat_multipler) * multipler) / 24 # 0 is temp value
power_kwh = ((adult_power_consumption+children_power_consumption+electric_car_consumption) * multipler) / 24
water_m3 = ((adult_water_consumtion+child_water_consumtion) * multipler) / 24
recursive_producer("power", json.dumps({"house_id": house_id, "timestamp":old_time, "kwh":power_kwh}), f'{old_time}-{house_id}')
recursive_producer("water", json.dumps({"house_id": house_id, "timestamp":old_time, "m3":water_m3}), f'{old_time}-{house_id}')
recursive_producer("heat", json.dumps({"house_id": house_id, "timestamp":old_time, "kwh":heat_consumption}), f'{old_time}-{house_id}')
old_time += 3600
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('data_file', type=FileType('r'))
args = parser.parse_args()
# Create Producer instance
def delivery_callback(err, msg):
if err:
print('ERROR: Message failed delivery: {}'.format(err))
else:
print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
threads = []
with open(f'./{args.data_file.name}', "r") as f:
houses = json.load(f)
for i, house in enumerate(houses):
threads.append(Thread(target=task, args=(f"Thread-{i+1}", house)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# with open("./power_data.json", "w+") as f:
# f.writelines(json.dumps(topics["power"]))
# with open("./water_data.json", "w+") as f:
# f.writelines(json.dumps(topics["water"]))
# with open("./heat_data.json", "w+") as f:
# f.writelines(json.dumps(topics["heat"]))
# count = 0
# for _ in range(10):
# user_id = choice(user_ids)
# product = choice(products)
# for topic, value in topics.items():
# for data in value:
# # for key, keyvalue in data.items():
# recursive_producer(data)
# # count += 1
# # Block until the messages are sent.
# producer.flush()