Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions sdks/python/apache_beam/examples/inference/vllm_text_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ def parse_known_args(argv):
required=False,
default=None,
help='Chat template to use for chat example.')
parser.add_argument(
'--vllm_server_kwargs',
dest='vllm_server_kwargs',
type=str,
required=False,
default=None,
help='VLLM server kwargs in format key1=value1,key2=value2')
parser.add_argument(
'--use_dynamo',
dest='use_dynamo',
type=bool,
required=False,
default=False,
help='Whether to use dynamo')
return parser.parse_known_args(argv)


Expand All @@ -132,13 +146,24 @@ def run(
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

model_handler = VLLMCompletionsModelHandler(model_name=known_args.model)
vllm_server_kwargs = {}
if known_args.vllm_server_kwargs:
for kv in known_args.vllm_server_kwargs.split(','):
k, v = kv.split('=')
vllm_server_kwargs[k] = v

model_handler = VLLMCompletionsModelHandler(
model_name=known_args.model,
vllm_server_kwargs=vllm_server_kwargs,
use_dynamo=known_args.use_dynamo)
input_examples = COMPLETION_EXAMPLES

if known_args.chat:
model_handler = VLLMChatModelHandler(
model_name=known_args.model,
chat_template_path=known_args.chat_template)
chat_template_path=known_args.chat_template,
vllm_server_kwargs=vllm_server_kwargs,
use_dynamo=known_args.use_dynamo)
input_examples = CHAT_EXAMPLES

pipeline = test_pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ RUN python3 --version
RUN apt-get install -y curl
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12 && pip install --upgrade pip

RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.58.1
RUN pip install openai vllm
RUN pip install --no-cache-dir -vvv apache-beam[gcp]==2.71.0
RUN pip install --no-cache-dir openai vllm ai-dynamo[vllm]

RUN apt install libcairo2-dev pkg-config python3-dev -y
RUN pip install pycairo
RUN pip install --no-cache-dir pycairo

# Copy the Apache Beam worker dependencies from the Beam Python 3.12 SDK image.
COPY --from=apache/beam_python3.12_sdk:2.58.1 /opt/apache/beam /opt/apache/beam
COPY --from=apache/beam_python3.12_sdk:2.71.0 /opt/apache/beam /opt/apache/beam

# Set the entrypoint to Apache Beam SDK worker launcher.
ENTRYPOINT [ "/opt/apache/beam/boot" ]
63 changes: 57 additions & 6 deletions sdks/python/apache_beam/ml/inference/vllm_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:


