Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 139 additions & 128 deletions CO2Meter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""
Module for monitoring CO2 levels.
"""
import sys
import fcntl
import threading
Expand All @@ -9,135 +12,143 @@
HIDIOCSFEATURE_9 = 0xC0094806

def _co2_worker(weak_self):
while True:
self = weak_self()
if self is None:
break
self._read_data()
"""Worker to get data."""
while True:
self = weak_self()
if self is None:
break
self._read_data()

if not self._running:
break
del self
if not self._running:
break
del self


class CO2Meter:

_key = [0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96]
_device = ""
_values = {}
_file = ""
_running = True
_callback = None

def __init__(self, device="/dev/hidraw0", callback=None):
self._device = device
self._callback = callback
self._file = open(device, "a+b", 0)

if sys.version_info >= (3,):
set_report = [0] + self._key
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, bytearray(set_report))
else:
set_report_str = "\x00" + "".join(chr(e) for e in self._key)
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, set_report_str)

thread = threading.Thread(target=_co2_worker, args=(weakref.ref(self),))
thread.daemon = True
thread.start()


def _read_data(self):
try:
result = self._file.read(8)
if sys.version_info >= (3,):
data = list(result)
else:
data = list(ord(e) for e in result)

decrypted = self._decrypt(data)
if decrypted[4] != 0x0d or (sum(decrypted[:3]) & 0xff) != decrypted[3]:
print(self._hd(data), " => ", self._hd(decrypted), "Checksum error")
else:
operation = decrypted[0]
val = decrypted[1] << 8 | decrypted[2]
self._values[operation] = val
if self._callback is not None:
if operation == CO2METER_CO2:
self._callback(sensor=operation, value=val)
elif operation == CO2METER_TEMP:
self._callback(sensor=operation,
value=round(val / 16.0 - 273.1, 1))
elif operation == CO2METER_HUM:
self._callback(sensor=operation, value=round(val / 100.0, 1))
except:
self._running = False


def _decrypt(self, data):
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65]
shuffle = [2, 4, 0, 7, 1, 6, 5, 3]

phase1 = [0] * 8
for i, j in enumerate(shuffle):
phase1[j] = data[i]

phase2 = [0] * 8
for i in range(8):
phase2[i] = phase1[i] ^ self._key[i]

phase3 = [0] * 8
for i in range(8):
phase3[i] = ((phase2[i] >> 3) | (phase2[(i-1+8)%8] << 5)) & 0xff

ctmp = [0] * 8
for i in range(8):
ctmp[i] = ((cstate[i] >> 4) | (cstate[i]<<4)) & 0xff

out = [0] * 8
for i in range(8):
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff

return out


@staticmethod
def _hd(data):
return " ".join("%02X" % e for e in data)


def get_co2(self):
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_CO2 in self._values:
result = {'co2': self._values[CO2METER_CO2]}

return result


def get_temperature(self):
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_TEMP in self._values:
result = {'temperature': (self._values[CO2METER_TEMP]/16.0-273.15)}

return result


def get_humidity(self): # not implemented by all devices
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_HUM in self._values:
result = {'humidity': (self._values[CO2METER_HUM]/100.0)}
return result


def get_data(self):
result = {}
result.update(self.get_co2())
result.update(self.get_temperature())
result.update(self.get_humidity())

return result
"""Class to monitor CO2."""
_key = [0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96]
_device = ""
_values = {}
_file = ""
_running = True
_callback = None

def __init__(self, device="/dev/hidraw0", callback=None):
self._device = device
self._callback = callback
self._file = open(device, "a+b", 0)

if sys.version_info >= (3,):
set_report = [0] + self._key
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, bytearray(set_report))
else:
set_report_str = "\x00" + "".join(chr(e) for e in self._key)
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, set_report_str)

thread = threading.Thread(target=_co2_worker, args=(weakref.ref(self),))
thread.daemon = True
thread.start()


def _read_data(self):
"""read data from sensor."""
try:
result = self._file.read(8)
if sys.version_info >= (3,):
data = list(result)
else:
data = list(ord(e) for e in result)

decrypted = self._decrypt(data)
if decrypted[4] != 0x0d or (sum(decrypted[:3]) & 0xff) != decrypted[3]:
print(self._hd(data), " => ", self._hd(decrypted), "Checksum error")
else:
operation = decrypted[0]
val = decrypted[1] << 8 | decrypted[2]
self._values[operation] = val
if self._callback is not None:
if operation == CO2METER_CO2:
self._callback(sensor=operation, value=val)
elif operation == CO2METER_TEMP:
self._callback(sensor=operation,
value=round(val / 16.0 - 273.1, 1))
elif operation == CO2METER_HUM:
self._callback(sensor=operation, value=round(val / 100.0, 1))
except:
self._running = False


def _decrypt(self, data):
"""decrypt data from sensor."""
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65]
shuffle = [2, 4, 0, 7, 1, 6, 5, 3]

phase1 = [0] * 8
for i, j in enumerate(shuffle):
phase1[j] = data[i]

phase2 = [0] * 8
for i in range(8):
phase2[i] = phase1[i] ^ self._key[i]

phase3 = [0] * 8
for i in range(8):
phase3[i] = ((phase2[i] >> 3) | (phase2[(i-1+8)%8] << 5)) & 0xff

ctmp = [0] * 8
for i in range(8):
ctmp[i] = ((cstate[i] >> 4) | (cstate[i]<<4)) & 0xff

out = [0] * 8
for i in range(8):
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff

return out


@staticmethod
def _hd(data):
"""combine data from sensor."""
return " ".join("%02X" % e for e in data)


def get_co2(self):
"""get CO2 data from sensor."""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_CO2 in self._values:
result = {'co2': self._values[CO2METER_CO2]}

return result


def get_temperature(self):
"""get temp data from sensor."""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_TEMP in self._values:
result = {'temperature': (self._values[CO2METER_TEMP]/16.0-273.15)}

return result


def get_humidity(self): # not implemented by all devices
"""get humidity data from sensor."""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_HUM in self._values:
result = {'humidity': (self._values[CO2METER_HUM]/100.0)}
return result


def get_data(self):
"""get all the data from sensor."""
result = {}
result.update(self.get_co2())
result.update(self.get_temperature())
result.update(self.get_humidity())

return result
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,14 @@ CO2METER_HUM = 0x44

### Error handling
In Case the device can't be read anymore (e.g. it was unplugged) the worker thread will end in the background. Afterwards calls to any of the `get_*` functions will throw an `IOError`. You will need to handle any resetup, making sure that the device is there etc yourself.

### Icon Credit

Project icon made by
<a href="https://www.flaticon.com/authors/vectors-market"
title="Vectors Market">Vectors Market</a> from
<a href="https://www.flaticon.com/"
title="Flaticon">www.flaticon.com</a>
is licensed by
<a href="http://creativecommons.org/licenses/by/3.0/"
title="Creative Commons BY 3.0" target="_blank">CC 3.0 BY</a>
53 changes: 53 additions & 0 deletions co2sensor-prom
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""Example program to get data from CO2 sensor."""
import sys
import http.server
import CO2Meter

