Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions pya/backend/SoundDevice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import sounddevice as sd
from .base import BackendBase, StreamBase
import numpy as np
import time

def translate_dict_keys(input_dict: dict, translate_dict: dict):
# Translates keys in input_dict to be replaced by the values that they are mapped to by translate_dict
return {
translate_dict.get(key, key): value
for key, value in input_dict.items()
}

class SoundDeviceBackend(BackendBase):
_boot_delay = 0.5 # a short delay to prevent PyAudio racing conditions
bs = 512

translate_dict = {
'hostapi': 'hostApi',
'max_input_channels': 'maxInputChannels',
'max_output_channels': 'maxOutputChannels',
'default_low_input_latency': 'defaultLowInputLatency',
'default_low_output_latency': 'defaultLowOutputLatency',
'default_high_input_latency': 'defaultHighInputLatency',
'default_high_output_latency': 'defaultHighOutputLatency',
'default_samplerate': 'defaultSampleRate',
}

def __init__(self, format=np.float32):
if not sd._initialized:
sd._initialize()
self.format = format
self.dtype = format
if format in [np.int16, "int16"]:
self.range = 32767
elif format in [np.float32, "float32"]:
self.range = 1.0
else:
raise AttributeError(f"Aserver: currently unsupported pyaudio format {self.format}")

def get_device_count(self):
return len(sd.query_devices())

def get_device_info_by_index(self, idx):
return translate_dict_keys(sd.query_devices(idx), self.translate_dict)

def get_default_input_device_info(self):
in_idx, _ = sd.default.device
return self.get_device_info_by_index(in_idx)

def get_default_output_device_info(self):
_, out_idx = sd.default.device
return self.get_device_info_by_index(out_idx)

def open(self, rate, channels, input_flag, output_flag, frames_per_buffer,
input_device_index=None, output_device_index=None, start=True,
input_host_api_specific_stream_info=None, output_host_api_specific_stream_info=None,
stream_callback=None):
if not input_flag and not output_flag:
raise ValueError("Input flag and output flag were both False!")
kwargs = dict(
samplerate=rate,
blocksize=frames_per_buffer,
device=(input_device_index, output_device_index),
channels=channels,
dtype=self.dtype,
extra_settings=(
input_host_api_specific_stream_info,
output_host_api_specific_stream_info
),
callback=self.make_callback(stream_callback, input_flag, output_flag)
)
if input_flag and output_flag:
stream = sd.Stream(**kwargs)
elif input_flag:
stream = sd.InputStream(**kwargs)
else:
stream = sd.OutputStream(**kwargs)
if start:
stream.start()
time.sleep(self._boot_delay) # give stream some time to be opened completely
return SoundDeviceStream(stream)

def process_buffer(self, buffer):
return buffer

def terminate(self):
sd._terminate()

def make_callback(self, server_callback, input_flag, output_flag):
if input_flag and output_flag:
def new_callback(indata, outdata, frames, time, status):
data = server_callback(indata, frames, time, status)
outdata[:len(data)] = data
return new_callback
if input_flag and not output_flag: # output flag false
return server_callback
# input flag false, output flag true
indata = np.array([])
def new_callback(outdata, frames, time, status):
data = server_callback(indata, frames, time, status)
outdata[:len(data)] = data
return new_callback



class SoundDeviceStream(StreamBase):
def __init__(self, sd_stream: sd.Stream):
self.sd_stream = sd_stream
def is_active(self):
return self.sd_stream.active

def start_stream(self):
self.sd_stream.start()

def stop_stream(self):
self.sd_stream.stop()

def close(self):
self.sd_stream.close()
7 changes: 6 additions & 1 deletion pya/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ def terminate(self):
@abstractmethod
def process_buffer(self, *args, **kwargs):
raise NotImplementedError


def get_devices(self) -> list[dict]:
return [
self.get_device_info_by_index(i)
for i in range(self.get_device_count())
]

class StreamBase(ABC):

Expand Down
101 changes: 54 additions & 47 deletions pya/gui/aservergui.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,122 +10,107 @@
class AserverGUI:

def __init__(self):
self.audio_devices = device_info(verbose=False)
self.index = None
self.input_flag = False
self.blocksize = 1024
self.sr = 44100
self.index = determine_backend().get_default_output_device_info()["index"]

self._init_views()
self._update_values()
self._update_views()

self.show() # render GUI

def _init_views(self):
# audio device selector dropdown
self.device_selector = widgets.Dropdown(
options=[
d["name"] for d in self.audio_devices if d["maxOutputChannels"] > 0
],
description="AOut Device:",
disabled=False,
value=self.audio_devices[self.index]["name"],
disabled=False
)

def on_pyagui_device_selection_change(change):
self.index = [
a["index"]
for a in self.audio_devices
if a["name"] == change["new"] and a["maxOutputChannels"] > 0
][0]
self.sr = int(self.audio_devices[self.index]["defaultSampleRate"])
self.sr_wdg.set_state({"value": self.sr})

self.sr_wdg.value = self.sr
self.device_selector.observe(on_pyagui_device_selection_change, names="value")

# input_flag selector
self.input_flag_wdg = widgets.Checkbox(
value=False, # Initialzustand
description="AudioIn",
indent=False, # Entfernt den zusätzlichen Einzug
layout=widgets.Layout(width="70px"),
)
self.input_flag = self.input_flag_wdg.value

