diff --git a/.devcontainer/.env b/.devcontainer/.env new file mode 100644 index 0000000..4ef9382 --- /dev/null +++ b/.devcontainer/.env @@ -0,0 +1,4 @@ +http_proxy=http://10.232.14.15:8118 +https_proxy=http://10.232.14.15:8118 +HF_ENDPOINT=https://hf-mirror.com +HF_TOKEN= \ No newline at end of file diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..a3e9b2c --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,36 @@ +FROM ghcr.io/shoppal-ai/llm-base + +RUN apt update && apt install -y zsh curl git sudo wget libsndfile1 ffmpeg + +# # install python environment +# RUN apt install -y python3.8-dev python3-pip +# RUN update-alternatives --install /usr/bin/pthon python /usr/bin/python3.10 1 +RUN apt install -y python3.10-venv + +ARG USERNAME=vscode +ARG USER_UID=1000 +ARG USER_GID=$USER_UID + +RUN groupadd --gid $USER_GID $USERNAME \ + && useradd --uid $USER_UID --gid $USER_GID -m $USERNAME + +RUN usermod -aG sudo $USERNAME +RUN echo 'vscode ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers + +USER $USERNAME + +RUN cd ~ && wget https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh && sh install.sh + +RUN python3 -m venv ~/venv +RUN echo "source ~/venv/bin/activate" >> ~/.zshrc + +# Set Python path in the virtual environment. +RUN echo "export PYTHONPATH=\$PYTHONPATH:/workspace" >> ~/.zshrc +RUN /bin/zsh ~/.zshrc + +# Setup HF_ENDPOINT and PYANNOTE_CACHE +ENV HF_ENDPOINT=https://hf-mirror.com +ENV PYANNOTE_CACHE=/data0/cache/pyannote/ + +ENV DEBIAN_FRONTEND=dialog + diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..e52b57f --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,56 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/postgres +{ + "name": "VoiceStreamAI Demo", + "dockerComposeFile": "docker-compose.yml", + "service": "app", + "workspaceMount": "source=~/VoiceStreamAI,target=/workspace,type=bind,consistency=cached", + "workspaceFolder": "/workspace", + "customizations": { + "vscode": { + "settings": { + "terminal.integrated.defaultProfile.linux": "zsh", + "terminal.integrated.profiles.linux": { + "zsh": { + "path": "/bin/zsh" + } + } + }, + "extensions": [ + "GitHub.copilot", + "GitHub.copilot-labs", + "GitHub.vscode-pull-request-github", + "ms-python.python", + "ms-python.vscode-pylance", + "ms-python.pylint", + "ms-python.isort", + "ms-python.black-formatter", + "matangover.mypy", + "ms-toolsai.jupyter", + "ms-toolsai.jupyter-keymap", + "ms-toolsai.vscode-jupyter-slideshow", + "eamodio.gitlens", + "github.vscode-github-actions" + ] + } + }, + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // This can be used to network with other containers or the host. + // "forwardPorts": [8080], + + // Run Args to use GPU + // "runArgs": ["--gpus", "all"], + // Use 'postCreateCommand' to run commands after the container is created. + "postCreateCommand": "./.devcontainer/postCreateCommand.sh" + + // Configure tool-specific properties. + // "customizations": {}, + + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root", +} + diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 0000000..5fc7fe7 --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3.8' + +services: + app: + build: + context: . + args: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + dockerfile: Dockerfile + volumes: + - ~/VoiceStreamAI:/workspace:cached + - /data0:/data0:cached + user: vscode + command: sleep infinity + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] + environment: + - http_proxy=${http_proxy} + - https_proxy=${https_proxy} \ No newline at end of file diff --git a/.devcontainer/postCreateCommand.sh b/.devcontainer/postCreateCommand.sh new file mode 100755 index 0000000..7109689 --- /dev/null +++ b/.devcontainer/postCreateCommand.sh @@ -0,0 +1,6 @@ +#!/bin/bash +source /home/vscode/venv/bin/activate +pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple +pip install -r requirements.txt +pip install -r requirements-dev.txt +sudo chown vscode:vscode /workspace \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4da0aa2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.DS_STORE +SHOPPAL_README.md +audio_files/* +__pycache__/ +*.ipynb +.ipynb_checkpoints \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..cad3105 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,9 @@ +ffmpeg +pyannote.core +pyannote.audio +torchvision +transformers +websockets +jupyter +datasets +openai \ No newline at end of file diff --git a/server.py b/server.py index 6f3a7b0..1737bdf 100644 --- a/server.py +++ b/server.py @@ -1,10 +1,3 @@ -""" -VoiceStreamAI Server: Real-time audio transcription using self-hosted Whisper and WebSocket - -Contributors: -- Alessandro Saccoia - alessandro.saccoia@gmail.com -""" - import asyncio import websockets import uuid @@ -12,173 +5,175 @@ import wave import os import time +import torch import logging - +import sys +import time +import random from transformers import pipeline from pyannote.core import Segment from pyannote.audio import Model from pyannote.audio.pipelines import VoiceActivityDetection +from utils.log import configure_logging +import numpy as np +import io +from utils.llm import chat +import soundfile as sf -HOST = 'localhost' -PORT = 8765 +logger = configure_logging() + + +HOST = '0.0.0.0' +PORT = os.environ.get("SERVER_PORT", random.randint(10000, 11000)) SAMPLING_RATE = 16000 AUDIO_CHANNELS = 1 -SAMPLES_WIDTH = 2 # int16 -DEBUG = True -VAD_AUTH_TOKEN = "FILL ME" # get your key here -> https://huggingface.co/pyannote/segmentation +SAMPLES_WIDTH = 2 # int16 +VAD_AUTH_TOKEN = os.environ.get( + "HF_TOKEN" +) # get your key here -> https://huggingface.co/pyannote/segmentation DEFAULT_CLIENT_CONFIG = { - "language" : None, # multilingual - "chunk_length_seconds" : 5, - "chunk_offset_seconds" : 1 + "language": None, # multilingual + "chunk_length_seconds": 5, + "chunk_offset_seconds": 1, } - -audio_dir = "audio_files" -os.makedirs(audio_dir, exist_ok=True) +device = torch.device("cuda", 1) ## ---------- INSTANTIATES VAD -------- model = Model.from_pretrained("pyannote/segmentation", use_auth_token=VAD_AUTH_TOKEN) -vad_pipeline = VoiceActivityDetection(segmentation=model) -vad_pipeline.instantiate({"onset": 0.5, "offset": 0.5, "min_duration_on": 0.3, "min_duration_off": 0.3}) +vad_pipeline = VoiceActivityDetection(segmentation=model, device=device) +vad_pipeline.instantiate( + {"onset": 0.5, "offset": 0.5, "min_duration_on": 0.3, "min_duration_off": 0.3} +) ## ---------- INSTANTIATES SPEECH -------- -recognition_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-large-v3") +# recognition_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-large-v3") +recognition_pipeline = pipeline( + "automatic-speech-recognition", model="openai/whisper-large-v2", device=device +) connected_clients = {} client_buffers = {} client_temp_buffers = {} client_configs = {} -# Counter for each client to keep track of file numbers -file_counters = {} - - - -async def transcribe_and_send(client_id, websocket, new_audio_data): - global file_counters - - if DEBUG: print(f"Client ID {client_id}: new_audio_data length in seconds at transcribe_and_send: {float(len(new_audio_data)) / float(SAMPLING_RATE * SAMPLES_WIDTH)}") - - # Initialize temporary buffer for new clients - if client_id not in client_temp_buffers: - client_temp_buffers[client_id] = bytearray() - - if DEBUG: print(f"Client ID {client_id}: client_temp_buffers[client_id] length in seconds at transcribe_and_send: {float(len(client_temp_buffers[client_id])) / float(SAMPLING_RATE * SAMPLES_WIDTH)}") - - # Add new audio data to the temporary buffer - old_audio_data = bytes(client_temp_buffers[client_id]) - - if DEBUG: print(f"Client ID {client_id}: old_audio_data length in seconds at transcribe_and_send: {float(len(old_audio_data)) / float(SAMPLING_RATE * SAMPLES_WIDTH)}") - +recv_time = {} +file_count = 0 - audio_data = old_audio_data + new_audio_data - - if DEBUG: print(f"Client ID {client_id}: audio_data length in seconds at transcribe_and_send: {float(len(audio_data)) / float(SAMPLING_RATE * SAMPLES_WIDTH)}") - - # Initialize file counter for new clients - if client_id not in file_counters: - file_counters[client_id] = 0 - - # File path - file_name = f"{audio_dir}/{client_id}_{file_counters[client_id]}.wav" - - if DEBUG: print(f"Client ID {client_id}: Filename : {file_name}") - - file_counters[client_id] += 1 +async def transcribe_and_send(client_id, websocket): + global file_count + if client_id in client_temp_buffers: + client_temp_buffers[client_id] = client_temp_buffers[client_id] + client_buffers[client_id] + else: + client_temp_buffers[client_id] = client_buffers[client_id] - # Save the audio data - with wave.open(file_name, 'wb') as wav_file: - wav_file.setnchannels(AUDIO_CHANNELS) - wav_file.setsampwidth(SAMPLES_WIDTH) - wav_file.setframerate(SAMPLING_RATE) - wav_file.writeframes(audio_data) + cur_data = client_temp_buffers[client_id] + duration = float(len(cur_data)) / (SAMPLES_WIDTH * SAMPLING_RATE) - # Measure VAD time + # vad inference + numpy_audio = np.frombuffer(cur_data, dtype=np.int16) + tensor_audio = torch.tensor(numpy_audio, dtype=torch.float32).view(1, -1) start_time_vad = time.time() - result = vad_pipeline(file_name) + vad_result = vad_pipeline({"waveform":tensor_audio, "sample_rate":SAMPLING_RATE}) vad_time = time.time() - start_time_vad + logger.info(f"Client ID {client_id}: VAD infer time:{vad_time:.2f}, VAD segments: {len(vad_result)}, current audio length: {duration:.2f}s") - # Logging after VAD - if DEBUG: print(f"Client ID {client_id}: VAD result segments count: {len(result)}") - print(f"Client ID {client_id}: VAD inference time: {vad_time:.2f}") - - if len(result) == 0: # this should happen just if there's no old audio data - os.remove(file_name) - client_temp_buffers[client_id].clear() + if len(vad_result) == 0: + logger.info("drop this segment due to no voice activity found") + client_temp_buffers[client_id]= bytearray() return + end = 0 + for segment in vad_result.itersegments(): + # if segment.start - end > client_configs[client_id]['chunk_offset_seconds']: + # # ASR pipeline + # cut_point = int(end * (SAMPLES_WIDTH * SAMPLING_RATE)) + # cur_numpy = np.frombuffer(cur_data[:cut_point], dtype=np.int16) + # asr_result = recognition_pipeline(cur_numpy) + # client_buffers[client_id] = client_buffers[client_id][cut_point:] + # if asr_result["text"]: + # question = asr_result['text'] + # answer = chat(question) + # await websocket.send(f"Q: {question} A: {answer}") + # return + # else: + end = segment.end + if duration - end > client_configs[client_id]['chunk_offset_seconds']: + cut_point = int(end * SAMPLING_RATE) * SAMPLES_WIDTH + logger.info(f"buffer size: {len(cur_data)}, cut_point: {cut_point}") + cur_numpy = np.frombuffer(cur_data[:cut_point], dtype=np.int16) + asr_result = recognition_pipeline({"sampling_rate":16000, "raw":cur_numpy}) + client_temp_buffers[client_id] = cur_data[cut_point:] + if asr_result["text"]: + file_count += 1 + question = asr_result['text'] + f"...|{time.time()-recv_time[client_id]:.3f}s|" + await websocket.send(question) + file_name = os.path.join('audio_files', f"{question}_{file_count}.wav") + with wave.open(file_name, 'wb') as wav_file: + wav_file.setnchannels(AUDIO_CHANNELS) + wav_file.setsampwidth(SAMPLES_WIDTH) + wav_file.setframerate(SAMPLING_RATE) + wav_file.writeframes(cur_data[:cut_point]) + answer = chat(asr_result['text']) + f"...|{time.time()-recv_time[client_id]:.3f}s|" + await websocket.send(answer) + return - - # Get last recognized segment - last_segment = None - for segment in result.itersegments(): - last_segment = segment - if DEBUG: print(f"Client ID {client_id}: VAD last Segment end : {last_segment.end}") - - # if the voice ends before chunk_offset_seconds process it all - if last_segment.end < (len(audio_data) / (SAMPLES_WIDTH * SAMPLING_RATE)) - int(client_configs[client_id]['chunk_offset_seconds']): - start_time_transcription = time.time() - - if client_configs[client_id]['language'] is not None: - result = recognition_pipeline(file_name, generate_kwargs={"language": client_configs[client_id]['language']}) - else: - result = recognition_pipeline(file_name) - - transcription_time = time.time() - start_time_transcription - if DEBUG: print(f"Transcription Time: {transcription_time:.2f} seconds") - - print(f"Client ID {client_id}: Transcribed : {result['text']}") - - if result['text']: - await websocket.send(result['text']) - client_temp_buffers[client_id].clear() # Clear temp buffer after processing - else: - client_temp_buffers[client_id].clear() - client_temp_buffers[client_id].extend(audio_data) - if DEBUG: print(f"Skipping because {last_segment.end} falls after {(len(audio_data) / (SAMPLES_WIDTH * SAMPLING_RATE)) - int(client_configs[client_id]['chunk_offset_seconds'])}") - - os.remove(file_name) # in the end always delete the created file async def receive_audio(websocket, path): + global recv_time + logger.info(f"websocket type: {websocket}") client_id = str(uuid.uuid4()) connected_clients[client_id] = websocket client_buffers[client_id] = bytearray() client_configs[client_id] = DEFAULT_CLIENT_CONFIG - - print(f"Client {client_id} connected") + + logger.info(f"Client {client_id} connected") try: async for message in websocket: if isinstance(message, bytes): client_buffers[client_id].extend(message) + recv_time[client_id] = time.time() elif isinstance(message, str): - config = json.loads(message) - if config.get('type') == 'config': - client_configs[client_id] = config['data'] - print(f"Config for {client_id}: {client_configs[client_id]}") - continue + # config = json.loads(message) + # if config.get("type") == "config": + # client_configs[client_id] = config["data"] + # logger.info(f"Config for {client_id}: {client_configs[client_id]}") + continue else: - print(f"Unexpected message type from {client_id}") + logger.info(f"Unexpected message type from {client_id}") # Process audio when enough data is received - if len(client_buffers[client_id]) > int(client_configs[client_id]['chunk_length_seconds']) * SAMPLING_RATE * SAMPLES_WIDTH: - if DEBUG: print(f"Client ID {client_id}: receive_audio calling transcribe_and_send with length: {len(client_buffers[client_id])}") - await transcribe_and_send(client_id, websocket, client_buffers[client_id]) + config_buf_size = ( + float(client_configs[client_id]["chunk_length_seconds"]) + * SAMPLING_RATE + * SAMPLES_WIDTH + ) + if len(client_buffers[client_id]) > config_buf_size: + logger.info( + f"Client ID {client_id}: receive_audio calling transcribe_and_send with length: {len(client_buffers[client_id])}, max length: {config_buf_size}" + ) + await transcribe_and_send( + client_id, websocket + ) client_buffers[client_id].clear() + except websockets.ConnectionClosed as e: - print(f"Connection with {client_id} closed: {e}") + logger.info(f"Connection with {client_id} closed: {e}") finally: del connected_clients[client_id] del client_buffers[client_id] + async def main(): async with websockets.serve(receive_audio, HOST, PORT): - print(f"WebSocket server started on ws://{HOST}:{PORT}") + logger.info(f"WebSocket server started on ws://{HOST}:{PORT}") await asyncio.Future() + if __name__ == "__main__": asyncio.run(main()) diff --git a/test.py b/test.py new file mode 100644 index 0000000..b32718f --- /dev/null +++ b/test.py @@ -0,0 +1,44 @@ +import openai + +import os + +api_key = "fake_key" +api_base = "http://vllm:8000/v1/" +client = openai.Client(api_key=api_key, base_url=api_base) + +model_name = "/data0/model_output/shoppal-test/dreampal" + +def predict(message, history, system_prompt): + history_openai_format = [] + history_openai_format.append({"role": "system", "content": system_prompt}) + for human, assistant in history: + history_openai_format.append({"role": "user", "content": human }) + history_openai_format.append({"role": "assistant", "content":assistant}) + history_openai_format.append({"role": "user", "content": message}) + + response = client.chat.completions.create( + model=model_name, + messages= history_openai_format, + # response_format={ "type": "json_object" }, + stream=True + ) + + partial_message = "" + for chunk in response: + if chunk.choices[0].delta.content and len(chunk.choices[0].delta.content) != 0: + partial_message = partial_message + chunk.choices[0].delta.content + yield partial_message + +system_prompt = """ +You are now a dream interpretation expert. Please analyze the description of the dream that I input. +""" + +response = client.chat.completions.create( + model=model_name, + messages= [{"role": "system", "content": system_prompt}, + {"role": "user", "content": "hello" }], + #sresponse_format={ "type": "json_object" }, + #stream=False + ) + +#print(response.choices[0].message) \ No newline at end of file diff --git a/utils/llm.py b/utils/llm.py new file mode 100644 index 0000000..441075f --- /dev/null +++ b/utils/llm.py @@ -0,0 +1,20 @@ +import openai +import os + + +client = openai.Client(api_key="fake_key", base_url="http://10.232.14.16:8000/v1/") + +def chat(text): + response = client.chat.completions.create( + model="/data0/models/huggingface/meta-llama/Llama-2-7b-chat-hf/", + messages= [{"role": "system", "content": "you are a usefull agent and try to answer each question within 15 words"}, + {"role": "user", "content": text }], + # response_format={ "type": "json_object" }, + #stream=False + ) + + return response.choices[0].message.content + +if __name__ == '__main__': + ret = chat("hello") + print(ret) \ No newline at end of file diff --git a/utils/log.py b/utils/log.py new file mode 100644 index 0000000..4260590 --- /dev/null +++ b/utils/log.py @@ -0,0 +1,36 @@ +import logging + +def configure_logging(): + # Create a logger + logger = logging.getLogger(__name__) + + # Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + logger.setLevel(logging.INFO) + + # Create a formatter + formatter = logging.Formatter('%(asctime)s - %(filename)s:%(lineno)s - %(levelname)s - %(message)s') + + # Create a console handler and set the formatter + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + # Add the console handler to the logger + logger.addHandler(console_handler) + + # Optionally, add a file handler to log to a file + # file_handler = logging.FileHandler('logfile.log') + # file_handler.setFormatter(formatter) + # logger.addHandler(file_handler) + + return logger + +if __name__ == '__main__': + # Configure logging + logger = configure_logging() + + # Example log messages + logger.debug('This is a debug message') + logger.info('This is an info message') + logger.warning('This is a warning message') + logger.error('This is an error message') + logger.critical('This is a critical message')