-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathinsert_blob.py
More file actions
80 lines (65 loc) · 2.49 KB
/
insert_blob.py
File metadata and controls
80 lines (65 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from azure.storage.blob import ContentSettings, BlockBlobService
from random import randrange, random, choice
from datetime import datetime
import json
import time
import sys
from threading import Thread
from queue import Queue
def upload(i, q):
print("thread {} started".format(i))
while True:
work_item = q.get()
contents = work_item[0]
path = work_item[1]
block_blob_service.create_blob_from_text(container_name,
path,
contents,
content_settings=ContentSettings(content_type='application/json'),
max_connections=10)
q.task_done()
print("\nthread {}, upload {} complete".format(i, path))
num_threads = 5
sleep_time = 60.0
max_size = 10 * 1024 * 1024 #10 Meg
tenant = '1'
num_devices = 10;
account_key = '#REPLACEME'
account_name = '#REPLACEME'
block_blob_service = BlockBlobService(account_name = account_name, account_key = account_key)
path = 'raw_input/'
container_name = 'tenant' + tenant
msg_array = ''
task_queue = Queue()
for i in range(num_threads):
worker = Thread(target=upload, args=(i, task_queue, ))
#worker.setDaemon(True)
worker.start()
while True:
try:
msg = {}
is_error = choice([True, False])
device_id = randrange(1, num_devices)
temperature = randrange(-50, 50) + random()
pressure = randrange(0, 500) + random()
ts = datetime.now().isoformat()
msg["deviceId"] = "tenant" + tenant + "-" + str(device_id)
msg["temperature"] = temperature
if not is_error:
msg["pressure"] = pressure
msg["ts"] = ts
msg["source"] = "upload"
msg_array += json.dumps(msg) + '\n'
current_size = len(msg_array)
progress = int( current_size / max_size * 100)
if current_size >= max_size:
dtnow = datetime.utcnow()
filename = str(dtnow.year) + '/' + str(dtnow.month).zfill(2) + '/' + str(dtnow.day).zfill(2) + '/' + \
str(dtnow.hour).zfill(2) + '/' + str(int(dtnow.minute) - int(dtnow.minute) % 15).zfill(2) + '/' + \
str(dtnow.second).zfill(2) + '.' + str(dtnow.microsecond).zfill(5) + ".json"
task_queue.put((msg_array, path + filename))
print("\nupload queue size is {}".format(task_queue.qsize()))
msg_array = ''
time.sleep(sleep_time)
except Exception as e:
print(e)