-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtt_temp_pub.py
More file actions
100 lines (75 loc) · 2.57 KB
/
tt_temp_pub.py
File metadata and controls
100 lines (75 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import json
import time
import random
import subprocess
import re
from examon.plugin.examonapp import ExamonApp
from examon.plugin.sensorreader import SensorReader
board_list = ["0x1000146118320bc", "0x1000146118320be", "0x100014611832087", "0x1000146118321e2", "0x1000146118320bc", "0x1000146118320be", "0x100014611832087", "0x1000146118321e2"]
class Sensor:
def __init__(self, sensor_name='tt_temp_sensor'):
self.sensor_name = sensor_name
def get_sensor_data(self, sensor_num=8):
payload = []
rawsmi = subprocess.run("tt-smi -s", shell=True, capture_output=True, text=True)
jsonsnap = json.loads(rawsmi.stdout)
for s in range(0, sensor_num):
payload.append({
'sensor_name': self.sensor_name,
'id': board_list[s],
'value': next((device['telemetry']['asic_temperature'] for device in jsonsnap.get('device_info', []) if device.get('smbus_telem', {}).get('BOARD_ID') == board_list[s]), None),
#'power': next((device['telemetry']['power'] for device in jsonsnap.get('device_info', []) if device.get('smbus_telem', {}).get('BOARD_ID') == board_list[s]), None),
#'clk': next((device['telemetry']['aiclk'] for device in jsonsnap.get('device_info', []) if device.get('smbus_telem', {}).get('BOARD_ID') == board_list[s]), None),
})
return payload
def read_data(self):
pass
def read_data(sr):
# get timestamp and data
timestamp = int(time.time()*1000)
raw_packet = sr.sensor.get_sensor_data()
# build the examon metric
examon_data = []
for raw_data in raw_packet:
metric = {}
metric['name'] = raw_data['sensor_name']
metric['value'] = raw_data['value']
#metric['clk'] = raw_data['clk']
#metric['power'] = raw_data['power']
metric['timestamp'] = timestamp
metric['tags'] = sr.get_tags()
metric['tags']['id'] = str(raw_data['id'])
examon_data.append(metric)
# * worker id (string) useful for debug/log
worker_id = sr.sensor.sensor_name
return (worker_id, examon_data,)
def worker(conf, tags):
"""
Worker process code
"""
# sensor instance
sensor = Sensor()
# SensorReader app
sr = SensorReader(conf, sensor)
# add read_data callback
sr.read_data = read_data
# set the default tags
sr.add_tags(tags)
# run the worker loop
sr.run()
if __name__ == '__main__':
# start creating an Examon app
app = ExamonApp()
app.parse_opt()
# for checking
print("Config:")
print(json.dumps(app.conf, indent=4))
# set default metrics tags
tags = app.examon_tags()
tags['org'] = 'E4'
tags['plugin'] = 'tt_temp_pub'
tags['chnl'] = 'data'
# add a worker
app.add_worker(worker, app.conf, tags)
# run!
app.run()