Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions 2025gcs/backend/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# # Dictionary to maintain vehicle state
# vehicle_data = {
# "last_time": 0,
# "lat": 0, /
# "lon": 0, /
# "rel_alt": 0,
# "alt": 0,
# "roll": 0, /
# "pitch": 0, /
# "yaw": 0, /
# "dlat": 0,
# "dlon": 0,
# "dalt": 0,
# "heading": 0,
# "groundspeed": 0,
# "throttle": 0,
# "climb": 0,
# "flight_mode": 0,
# "battery_voltage": 0,
# "battery_current": 0,
# "battery_remaining": 0,
# "is_dropped": False
# }

operation_sigchange = {
0: {
"battery_voltage": 22.2, #For total, For one cell: 3.7
"battery_remaining": 10, #In percent
"battery_current": 0,
"alt": 107 #In meters
},
1: ["last_time", "lat", "lon", "rel_alt", "roll", "pitch", "yaw", "dlat", "dlon", "dalt", "heading",
"groundspeed", "throttle", "climb", "flight_mode", "is_dropped"],
2: "Placeholder"
}

CRITICAL_THRESHOLD = {
"battery_voltage": 22.2, #For total, For one cell: 3.7
"battery_remaining": 10, #In percent
"battery_current": 0,
"alt": 107 #In meters
}

INFO_INCREMENT_THRESHOLDS = {
"alt": 5, # Alert every 5 meters altitude change
"groundspeed": 5 # Alert every 5 unit groundspeed change
# "airspeed": 5 # not yet availible but remove groundspeed when it is
}

CALLOUT_DEBOUNCE_TIME = 5.0 # Debounce time for repeated callouts in seconds (so we don't spam the same callout)

CRITICAL = 0
INFO = 1
DEBUG = 2
139 changes: 138 additions & 1 deletion 2025gcs/backend/helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import os
import json
import threading
import time
import numpy as np
import sounddevice as sd
from piper.voice import PiperVoice
from pathlib import Path
from constants import CRITICAL, INFO, DEBUG, INFO_INCREMENT_THRESHOLDS, CRITICAL_THRESHOLD ,CALLOUT_DEBOUNCE_TIME


ODM_TAGS = os.path.join(os.path.dirname(__file__), 'data', 'ODM', 'odm_geotags.txt')
TARGETS_CACHE = os.path.join(os.path.dirname(__file__), 'data', 'TargetInformation.json')
Expand Down Expand Up @@ -41,4 +49,133 @@ def serialize(class_name : str, conf : float, lat : float, lon : float) -> None:
json.dump(data, file, indent=4)
print("Detection cached.")
except Exception as e:
print(f"Error appending to cache: {e}")
print(f"Error appending to cache: {e}")

# Piper TTS configuration
PIPER_MODEL_PATH = os.path.join(os.path.dirname(__file__), 'piper_model', 'en_US-ryan-low.onnx')
class CallOuts:
"""Manages text-to-speech call outs with priority levels."""
def __init__(self, voice_model_path: str = PIPER_MODEL_PATH):
self.silenced_calls = []
self.last_callout_time = {}
self.last_values = {} # Track previous values for increment detection
self.tts_lock = threading.Lock() # Prevent overlapping speech (allow only one at a time)

# Load Piper voice model
voice_path = Path(voice_model_path)
self.voice = PiperVoice.load(str(voice_path))

def speak(self, text: str):
"""Stream synthesized audio chunks directly to the speaker."""
stream = None
try:
for chunk in self.voice.synthesize(text):
if stream is None:
stream = sd.OutputStream(
samplerate = chunk.sample_rate,
channels = chunk.sample_channels,
dtype="int16"
)
stream.start()

audio = np.frombuffer(chunk.audio_int16_bytes, dtype=np.int16)
stream.write(audio)
finally:
if stream:
stream.stop()
stream.close()

def CallOut(self, level: int, msg: str):
"""Text to speech using levels for confiming call out"""
# Check if this call level is silenced
if level in self.silenced_calls:
print(f"[SILENCED] {msg}")
return

# Check debounce time
current_time = time.time()
if msg in self.last_callout_time:
time_since_left = current_time - self.last_callout_time[msg]
if time_since_left < CALLOUT_DEBOUNCE_TIME:
return

# Update last callout time
self.last_callout_time[msg] = current_time

# Speak in a seperate thread to avoid blocking
def threaded_speak():
with self.tts_lock:
self.speak(msg)

thread = threading.Thread(target=threaded_speak, daemon=True)
thread.start()

def get_silenced_calls(self):
"""Gets all the calls that are silenced"""
return self.silenced_calls

def silence_calls(self, call_level: int):
"""Adds call level to silenced calls"""
if call_level != CRITICAL:
if call_level not in self.silenced_calls:
self.silenced_calls.append(call_level)

def unsilence_calls (self, call_level: int):
"""Removes call level from silenced calls"""
if call_level in self.silenced_calls:
self.silenced_calls.remove(call_level)

def check_critical_thresholds(self, vehicle_data: dict):
"""Check vehicle data against critical thresholds"""
for key, critical_value in CRITICAL_THRESHOLD.items():
if key in vehicle_data:
current_value = vehicle_data[key]
if current_value <= critical_value:
if key == "battery_voltage":
msg = f"Critical battery: {current_value} volts"
elif key == "battery_remaining":
msg = f"Critical battery: {current_value} percent"
elif key == "alt":
msg = f"Critical altitude: {current_value} meters"
elif key == "battery_current":
msg = f"High current draw: {current_value} amp"
else:
msg = f"Critical {key.replace('_', ' ')}: {current_value}"
self.CallOut(CRITICAL, msg)

def check_info_increments(self, vehicle_data: dict):
"""Check vehicle data for information increments"""
for key, increment in INFO_INCREMENT_THRESHOLDS.items():
if key in vehicle_data:
current_value = vehicle_data[key]

if key not in self.last_values:
self.last_values[key] = current_value
continue

last_value = self.last_values[key]
difference = abs(current_value - last_value)

if difference >= increment:
if key == "alt":
msg = f"Altitude: {current_value} meters"
elif key == "groundspeed":
msg = f"Ground speed: {current_value} meters per second"
else:
msg = f"{key.replace('_', ' ')}: {current_value}"
self.CallOut(INFO, msg)
# Update to current value as new baseline
self.last_values[key] = current_value

def check_vehicle_data(self, vehicle_data: dict):
"""Check vehicle data and trigger TTS callouts as needed"""
self.check_critical_thresholds(vehicle_data)
self.check_info_increments(vehicle_data)

def shutdown(self):
"""Clean up resources if needed"""
print("Shutting down TTS...")
with self.tts_lock:
pass
# Give sounddevice time to cleanup
time.sleep(0.5)
Binary file added 2025gcs/backend/piper_model/en_US-ryan-low.onnx
Binary file not shown.
Loading