class Sensor(http.server.BaseHTTPRequestHandler):
"""Class to handle sensor requests."""

sensor_line = """# HELP sensor_%(m)s_level level for %(m)s.
# TYPE sensor_%(m)s_level gauge
sensor_%(m)s_level %(lvl)f
"""

def __init__(self, meter, *args):
self.meter = meter
http.server.BaseHTTPRequestHandler.__init__(self, *args)

def do_GET(self):
"""Handle get requests."""
self.send_response(200)
self.send_header('Content-Type',
'text/plain; version=0.0.4; charset=utf-8')
self.end_headers()
try:
levels = self.meter.get_data()
except IOError as sensorerr:
print("Failed to read sensor (%s)." % sensorerr)
print("Exiting; assuming disconnect.")
sys.exit(0)
for measurement in ['co2', 'temperature']:
if measurement in levels:
self.wfile.write(
bytes(self.sensor_line % {'m': measurement,
'lvl': levels[measurement]}, 'UTF-8'))


def main():
"""Main program."""
print("Opening sensor %s." % sys.argv[1])
try:
meter = CO2Meter.CO2Meter("/dev/%s" % sys.argv[1])
except OSError as sensorerr:
print("Failed to open sensor (%s)." % sensorerr)
print("Exiting; assuming disconnect.")
sys.exit(0)
handler = lambda *args: Sensor(meter, *args)
server_address = ('', 8000)
httpd = http.server.HTTPServer(server_address, handler)
httpd.serve_forever()

if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions co2sensor-prom.d/90-co2sensor.rules
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Put this in: /etc/udev/rules.d/90-co2sensor.rules
# TODO: the remove step doesn't work?
# TODO: The add step does multiple calls with different values of %k.
ACTION=="remove", SUBSYSTEMS=="usb", ATTRS{idVendor}=="04d9", ATTRS{idProduct}=="a052", RUN+="/usr/bin/systemctl stop co2sensor@%k.service"
ACTION=="add", SUBSYSTEMS=="usb", ATTRS{idVendor}=="04d9", ATTRS{idProduct}=="a052", GROUP="plugdev", MODE="0660", SYMLINK+="co2sensor%n", TAG+="systemd", ENV{SYSTEMD_WANTS}="co2sensor@%k.service"
10 changes: 10 additions & 0 deletions co2sensor-prom.d/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# CO2Meter Support Files

Note these are Ubuntu specific paths.

* `90-co2sensor.rules` - This is the udev config. See the comment at top
of the file for where to install.
* `co2sensor@.service` - This is the systemd config file. See the
top of the file for where this goes.
* `sensor-overview.html` and `sensor.html` - Dashboard files for
prometheus.
9 changes: 9 additions & 0 deletions co2sensor-prom.d/co2sensor@.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Put this in: /lib/systemd/system/co2sensor@.service

[Unit]
Description=CO2Sensor

[Service]
ExecStart=/usr/local/bin/co2sensor-prom %I
KillMode=process
Restart=on-failure
Loading