From a5251b87df19c84b7e7d611f09e772b1206417ee Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 2 Dec 2025 16:23:29 -0500 Subject: [PATCH 1/5] [WIP] Add vllm Dynamo support --- .../ml/inference/vllm_inference.py | 38 ++++++++++++++++--- .../ml/inference/vllm_tests_requirements.txt | 1 + 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index bdbee9e51fd5..6462152629c3 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -109,13 +109,20 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI: class _VLLMModelServer(): - def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]): + def __init__( + self, + model_name: str, + vllm_server_kwargs: dict[str, str], + vllm_executable: Optional[str] = None): self._model_name = model_name self._vllm_server_kwargs = vllm_server_kwargs self._server_started = False self._server_process = None self._server_port: int = -1 self._server_process_lock = threading.RLock() + self._vllm_executable = 'vllm.entrypoints.openai.api_server' + if vllm_executable is not None: + self._vllm_executable = vllm_executable self.start_server() @@ -125,7 +132,7 @@ def start_server(self, retries=3): server_cmd = [ sys.executable, '-m', - 'vllm.entrypoints.openai.api_server', + self._vllm_executable, '--model', self._model_name, '--port', @@ -175,7 +182,8 @@ class VLLMCompletionsModelHandler(ModelHandler[str, def __init__( self, model_name: str, - vllm_server_kwargs: Optional[dict[str, str]] = None): + vllm_server_kwargs: Optional[dict[str, str]] = None, + use_dynamo: bool = False): """Implementation of the ModelHandler interface for vLLM using text as input. @@ -194,13 +202,22 @@ def __init__( `{'echo': 'true'}` to prepend new messages with the previous message. For a list of possible kwargs, see https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-completions-api + use_dynamo: Whether to use Nvidia Dynamo as the underlying vLLM engine. + Requires installing dynamo in your runtime environment + (`pip install ai-dynamo[vllm]`) """ self._model_name = model_name self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {} self._env_vars = {} + self._vllm_executable = None + if use_dynamo: + self._vllm_executable = 'dynamo.vllm' def load_model(self) -> _VLLMModelServer: - return _VLLMModelServer(self._model_name, self._vllm_server_kwargs) + return _VLLMModelServer( + self._model_name, + self._vllm_server_kwargs, + self._vllm_executable) async def _async_run_inference( self, @@ -253,7 +270,8 @@ def __init__( self, model_name: str, chat_template_path: Optional[str] = None, - vllm_server_kwargs: Optional[dict[str, str]] = None): + vllm_server_kwargs: Optional[dict[str, str]] = None, + use_dynamo: bool = False): """ Implementation of the ModelHandler interface for vLLM using previous messages as input. @@ -277,12 +295,17 @@ def __init__( `{'echo': 'true'}` to prepend new messages with the previous message. For a list of possible kwargs, see https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-chat-api + use_dynamo: Whether to use Nvidia Dynamo as the underlying vLLM engine. + Requires installing dynamo in your runtime environment + (`pip install ai-dynamo[vllm]`) """ self._model_name = model_name self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {} self._env_vars = {} self._chat_template_path = chat_template_path self._chat_file = f'template-{uuid.uuid4().hex}.jinja' + if use_dynamo: + self._vllm_executable = 'dynamo.vllm' def load_model(self) -> _VLLMModelServer: chat_template_contents = '' @@ -295,7 +318,10 @@ def load_model(self) -> _VLLMModelServer: f.write(chat_template_contents) self._vllm_server_kwargs['chat_template'] = local_chat_template_path - return _VLLMModelServer(self._model_name, self._vllm_server_kwargs) + return _VLLMModelServer( + self._model_name, + self._vllm_server_kwargs, + self._vllm_executable) async def _async_run_inference( self, diff --git a/sdks/python/apache_beam/ml/inference/vllm_tests_requirements.txt b/sdks/python/apache_beam/ml/inference/vllm_tests_requirements.txt index 0f8c6a6a673d..cd969734230f 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_tests_requirements.txt +++ b/sdks/python/apache_beam/ml/inference/vllm_tests_requirements.txt @@ -20,3 +20,4 @@ pillow>=8.0.0 transformers>=4.18.0 google-cloud-monitoring>=2.27.0 openai>=1.52.2 +ai-dynamo[vllm]>=0.1.1 From ef2d83eb5ec5c4369c5cd14fc0f6912be954e7ea Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 5 Feb 2026 10:21:13 -0500 Subject: [PATCH 2/5] Add integration test (currently failing) --- .../inference/vllm_text_completion.py | 29 +++++++++++++++++-- .../test_resources/vllm.dockerfile.old | 8 ++--- sdks/python/setup.py | 2 +- .../python/test-suites/dataflow/common.gradle | 4 +++ 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py index 2708c0f3d1a1..64c60dbddcb1 100644 --- a/sdks/python/apache_beam/examples/inference/vllm_text_completion.py +++ b/sdks/python/apache_beam/examples/inference/vllm_text_completion.py @@ -112,6 +112,20 @@ def parse_known_args(argv): required=False, default=None, help='Chat template to use for chat example.') + parser.add_argument( + '--vllm_server_kwargs', + dest='vllm_server_kwargs', + type=str, + required=False, + default=None, + help='VLLM server kwargs in format key1=value1,key2=value2') + parser.add_argument( + '--use_dynamo', + dest='use_dynamo', + type=bool, + required=False, + default=False, + help='Whether to use dynamo') return parser.parse_known_args(argv) @@ -132,13 +146,24 @@ def run( pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - model_handler = VLLMCompletionsModelHandler(model_name=known_args.model) + vllm_server_kwargs = {} + if known_args.vllm_server_kwargs: + for kv in known_args.vllm_server_kwargs.split(','): + k, v = kv.split('=') + vllm_server_kwargs[k] = v + + model_handler = VLLMCompletionsModelHandler( + model_name=known_args.model, + vllm_server_kwargs=vllm_server_kwargs, + use_dynamo=known_args.use_dynamo) input_examples = COMPLETION_EXAMPLES if known_args.chat: model_handler = VLLMChatModelHandler( model_name=known_args.model, - chat_template_path=known_args.chat_template) + chat_template_path=known_args.chat_template, + vllm_server_kwargs=vllm_server_kwargs, + use_dynamo=known_args.use_dynamo) input_examples = CHAT_EXAMPLES pipeline = test_pipeline diff --git a/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old b/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old index b9c99e49e02f..869d87290982 100644 --- a/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old +++ b/sdks/python/apache_beam/ml/inference/test_resources/vllm.dockerfile.old @@ -34,14 +34,14 @@ RUN python3 --version RUN apt-get install -y curl RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12 && pip install --upgrade pip -RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.58.1 -RUN pip install openai vllm +RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.71.0 +RUN pip install --no-cache-dir openai vllm ai-dynamo[vllm] RUN apt install libcairo2-dev pkg-config python3-dev -y -RUN pip install pycairo +RUN pip install --no-cache-dir pycairo # Copy the Apache Beam worker dependencies from the Beam Python 3.12 SDK image. -COPY --from=apache/beam_python3.12_sdk:2.58.1 /opt/apache/beam /opt/apache/beam +COPY --from=apache/beam_python3.12_sdk:2.71.0 /opt/apache/beam /opt/apache/beam # Set the entrypoint to Apache Beam SDK worker launcher. ENTRYPOINT [ "/opt/apache/beam/boot" ] \ No newline at end of file diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 96ba006a259c..83de4550b75f 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -609,7 +609,7 @@ def get_portability_package_data(): 'xgboost': ['xgboost>=1.6.0,<2.1.3', 'datatable==1.0.0'], 'tensorflow-hub': ['tensorflow-hub>=0.14.0,<0.16.0'], 'milvus': milvus_dependency, - 'vllm': ['openai==1.107.1', 'vllm==0.10.1.1', 'triton==3.3.1'] + 'vllm': ['openai==1.107.1', 'vllm==0.10.1.1', 'triton==3.3.1', 'ai-dynamo[vllm]==0.1.1'] }, zip_safe=False, # PyPI package information. diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 6a0777bd667c..919515f15662 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -478,6 +478,10 @@ def vllmTests = tasks.create("vllmTests") { executable 'sh' args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" } + exec { + executable 'sh' + args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --use_dynamo=T --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" + } } } From 85fd5b305e28fd18132f7ad72e796bbb69aa6f9e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 11 Feb 2026 10:33:57 -0500 Subject: [PATCH 3/5] Update commands to start server --- .../ml/inference/vllm_inference.py | 49 ++++++++++++++----- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index 6462152629c3..f35357095a2f 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -113,16 +113,15 @@ def __init__( self, model_name: str, vllm_server_kwargs: dict[str, str], - vllm_executable: Optional[str] = None): + use_dynamo: bool = False): self._model_name = model_name self._vllm_server_kwargs = vllm_server_kwargs self._server_started = False self._server_process = None + self._dynamo_process = None self._server_port: int = -1 self._server_process_lock = threading.RLock() - self._vllm_executable = 'vllm.entrypoints.openai.api_server' - if vllm_executable is not None: - self._vllm_executable = vllm_executable + self._use_dynamo = use_dynamo self.start_server() @@ -132,7 +131,7 @@ def start_server(self, retries=3): server_cmd = [ sys.executable, '-m', - self._vllm_executable, + 'vllm.entrypoints.openai.api_server', '--model', self._model_name, '--port', @@ -143,8 +142,35 @@ def start_server(self, retries=3): # Only add values for commands with value part. if v is not None: server_cmd.append(v) + if self._use_dynamo: + # Dynamo requires a different server command + server_cmd = [ + sys.executable, + '-m', + 'dynamo.frontend', + '--http-port', + '{{PORT}}', + ] self._server_process, self._server_port = start_process(server_cmd) + if self._use_dynamo: + # we need to independently start the openai server and run dynamo. + # We started the server above, so running dynamo is left. + # See https://github.com/ai-dynamo/dynamo?tab=readme-ov-file#run-dynamo + server_cmd = [ + sys.executable, + '-m', + 'dynamo.vllm', + '--model', + self._model_name, + ] + for k, v in self._vllm_server_kwargs.items(): + server_cmd.append(f'--{k}') + # Only add values for commands with value part. + if v is not None: + server_cmd.append(v) + self._dynamo_process, _ = start_process(server_cmd) + self.check_connectivity(retries) def get_server_port(self) -> int: @@ -154,7 +180,7 @@ def get_server_port(self) -> int: def check_connectivity(self, retries=3): with getVLLMClient(self._server_port) as client: - while self._server_process.poll() is None: + while self._server_process.poll() is None and (self._dynamo_process is None or self._dynamo_process.poll() is None): try: models = client.models.list().data logging.info('models: %s' % models) @@ -209,15 +235,13 @@ def __init__( self._model_name = model_name self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {} self._env_vars = {} - self._vllm_executable = None - if use_dynamo: - self._vllm_executable = 'dynamo.vllm' + self._use_dynamo = use_dynamo def load_model(self) -> _VLLMModelServer: return _VLLMModelServer( self._model_name, self._vllm_server_kwargs, - self._vllm_executable) + self._use_dynamo) async def _async_run_inference( self, @@ -304,8 +328,7 @@ def __init__( self._env_vars = {} self._chat_template_path = chat_template_path self._chat_file = f'template-{uuid.uuid4().hex}.jinja' - if use_dynamo: - self._vllm_executable = 'dynamo.vllm' + self._use_dynamo = use_dynamo def load_model(self) -> _VLLMModelServer: chat_template_contents = '' @@ -321,7 +344,7 @@ def load_model(self) -> _VLLMModelServer: return _VLLMModelServer( self._model_name, self._vllm_server_kwargs, - self._vllm_executable) + self._use_dynamo) async def _async_run_inference( self, From cbdad3a0ebea9d492adf12401e51b1917d25adfb Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 11 Feb 2026 11:24:01 -0500 Subject: [PATCH 4/5] Update commands to start server --- sdks/python/apache_beam/ml/inference/vllm_inference.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/ml/inference/vllm_inference.py b/sdks/python/apache_beam/ml/inference/vllm_inference.py index f35357095a2f..83bf1749fd3d 100644 --- a/sdks/python/apache_beam/ml/inference/vllm_inference.py +++ b/sdks/python/apache_beam/ml/inference/vllm_inference.py @@ -163,6 +163,8 @@ def start_server(self, retries=3): 'dynamo.vllm', '--model', self._model_name, + '--kv-events-config', + '{"enable_kv_cache_events": false}', ] for k, v in self._vllm_server_kwargs.items(): server_cmd.append(f'--{k}') From bdc80df1755c23454c03f2125886192ab0349d9f Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 18 Feb 2026 09:31:17 -0500 Subject: [PATCH 5/5] wip --- .../python/test-suites/dataflow/common.gradle | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 919515f15662..32c95f168785 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -455,7 +455,7 @@ def vllmTests = tasks.create("vllmTests") { def testOpts = basicPytestOpts def argMap = [ "runner": "DataflowRunner", - "machine_type":"n1-standard-4", + "machine_type":"g2-standard-4", // TODO(https://github.com/apache/beam/issues/22651): Build docker image for VLLM tests during Run time. // This would also enable to use wheel "--sdk_location" as other tasks, and eliminate distTarBall dependency // declaration for this project. @@ -464,23 +464,23 @@ def vllmTests = tasks.create("vllmTests") { "sdk_location": files(configurations.distTarBall.files).singleFile, "project": "apache-beam-testing", "region": "us-central1", - "model": "facebook/opt-125m", + "model": "Qwen/Qwen3-0.6B", "output": "gs://apache-beam-ml/outputs/vllm_predictions.txt", "disk_size_gb": 75 ] def cmdArgs = mapToArgString(argMap) // Exec one version with and one version without the chat option + // exec { + // executable 'sh' + // args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" + // } + // exec { + // executable 'sh' + // args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" + // } exec { executable 'sh' - args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" - } - exec { - executable 'sh' - args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" - } - exec { - executable 'sh' - args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --use_dynamo=T --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'" + args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --use_dynamo=T --experiment='worker_accelerator=type:nvidia-l4;count:1;install-nvidia-driver:5xx'" } } }