-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmicListener.py
More file actions
137 lines (103 loc) · 4.69 KB
/
micListener.py
File metadata and controls
137 lines (103 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import sounddevice as sd # needed to control the microphone
import soundfile as sf # needed to create the audio files
import queue # needed for making the queue that handles real time audio
import sys # needed for file status
from openai import OpenAI # needed for calling OpenAI Audio API
import yaml # needed for config
import pika # needed to send messages out via RabbitMQ
import threading # needed for multi threads
from gpiozero import Button, LED # needed for button control
import numpy as np # needed for audio levels
import time # needed for sleep
class MicListener:
"""
Class that handles listening to the microphone and generating
"""
def __init__(self):
"""
Initialization method
"""
self.recordStatus = False # boolean for if the audio is being saved
self.led = LED("BOARD8")
self.led.off() # just using this to turn the pin into a ground
self.button = Button("BOARD10") # the actual button pin
self.queue = queue.Queue()
self.decibels = -100 # sound level value
# setup microphone
self.deviceInfo = sd.query_devices(kind='input')
#print(str(self.deviceInfo))
# load config settings
with open("./configs/billing.yaml", "r") as ymlfile:
config = yaml.safe_load(ymlfile)
# load openAI keys into client
self.client = OpenAI(api_key=config["openai"]["API_KEY"])
# set up RabbitMQ
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', heartbeat=3600)) # increase heartbeat to deal with weird dropouts
self.channel = self.connection.channel()
self.channel.queue_declare(queue='userOutput')
self.channel.queue_declare(queue='userTime')
def callback(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
volume_norm = np.sqrt(np.mean(indata**2))
self.decibels = 20 * np.log10(volume_norm) if volume_norm > 0 else -np.inf
#print(f"Volume Level (dB): {decibels:.2f} dB")
self.queue.put(indata.copy())
def transcribeAudio(self):
"""
Transcribes the recorded audio into a text string and returns it
Returns:
str: The text representing all speech recorded by the audio file
"""
# check the file size to make sure audio file is long enough
f = sf.SoundFile("request.wav")
#print(f"Runtime = {str(f.frames / f.samplerate)}")
if (f.frames / f.samplerate) > 0.1:
audio_file = open("request.wav", "rb")
transcript = self.client.audio.transcriptions.create(model="whisper-1", file=audio_file, response_format="text")
return str(transcript)
else:
return "Error, recording was too short"
def piListener(self):
"""
records mic while button is pressed, and stops while released
"""
print("press button to record")
sampleRate = int(self.deviceInfo['default_samplerate'])
with sf.SoundFile(file="request.wav", mode='w', samplerate=sampleRate, channels=1, subtype='PCM_16') as file:
with sd.InputStream(samplerate=sampleRate, channels=1, callback=self.callback):
while True:
#if self.button.is_pressed:
if self.decibels >= -35:
self.recordStatus = True
file.write(self.queue.get())
#elif not self.button.is_pressed and self.recordStatus:
elif self.decibels < -35 and self.recordStatus:
self.recordStatus = False
print("Finished recording")
file.close()
break
elif not self.queue.empty():
#print("clearing queue")
self.queue.get()
def publishText(self, text):
"""
pushes text out to the message queue
Args:
text (str): the text to add to the message queue
"""
self.channel.basic_publish(exchange='', routing_key='userOutput', body=text)
self.channel.basic_publish(exchange='', routing_key='userTime', body=text)
if __name__ == "__main__":
print("Running Mic Listener")
micListener = MicListener()
while True:
micListener.piListener()
text = micListener.transcribeAudio()
if not text.startswith("Error"):
micListener.publishText(text=text)
else:
print(text)
#print(text)
#micListener.callLLM(text)