Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chimerapy/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def register_nodes_metadata():
"chimerapy.pipelines.yolov8.multi_vid_pose:YoloV8Node",
"chimerapy.pipelines.yolov8.multi_save:MultiSaveNode",
"chimerapy.pipelines.yolov8.display:DisplayNode",
"chimerapy.pipelines.speech_to_text.deepgram_node:DeepgramNode",
],
}

Expand Down
Empty file.
97 changes: 97 additions & 0 deletions chimerapy/pipelines/speech_to_text/deepgram_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import Any, Dict, Optional

from deepgram import Deepgram

import chimerapy.engine as cpe
from chimerapy.orchestrator import step_node


@step_node(name="CPPipelines_DeepgramNode")
class DeepgramNode(cpe.Node):
"""A node which transcribes live audio using Deepgram.

Parameters
----------
api_key : str
The Deepgram API key
name : str, optional (default: "DeepgramNode")
The name of the node
chunk_key : str, optional (default: "audio_chunk")
The key of the audio chunk in the data chunk
deepgram_options : Dict[str, Any], optional (default: None)
Options to pass to the Deepgram client(deepgram.transcription.live)
"""

def __init__(
self,
api_key: str,
name: str = "DeepgramNode",
chunk_key: str = "audio_chunk",
deepgram_options: Optional[Dict[str, Any]] = None,
):
super().__init__(name=name)
self.api_key = api_key
self.deepgram_client: Optional[Deepgram] = None
self.transcribers = {}
self.chunk_key = chunk_key
self.deepgram_options = deepgram_options or {}

async def setup(self) -> None:
"""Setup the Deepgram client."""
self.deepgram_client = Deepgram(self.api_key)

async def step(
self, data_chunks: Dict[str, cpe.DataChunk]
) -> cpe.DataChunk:
"""Transcribe the audio chunks."""
for name, data_chunk in data_chunks.items():
await self._create_transcription(name)

transcriber = self.transcribers[name]
audio_chunk = data_chunk.get(self.chunk_key)["value"]
transcriber.send(audio_chunk)

async def _create_transcription(self, name) -> None:
"""Create a transcription for the given name."""
if name not in self.transcribers:
try:
self.transcribers[
name
] = await self.deepgram_client.transcription.live(
self.deepgram_options
)
except Exception as e:
self.logger.error(
f"Failed to create transcription for {name}: {e}"
)
return

transcriber = self.transcribers[name]
transcriber.registerHandler(
transcriber.event.CLOSE,
lambda c: print(f"Connection closed with code {c}."),
)
transcriber.registerHandler(
transcriber.event.ERROR, lambda e: print(f"Error: {e}")
)
transcriber.registerHandler(
transcriber.event.TRANSCRIPT_RECEIVED,
lambda t: self._save_transcript(name, t),
)
self.logger.info(f"Created transcription for {name}")

def _save_transcript(self, name, response) -> None:
"""Save the transcript to a csv file."""
transcript_data = {
"transcript": response["channel"]["alternatives"][0]["transcript"],
"conf": response["channel"]["alternatives"][0]["confidence"],
"start": response["start"],
"end": response["start"] + response["duration"],
}
self.save_tabular(name, transcript_data)
self.save_json(f"{name}-deepgram-responses", response)

async def teardown(self) -> None:
"""Finish all transcriptions."""
for transcriber in self.transcribers.values():
await transcriber.finish()
62 changes: 62 additions & 0 deletions configs/speech_to_text/stt_deepgram.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"mode": "record",
"workers": {
"manager_ip": "129.59.104.153",
"manager_port": 9001,
"instances": [
{
"name": "local",
"id": "local",
"description": "local worker for the MMLA pipeline for Speech to Text with deepgram"
}
]
},
"nodes": [
{
"registry_name": "CPPipelines_AudioNode",
"name": "local-audio",
"kwargs": {
"backend": "pvrecorder",
"input_device_id": 3,
"audio_format": "INT16",
"sample_rate": "RATE_44100",
"chunk_size": "CHUNK_512",
"save_name": "local-audio",
"chunk_key": "audio_chunk"
},
"package": "chimerapy-pipelines"
},
{
"registry_name": "CPPipelines_DeepgramNode",
"name": "stt-deepgram",
"kwargs": {
"name": "stt-deepgram",
"api_key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"chunk_key": "audio_chunk",
"deepgram_options": {
"encoding": "linear16",
"channels": 1,
"sample_rate": 16000,
"language": "en-US"
}
},
"package": "chimerapy-pipelines"
}
],
"adj": [
[
"local-audio",
"stt-deepgram"
]
],
"manager_config": {
"logdir": "cp-logs",
"port": 9001
},
"mappings": {
"local": [
"local-audio",
"stt-deepgram"
]
}
}
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ dependencies = [
'chimerapy-orchestrator',
'chimerapy-engine',
'pyaudio',
'pvrecorder'
'pvrecorder',
'deepgram-sdk'
]

[project.optional-dependencies]
Expand Down