Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions ProjectIF/btconnectest
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import serial
import time
import mycodo.utils as mycodo_utils # utility functions
from mycodo.config import MycodoConfig # config class
import logging

logger = logging.getLogger(__name__)

class BluetoothArduinoInterface:
def __init__(self, config):
self.config = config
self.bluetooth_port = self.config.get('arduino_bluetooth', 'port', fallback='/dev/rfcomm0') # change values
self.baud_rate = self.config.getint('arduino_bluetooth', 'baud_rate', fallback=9600) # change values
self.serial_connection = None
self.connect()

def connect(self):
try:
self.serial_connection = serial.Serial(self.bluetooth_port, self.baud_rate, timeout=1)
logger.info(f"Successfully connected to Arduino via Bluetooth on {self.bluetooth_port} at {self.baud_rate} bps")
except serial.SerialException as e:
logger.error(f"Error connecting to Arduino via Bluetooth on {self.bluetooth_port}: {e}")
self.serial_connection = None

def disconnect(self):
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
logger.info(f"Disconnected from Arduino on {self.bluetooth_port}")
self.serial_connection = None

def send_command(self, command):
if self.serial_connection and self.serial_connection.is_open:
try:
encoded_command = (command + '\n').encode('utf-8') # Add newline as terminator
self.serial_connection.write(encoded_command)
logger.debug(f"Sent command to Arduino: {command}")
time.sleep(0.1) # Small delay for transmission
return True
except serial.SerialException as e:
logger.error(f"Error sending command '{command}' to Arduino: {e}")
self.connect() # Attempt to reconnect on error
return False
else:
logger.warning("Not connected to Arduino. Cannot send command.")
self.connect() # Attempt to reconnect if not connected
return False

def receive_data(self):
if self.serial_connection and self.serial_connection.is_open:
try:
if self.serial_connection.in_waiting > 0:
data_bytes = self.serial_connection.readline().strip()
data_str = data_bytes.decode('utf-8')
logger.debug(f"Received data from Arduino: {data_str}")
return data_str
return None
except serial.SerialException as e:
logger.error(f"Error receiving data from Arduino: {e}")
self.connect() # Attempt to reconnect on error
return None
else:
logger.warning("Not connected to Arduino. Cannot receive data.")
self.connect() # Attempt to reconnect if not connected
return None

# --- Integration into Mycodo Daemon (Conceptual) ---

def mycodo_daemon_setup():
config = MycodoConfig()
bluetooth_arduino = BluetoothArduinoInterface(config)
return bluetooth_arduino

def mycodo_daemon_loop(bluetooth_arduino):
last_send_time = time.time() # initialize
while True:
if mycodo_utils.check_if_time_elapsed(last_send_time, 60): # every 60 seconds, send
command_to_send = f"READ_SENSOR {mycodo_utils.generate_unique_id()}"
bluetooth_arduino.send_command(command_to_send)
last_send_time = time.time()

data_received = bluetooth_arduino.receive_data() # process data from arduino
if data_received:
logger.info(f"Data from Arduino: {data_received}")

time.sleep(0.1)


if __name__ == "__main__":
# tests BluetoothArduinoInterface class
mycodo_config = MycodoConfig()
if not mycodo_config.has_section('arduino_bluetooth'):
mycodo_config.add_section('arduino_bluetooth')

# TODO: Update Values
mycodo_config.set('arduino_bluetooth', 'port', '/dev/rfcomm0') # ??
mycodo_config.set('arduino_bluetooth', 'baud_rate', '115200') # ??
mycodo_config.save()

arduino_interface = BluetoothArduinoInterface(mycodo_config)

try:
arduino_interface.send_command("HELLO_ARDUINO")
time.sleep(2)
for i in range(5):
arduino_interface.send_command(f"SET_LED {i % 2}")
received = arduino_interface.receive_data()
if received:
print(f"Received: {received}")
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
finally:
arduino_interface.disconnect()