-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvad_processor.py
More file actions
277 lines (225 loc) · 10.6 KB
/
vad_processor.py
File metadata and controls
277 lines (225 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
Enhanced VAD (Voice Activity Detection) processor
Provides multiple VAD options including pipecat's advanced VAD
"""
import asyncio
import logging
import numpy as np
from typing import Optional, Callable, Dict, Any
from enum import Enum
logger = logging.getLogger(__name__)
class VADType(Enum):
DEEPGRAM = "deepgram" # Use Deepgram's built-in VAD
PIPECAT_SILERO = "pipecat_silero" # Use pipecat's Silero VAD
PIPECAT_WEBRTC = "pipecat_webrtc" # Use pipecat's WebRTC VAD
CUSTOM = "custom" # Custom VAD implementation
class VADProcessor:
"""
Voice Activity Detection processor with multiple VAD backends
"""
def __init__(self, vad_type: VADType = VADType.DEEPGRAM, **kwargs):
self.vad_type = vad_type
self.vad_engine = None
self.is_speaking = False
self.speech_start_callback: Optional[Callable] = None
self.speech_end_callback: Optional[Callable] = None
# VAD configuration
self.config = {
"sample_rate": kwargs.get("sample_rate", 16000),
"chunk_size": kwargs.get("chunk_size", 1600), # 100ms at 16kHz
"silence_duration_ms": kwargs.get("silence_duration_ms", 1000), # 1s silence to end speech
"speech_threshold": kwargs.get("speech_threshold", 0.5), # Speech probability threshold
"min_speech_duration_ms": kwargs.get("min_speech_duration_ms", 250), # Min speech duration
}
logger.info(f"🎤 Initializing VAD processor: {vad_type.value}")
async def initialize(self):
"""Initialize the VAD engine"""
try:
if self.vad_type == VADType.PIPECAT_SILERO:
await self._init_pipecat_silero()
elif self.vad_type == VADType.PIPECAT_WEBRTC:
await self._init_pipecat_webrtc()
elif self.vad_type == VADType.CUSTOM:
await self._init_custom_vad()
else:
# DEEPGRAM - no initialization needed, handled by Deepgram service
logger.info("🎙️ Using Deepgram built-in VAD")
except Exception as e:
logger.error(f"❌ Failed to initialize VAD: {e}")
logger.info("🔄 Falling back to Deepgram VAD")
self.vad_type = VADType.DEEPGRAM
async def _init_pipecat_silero(self):
"""Initialize pipecat Silero VAD"""
try:
from pipecat.vad.silero import SileroVADAnalyzer
self.vad_engine = SileroVADAnalyzer(
sample_rate=self.config["sample_rate"],
min_volume=0.6,
min_silence_duration=self.config["silence_duration_ms"] / 1000.0,
min_speech_duration=self.config["min_speech_duration_ms"] / 1000.0,
)
logger.info("✅ Pipecat Silero VAD initialized")
except ImportError:
logger.warning("⚠️ Pipecat Silero VAD not available, falling back to Deepgram")
self.vad_type = VADType.DEEPGRAM
except Exception as e:
logger.error(f"❌ Silero VAD initialization failed: {e}")
self.vad_type = VADType.DEEPGRAM
async def _init_pipecat_webrtc(self):
"""Initialize pipecat WebRTC VAD"""
try:
from pipecat.vad.webrtc import WebRTCVADAnalyzer
self.vad_engine = WebRTCVADAnalyzer(
sample_rate=self.config["sample_rate"],
aggressiveness=2, # 0-3, higher = more aggressive
)
logger.info("✅ Pipecat WebRTC VAD initialized")
except ImportError:
logger.warning("⚠️ Pipecat WebRTC VAD not available, falling back to Deepgram")
self.vad_type = VADType.DEEPGRAM
except Exception as e:
logger.error(f"❌ WebRTC VAD initialization failed: {e}")
self.vad_type = VADType.DEEPGRAM
async def _init_custom_vad(self):
"""Initialize custom VAD implementation"""
logger.info("✅ Custom VAD initialized (energy-based)")
# Custom VAD will use energy-based detection
def set_callbacks(self, speech_start: Callable = None, speech_end: Callable = None):
"""Set callbacks for speech start/end events"""
self.speech_start_callback = speech_start
self.speech_end_callback = speech_end
async def process_audio(self, audio_data: bytes) -> Dict[str, Any]:
"""
Process audio data and return VAD results
Args:
audio_data: Raw audio bytes (16-bit linear PCM)
Returns:
Dict with VAD results: {
"is_speech": bool,
"confidence": float,
"speech_start": bool,
"speech_end": bool,
"energy": float
}
"""
try:
if self.vad_type == VADType.DEEPGRAM:
# For Deepgram VAD, we rely on the service's vad_events
return await self._process_deepgram_vad(audio_data)
elif self.vad_type in [VADType.PIPECAT_SILERO, VADType.PIPECAT_WEBRTC]:
return await self._process_pipecat_vad(audio_data)
elif self.vad_type == VADType.CUSTOM:
return await self._process_custom_vad(audio_data)
except Exception as e:
logger.error(f"VAD processing error: {e}")
return {
"is_speech": False,
"confidence": 0.0,
"speech_start": False,
"speech_end": False,
"energy": 0.0
}
async def _process_deepgram_vad(self, audio_data: bytes) -> Dict[str, Any]:
"""Process audio using Deepgram's built-in VAD"""
# For Deepgram VAD, we rely on the WebSocket vad_events
# This is just a placeholder - actual VAD happens in Deepgram service
energy = self._calculate_energy(audio_data)
return {
"is_speech": energy > 0.01, # Simple energy threshold
"confidence": min(energy * 10, 1.0),
"speech_start": False, # Handled by Deepgram vad_events
"speech_end": False, # Handled by Deepgram vad_events
"energy": energy
}
async def _process_pipecat_vad(self, audio_data: bytes) -> Dict[str, Any]:
"""Process audio using pipecat VAD engines"""
if not self.vad_engine:
return await self._process_deepgram_vad(audio_data)
try:
# Convert bytes to numpy array
audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32768.0
# Process with pipecat VAD
vad_result = await self.vad_engine.analyze_audio(audio_np)
# Check for speech state changes
was_speaking = self.is_speaking
self.is_speaking = vad_result.get("is_speech", False)
speech_start = not was_speaking and self.is_speaking
speech_end = was_speaking and not self.is_speaking
# Trigger callbacks
if speech_start and self.speech_start_callback:
await self.speech_start_callback()
if speech_end and self.speech_end_callback:
await self.speech_end_callback()
return {
"is_speech": self.is_speaking,
"confidence": vad_result.get("confidence", 0.5),
"speech_start": speech_start,
"speech_end": speech_end,
"energy": self._calculate_energy(audio_data)
}
except Exception as e:
logger.error(f"Pipecat VAD error: {e}")
return await self._process_deepgram_vad(audio_data)
async def _process_custom_vad(self, audio_data: bytes) -> Dict[str, Any]:
"""Process audio using custom energy-based VAD"""
energy = self._calculate_energy(audio_data)
# Simple energy-based VAD with hysteresis
speech_threshold_high = 0.02 # Threshold to start speech
speech_threshold_low = 0.01 # Threshold to continue speech
was_speaking = self.is_speaking
if not self.is_speaking:
# Not currently speaking - check if speech starts
self.is_speaking = energy > speech_threshold_high
else:
# Currently speaking - check if speech continues
self.is_speaking = energy > speech_threshold_low
speech_start = not was_speaking and self.is_speaking
speech_end = was_speaking and not self.is_speaking
# Trigger callbacks
if speech_start and self.speech_start_callback:
await self.speech_start_callback()
if speech_end and self.speech_end_callback:
await self.speech_end_callback()
return {
"is_speech": self.is_speaking,
"confidence": min(energy * 50, 1.0), # Convert energy to confidence
"speech_start": speech_start,
"speech_end": speech_end,
"energy": energy
}
def _calculate_energy(self, audio_data: bytes) -> float:
"""Calculate RMS energy of audio data"""
try:
if len(audio_data) == 0:
return 0.0
# Convert to numpy array
audio_np = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
# Calculate RMS energy
rms = np.sqrt(np.mean(audio_np ** 2)) / 32768.0 # Normalize to [0, 1]
return float(rms)
except Exception as e:
logger.error(f"Energy calculation error: {e}")
return 0.0
async def cleanup(self):
"""Cleanup VAD resources"""
if self.vad_engine and hasattr(self.vad_engine, 'cleanup'):
await self.vad_engine.cleanup()
logger.info("🧹 VAD processor cleaned up")
# Factory function for easy VAD creation
async def create_vad_processor(vad_type: str = "deepgram", **kwargs) -> VADProcessor:
"""
Create and initialize a VAD processor
Args:
vad_type: Type of VAD ("deepgram", "pipecat_silero", "pipecat_webrtc", "custom")
**kwargs: Additional configuration options
Returns:
Initialized VADProcessor instance
"""
try:
vad_enum = VADType(vad_type.lower())
except ValueError:
logger.warning(f"Unknown VAD type '{vad_type}', using Deepgram")
vad_enum = VADType.DEEPGRAM
processor = VADProcessor(vad_enum, **kwargs)
await processor.initialize()
return processor