diff --git a/library/icm20948/__init__.py b/library/icm20948/__init__.py index 06769e1..6313b29 100644 --- a/library/icm20948/__init__.py +++ b/library/icm20948/__init__.py @@ -1,5 +1,6 @@ import time import struct +from enum import IntFlag __version__ = '0.0.2' @@ -27,7 +28,12 @@ ICM20948_PWR_MGMT_1 = 0x06 ICM20948_PWR_MGMT_2 = 0x07 ICM20948_INT_PIN_CFG = 0x0F - +ICM20948_FIFO_EN_1 = 0x66 +ICM20948_FIFO_EN_2 = 0x67 +ICM20948_FIFO_RST = 0x68 +ICM20948_FIFO_MODE = 0x69 +ICM20948_FIFO_COUNT = 0x70 +ICM20948_FIFO_R_W = 0x72 ICM20948_ACCEL_SMPLRT_DIV_1 = 0x10 ICM20948_ACCEL_SMPLRT_DIV_2 = 0x11 ICM20948_ACCEL_INTEL_CTRL = 0x12 @@ -64,6 +70,25 @@ AK09916_CNTL2_MODE_TEST = 16 AK09916_CNTL3 = 0x32 +# Register Bits +ICM20948_I2C_SLV0_EN = 0x80 +ICM20948_I2C_SLV0_GRP = 0x08 +ICM20948_FIFO_EN = 0x40 +ICM20948_I2C_MST_EN = 0x20 +AK09916_READ = 0x80 + +ICM20948_FIFO_MODE_STREAM = 0x00 # overwrite old data with new +ICM20948_FIFO_MODE_SNAPSHOT = 0x01 # only write until fifo is full + +ICM20948_FIFO_TYPE_ACC = 0x0010 +ICM20948_FIFO_TYPE_GYR = 0x000E +ICM20948_FIFO_TYPE_GYR_X = 0x0002 +ICM20948_FIFO_TYPE_GYR_Y = 0x0004 +ICM20948_FIFO_TYPE_GYR_Z = 0x0008 +ICM20948_FIFO_TYPE_TEMP = 0x0001 +ICM20948_FIFO_TYPE_MAG = 0x0100 +ICM20948_FIFO_TYPE_ACC_GYR = 0x001E +ICM20948_FIFO_TYPE_ACC_GYR_MAG = 0x011E class ICM20948: def write(self, reg, value): @@ -77,7 +102,7 @@ def read(self, reg): def trigger_mag_io(self): user = self.read(ICM20948_USER_CTRL) - self.write(ICM20948_USER_CTRL, user | 0x20) + self.write(ICM20948_USER_CTRL, user | ICM20948_I2C_MST_EN) time.sleep(0.005) self.write(ICM20948_USER_CTRL, user) @@ -85,12 +110,97 @@ def read_bytes(self, reg, length=1): """Read byte(s) from the sensor.""" return self._bus.read_i2c_block_data(self._addr, reg, length) + def read_bytes_fifo(self, length=1): + """Read byte(s) from the fifo queue.""" + self.bank(0) + data = [] + # read data in 32 bytes chunks due to i2c limit + for i in range(length//32): + data += self.read_bytes(ICM20948_FIFO_R_W, 32) + data += self.read_bytes(ICM20948_FIFO_R_W, length % 32) + return data + def bank(self, value): """Switch register self.bank.""" if not self._bank == value: self.write(ICM20948_BANK_SEL, value << 4) self._bank = value + def enable_fifo(self, value=True, mode=ICM20948_FIFO_MODE_SNAPSHOT): + self.bank(0) + user = self.read(ICM20948_USER_CTRL) + if (value): + user |= ICM20948_FIFO_EN + else: + user &= ~ICM20948_FIFO_EN + self.write(ICM20948_USER_CTRL, user) + self.set_fifo_mode(mode) + + def set_fifo_mode(self, mode): + self.bank(0) + self.write(ICM20948_FIFO_MODE, mode) + + def start_fifo(self, type=ICM20948_FIFO_TYPE_ACC_GYR): + self._fifotype = type + if type >> 8 and (type & 0xff): + raise Exception("Magnetometer output currently cannot be discerned from other data in fifo queue") + self.bank(0) + self.write(ICM20948_FIFO_EN_1, type >> 8) + self.write(ICM20948_FIFO_EN_2, type & 0xff) + + def stop_fifo(self): + self.bank(0) + self.write(ICM20948_FIFO_EN_1, 0x00) + self.write(ICM20948_FIFO_EN_2, 0x00) + + def reset_fifo(self): + self.bank(0) + self.write(ICM20948_FIFO_RST, 0x01) + self.write(ICM20948_FIFO_RST, 0x00) + + def get_fifo_count(self): + self.bank(0) + data = self.read_bytes(ICM20948_FIFO_COUNT, 2) + count, = struct.unpack(">h", bytearray(data)) + return count + + def get_fifo_dataset_count(self): + def check_type(type): + return self._fifotype & type == type + count = self.get_fifo_count() + divider = 0 + if check_type(ICM20948_FIFO_TYPE_ACC): + divider += 6 + if check_type(ICM20948_FIFO_TYPE_GYR_X): + divider += 2 + if check_type(ICM20948_FIFO_TYPE_GYR_Y): + divider += 2 + if check_type(ICM20948_FIFO_TYPE_GYR_Z): + divider += 2 + if check_type(ICM20948_FIFO_TYPE_TEMP): + divider += 2 + if check_type(ICM20948_FIFO_TYPE_MAG): + divider += 6 + return count // divider + + def seek_fifo_begin(self): + """Advance FIFO Queue to start with first complete set of data. + Needs to be used when operating in continuous mode.""" + count = self.get_fifo_count() + set_length = 0 + if self._fifotype & ICM20948_FIFO_TYPE_ACC: + set_length += 6 + if self._fifotype & ICM20948_FIFO_TYPE_GYR_X: + set_length += 2 + if self._fifotype & ICM20948_FIFO_TYPE_GYR_Y: + set_length += 2 + if self._fifotype & ICM20948_FIFO_TYPE_GYR_Z: + set_length += 2 + if self._fifotype & ICM20948_FIFO_TYPE_TEMP: + set_length += 2 + offset = count % set_length + self.read_bytes_fifo(offset) + def mag_write(self, reg, value): """Write a byte to the slave magnetometer.""" self.bank(3) @@ -103,10 +213,10 @@ def mag_write(self, reg, value): def mag_read(self, reg): """Read a byte from the slave magnetometer.""" self.bank(3) - self.write(ICM20948_I2C_SLV0_ADDR, AK09916_I2C_ADDR | 0x80) + self.write(ICM20948_I2C_SLV0_ADDR, AK09916_I2C_ADDR | AK09916_READ) self.write(ICM20948_I2C_SLV0_REG, reg) self.write(ICM20948_I2C_SLV0_DO, 0xff) - self.write(ICM20948_I2C_SLV0_CTRL, 0x80 | 1) # Read 1 byte + self.write(ICM20948_I2C_SLV0_CTRL, ICM20948_I2C_SLV0_EN | 1) # Read 1 byte self.bank(0) self.trigger_mag_io() @@ -116,8 +226,8 @@ def mag_read(self, reg): def mag_read_bytes(self, reg, length=1): """Read up to 24 bytes from the slave magnetometer.""" self.bank(3) - self.write(ICM20948_I2C_SLV0_CTRL, 0x80 | 0x08 | length) - self.write(ICM20948_I2C_SLV0_ADDR, AK09916_I2C_ADDR | 0x80) + self.write(ICM20948_I2C_SLV0_CTRL, ICM20948_I2C_SLV0_EN | ICM20948_I2C_SLV0_GRP | length) + self.write(ICM20948_I2C_SLV0_ADDR, AK09916_I2C_ADDR | AK09916_READ) self.write(ICM20948_I2C_SLV0_REG, reg) self.write(ICM20948_I2C_SLV0_DO, 0xff) self.bank(0) @@ -154,6 +264,25 @@ def read_magnetometer_data(self, timeout=1.0): return x, y, z + def read_magnetometer_data_fifo(self, length=1): + data = self.read_bytes_fifo(6*length) + + values = [] + + for i in range(length): + x, y, z = struct.unpack("hhhhhh", bytearray(data[12*i:12*(i+1)])) + + ax /= gs + ay /= gs + az /= gs + + gx /= dps + gy /= dps + gz /= dps + + values.append([ax, ay, az, gx, gy, gz]) + + return values + def set_accelerometer_sample_rate(self, rate=125): """Set the accelerometer sample rate in Hz.""" self.bank(2) @@ -194,12 +349,14 @@ def set_accelerometer_sample_rate(self, rate=125): # TODO maybe use struct to pack and then write_bytes self.write(ICM20948_ACCEL_SMPLRT_DIV_1, (rate >> 8) & 0xff) self.write(ICM20948_ACCEL_SMPLRT_DIV_2, rate & 0xff) + return 1125 / (1 + rate) def set_accelerometer_full_scale(self, scale=16): """Set the accelerometer fulls cale range to +- the supplied value.""" self.bank(2) value = self.read(ICM20948_ACCEL_CONFIG) & 0b11111001 - value |= {2: 0b00, 4: 0b01, 8: 0b10, 16: 0b11}[scale] << 1 + self._acc_scale = {2: 0b00, 4: 0b01, 8: 0b10, 16: 0b11}[scale] + value |= self._acc_scale << 1 self.write(ICM20948_ACCEL_CONFIG, value) def set_accelerometer_low_pass(self, enabled=True, mode=5): @@ -217,12 +374,15 @@ def set_gyro_sample_rate(self, rate=125): # 125Hz sample rate - 1.125 kHz / (1 + rate) rate = int((1125.0 / rate) - 1) self.write(ICM20948_GYRO_SMPLRT_DIV, rate) + # return the actual sample rate + return 1125 / (1 + rate) def set_gyro_full_scale(self, scale=250): """Set the gyro full scale range to +- supplied value.""" self.bank(2) value = self.read(ICM20948_GYRO_CONFIG_1) & 0b11111001 - value |= {250: 0b00, 500: 0b01, 1000: 0b10, 2000: 0b11}[scale] << 1 + self._gyr_scale = {250: 0b00, 500: 0b01, 1000: 0b10, 2000: 0b11}[scale] + value |= self._gyr_scale << 1 self.write(ICM20948_GYRO_CONFIG_1, value) def set_gyro_low_pass(self, enabled=True, mode=5): @@ -246,6 +406,7 @@ def read_temperature(self): def __init__(self, i2c_addr=I2C_ADDR, i2c_bus=None): self._bank = -1 self._addr = i2c_addr + self._fifotype = ICM20948_FIFO_TYPE_ACC if i2c_bus is None: from smbus import SMBus @@ -254,6 +415,10 @@ def __init__(self, i2c_addr=I2C_ADDR, i2c_bus=None): self._bus = i2c_bus self.bank(0) + + self._acc_scale = (self.read(ICM20948_ACCEL_CONFIG) & 0x06) >> 1 + self._gyr_scale = (self.read(ICM20948_GYRO_CONFIG_1) & 0x06) >> 1 + if not self.read(ICM20948_WHO_AM_I) == CHIP_ID: raise RuntimeError("Unable to find ICM20948")