diff --git a/AGENTS.md b/AGENTS.md index ec087c0..dcf24e0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,7 +6,48 @@ - Use the linux kernel guidelines for commenting insofar as they are applicable to Go (e.g. avoid stating the obvious) - Use `any` instead of `interface{}` and in general use modern Go +- Make liberal use of line breaks; don't try to stuff structs onto one line # Error handling - Wrap errors when passing them back up the stack +- Do not silently ignore errors +- Be defensive with external I/O +- Do not be defensive with internal state (e.g. nil pointers, empty strings) + - It is the responsibility of the caller to make sure internal objects and parameters are initialized before calling a function that uses them. + - Let it crash or panic. + +# Editing + +- Do the simplest thing that works, but no simpler +- Do not mix task categories + - If asked to make a logic change, do not reorganise or refactor code + - If asked to refactor code, do not make logic or functionality changes + +# Documentation + +- Ensure that new command line options and env vars are documented in the CLI help and the README.md. + +# Building and linting + +- Build with `go build -o voxinput .` +- Lint with `go vet .` +- After updates to verstion.txt, the Nix flake or go.mod build with `nix build .` + - If vendor hash errors are found, use `fakeHash` then `nix build .` to get the correct hash + +# Running + +- Use the private scripts in the bringup/ directory +- Suggest creating bringing scripts if they do not exist already +- Example of a bringup script for realtime transcription +```sh +#!/bin/sh +export OPENAI_BASE_URL=http://localai-host:8081/v1 OPENAI_WS_BASE_URL=ws://localai-host:8081/v1/realtime VOXINPUT_TRANSCRIPTION_MODEL=whisper-large-turbo +export VOXINPUT_PROMPT="VoxInput LocalAI ROS2 LLM NixOS struct env var" +./voxinput listen +``` +- Example of a bringup script for transcribing from a monitor device +```sh +#!/bin/sh +OPENAI_BASE_URL=http://ledbx:8081/v1 OPENAI_WS_BASE_URL=ws://ledbx:8081/v1/realtime VOXINPUT_TRANSCRIPTION_MODEL=whisper-large-turbo VOXINPUT_CAPTURE_DEVICE="Monitor of iFi (by AMR) HD USB Audio Analog Stereo" ./voxinput listen --output-file /tmp/transcript.txt +``` diff --git a/README.md b/README.md index 619261b..e7606e3 100644 --- a/README.md +++ b/README.md @@ -69,12 +69,17 @@ Unless you don't mind running VoxInput as root, then you also need to ensure the - `OPENAI_BASE_URL` or `VOXINPUT_BASE_URL`: The base URL of the OpenAI compatible API server: defaults to `http://localhost:8080/v1` - `VOXINPUT_LANG` or `LANG`: Language code for transcription (defaults to empty). - `VOXINPUT_TRANSCRIPTION_MODEL`: Transcription model (default: `whisper-1`). +- `VOXINPUT_ASSISTANT_MODEL`: Assistant model (default: `none`). +- `VOXINPUT_ASSISTANT_VOICE`: Assistant voice (default: `alloy`). - `VOXINPUT_TRANSCRIPTION_TIMEOUT`: Timeout duration (default: `30s`). - `VOXINPUT_SHOW_STATUS`: Show GUI notifications (`yes`/`no`, default: `yes`). - `VOXINPUT_CAPTURE_DEVICE`: Specific audio capture device name (run `voxinput devices` to list). - `VOXINPUT_OUTPUT_FILE`: Path to save the transcribed text to a file instead of typing it with dotool. +- `VOXINPUT_MODE`: Realtime mode (transcription|assistant, default: transcription). - `XDG_RUNTIME_DIR` or `VOXINPUT_RUNTIME_DIR`: Used for the PID and state files, defaults to `/run/voxinput` if niether are present +**Warning**: Assistant mode is WIP and you may need a particular version of LocalAI's realtime API to run it because I am developing both in lockstep. Eventually though it should be compatible with at least OpenAI or LocalAI. + ### Commands - **`listen`**: Start speech to text daemon. @@ -83,6 +88,7 @@ Unless you don't mind running VoxInput as root, then you also need to ensure the - `--no-show-status`: Don't show when recording has started or stopped. - `--output-file `: Save transcript to file instead of typing. - `--prompt `: Text used to condition model output. Could be previously transcribed text or uncommon words you expect to use + - `--mode `: Realtime mode (default: transcription) ```bash ./voxinput listen @@ -226,7 +232,10 @@ The realtime mode has a UI to display various actions being taken by VoxInput. H - [x] GUI and system tray - [x] Voice detection and activation (partial, see below) - [ ] Code words to start and stop transcription -- [ ] Allow user to describe a button they want to press (requires submitting screen shot and transcription to LocalAGI) +- [ ] Assistant mode + - [x] Voice conversations with an LLM + - [ ] Submit desktop images to a VLM to allow it to click on items + - [ ] Use tool calls or MCP to allow the VLM/LLM to perform actions ## Signals diff --git a/flake.nix b/flake.nix index fc8d23d..cc3c687 100644 --- a/flake.nix +++ b/flake.nix @@ -46,7 +46,8 @@ # Path to the source code src = ./.; - vendorHash = "sha256-+67Ajh+Jy5+mpYQCiUXDG5EKg72YtW0v9IUuswkmUXM="; #nixpkgs.lib.fakeHash; + vendorHash = "sha256-UH4oOSSl1Xq3aVzI+N97UrsYq/MN2M/ehud8RFHNoAg="; + # vendorHash = nixpkgs.lib.fakeHash; nativeBuildInputs = with pkgs; [ makeWrapper diff --git a/go.mod b/go.mod index fa82198..71997cc 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,11 @@ go 1.24.2 require ( fyne.io/fyne/v2 v2.7.1 - github.com/WqyJh/go-openai-realtime v0.6.0 + github.com/WqyJh/go-openai-realtime v0.6.1 github.com/gen2brain/malgo v0.11.24 github.com/sashabaranov/go-openai v1.41.2 ) -replace github.com/WqyJh/go-openai-realtime v0.6.0 => github.com/richiejp/go-openai-realtime v0.6.1-fix-created-event - require ( fyne.io/systray v1.11.1-0.20250603113521-ca66a66d8b58 // indirect github.com/BurntSushi/toml v1.5.0 // indirect diff --git a/go.sum b/go.sum index 5f48ed4..ea3f3ce 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ fyne.io/systray v1.11.1-0.20250603113521-ca66a66d8b58 h1:eA5/u2XRd8OUkoMqEv3IBlF fyne.io/systray v1.11.1-0.20250603113521-ca66a66d8b58/go.mod h1:RVwqP9nYMo7h5zViCBHri2FgjXF7H2cub7MAq4NSoLs= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/WqyJh/go-openai-realtime v0.6.1 h1:QhWRrbx9ZeixN/kVH/5VyEgjoMei7xisHqw7ybbef2E= +github.com/WqyJh/go-openai-realtime v0.6.1/go.mod h1:BCN7J7AUbfSFkLLVnhGWF2OkvoQ7GqTWrU/w+d+QwR4= github.com/WqyJh/jsontools v0.3.1 h1:zKT+DvxUSTji06ZcjsbQzZ48PycFZDI0OGATmmFhJ+U= github.com/WqyJh/jsontools v0.3.1/go.mod h1:Gk2OlyXjAJmYNZ0aUbEXGHq4I5ihGRjXxVuUprWtkss= github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= @@ -61,8 +63,6 @@ github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/richiejp/go-openai-realtime v0.6.1-fix-created-event h1:Q9cdsuGl6cWnQ8ceFLNYBcUe/C1ZX/6dJ10qttlrZbk= -github.com/richiejp/go-openai-realtime v0.6.1-fix-created-event/go.mod h1:BCN7J7AUbfSFkLLVnhGWF2OkvoQ7GqTWrU/w+d+QwR4= github.com/rymdport/portal v0.4.2 h1:7jKRSemwlTyVHHrTGgQg7gmNPJs88xkbKcIL3NlcmSU= github.com/rymdport/portal v0.4.2/go.mod h1:kFF4jslnJ8pD5uCi17brj/ODlfIidOxlgUDTO5ncnC4= github.com/sashabaranov/go-openai v1.41.2 h1:vfPRBZNMpnqu8ELsclWcAvF19lDNgh1t6TVfFFOPiSM= diff --git a/internal/audio/audio.go b/internal/audio/audio.go index aa44735..1af33d0 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -17,7 +17,6 @@ type StreamConfig struct { Format malgo.FormatType Channels int SampleRate int - DeviceType malgo.DeviceType MalgoContext malgo.Context CaptureDeviceID *malgo.DeviceID } @@ -35,10 +34,7 @@ func (config StreamConfig) asDeviceConfig(deviceType malgo.DeviceType) malgo.Dev if config.SampleRate != 0 { deviceConfig.SampleRate = uint32(config.SampleRate) } - if config.DeviceType != 0 { - deviceConfig.DeviceType = config.DeviceType - } - if config.CaptureDeviceID != nil { + if config.CaptureDeviceID != nil && (deviceType == malgo.Capture || deviceType == malgo.Duplex) { deviceConfig.Capture.DeviceID = config.CaptureDeviceID.Pointer() } return deviceConfig @@ -64,8 +60,8 @@ func (c *StreamConfig) SetCaptureDeviceByName(mctx *malgo.Context, name string) return false, nil } -func stream(ctx context.Context, abortChan chan error, config StreamConfig, deviceCallbacks malgo.DeviceCallbacks) error { - deviceConfig := config.asDeviceConfig(malgo.Capture) +func stream(ctx context.Context, abortChan chan error, config StreamConfig, deviceType malgo.DeviceType, deviceCallbacks malgo.DeviceCallbacks) error { + deviceConfig := config.asDeviceConfig(deviceType) device, err := malgo.InitDevice(config.MalgoContext, deviceConfig, deviceCallbacks) if err != nil { return err @@ -121,12 +117,10 @@ func ListCaptureDevices() error { } // Capture records incoming samples into the provided writer. -// The function initializes a capture device in the default context using -// provide stream configuration. -// Capturing will commence writing the samples to the writer until either the -// writer returns an error, or the context signals done. +// The function initializes a capture device in the default context using the +// provided stream configuration. +// XXX: Capture, Duplex and Playback are mutually exclusive, only use one at a time func Capture(ctx context.Context, w io.Writer, config StreamConfig) error { - config.DeviceType = malgo.Capture abortChan := make(chan error) defer close(abortChan) aborted := false @@ -137,24 +131,25 @@ func Capture(ctx context.Context, w io.Writer, config StreamConfig) error { return } - _, err := w.Write(inputSamples) - if err != nil { - aborted = true - abortChan <- err + if len(inputSamples) > 0 { + _, err := w.Write(inputSamples) + if err != nil { + aborted = true + abortChan <- err + } } }, } - return stream(ctx, abortChan, config, deviceCallbacks) + return stream(ctx, abortChan, config, malgo.Capture, deviceCallbacks) } -// Playback streams samples from a reader to the sound device. -// The function initializes a playback device in the default context using -// provide stream configuration. -// Playback will commence playing the samples provided from the reader until either the -// reader returns an error, or the context signals done. -func Playback(ctx context.Context, r io.Reader, config StreamConfig) error { - config.DeviceType = malgo.Playback +// Duplex streams audio from a reader to the playback device and captures audio +// from the capture device to a writer. +// It initializes a duplex device in the default context using the provided stream configuration. +// It expects both r and w to be non-nil. +// XXX: Capture, Duplex and Playback are mutually exclusive, only use one at a time +func Duplex(ctx context.Context, r io.Reader, w io.Writer, config StreamConfig) error { abortChan := make(chan error) defer close(abortChan) aborted := false @@ -164,20 +159,74 @@ func Playback(ctx context.Context, r io.Reader, config StreamConfig) error { if aborted { return } - if frameCount == 0 { - return + + if len(inputSamples) > 0 { + _, err := w.Write(inputSamples) + if err != nil { + aborted = true + abortChan <- err + return + } } - read, err := io.ReadFull(r, outputSamples) - if read <= 0 { + if len(outputSamples) > 0 { + if frameCount == 0 { + return + } + + read, err := r.Read(outputSamples) if err != nil { + if err == io.EOF { + for i := read; i < len(outputSamples); i++ { + outputSamples[i] = 0 + } + aborted = true + abortChan <- io.EOF + return + } aborted = true abortChan <- err + return } + } + }, + } + + return stream(ctx, abortChan, config, malgo.Duplex, deviceCallbacks) +} + +// Playback streams samples from the provided reader to the playback device. +// The function initializes a playback device in the default context using the +// provided stream configuration. +// XXX: Capture, Duplex and Playback are mutually exclusive, only use one at a time +func Playback(ctx context.Context, r io.Reader, config StreamConfig) error { + abortChan := make(chan error) + defer close(abortChan) + aborted := false + + deviceCallbacks := malgo.DeviceCallbacks{ + Data: func(outputSamples, inputSamples []byte, frameCount uint32) { + if aborted { return } + + if len(outputSamples) > 0 { + if frameCount == 0 { + return + } + + read, err := r.Read(outputSamples) + if err != nil { + aborted = true + abortChan <- err + return + } + for i := read; i < len(outputSamples); i++ { + outputSamples[i] = 0 + } + } }, } - return stream(ctx, abortChan, config, deviceCallbacks) + return stream(ctx, abortChan, config, malgo.Playback, deviceCallbacks) } diff --git a/internal/gui/gui.go b/internal/gui/gui.go index d460054..3063892 100644 --- a/internal/gui/gui.go +++ b/internal/gui/gui.go @@ -20,14 +20,17 @@ type Msg interface { type ShowListeningMsg struct{} type ShowSpeechDetectedMsg struct{} type ShowTranscribingMsg struct{} +type ShowGeneratingResponseMsg struct{} type HideMsg struct{} type ShowStoppingMsg struct{} func (m *ShowListeningMsg) IsMsg() bool { return true } func (m *ShowSpeechDetectedMsg) IsMsg() bool { return true } func (m *ShowTranscribingMsg) IsMsg() bool { return true } -func (m *HideMsg) IsMsg() bool { return true } -func (m *ShowStoppingMsg) IsMsg() bool { return true } + +func (m *ShowGeneratingResponseMsg) IsMsg() bool { return true } +func (m *HideMsg) IsMsg() bool { return true } +func (m *ShowStoppingMsg) IsMsg() bool { return true } type GUI struct { a fyne.App @@ -77,6 +80,10 @@ func New(ctx context.Context, showStatus string) *GUI { if showStatus != "" { ui.showStatus("Transcribing...", theme.FileTextIcon()) } + case *ShowGeneratingResponseMsg: + if showStatus != "" { + ui.showStatus("Generating response...", theme.FileAudioIcon()) + } case *HideMsg: if ui.cancelTimer != nil { ui.cancelTimer() diff --git a/listen.go b/listen.go index ee38ee7..b996a3f 100644 --- a/listen.go +++ b/listen.go @@ -41,7 +41,7 @@ func newChunkWriter(ctx context.Context, ready chan<- (*bytes.Buffer)) *chunkWri func (rbw *chunkWriter) Write(p []byte) (n int, err error) { now := time.Now() - if now.Sub(rbw.lastSend) >= 500*time.Millisecond { + if now.Sub(rbw.lastSend) >= 250*time.Millisecond { select { case rbw.ready <- rbw.current: break @@ -55,6 +55,50 @@ func (rbw *chunkWriter) Write(p []byte) (n int, err error) { return rbw.current.Write(p) } +type chunkReader struct { + ctx context.Context + chunks <-chan *bytes.Buffer + current *bytes.Buffer +} + +func newChunkReader(ctx context.Context, chunks <-chan *bytes.Buffer) *chunkReader { + return &chunkReader{ + ctx: ctx, + chunks: chunks, + } +} + +func (cr *chunkReader) Read(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + + n := 0 + for len(p) > 0 { + if cr.current == nil || cr.current.Len() == 0 { + select { + case buf := <-cr.chunks: + cr.current = buf + default: + return n, nil + } + } + if cr.current == nil { + return n, nil + } + + nn, err := cr.current.Read(p) + n += nn + p = p[nn:] + if err == io.EOF { + cr.current = nil + continue + } + return n, nil + } + return n, nil +} + func getPrefixedEnv(prefixes []string, name string, fallback string) (val string) { for _, p := range prefixes { var n string @@ -81,19 +125,19 @@ func waitForSessionUpdated(ctx context.Context, conn *openairt.Conn) error { if err != nil { var permanent *openairt.PermanentError if errors.As(err, &permanent) { - log.Println("main: Connection failed: ", err) + log.Println("waitForSessionUpdated: Connection failed: ", err) return err } - log.Println("main: error receiving message, retrying: ", err) + log.Println("waitForSessionUpdated: error receiving message, retrying: ", err) time.Sleep(250 * time.Millisecond) continue } - log.Println("main: received message of type: ", msg.ServerEventType()) + log.Println("waitForSessionUpdated: received message of type: ", msg.ServerEventType()) switch msg.ServerEventType() { case openairt.ServerEventTypeError: - log.Println("main: Server error: ", msg.(openairt.ErrorEvent).Error.Message) + log.Println("waitForSessionUpdated: Server error: ", msg.(openairt.ErrorEvent).Error.Message) case openairt.ServerEventTypeConversationCreated: case openairt.ServerEventTypeSessionCreated: fallthrough @@ -114,17 +158,313 @@ func waitForSessionUpdated(ctx context.Context, conn *openairt.Conn) error { } type ListenConfig struct { - PIDPath string - APIKey string - HTTPAPIBase string - WSAPIBase string - Lang string - Model string - Timeout time.Duration - UI *gui.GUI - CaptureDevice string - OutputFile string - Prompt string + PIDPath string + APIKey string + HTTPAPIBase string + WSAPIBase string + Lang string + Model string + Timeout time.Duration + UI *gui.GUI + CaptureDevice string + OutputFile string + Prompt string + Mode string + AssistantModel string + AssistantVoice string +} + +type Listener struct { + ctx context.Context + cancel context.CancelFunc + conn *openairt.Conn + errCh chan error + audioChunks chan *bytes.Buffer + chunkWriter *chunkWriter + config ListenConfig + streamConfig audio.StreamConfig + rtCli *openairt.Client + statePath string + audioPlayChunks chan *bytes.Buffer + playReader *chunkReader +} + +func NewListener(config ListenConfig, streamConfig audio.StreamConfig, rtCli *openairt.Client, statePath string) *Listener { + ctx, cancel := context.WithCancel(context.Background()) + l := &Listener{ + ctx: ctx, + cancel: cancel, + config: config, + streamConfig: streamConfig, + rtCli: rtCli, + statePath: statePath, + errCh: make(chan error, 1), + audioChunks: make(chan *bytes.Buffer, 10), + } + l.chunkWriter = newChunkWriter(l.ctx, l.audioChunks) + l.audioPlayChunks = make(chan *bytes.Buffer, 10) + l.playReader = newChunkReader(l.ctx, l.audioPlayChunks) + + return l +} + +func (l *Listener) Start() error { + initCtx, finishInit := context.WithTimeout(l.ctx, l.config.Timeout) + opts := []openairt.ConnectOption{openairt.WithIntent()} + if l.config.Mode == "assistant" && l.config.AssistantModel != "" { + opts = append(opts, openairt.WithModel(l.config.AssistantModel)) + } + conn, err := l.rtCli.Connect(initCtx, opts...) + if err != nil { + log.Println("Listener.Start: realtime connect: ", err) + finishInit() + return err + } + l.conn = conn + log.Println("Listener.Start: Connected to realtime API, waiting for session.created event...") + if err := waitForSessionUpdated(initCtx, l.conn); err != nil { + finishInit() + return err + } + if l.config.Mode == "assistant" { + voice := openairt.VoiceAlloy + if l.config.AssistantVoice != "" { + voice = openairt.Voice(l.config.AssistantVoice) + } + err = l.conn.SendMessage(initCtx, openairt.SessionUpdateEvent{ + EventBase: openairt.EventBase{ + EventID: "Initial update", + }, + Session: openairt.ClientSession{ + Modalities: []openairt.Modality{"text", "audio"}, + Instructions: l.config.Prompt, + Voice: voice, + InputAudioFormat: openairt.AudioFormatPcm16, + OutputAudioFormat: openairt.AudioFormatPcm16, + TurnDetection: &openairt.ClientTurnDetection{ + Type: openairt.ClientTurnDetectionTypeServerVad, + }, + }, + }) + } else { + err = l.conn.SendMessage(initCtx, openairt.TranscriptionSessionUpdateEvent{ + EventBase: openairt.EventBase{ + EventID: "Initial update", + }, + Session: openairt.ClientTranscriptionSession{ + InputAudioTranscription: &openairt.InputAudioTranscription{ + Model: l.config.Model, + Language: l.config.Lang, + Prompt: l.config.Prompt, + }, + TurnDetection: &openairt.ClientTurnDetection{ + Type: openairt.ClientTurnDetectionTypeServerVad, + }, + }, + }) + } + if err != nil { + log.Println("Listener.Start: error sending initial update: ", err) + finishInit() + return err + } + if err := waitForSessionUpdated(initCtx, l.conn); err != nil { + finishInit() + return err + } + finishInit() + log.Println("Listener.Start: Record/Transcribe...") + if err := pid.WriteState(l.statePath, true); err != nil { + log.Println("Listener.Start: failed to write recording state: ", err) + } + l.config.UI.Chan <- &gui.ShowListeningMsg{} + + return nil +} + +func (l *Listener) RunAudio() { + if l.config.Mode == "assistant" { + if err := audio.Duplex(l.ctx, l.playReader, l.chunkWriter, l.streamConfig); err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) { + return + } + l.errCh <- fmt.Errorf("audio duplex: %w", err) + l.cancel() + } + return + } + + if err := audio.Capture(l.ctx, l.chunkWriter, l.streamConfig); err != nil { + if errors.Is(err, context.Canceled) { + return + } + l.errCh <- fmt.Errorf("audio capture: %w", err) + l.cancel() + } +} + +func (l *Listener) SendChunks() { + for { + var cur *bytes.Buffer + select { + case cur = <-l.audioChunks: + case <-l.ctx.Done(): + return + } + log.Printf("Listener.SendChunks: transcribing, %d\n", cur.Len()) + if cur.Len() < 1 { + continue + } + if err := l.conn.SendMessage(l.ctx, openairt.InputAudioBufferAppendEvent{ + EventBase: openairt.EventBase{ + EventID: "TODO", + }, + Audio: base64.StdEncoding.EncodeToString(cur.Bytes()), + }); err != nil { + var permanent *openairt.PermanentError + if errors.As(err, &permanent) { + l.errCh <- fmt.Errorf("Listener.SendChunks: connection failed: %w", err) + l.cancel() + return + } + log.Println("Listener.SendChunks: error sending message: ", err) + continue + } + } +} + +func (l *Listener) ReceiveTranscriptionMessages() { + for { + msg, err := l.conn.ReadMessage(l.ctx) + if err != nil { + var permanent *openairt.PermanentError + if errors.As(err, &permanent) { + log.Println("Listener.ReceiveTranscriptionMessages: Connection failed: ", err) + l.cancel() + return + } + log.Println("Listener.ReceiveTranscriptionMessages: error receiving message, retrying: ", err) + continue + } + log.Println("Listener.ReceiveTranscriptionMessages: receiving message: ", msg.ServerEventType()) + var text string + switch msg.ServerEventType() { + case openairt.ServerEventTypeInputAudioBufferSpeechStarted: + l.config.UI.Chan <- &gui.ShowSpeechDetectedMsg{} + case openairt.ServerEventTypeInputAudioBufferSpeechStopped: + l.config.UI.Chan <- &gui.ShowTranscribingMsg{} + case openairt.ServerEventTypeResponseAudioTranscriptDone: + text = msg.(openairt.ResponseAudioTranscriptDoneEvent).Transcript + case openairt.ServerEventTypeConversationItemInputAudioTranscriptionCompleted: + text = msg.(openairt.ConversationItemInputAudioTranscriptionCompletedEvent).Transcript + case openairt.ServerEventTypeError: + log.Println("Listener.ReceiveTranscriptionMessages: server error: ", msg.(openairt.ErrorEvent).Error.Message) + continue + default: + continue + } + if text == "" { + continue + } + l.config.UI.Chan <- &gui.HideMsg{} + log.Println("Listener.ReceiveTranscriptionMessages: received transcribed text: ", text) + if l.config.OutputFile != "" { + f, err := os.OpenFile(l.config.OutputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Failed to open output file %s: %v\n", l.config.OutputFile, err) + continue + } + if _, err := fmt.Fprintln(f, text); err != nil { + log.Printf("Failed to write to output file: %v\n", err) + } + if err := f.Close(); err != nil { + log.Printf("Failed to close output file: %v\n", err) + } + continue + } + dotool := exec.CommandContext(l.ctx, "dotool") + stdin, err := dotool.StdinPipe() + if err != nil { + l.errCh <- fmt.Errorf("dotool stdin pipe: %w", err) + l.cancel() + return + } + dotool.Stderr = os.Stderr + if err := dotool.Start(); err != nil { + l.errCh <- fmt.Errorf("dotool start: %w", err) + l.cancel() + return + } + _, err = io.WriteString(stdin, fmt.Sprintf("type %s ", text)) + if err != nil { + l.errCh <- fmt.Errorf("dotool stdin WriteString: %w", err) + l.cancel() + return + } + if err := stdin.Close(); err != nil { + l.errCh <- fmt.Errorf("close dotool stdin: %w", err) + l.cancel() + return + } + if err := dotool.Wait(); err != nil { + if errors.Is(err, context.Canceled) { + return + } + l.errCh <- fmt.Errorf("dotool wait: %w", err) + l.cancel() + return + } + } +} + +func (l *Listener) ReceiveAssistantMessages() { + for { + msg, err := l.conn.ReadMessage(l.ctx) + if err != nil { + var permanent *openairt.PermanentError + if errors.As(err, &permanent) { + log.Println("Listener.ReceiveAssistantMessages: Connection failed: ", err) + l.cancel() + return + } + log.Println("Listener.ReceiveAssistantMessages: error receiving message, retrying: ", err) + continue + } + log.Println("Listener.ReceiveAssistantMessages: receiving message: ", msg.ServerEventType()) + switch msg.ServerEventType() { + case openairt.ServerEventTypeInputAudioBufferSpeechStarted: + l.config.UI.Chan <- &gui.ShowSpeechDetectedMsg{} + case openairt.ServerEventTypeInputAudioBufferSpeechStopped: + l.config.UI.Chan <- &gui.ShowGeneratingResponseMsg{} + case openairt.ServerEventTypeResponseAudioDelta: + delta := msg.(openairt.ResponseAudioDeltaEvent) + b, err := base64.StdEncoding.DecodeString(delta.Delta) + if err != nil { + log.Println("Listener.ReceiveAssistantMessages: error decoding audio delta: ", err) + continue + } + select { + case l.audioPlayChunks <- bytes.NewBuffer(b): + default: + log.Println("Listener.ReceiveAssistantMessages: dropped audio chunk") + } + case openairt.ServerEventTypeError: + log.Println("Listener.ReceiveAssistantMessages: server error: ", msg.(openairt.ErrorEvent).Error.Message) + continue + default: + continue + } + + } +} + +func (l *Listener) Stop() { + log.Println("Listener.Stop: finished transcribing") + l.conn.Close() + l.cancel() + if err := pid.WriteState(l.statePath, false); err != nil { + log.Println("Listener.Stop: failed to write idle state: ", err) + } } func listen(config ListenConfig) { @@ -132,7 +472,7 @@ func listen(config ListenConfig) { log.Print("internal/audio: ", message) }) if err != nil { - log.Fatalln("main: ", err) + log.Fatalln("listen: ", err) } defer func() { _ = mctx.Uninit() @@ -171,279 +511,91 @@ func listen(config ListenConfig) { statePath, err := pid.StatePath() if err != nil { - log.Fatalln("main: failed to get state file path: ", err) + log.Fatalln("listen: failed to get state file path: ", err) } err = pid.Write(config.PIDPath) defer func() { if err := os.Remove(config.PIDPath); err != nil { - log.Println("main: failed to remove PID file: ", err) + log.Println("listen: failed to remove PID file: ", err) } if err := os.Remove(statePath); err != nil && !os.IsNotExist(err) { - log.Println("main: failed to remove state file: ", err) + log.Println("listen: failed to remove state file: ", err) } }() if err := pid.WriteState(statePath, false); err != nil { - log.Println("main: failed to write initial state: ", err) + log.Println("listen: failed to write initial state: ", err) } -Listen: +ForListen: for { - log.Println("main: Waiting for record signal...") + log.Println("listen: Waiting for record signal...") - ctx, cancel := context.WithCancel(context.Background()) - for sig := range sigChan { + var sig os.Signal + for { + sig = <-sigChan switch sig { case syscall.SIGUSR1: + // start case syscall.SIGUSR2: - log.Println("main: Received stop/write signal, but wasn't recording") + log.Println("listen: Received stop/write signal, but wasn't recording") continue case syscall.SIGTERM: - cancel() - break Listen + break ForListen + } + if sig == syscall.SIGUSR1 { + break } - break } - initCtx, finishInit := context.WithTimeout(ctx, config.Timeout) - errCh := make(chan error, 1) - conn, err := rtCli.Connect(initCtx, openairt.WithIntent()) - if err != nil { - log.Println("main: realtime connect: ", err) - finishInit() - cancel() + l := NewListener(config, streamConfig, rtCli, statePath) + if err := l.Start(); err != nil { + l.cancel() continue } - log.Println("main: Connected to realtime API, waiting for session.created event...") - - // It's not required to wait for this, but the server may take time to startup - if err := waitForSessionUpdated(initCtx, conn); err != nil { - finishInit() - cancel() - break Listen - } - - err = conn.SendMessage(initCtx, openairt.TranscriptionSessionUpdateEvent{ - EventBase: openairt.EventBase{ - EventID: "Initial update", - }, - Session: openairt.ClientTranscriptionSession{ - InputAudioTranscription: &openairt.InputAudioTranscription{ - Model: config.Model, - Language: config.Lang, - Prompt: config.Prompt, - }, - TurnDetection: &openairt.ClientTurnDetection{ - Type: openairt.ClientTurnDetectionTypeServerVad, - }, - }, - }) - - if err := waitForSessionUpdated(initCtx, conn); err != nil { - finishInit() - cancel() - break Listen - } - - finishInit() - log.Println("main: Record/Transcribe...") - if err := pid.WriteState(statePath, true); err != nil { - log.Println("main: failed to write recording state: ", err) + go l.RunAudio() + go l.SendChunks() + if l.config.Mode == "assistant" { + go l.ReceiveAssistantMessages() + } else { + go l.ReceiveTranscriptionMessages() } - config.UI.Chan <- &gui.ShowListeningMsg{} - - audioChunks := make(chan (*bytes.Buffer), 10) - chunkWriter := newChunkWriter(ctx, audioChunks) - - go func() { - if err := audio.Capture(ctx, chunkWriter, streamConfig); err != nil { - if errors.Is(err, context.Canceled) { - return - } - errCh <- fmt.Errorf("audio capture: %w", err) - cancel() - } - }() - - go func() { - var headerBuf bytes.Buffer - - wavHeader := audio.NewWAVHeader(0) - - for { - headerBuf.Reset() - - var cur *bytes.Buffer - select { - // Received from chunkWriter - case cur = <-audioChunks: - case <-ctx.Done(): - return - } - - log.Printf("main: transcribing, %d\n", cur.Len()) - - wavHeader.ChunkSize = uint32(cur.Len()) - if err := wavHeader.Write(&headerBuf); err != nil { - errCh <- fmt.Errorf("write wav header: %w", err) - cancel() - return - } - - if err := conn.SendMessage(ctx, openairt.InputAudioBufferAppendEvent{ - EventBase: openairt.EventBase{ - EventID: "TODO", - }, - Audio: base64.StdEncoding.EncodeToString(cur.Bytes()), - }); err != nil { - var permanent *openairt.PermanentError - if errors.As(err, &permanent) { - errCh <- fmt.Errorf("main: connection failed: %w", err) - cancel() - break - } - log.Println("main: error sending message: ", err) - continue - } - } - }() - - go func() { - for { - msg, err := conn.ReadMessage(ctx) - if err != nil { - var permanent *openairt.PermanentError - if errors.As(err, &permanent) { - log.Println("main: Connection failed: ", err) - cancel() - break - } - log.Println("main: error receiving message, retrying: ", err) - continue - } - - log.Println("main: receiving message: ", msg.ServerEventType()) - - var text string - switch msg.ServerEventType() { - case openairt.ServerEventTypeInputAudioBufferSpeechStarted: - config.UI.Chan <- &gui.ShowSpeechDetectedMsg{} - case openairt.ServerEventTypeInputAudioBufferSpeechStopped: - config.UI.Chan <- &gui.ShowTranscribingMsg{} - case openairt.ServerEventTypeResponseAudioTranscriptDone: - text = msg.(openairt.ResponseAudioTranscriptDoneEvent).Transcript - case openairt.ServerEventTypeConversationItemInputAudioTranscriptionCompleted: - text = msg.(openairt.ConversationItemInputAudioTranscriptionCompletedEvent).Transcript - case openairt.ServerEventTypeError: - log.Println("main: server error: ", msg.(openairt.ErrorEvent).Error.Message) - continue - default: - continue - } - - if text == "" { - continue - } - - config.UI.Chan <- &gui.HideMsg{} - - log.Println("main: received transcribed text: ", text) - - if config.OutputFile != "" { - f, err := os.OpenFile(config.OutputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - log.Printf("Failed to open output file %s: %v\n", config.OutputFile, err) - continue - } - if _, err := fmt.Fprintln(f, text); err != nil { - log.Printf("Failed to write to output file: %v\n", err) - } - if err := f.Close(); err != nil { - log.Printf("Failed to close output file: %v\n", err) - } - continue - } - - dotool := exec.CommandContext(ctx, "dotool") - stdin, err := dotool.StdinPipe() - if err != nil { - errCh <- fmt.Errorf("dotool stdin pipe: %w", err) - cancel() - return - } - dotool.Stderr = os.Stderr - - if err := dotool.Start(); err != nil { - errCh <- fmt.Errorf("dotool stderr pipe: %w", err) - cancel() - return - } - - _, err = io.WriteString(stdin, fmt.Sprintf("type %s ", text)) - if err != nil { - errCh <- fmt.Errorf("dotool stdin WriteString: %w", err) - cancel() - return - } - - if err := stdin.Close(); err != nil { - errCh <- fmt.Errorf("close dotool stdin: %w", err) - cancel() - return - } - - if err := dotool.Wait(); err != nil { - if errors.Is(err, context.Canceled) { - return - } - errCh <- fmt.Errorf("dotool wait: %w", err) - cancel() - return - } - } - }() - + ForSignal: for { select { - case <-ctx.Done(): - case sig := <-sigChan: + case sig = <-sigChan: switch sig { case syscall.SIGUSR1: - log.Println("main: received record signal, but already recording") - continue + log.Println("listen: received record signal, but already recording") case syscall.SIGUSR2: // TODO: Do input_audio_buffer.commit and/or wait for final transcription? + break ForSignal case syscall.SIGTERM: - conn.Close() - cancel() - break Listen + l.config.UI.Chan <- &gui.ShowStoppingMsg{} + l.Stop() + break ForListen } + // We check this incase there was an error. Otherwise we would sit here waiting + // for a signal to stop listening when we have effectively already stopped listening + case <-l.ctx.Done(): + break ForSignal } - - break } - config.UI.Chan <- &gui.ShowStoppingMsg{} - log.Println("main: finished transcribing") - conn.Close() - cancel() - - // Set state back to idle - if err := pid.WriteState(statePath, false); err != nil { - log.Println("main: failed to write idle state: ", err) - } + l.config.UI.Chan <- &gui.ShowStoppingMsg{} + l.Stop() for { select { - case err := <-errCh: + case err := <-l.errCh: if err != nil && !errors.Is(err, context.Canceled) { - log.Fatalln("main: ", err) + log.Fatalln("listen: ", err) } default: - continue Listen + continue ForListen } } } diff --git a/main.go b/main.go index 99918b3..7d26772 100644 --- a/main.go +++ b/main.go @@ -43,6 +43,7 @@ func main() { --no-show-status don't show when recording has started or stopped --output-file Write transcribed text to file instead of keyboard --prompt Text used to condition model output. Could be previously transcribed text or uncommon words you expect to use + --mode (realtime only, default: transcription) record - Tell existing listener to start recording audio. In realtime mode it also begins transcription write - Tell existing listener to stop recording audio and begin transcription if not in realtime mode @@ -59,11 +60,14 @@ Environment variables: VOXINPUT_WS_BASE_URL or OPENAI_WS_BASE_URL - WebSocket API base URL (default: ws://localhost:8080/v1/realtime) VOXINPUT_LANG or LANG - Language code for transcription (default: none) VOXINPUT_TRANSCRIPTION_MODEL or TRANSCRIPTION_MODEL - Transcription model (default: whisper-1) + VOXINPUT_ASSISTANT_MODEL or ASSISTANT_MODEL - Assistant model (default: none) + VOXINPUT_ASSISTANT_VOICE or ASSISTANT_VOICE - Assistant voice (default: alloy) VOXINPUT_TRANSCRIPTION_TIMEOUT or TRANSCRIPTION_TIMEOUT - Transcription timeout (default: 30s) VOXINPUT_SHOW_STATUS or SHOW_STATUS - Show status notifications (yes/no, default: yes) VOXINPUT_CAPTURE_DEVICE - Name of the capture device (default: system default; use 'devices' to list) VOXINPUT_OUTPUT_FILE - File to write transcribed text to (instead of keyboard) VOXINPUT_PROMPT - Text used to condition model output. Could be previously transcribed text or uncommon words you expect to use + VOXINPUT_MODE - Realtime mode (transcription|assistant, default: transcription) XDG_RUNTIME_DIR - Directory for PID and state files (required, standard XDG variable)`) return case "ver": @@ -88,12 +92,16 @@ Environment variables: wsApiBase := getOpenaiEnv("WS_BASE_URL", "ws://localhost:8080/v1/realtime") lang := getPrefixedEnv([]string{"VOXINPUT", ""}, "LANG", "") model := getPrefixedEnv([]string{"VOXINPUT", ""}, "TRANSCRIPTION_MODEL", "whisper-1") + assistantModel := getPrefixedEnv([]string{"VOXINPUT", ""}, "ASSISTANT_MODEL", "") + assistantVoice := getPrefixedEnv([]string{"VOXINPUT", ""}, "ASSISTANT_VOICE", "") timeoutStr := getPrefixedEnv([]string{"VOXINPUT", ""}, "TRANSCRIPTION_TIMEOUT", "30s") showStatus := getPrefixedEnv([]string{"VOXINPUT", ""}, "SHOW_STATUS", "yes") captureDeviceName := getPrefixedEnv([]string{"VOXINPUT"}, "CAPTURE_DEVICE", "") prompt := getPrefixedEnv([]string{"VOXINPUT"}, "PROMPT", "") outputFile := getPrefixedEnv([]string{"VOXINPUT"}, "OUTPUT_FILE", "") + mode := getPrefixedEnv([]string{"VOXINPUT"}, "MODE", "transcription") + timeout, err := time.ParseDuration(timeoutStr) if err != nil { log.Println("main: failed to parse timeout", err) @@ -143,23 +151,38 @@ Environment variables: prompt = promptArg } + var modeArg string + for i := 2; i < len(os.Args); i++ { + arg := os.Args[i] + if arg == "--mode" && i+1 < len(os.Args) { + modeArg = os.Args[i+1] + break + } + } + if modeArg != "" { + mode = modeArg + } + if realtime { ctx, cancel := context.WithCancel(context.Background()) ui := gui.New(ctx, showStatus) go func() { listen(ListenConfig{ - PIDPath: pidPath, - APIKey: apiKey, - HTTPAPIBase: httpApiBase, - WSAPIBase: wsApiBase, - Lang: lang, - Model: model, - Timeout: timeout, - UI: ui, - CaptureDevice: captureDeviceName, - OutputFile: outputFile, - Prompt: prompt, + PIDPath: pidPath, + APIKey: apiKey, + HTTPAPIBase: httpApiBase, + WSAPIBase: wsApiBase, + Lang: lang, + Model: model, + Timeout: timeout, + UI: ui, + CaptureDevice: captureDeviceName, + OutputFile: outputFile, + Prompt: prompt, + Mode: mode, + AssistantModel: assistantModel, + AssistantVoice: assistantVoice, }) cancel() }() diff --git a/old.go b/old.go index da78b8a..4f72a61 100644 --- a/old.go +++ b/old.go @@ -98,7 +98,8 @@ Listen: errCh := make(chan error, 1) go func() { - if err := audio.Capture(ctx, &buf, streamConfig); err != nil { + err := audio.Capture(ctx, &buf, streamConfig) + if err != nil { errCh <- fmt.Errorf("audio capture: %w", err) } }()