-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path29_openai_realtime.py
More file actions
126 lines (100 loc) · 3.42 KB
/
29_openai_realtime.py
File metadata and controls
126 lines (100 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import json
import base64
import numpy as np
import sounddevice as sd
import websocket
import wave
from datetime import datetime
from dotenv import load_dotenv
# Load environment variables
load_dotenv(".env")
load_dotenv(".env.local", override=True)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
url = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview"
headers = [f"Authorization: Bearer {OPENAI_API_KEY}", "OpenAI-Beta: realtime=v1"]
# buffer to store full audio stream
audio_buffer = bytearray()
def save_audio_to_wav(pcm_bytes: bytes):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"response_{timestamp}.wav"
filepath = os.path.join(os.getcwd(), filename)
try:
# Open file in write mode - this returns a Wave_write object despite Pylint's confusion
# pylint: disable=no-member
wf = wave.open(filepath, "wb")
wf.setnchannels(1) # Set to mono
wf.setsampwidth(2) # 16-bit PCM
wf.setframerate(16000) # 16kHz sample rate
wf.writeframes(pcm_bytes)
wf.close()
# pylint: enable=no-member
print(f"💾 Saved full audio to {filepath}")
except Exception as e:
print(f"❌ Error saving WAV: {e}")
def on_open(ws):
print("✅ Connected.")
ws.send(
json.dumps(
{
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": "Tell me a fun fact about space.",
}
],
},
}
)
)
ws.send(json.dumps({"type": "response.create"}))
def on_message(ws, message):
global audio_buffer
data = json.loads(message)
if data.get("type") == "audio" and data.get("audio", {}).get("data"):
chunk = base64.b64decode(data["audio"]["data"])
audio_buffer.extend(chunk)
# Removed the byte logging as requested
elif data.get("type") == "response.audio.done":
print("✅ Audio streaming complete. Playing and saving...")
if not audio_buffer:
print("⚠️ No audio data received.")
return
# Convert buffer to numpy array for playback
try:
# Playback
samples = (
np.frombuffer(audio_buffer, dtype=np.int16).astype(np.float32) / 32768.0
)
# Play the audio
sd.play(samples, samplerate=16000)
sd.wait() # Wait until playback is finished
print("✅ Audio playback complete")
except Exception as e:
print(f"❌ Audio playback failed: {e}")
# Save full buffer
save_audio_to_wav(bytes(audio_buffer))
# Reset buffer for next turn
audio_buffer = bytearray()
else:
# Optional: print non-audio events for debugging
if data.get("type") != "ping":
print("📨 Event:", json.dumps(data, indent=2))
def on_error(ws, error):
print("❌ WebSocket error:", error)
def on_close(ws, close_status_code, close_msg):
print("❎ Connection closed.")
# Start WebSocket
ws = websocket.WebSocketApp(
url,
header=headers,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws.run_forever()