diff --git a/chimerapy/pipelines/__init__.py b/chimerapy/pipelines/__init__.py index 5147277..c85a860 100644 --- a/chimerapy/pipelines/__init__.py +++ b/chimerapy/pipelines/__init__.py @@ -22,6 +22,7 @@ def register_nodes_metadata(): "chimerapy.pipelines.yolov8.multi_vid_pose:YoloV8Node", "chimerapy.pipelines.yolov8.multi_save:MultiSaveNode", "chimerapy.pipelines.yolov8.display:DisplayNode", + "chimerapy.pipelines.speech_to_text.deepgram_node:DeepgramNode", ], } diff --git a/chimerapy/pipelines/speech_to_text/__init__.py b/chimerapy/pipelines/speech_to_text/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chimerapy/pipelines/speech_to_text/deepgram_node.py b/chimerapy/pipelines/speech_to_text/deepgram_node.py new file mode 100644 index 0000000..386a8e1 --- /dev/null +++ b/chimerapy/pipelines/speech_to_text/deepgram_node.py @@ -0,0 +1,97 @@ +from typing import Any, Dict, Optional + +from deepgram import Deepgram + +import chimerapy.engine as cpe +from chimerapy.orchestrator import step_node + + +@step_node(name="CPPipelines_DeepgramNode") +class DeepgramNode(cpe.Node): + """A node which transcribes live audio using Deepgram. + + Parameters + ---------- + api_key : str + The Deepgram API key + name : str, optional (default: "DeepgramNode") + The name of the node + chunk_key : str, optional (default: "audio_chunk") + The key of the audio chunk in the data chunk + deepgram_options : Dict[str, Any], optional (default: None) + Options to pass to the Deepgram client(deepgram.transcription.live) + """ + + def __init__( + self, + api_key: str, + name: str = "DeepgramNode", + chunk_key: str = "audio_chunk", + deepgram_options: Optional[Dict[str, Any]] = None, + ): + super().__init__(name=name) + self.api_key = api_key + self.deepgram_client: Optional[Deepgram] = None + self.transcribers = {} + self.chunk_key = chunk_key + self.deepgram_options = deepgram_options or {} + + async def setup(self) -> None: + """Setup the Deepgram client.""" + self.deepgram_client = Deepgram(self.api_key) + + async def step( + self, data_chunks: Dict[str, cpe.DataChunk] + ) -> cpe.DataChunk: + """Transcribe the audio chunks.""" + for name, data_chunk in data_chunks.items(): + await self._create_transcription(name) + + transcriber = self.transcribers[name] + audio_chunk = data_chunk.get(self.chunk_key)["value"] + transcriber.send(audio_chunk) + + async def _create_transcription(self, name) -> None: + """Create a transcription for the given name.""" + if name not in self.transcribers: + try: + self.transcribers[ + name + ] = await self.deepgram_client.transcription.live( + self.deepgram_options + ) + except Exception as e: + self.logger.error( + f"Failed to create transcription for {name}: {e}" + ) + return + + transcriber = self.transcribers[name] + transcriber.registerHandler( + transcriber.event.CLOSE, + lambda c: print(f"Connection closed with code {c}."), + ) + transcriber.registerHandler( + transcriber.event.ERROR, lambda e: print(f"Error: {e}") + ) + transcriber.registerHandler( + transcriber.event.TRANSCRIPT_RECEIVED, + lambda t: self._save_transcript(name, t), + ) + self.logger.info(f"Created transcription for {name}") + + def _save_transcript(self, name, response) -> None: + """Save the transcript to a csv file.""" + transcript_data = { + "transcript": response["channel"]["alternatives"][0]["transcript"], + "conf": response["channel"]["alternatives"][0]["confidence"], + "start": response["start"], + "end": response["start"] + response["duration"], + } + self.save_tabular(name, transcript_data) + self.save_json(f"{name}-deepgram-responses", response) + + async def teardown(self) -> None: + """Finish all transcriptions.""" + for transcriber in self.transcribers.values(): + await transcriber.finish() diff --git a/configs/speech_to_text/stt_deepgram.json b/configs/speech_to_text/stt_deepgram.json new file mode 100644 index 0000000..037ccc5 --- /dev/null +++ b/configs/speech_to_text/stt_deepgram.json @@ -0,0 +1,62 @@ +{ + "mode": "record", + "workers": { + "manager_ip": "129.59.104.153", + "manager_port": 9001, + "instances": [ + { + "name": "local", + "id": "local", + "description": "local worker for the MMLA pipeline for Speech to Text with deepgram" + } + ] + }, + "nodes": [ + { + "registry_name": "CPPipelines_AudioNode", + "name": "local-audio", + "kwargs": { + "backend": "pvrecorder", + "input_device_id": 3, + "audio_format": "INT16", + "sample_rate": "RATE_44100", + "chunk_size": "CHUNK_512", + "save_name": "local-audio", + "chunk_key": "audio_chunk" + }, + "package": "chimerapy-pipelines" + }, + { + "registry_name": "CPPipelines_DeepgramNode", + "name": "stt-deepgram", + "kwargs": { + "name": "stt-deepgram", + "api_key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "chunk_key": "audio_chunk", + "deepgram_options": { + "encoding": "linear16", + "channels": 1, + "sample_rate": 16000, + "language": "en-US" + } + }, + "package": "chimerapy-pipelines" + } + ], + "adj": [ + [ + "local-audio", + "stt-deepgram" + ] + ], + "manager_config": { + "logdir": "cp-logs", + "port": 9001 + }, + "mappings": { + "local": [ + "local-audio", + "stt-deepgram" + ] + } +} diff --git a/pyproject.toml b/pyproject.toml index c056fde..adb2d28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,8 @@ dependencies = [ 'chimerapy-orchestrator', 'chimerapy-engine', 'pyaudio', - 'pvrecorder' + 'pvrecorder', + 'deepgram-sdk' ] [project.optional-dependencies]