class _VLLMModelServer():
def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
def __init__(
self,
model_name: str,
vllm_server_kwargs: dict[str, str],
use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
self._server_started = False
self._server_process = None
self._dynamo_process = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
self._use_dynamo = use_dynamo

self.start_server()

Expand All @@ -136,8 +142,37 @@ def start_server(self, retries=3):
# Only add values for commands with value part.
if v is not None:
server_cmd.append(v)
if self._use_dynamo:
# Dynamo requires a different server command
server_cmd = [
sys.executable,
'-m',
'dynamo.frontend',
'--http-port',
'{{PORT}}',
]
self._server_process, self._server_port = start_process(server_cmd)

if self._use_dynamo:
# we need to independently start the openai server and run dynamo.
# We started the server above, so running dynamo is left.
# See https://github.com/ai-dynamo/dynamo?tab=readme-ov-file#run-dynamo
server_cmd = [
sys.executable,
'-m',
'dynamo.vllm',
'--model',
self._model_name,
'--kv-events-config',
'{"enable_kv_cache_events": false}',
]
for k, v in self._vllm_server_kwargs.items():
server_cmd.append(f'--{k}')
# Only add values for commands with value part.
if v is not None:
server_cmd.append(v)
self._dynamo_process, _ = start_process(server_cmd)

self.check_connectivity(retries)

def get_server_port(self) -> int:
Expand All @@ -147,7 +182,7 @@ def get_server_port(self) -> int:

def check_connectivity(self, retries=3):
with getVLLMClient(self._server_port) as client:
while self._server_process.poll() is None:
while self._server_process.poll() is None and (self._dynamo_process is None or self._dynamo_process.poll() is None):
try:
models = client.models.list().data
logging.info('models: %s' % models)
Expand Down Expand Up @@ -175,7 +210,8 @@ class VLLMCompletionsModelHandler(ModelHandler[str,
def __init__(
self,
model_name: str,
vllm_server_kwargs: Optional[dict[str, str]] = None):
vllm_server_kwargs: Optional[dict[str, str]] = None,
use_dynamo: bool = False):
"""Implementation of the ModelHandler interface for vLLM using text as
input.

Expand All @@ -194,13 +230,20 @@ def __init__(
`{'echo': 'true'}` to prepend new messages with the previous message.
For a list of possible kwargs, see
https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-completions-api
use_dynamo: Whether to use Nvidia Dynamo as the underlying vLLM engine.
Requires installing dynamo in your runtime environment
(`pip install ai-dynamo[vllm]`)
"""
self._model_name = model_name
self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {}
self._env_vars = {}
self._use_dynamo = use_dynamo

def load_model(self) -> _VLLMModelServer:
return _VLLMModelServer(self._model_name, self._vllm_server_kwargs)
return _VLLMModelServer(
self._model_name,
self._vllm_server_kwargs,
self._use_dynamo)

async def _async_run_inference(
self,
Expand Down Expand Up @@ -253,7 +296,8 @@ def __init__(
self,
model_name: str,
chat_template_path: Optional[str] = None,
vllm_server_kwargs: Optional[dict[str, str]] = None):
vllm_server_kwargs: Optional[dict[str, str]] = None,
use_dynamo: bool = False):
""" Implementation of the ModelHandler interface for vLLM using previous
messages as input.

Expand All @@ -277,12 +321,16 @@ def __init__(
`{'echo': 'true'}` to prepend new messages with the previous message.
For a list of possible kwargs, see
https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html#extra-parameters-for-chat-api
use_dynamo: Whether to use Nvidia Dynamo as the underlying vLLM engine.
Requires installing dynamo in your runtime environment
(`pip install ai-dynamo[vllm]`)
"""
self._model_name = model_name
self._vllm_server_kwargs: dict[str, str] = vllm_server_kwargs or {}
self._env_vars = {}
self._chat_template_path = chat_template_path
self._chat_file = f'template-{uuid.uuid4().hex}.jinja'
self._use_dynamo = use_dynamo

def load_model(self) -> _VLLMModelServer:
chat_template_contents = ''
Expand All @@ -295,7 +343,10 @@ def load_model(self) -> _VLLMModelServer:
f.write(chat_template_contents)
self._vllm_server_kwargs['chat_template'] = local_chat_template_path

return _VLLMModelServer(self._model_name, self._vllm_server_kwargs)
return _VLLMModelServer(
self._model_name,
self._vllm_server_kwargs,
self._use_dynamo)

async def _async_run_inference(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pillow>=8.0.0
transformers>=4.18.0
google-cloud-monitoring>=2.27.0
openai>=1.52.2
ai-dynamo[vllm]>=0.1.1
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def get_portability_package_data():
'xgboost': ['xgboost>=1.6.0,<2.1.3', 'datatable==1.0.0'],
'tensorflow-hub': ['tensorflow-hub>=0.14.0,<0.16.0'],
'milvus': milvus_dependency,
'vllm': ['openai==1.107.1', 'vllm==0.10.1.1', 'triton==3.3.1']
'vllm': ['openai==1.107.1', 'vllm==0.10.1.1', 'triton==3.3.1', 'ai-dynamo[vllm]==0.1.1']
},
zip_safe=False,
# PyPI package information.
Expand Down
18 changes: 11 additions & 7 deletions sdks/python/test-suites/dataflow/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def vllmTests = tasks.create("vllmTests") {
def testOpts = basicPytestOpts
def argMap = [
"runner": "DataflowRunner",
"machine_type":"n1-standard-4",
"machine_type":"g2-standard-4",
// TODO(https://github.com/apache/beam/issues/22651): Build docker image for VLLM tests during Run time.
// This would also enable to use wheel "--sdk_location" as other tasks, and eliminate distTarBall dependency
// declaration for this project.
Expand All @@ -464,19 +464,23 @@ def vllmTests = tasks.create("vllmTests") {
"sdk_location": files(configurations.distTarBall.files).singleFile,
"project": "apache-beam-testing",
"region": "us-central1",
"model": "facebook/opt-125m",
"model": "Qwen/Qwen3-0.6B",
"output": "gs://apache-beam-ml/outputs/vllm_predictions.txt",
"disk_size_gb": 75
]
def cmdArgs = mapToArgString(argMap)
// Exec one version with and one version without the chat option
// exec {
// executable 'sh'
// args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
// }
// exec {
// executable 'sh'
// args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
// }
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
}
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --chat true --chat_template 'gs://apache-beam-ml/additional_files/sample_chat_template.jinja' --experiment='worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx'"
args '-c', ". ${envdir}/bin/activate && pip install openai && python -m apache_beam.examples.inference.vllm_text_completion $cmdArgs --use_dynamo=T --experiment='worker_accelerator=type:nvidia-l4;count:1;install-nvidia-driver:5xx'"
}
}
}
Expand Down
Loading