def on_input_flag_wdg_change(change):
self.input_flag = change["new"]

self.input_flag_wdg.observe(on_input_flag_wdg_change, names="value")

# sr selector
self.sr_wdg = widgets.IntText(
description="sr:",
value=self.sr,
layout=widgets.Layout(width="160px"),
)

def on_sr_wdg_value_change(change):
self.sr = change["new"]

self.sr_wdg.observe(on_sr_wdg_value_change, names="value")

# blocksize selector
self.blocksize_wdg = widgets.IntText(
description="bs:",
value=self.blocksize,
layout=widgets.Layout(width="160px"),
)

def on_bs_wdg_value_change(change):
self.blocksize = change["new"]

self.blocksize_wdg.observe(on_bs_wdg_value_change, names="value")

# reboot button
self.reboot_button = widgets.Button(
description="(re)boot", layout={"width": "75px"}
)

def on_reboot_button_click(change):
# TODO: change to Aserver.default
global s
if "s" in globals() and isinstance(s, Aserver):
s.shutdown_default_server()
s = startup(
sr=int(self.sr),
device=self.index,
input_flag=self.input_flag,
bs=self.blocksize,
)

# Save selected and default names before reload
prev_name = self.backend.get_device_info_by_index(self.index)["name"]
prev_default_name = self.backend.get_default_output_device_info()["name"]

Aserver.shutdown_default_server()
self.backend = determine_backend()
new_default_name = self.backend.get_default_output_device_info()["name"]
device_names = [d["name"] for d in self.backend.get_devices()]

# Reset device choice if previous device disconnected or default device changed
if prev_name not in device_names or prev_default_name != new_default_name:
self.index = None
else:
# Previously selected device might have changed its index after reload
self.index = device_names.index(prev_name)
# Launch new server
self._update_values()
self._update_views()
self.reboot_button.on_click(on_reboot_button_click)

# test tone button
self.test_tone_button = widgets.Button(
description="test tone", layout={"width": "70px"}
)

def on_test_tone_button_click(change):
from pya.agen.lib import SinOsc, Line

(SinOsc(800) * Line(0.1, 0, 0.2)).dup(2).play()

self.test_tone_button.on_click(on_test_tone_button_click)

# scope button
self.scope_button = widgets.Button(
description="ScopeGUI", layout={"width": "80px"}
)

def on_scope_button_click(change):
# TODO: change to Aserver.default
global s
if "s" in globals() and isinstance(s, Aserver):
s.scope_gui()

Aserver.default.scope_gui()
self.scope_button.on_click(on_scope_button_click)

# stop button
def on_pyagui_stop_button_click(b):
# TODO: change to Aserver.default
if "s" in globals():
s.stop()
else:
print("no pya server.")

Aserver.default.stop()
self.stop_button = widgets.Button(
description="Stop",
tooltip="Stop all scheduled events on AServer",
Expand All @@ -151,7 +136,29 @@ def on_pyagui_stop_button_click(b):
width="100%",
),
)
self.show() # render GUI

def _update_views(self):
# audio device selector dropdown
self.device_selector.options = [
d["name"] for d in self.audio_devices if d["maxOutputChannels"] > 0
]
self.device_selector.value = self.audio_devices[self.index]["name"]
# Input flag checkbox
self.input_flag_wdg.value = self.input_flag
# Sample rate input
self.sr_wdg.value = self.sr
# Block size input
self.blocksize_wdg.value = self.blocksize
def _update_values(self):
self.backend = startup(
sr=int(self.sr),
device=self.index,
input_flag=self.input_flag,
bs=self.blocksize
).backend
self.audio_devices = self.backend.get_devices()
if self.index is None:
self.index = self.backend.get_default_output_device_info()["index"]

def show(self):
display(self.all_widgets)
Expand Down
11 changes: 10 additions & 1 deletion pya/helper/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pya.backend.base import BackendBase
from pya.backend.PyAudio import PyAudioBackend
from pya.backend.Jupyter import JupyterBackend
from pya.backend.SoundDevice import SoundDeviceBackend


def get_server_info():
Expand All @@ -28,6 +29,14 @@ def get_server_info():
return None


def try_sounddevice_backend(**kwargs) -> Optional["SoundDeviceBackend"]:
try:
from pya.backend.SoundDevice import SoundDeviceBackend
return SoundDeviceBackend(**kwargs)
except ImportError:
return None


def try_pyaudio_backend(**kwargs) -> Optional["PyAudioBackend"]:
try:
from pya.backend.PyAudio import PyAudioBackend
Expand Down Expand Up @@ -70,7 +79,7 @@ def determine_backend(force_webaudio=False, port=8765, **kwargs) -> "BackendBase
RuntimeError
if no Backend is available
"""
backend = None if force_webaudio else try_pyaudio_backend(**kwargs)
backend = None if force_webaudio else try_sounddevice_backend(**kwargs) or try_pyaudio_backend(**kwargs)
if backend is None:
backend = try_jupyter_backend(port=port, **kwargs)
if backend is None:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pyamapping
scipy>=1.7.3
matplotlib>=3.5.3
soundfile>=0.13.0
sounddevice