diff --git a/.gitignore b/.gitignore index 885d5c06..cc500d06 100644 --- a/.gitignore +++ b/.gitignore @@ -8,13 +8,6 @@ test.py # C extensions *.so -src/twinkle_client/dataloader -src/twinkle_client/dataset -src/twinkle_client/model -src/twinkle_client/processor -src/twinkle_client/reward -src/twinkle_client/sampler - # Distribution / packaging .Python build/ diff --git a/client_tools/client_generator.py b/client_tools/client_generator.py index c52d03d9..41f0419e 100644 --- a/client_tools/client_generator.py +++ b/client_tools/client_generator.py @@ -3,6 +3,17 @@ from pathlib import Path from typing import Dict, List, Tuple, Set +AUTO_GEN_WARNING = """# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +""" def generate_processors(): """Generate client wrappers for all classes with @remote_function methods.""" @@ -326,7 +337,8 @@ def __next__(self): init_params = "self, **kwargs" kwargs_dict = "kwargs" - class_template = f'''{chr(10).join(import_lines)} + class_template = f'''{AUTO_GEN_WARNING} +{chr(10).join(import_lines)} class {class_name}({inheritance}): """Client wrapper for {class_name} that calls server HTTP endpoints.""" @@ -414,7 +426,8 @@ def write_init_files(module_files: Dict, src_client_path: Path) -> None: for source_filename, classes in sorted(source_files.items()) for class_name, _, _, _ in classes ] - init_file.write_text('\n'.join(sorted(init_lines)) + '\n', encoding='utf-8') + init_content = AUTO_GEN_WARNING + '\n'.join(sorted(init_lines)) + '\n' + init_file.write_text(init_content, encoding='utf-8') module_files = scan_modules(src_twinkle_path, module_mapping) write_client_files(module_files, src_client_path, processor_type_mapping) @@ -432,7 +445,7 @@ def generate_models(): client_module_path = src_client_path / 'model' client_module_path.mkdir(parents=True, exist_ok=True) - model_code = '''from typing import Any, Optional, Union, Type, Dict, Literal, List + model_code = AUTO_GEN_WARNING + '''from typing import Any, Optional, Union, Type, Dict, Literal, List import uuid from twinkle_client.http import TWINKLE_SERVER_URL from twinkle_client.http import http_post, heartbeat_manager @@ -693,7 +706,7 @@ def upload_to_hub(self, checkpoint_dir: str, hub_model_id: str, hub_token: Optio # Create/overwrite __init__.py init_file = client_module_path / '__init__.py' - init_content = "from .multi_lora_transformers import MultiLoraTransformersModel\n" + init_content = AUTO_GEN_WARNING + "from .multi_lora_transformers import MultiLoraTransformersModel\n" print(f"Writing {init_file}...") with open(init_file, 'w', encoding='utf-8') as f: f.write(init_content) @@ -710,7 +723,7 @@ def generate_samplers(): client_module_path = src_client_path / 'sampler' client_module_path.mkdir(parents=True, exist_ok=True) - sampler_code = '''from typing import Any, Optional, List, Dict, Union + sampler_code = AUTO_GEN_WARNING + '''from typing import Any, Optional, List, Dict, Union import uuid from twinkle_client.http import TWINKLE_SERVER_URL from twinkle_client.http import http_post, heartbeat_manager @@ -837,7 +850,7 @@ def set_template(self, template_cls: str, adapter_name: str = '', **kwargs): # Create/overwrite __init__.py init_file = client_module_path / '__init__.py' - init_content = "from .vllm_sampler import VLLMSampler\n" + init_content = AUTO_GEN_WARNING + "from .vllm_sampler import VLLMSampler\n" print(f"Writing {init_file}...") with open(init_file, 'w', encoding='utf-8') as f: f.write(init_content) diff --git a/cookbook/legacy/client/tinker/megatron/lora.py b/cookbook/client/tinker/megatron/lora.py similarity index 59% rename from cookbook/legacy/client/tinker/megatron/lora.py rename to cookbook/client/tinker/megatron/lora.py index 50d105fb..91815bb7 100644 --- a/cookbook/legacy/client/tinker/megatron/lora.py +++ b/cookbook/client/tinker/megatron/lora.py @@ -1,19 +1,33 @@ -#%% +# Tinker-Compatible Client - Megatron LoRA Training & Sampling Example +# +# This script demonstrates end-to-end LoRA fine-tuning and inference using the +# Tinker-compatible client API with a Megatron backend. +# It covers: connecting to the server, preparing data manually with tokenizers, +# running a training loop, saving checkpoints, and sampling from the model. +# The server must be running first (see server.py and server_config.yaml). + from twinkle_client import init_tinker_compat_client + +# Step 1: Initialize the Tinker-compatible client to communicate with the server. service_client = init_tinker_compat_client(base_url='http://localhost:8000') +# Step 2: List models available on the server to verify the connection print("Available models:") for item in service_client.get_server_capabilities().supported_models: print("- " + item.model_name) -#%% +# Step 3: Create a REST client for querying training runs and checkpoints. +# This is useful for inspecting previous training sessions or resuming training. rest_client = service_client.create_rest_client() future = rest_client.list_training_runs(limit=50) response = future.result() + +# You can resume from a twinkle:// path. Example: # resume_path = "twinkle://20260131_170251-Qwen_Qwen2_5-0_5B-Instruct-7275126c/weights/pig-latin-lora-epoch-1" resume_path = "" + print(f"Found {len(response.training_runs)} training runs") for tr in response.training_runs: print(tr.model_dump_json(indent=2)) @@ -21,9 +35,11 @@ chpts = rest_client.list_checkpoints(tr.training_run_id).result() for chpt in chpts.checkpoints: print(" " + chpt.model_dump_json(indent=2)) - # resume_path = chpt.tinker_path # Just get the last one for demo purposes + # Uncomment the line below to resume from the last checkpoint: + # resume_path = chpt.tinker_path -#%% +# Step 4: Create or resume a training client. +# If resume_path is set, it restores both model weights and optimizer state. base_model = "Qwen/Qwen2.5-0.5B-Instruct" if not resume_path: training_client = service_client.create_lora_training_client( @@ -32,8 +48,10 @@ else: training_client = service_client.create_training_client_from_state_with_optimizer(path=resume_path) -#%% -# Create some training examples +# Step 5: Prepare training data manually +# +# This example teaches the model to translate English into Pig Latin. +# Each example has an "input" (English phrase) and "output" (Pig Latin). examples = [ {"input": "banana split", "output": "anana-bay plit-say"}, {"input": "quantum physics", "output": "uantum-qay ysics-phay"}, @@ -44,82 +62,97 @@ {"input": "coding wizard", "output": "oding-cay izard-way"}, ] -# Convert examples into the format expected by the training client from tinker import types from modelscope import AutoTokenizer -# Get the tokenizer from the training client -# tokenizer = training_client.get_tokenizer() # NOTE: network call huggingface + +# Load the tokenizer locally (avoids a network call to HuggingFace) tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) def process_example(example: dict, tokenizer) -> types.Datum: - # Format the input with Input/Output template - # For most real use cases, you'll want to use a renderer / chat template, - # (see later docs) but here, we'll keep it simple. + """Convert a raw example dict into a Datum suitable for the training API. + + The Datum contains: + - model_input: the token IDs fed into the LLM + - loss_fn_inputs: target tokens and per-token weights (0 = ignore, 1 = train) + """ + # Build a simple prompt template prompt = f"English: {example['input']}\nPig Latin:" + # Tokenize the prompt; weights=0 means the loss ignores these tokens prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True) prompt_weights = [0] * len(prompt_tokens) - # Add a space before the output string, and finish with double newline + + # Tokenize the completion; weights=1 means the loss is computed on these tokens completion_tokens = tokenizer.encode(f" {example['output']}\n\n", add_special_tokens=False) completion_weights = [1] * len(completion_tokens) + # Concatenate prompt + completion tokens = prompt_tokens + completion_tokens weights = prompt_weights + completion_weights + # Shift by one: input is tokens[:-1], target is tokens[1:] (next-token prediction) input_tokens = tokens[:-1] - target_tokens = tokens[1:] # We're predicting the next token, so targets need to be shifted. + target_tokens = tokens[1:] weights = weights[1:] - # A datum is a single training example for the loss function. - # It has model_input, which is the input sequence that'll be passed into the LLM, - # loss_fn_inputs, which is a dictionary of extra inputs used by the loss function. return types.Datum( model_input=types.ModelInput.from_ints(tokens=input_tokens), loss_fn_inputs=dict(weights=weights, target_tokens=target_tokens) ) +# Process all examples into Datum objects processed_examples = [process_example(ex, tokenizer) for ex in examples] -# Visualize the first example for debugging purposes +# Visualize the first example to verify tokenization and weight alignment datum0 = processed_examples[0] print(f"{'Input':<20} {'Target':<20} {'Weight':<10}") print("-" * 50) for i, (inp, tgt, wgt) in enumerate(zip(datum0.model_input.to_ints(), datum0.loss_fn_inputs['target_tokens'].tolist(), datum0.loss_fn_inputs['weights'].tolist())): print(f"{repr(tokenizer.decode([inp])):<20} {repr(tokenizer.decode([tgt])):<20} {wgt:<10}") -#%% +# Step 6: Run the training loop +# +# For each epoch, iterate over multiple batches: +# - forward_backward: sends data to the server, computes loss & gradients +# - optim_step: updates model weights using Adam optimizer import numpy as np for epoch in range(2): for batch in range(5): - + # Send training data and get back logprobs (asynchronous futures) fwdbwd_future = training_client.forward_backward(processed_examples, "cross_entropy") optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) - # Wait for the results + # Wait for results from the server fwdbwd_result = fwdbwd_future.result() optim_result = optim_future.result() - # fwdbwd_result contains the logprobs of all the tokens we put in. Now we can compute the weighted - # average log loss per token. + # Compute the weighted average log-loss per token for monitoring print(f"Epoch {epoch}, Batch {batch}: ", end="") logprobs = np.concatenate([output['logprobs'].tolist() for output in fwdbwd_result.loss_fn_outputs]) weights = np.concatenate([example.loss_fn_inputs['weights'].tolist() for example in processed_examples]) print(f"Loss per token: {-np.dot(logprobs, weights) / weights.sum():.4f}") + # Save checkpoint (model weights + optimizer state) after each epoch save_future = training_client.save_state(f"pig-latin-lora-epoch-{epoch}") save_result = save_future.result() print(f"Saved checkpoint for epoch {epoch} to {save_result.path}") -#%% -# First, create a sampling client. We need to transfer weights +# Step 7: Sample from the trained model +# +# Save the current weights and create a sampling client to generate text. sampling_client = training_client.save_weights_and_get_sampling_client(name='pig-latin-model') -# Now, we can sample from the model. +# Prepare a prompt and sampling parameters prompt = types.ModelInput.from_ints(tokenizer.encode("English: coffee break\nPig Latin:")) -params = types.SamplingParams(max_tokens=20, temperature=0.0, stop=["\n"]) # Greedy sampling +params = types.SamplingParams( + max_tokens=20, # Maximum number of tokens to generate + temperature=0.0, # Greedy sampling (deterministic) + stop=["\n"] # Stop at newline +) + +# Generate 8 completions and print the results future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) result = future.result() print("Responses:") for i, seq in enumerate(result.sequences): print(f"{i}: {repr(tokenizer.decode(seq.tokens))}") -# %% diff --git a/cookbook/client/tinker/megatron/server.py b/cookbook/client/tinker/megatron/server.py new file mode 100644 index 00000000..c625aa99 --- /dev/null +++ b/cookbook/client/tinker/megatron/server.py @@ -0,0 +1,21 @@ +# Twinkle Server Launcher - Tinker-Compatible Megatron Backend +# +# This script starts the Twinkle server with Tinker-compatible API support +# using the Megatron model backend. +# It reads the server_config.yaml in the same directory for all +# configuration (model, deployment settings, etc.). +# Run this script BEFORE running the client training script (lora.py). + +import os + +# Enable Ray debug mode for verbose logging during development +os.environ['RAY_DEBUG'] = '1' + +from twinkle.server import launch_server + +# Resolve the path to server_config.yaml relative to this script's location +file_dir = os.path.abspath(os.path.dirname(__file__)) +config_path = os.path.join(file_dir, 'server_config.yaml') + +# Launch the Twinkle server — this call blocks until the server is shut down +launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/client/tinker/megatron/server_config.yaml b/cookbook/client/tinker/megatron/server_config.yaml new file mode 100644 index 00000000..4e8a2c35 --- /dev/null +++ b/cookbook/client/tinker/megatron/server_config.yaml @@ -0,0 +1,58 @@ +# Twinkle Server Configuration - Tinker-Compatible Megatron Backend + +# Server protocol type: "tinker" enables the Tinker-compatible API +server_type: tinker + +# proxy_location: determines where the HTTP proxy runs. +# "EveryNode" means each Ray node runs its own proxy (good for multi-node). +proxy_location: EveryNode + +# HTTP listener settings +http_options: + host: 0.0.0.0 # Listen on all network interfaces + port: 8000 # Port number for the server + +# Applications: each entry defines a service component deployed on the server +applications: + + # 1. TinkerCompatServer - The central API server + # Handles client connections, training run tracking, checkpoint listing. + - name: server + route_prefix: /api/v1 # API endpoint prefix (Tinker-compatible) + import_path: server # Python module to import + args: + + deployments: + - name: TinkerCompatServer + autoscaling_config: + min_replicas: 1 # Minimum number of replicas + max_replicas: 1 # Maximum number of replicas + target_ongoing_requests: 128 # Target concurrent requests per replica + ray_actor_options: + num_cpus: 0.1 # CPU resources allocated to this actor + + # 2. Model Service - Hosts the base model for training (Megatron backend) + # This is the actual model worker that performs forward/backward passes. + - name: models-Qwen2.5-0.5B-Instruct + route_prefix: /api/v1/model/Qwen/Qwen2.5-0.5B-Instruct # REST path for this model + import_path: model + args: + use_megatron: true # Use Megatron-LM backend (not HuggingFace) + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" # ModelScope model identifier to load + nproc_per_node: 2 # Number of GPU processes per node + device_group: # Logical device group for this model + name: model + ranks: [0, 1] # GPU rank indices to use + device_type: cuda + device_mesh: # Distributed training mesh configuration + device_type: cuda + mesh: [0, 1] # Device indices in the mesh + mesh_dim_names: ['dp'] # Mesh dimension names: 'dp' = data parallel + deployments: + - name: ModelManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 diff --git a/cookbook/legacy/client/tinker/transformer/lora.py b/cookbook/client/tinker/transformer/lora.py similarity index 56% rename from cookbook/legacy/client/tinker/transformer/lora.py rename to cookbook/client/tinker/transformer/lora.py index 0f1a096b..f9ded26a 100644 --- a/cookbook/legacy/client/tinker/transformer/lora.py +++ b/cookbook/client/tinker/transformer/lora.py @@ -1,25 +1,44 @@ -#%% +# Tinker-Compatible Client - Transformers LoRA Training Example +# +# This script demonstrates end-to-end LoRA fine-tuning using the Tinker- +# compatible client API (an alternative client protocol for the Twinkle server). +# It covers: connecting to the server, preparing data manually with tokenizers, +# running a training loop, saving checkpoints, and publishing to ModelScope. +# The server must be running first (see server.py and server_config.yaml). + +# Step 1: Load environment variables from a .env file (e.g., API tokens) import dotenv dotenv.load_dotenv('.env') import os from twinkle_client import init_tinker_compat_client + +# Step 2: Initialize the Tinker-compatible client to communicate with the server. +# - base_url: the address of the running server +# - api_key: authentication token (loaded from environment variable) service_client = init_tinker_compat_client(base_url='http://localhost:8000', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')) +# Step 3: List models available on the server to verify the connection print("Available models:") for item in service_client.get_server_capabilities().supported_models: print("- " + item.model_name) -#%% +# Step 4: Create a REST client for querying training runs and checkpoints. +# This is useful for inspecting previous training sessions or resuming training. rest_client = service_client.create_rest_client() future = rest_client.list_training_runs(limit=50) response = future.result() -# Support resume from twinkle path or model id + +# You can resume from either: +# 1. A twinkle path: "twinkle://...//weights/" +# 2. A model id on hub: "/" +# Example: # resume_path = "twinkle://20260131_170251-Qwen_Qwen2_5-0_5B-Instruct-7275126c/weights/pig-latin-lora-epoch-1" # resume_path = "AlexEz/20260205_163645-Qwen_Qwen2_5-7B-Instruct-385d5c17_pig-latin-lora-epoch-1" resume_path = "" + print(f"Found {len(response.training_runs)} training runs") for tr in response.training_runs: print(tr.model_dump_json(indent=2)) @@ -27,9 +46,11 @@ chpts = rest_client.list_checkpoints(tr.training_run_id).result() for chpt in chpts.checkpoints: print(" " + chpt.model_dump_json(indent=2)) - # resume_path = chpt.tinker_path # Just get the last one for demo purposes + # Uncomment the line below to resume from the last checkpoint: + # resume_path = chpt.tinker_path -#%% +# Step 5: Create or resume a training client. +# If resume_path is set, it restores both model weights and optimizer state. base_model = "Qwen/Qwen2.5-0.5B-Instruct" if not resume_path: training_client = service_client.create_lora_training_client( @@ -39,8 +60,10 @@ print("Resuming from " + resume_path) training_client = service_client.create_training_client_from_state_with_optimizer(path=resume_path) -#%% -# Create some training examples +# Step 6: Prepare training data manually +# +# This example teaches the model to translate English into Pig Latin. +# Each example has an "input" (English phrase) and "output" (Pig Latin). examples = [ {"input": "banana split", "output": "anana-bay plit-say"}, {"input": "quantum physics", "output": "uantum-qay ysics-phay"}, @@ -51,74 +74,83 @@ {"input": "coding wizard", "output": "oding-cay izard-way"}, ] -# Convert examples into the format expected by the training client from tinker import types from modelscope import AutoTokenizer -# Get the tokenizer from the training client -# tokenizer = training_client.get_tokenizer() # NOTE: network call huggingface + +# Load the tokenizer locally (avoids a network call to HuggingFace) tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) def process_example(example: dict, tokenizer) -> types.Datum: - # Format the input with Input/Output template - # For most real use cases, you'll want to use a renderer / chat template, - # (see later docs) but here, we'll keep it simple. + """Convert a raw example dict into a Datum suitable for the training API. + + The Datum contains: + - model_input: the token IDs fed into the LLM + - loss_fn_inputs: target tokens and per-token weights (0 = ignore, 1 = train) + """ + # Build a simple prompt template prompt = f"English: {example['input']}\nPig Latin:" + # Tokenize the prompt; weights=0 means the loss ignores these tokens prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True) prompt_weights = [0] * len(prompt_tokens) - # Add a space before the output string, and finish with double newline + + # Tokenize the completion; weights=1 means the loss is computed on these tokens completion_tokens = tokenizer.encode(f" {example['output']}\n\n", add_special_tokens=False) completion_weights = [1] * len(completion_tokens) + # Concatenate prompt + completion tokens = prompt_tokens + completion_tokens weights = prompt_weights + completion_weights + # Shift by one: input is tokens[:-1], target is tokens[1:] (next-token prediction) input_tokens = tokens[:-1] - target_tokens = tokens[1:] # We're predicting the next token, so targets need to be shifted. + target_tokens = tokens[1:] weights = weights[1:] - # A datum is a single training example for the loss function. - # It has model_input, which is the input sequence that'll be passed into the LLM, - # loss_fn_inputs, which is a dictionary of extra inputs used by the loss function. return types.Datum( model_input=types.ModelInput.from_ints(tokens=input_tokens), loss_fn_inputs=dict(weights=weights, target_tokens=target_tokens) ) +# Process all examples into Datum objects processed_examples = [process_example(ex, tokenizer) for ex in examples] -# Visualize the first example for debugging purposes +# Visualize the first example to verify tokenization and weight alignment datum0 = processed_examples[0] print(f"{'Input':<20} {'Target':<20} {'Weight':<10}") print("-" * 50) for i, (inp, tgt, wgt) in enumerate(zip(datum0.model_input.to_ints(), datum0.loss_fn_inputs['target_tokens'].tolist(), datum0.loss_fn_inputs['weights'].tolist())): print(f"{repr(tokenizer.decode([inp])):<20} {repr(tokenizer.decode([tgt])):<20} {wgt:<10}") -#%% +# Step 7: Run the training loop +# +# For each epoch, iterate over multiple batches: +# - forward_backward: sends data to the server, computes loss & gradients +# - optim_step: updates model weights using Adam optimizer import numpy as np for epoch in range(2): for batch in range(5): - + # Send training data and get back logprobs (asynchronous futures) fwdbwd_future = training_client.forward_backward(processed_examples, "cross_entropy") optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) - # Wait for the results + # Wait for results from the server fwdbwd_result = fwdbwd_future.result() optim_result = optim_future.result() - # fwdbwd_result contains the logprobs of all the tokens we put in. Now we can compute the weighted - # average log loss per token. + # Compute the weighted average log-loss per token for monitoring print(f"Epoch {epoch}, Batch {batch}: ", end="") logprobs = np.concatenate([output['logprobs'].tolist() for output in fwdbwd_result.loss_fn_outputs]) weights = np.concatenate([example.loss_fn_inputs['weights'].tolist() for example in processed_examples]) print(f"Loss per token: {-np.dot(logprobs, weights) / weights.sum():.4f}") - # Save the model and optimizer state + # Save checkpoint (model weights + optimizer state) after each epoch save_future = training_client.save_state(f"pig-latin-lora-epoch-{epoch}") save_result = save_future.result() print(f"Saved checkpoint for epoch {epoch} to {save_result.path}") -# NOTE: Need to set your modelscope token as api_key when initializing the service client -# model name is {run_id}_{checkpoint_name} -# rest_client.publish_checkpoint_from_tinker_path(save_result.path).result() -# print("Published checkpoint") +# Step 8: Publish the final checkpoint to ModelScope Hub. +# NOTE: Requires a valid ModelScope token set as api_key when initializing the client. +# The published model name will be: {run_id}_{checkpoint_name} +rest_client.publish_checkpoint_from_tinker_path(save_result.path).result() +print("Published checkpoint") diff --git a/cookbook/client/tinker/transformer/sample.py b/cookbook/client/tinker/transformer/sample.py new file mode 100644 index 00000000..c98c0bc4 --- /dev/null +++ b/cookbook/client/tinker/transformer/sample.py @@ -0,0 +1,43 @@ +# Tinker-Compatible Client - Sampling / Inference Example +# +# This script demonstrates how to use a previously trained LoRA checkpoint +# for text generation (sampling) via the Tinker-compatible client API. +# The server must be running first (see server.py and server_config.yaml). + +from tinker import types +from twinkle_client import init_tinker_compat_client +from modelscope import AutoTokenizer + +# Step 1: Define the base model and connect to the server +base_model = "Qwen/Qwen2.5-0.5B-Instruct" +service_client = init_tinker_compat_client(base_url='http://localhost:8000', api_key="tml-EMPTY_TOKEN") + +# Step 2: Create a sampling client by loading weights from a saved checkpoint. +# The model_path is a twinkle:// URI pointing to a previously saved LoRA checkpoint. +# The server will load the base model and apply the LoRA adapter weights. +sampling_client = service_client.create_sampling_client( + model_path="twinkle://20260130_133245-Qwen_Qwen2_5-0_5B-Instruct-ffebd239/weights/pig-latin-lora-epoch-1", + base_model=base_model) + +# Step 3: Load the tokenizer locally to encode the prompt and decode the results +print(f"Using model {base_model}") +tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + +# Step 4: Prepare the prompt and sampling parameters +prompt = types.ModelInput.from_ints(tokenizer.encode("English: coffee break\nPig Latin:")) +params = types.SamplingParams( + max_tokens=20, # Maximum number of tokens to generate + temperature=0.0, # Greedy sampling (deterministic, always pick the top token) + stop=["\n"] # Stop generation when a newline character is produced +) + +# Step 5: Send the sampling request to the server. +# num_samples=8 generates 8 independent completions for the same prompt. +print("Sampling...") +future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) +result = future.result() + +# Step 6: Decode and print the generated responses +print("Responses:") +for i, seq in enumerate(result.sequences): + print(f"{i}: {repr(tokenizer.decode(seq.tokens))}") diff --git a/cookbook/legacy/client/tinker/transformer/self_congnition.py b/cookbook/client/tinker/transformer/self_congnition.py similarity index 55% rename from cookbook/legacy/client/tinker/transformer/self_congnition.py rename to cookbook/client/tinker/transformer/self_congnition.py index e4112628..b3a6c5e6 100644 --- a/cookbook/legacy/client/tinker/transformer/self_congnition.py +++ b/cookbook/client/tinker/transformer/self_congnition.py @@ -1,3 +1,12 @@ +# Tinker-Compatible Client - Self-Cognition Training & Evaluation Example +# +# This script demonstrates two workflows using the Tinker-compatible client: +# 1. train(): Fine-tune a model on a self-cognition dataset so it learns +# a custom identity (name, author). +# 2. eval(): Load a trained checkpoint and sample from it to verify +# that the model has learned the custom identity. +# The server must be running first (see server.py and server_config.yaml). + import numpy as np from tqdm import tqdm from tinker import types @@ -8,53 +17,86 @@ from twinkle.server.tinker.common import input_feature_to_datum from modelscope import AutoTokenizer +# The base model to fine-tune / evaluate base_model = "Qwen/Qwen2.5-0.5B-Instruct" + def train(): - # process data + # Step 1: Prepare the dataset + + # Load the self-cognition dataset from ModelScope (first 500 examples) dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) + + # Apply the chat template matching the base model (max 256 tokens per sample) dataset.set_template('Template', model_id=f'ms://{base_model}', max_length=256) + + # Replace placeholder names with custom model/author identity dataset.map(SelfCognitionProcessor('twinkle模型', 'twinkle团队'), load_from_cache_file=False) + + # Tokenize and encode the dataset into model-ready input features dataset.encode(batched=True, load_from_cache_file=False) + + # Wrap the dataset into a DataLoader that yields batches of size 8 dataloader = DataLoader(dataset=dataset, batch_size=8) - # init service client + # Step 2: Initialize the training client + + # Connect to the Twinkle server running locally service_client = init_tinker_compat_client(base_url='http://localhost:8000') + + # Create a LoRA training client for the base model (rank=16 for the LoRA adapter) training_client = service_client.create_lora_training_client( base_model=base_model, rank=16 ) + # Step 3: Run the training loop + for epoch in range(3): print(f"Epoch {epoch}") for step, batch in tqdm(enumerate(dataloader)): + # Convert each InputFeature into a Datum for the Tinker API input_datum = [input_feature_to_datum(input_feature) for input_feature in batch] + + # Send data to server: forward + backward pass (computes gradients) fwdbwd_future = training_client.forward_backward(input_datum, "cross_entropy") + + # Optimizer step: update model weights with Adam optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) - # Wait for the results + # Wait for both operations to complete fwdbwd_result = fwdbwd_future.result() optim_result = optim_future.result() - # fwdbwd_result contains the logprobs of all the tokens we put in. Now we can compute the weighted + # Compute weighted average log-loss per token for monitoring logprobs = np.concatenate([output['logprobs'].tolist() for output in fwdbwd_result.loss_fn_outputs]) weights = np.concatenate([example.loss_fn_inputs['weights'].tolist() for example in input_datum]) print(f"Loss per token: {-np.dot(logprobs, weights) / weights.sum():.4f}") + # Save a checkpoint after each epoch save_future = training_client.save_state(f"twinkle-lora-{epoch}") save_result = save_future.result() print(f"Saved checkpoint to {save_result.path}") + def eval(): + # Step 1: Load the trained LoRA checkpoint for inference + + # Path to a previously saved LoRA checkpoint (twinkle:// URI) weight_path = "twinkle://20260207_110850-Qwen_Qwen2_5-0_5B-Instruct-ce7e819f/weights/twinkle-lora-2" + # Connect to the server and create a sampling client with the trained weights service_client = init_tinker_compat_client(base_url='http://localhost:8000') sampling_client = service_client.create_sampling_client( model_path=weight_path, base_model=base_model) - + + # Load the tokenizer for encoding the prompt and decoding the output tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + # Step 2: Prepare the chat prompt + + # Build a multi-turn conversation to test the model's self-cognition inputs = [ { 'role': 'system', @@ -65,22 +107,34 @@ def eval(): 'content': 'what is your name?' } ] + + # Apply the model's chat template to format the conversation input_ids = tokenizer.apply_chat_template( inputs, tokenize=True, - add_generation_prompt=True # usually needed for chat models + add_generation_prompt=True # Adds the assistant prompt prefix ) - # Now, we can sample from the model. + + # Step 3: Generate responses + prompt = types.ModelInput.from_ints(input_ids) - params = types.SamplingParams(max_tokens=50, temperature=0.2, stop=["\n"]) + params = types.SamplingParams( + max_tokens=50, # Maximum tokens to generate + temperature=0.2, # Low temperature for more focused responses + stop=["\n"] # Stop at newline + ) + # Sample 8 independent completions print("Sampling...") future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) result = future.result() + + # Decode and print each response print("Responses:") for i, seq in enumerate(result.sequences): print(f"{i}: {repr(tokenizer.decode(seq.tokens))}") + if __name__ == "__main__": - # train() - eval() + # train() # Uncomment to run training + eval() # Run evaluation / inference diff --git a/cookbook/client/tinker/transformer/server.py b/cookbook/client/tinker/transformer/server.py new file mode 100644 index 00000000..401065cf --- /dev/null +++ b/cookbook/client/tinker/transformer/server.py @@ -0,0 +1,20 @@ +# Twinkle Server Launcher - Tinker-Compatible Transformers Backend +# +# This script starts the Twinkle server with Tinker-compatible API support. +# It reads the server_config.yaml in the same directory for all +# configuration (model, sampler, deployment settings, etc.). +# Run this script BEFORE running any client scripts (lora.py, sample.py, etc.). + +import os + +# Enable Ray debug mode for verbose logging during development +os.environ['RAY_DEBUG'] = '1' + +from twinkle.server import launch_server + +# Resolve the path to server_config.yaml relative to this script's location +file_dir = os.path.abspath(os.path.dirname(__file__)) +config_path = os.path.join(file_dir, 'server_config.yaml') + +# Launch the Twinkle server — this call blocks until the server is shut down +launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/client/tinker/transformer/server_config.yaml b/cookbook/client/tinker/transformer/server_config.yaml new file mode 100644 index 00000000..617f350b --- /dev/null +++ b/cookbook/client/tinker/transformer/server_config.yaml @@ -0,0 +1,95 @@ +# Twinkle Server Configuration - Tinker-Compatible Transformers Backend + +# Server protocol type: "tinker" enables the Tinker-compatible API +server_type: tinker + +# proxy_location: determines where the HTTP proxy runs. +# "EveryNode" means each Ray node runs its own proxy (good for multi-node). +proxy_location: EveryNode + +# HTTP listener settings +http_options: + host: 0.0.0.0 # Listen on all network interfaces + port: 8000 # Port number for the server + +# Applications: each entry defines a service component deployed on the server +applications: + + # 1. TinkerCompatServer - The central API server + # Handles client connections, training run tracking, checkpoint listing. + - name: server + route_prefix: /api/v1 # API endpoint prefix (Tinker-compatible) + import_path: server # Python module to import + args: + + deployments: + - name: TinkerCompatServer + autoscaling_config: + min_replicas: 1 # Minimum number of replicas + max_replicas: 1 # Maximum number of replicas + target_ongoing_requests: 128 # Target concurrent requests per replica + ray_actor_options: + num_cpus: 0.1 # CPU resources allocated to this actor + + # 2. Model Service (commented out) - Would host the base model for training. + # Uncomment and configure if you need a training model worker. + # - name: models-Qwen2.5-0.5B-Instruct + # route_prefix: /api/v1/model/Qwen/Qwen2.5-0.5B-Instruct + # import_path: model + # args: + # use_megatron: false # Use HuggingFace Transformers backend + # model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" # ModelScope model identifier + # nproc_per_node: 2 # Number of GPU processes per node + # device_group: + # name: model + # ranks: [0, 1] # GPU rank indices + # device_type: cuda + # device_mesh: + # device_type: cuda + # mesh: [0, 1] + # mesh_dim_names: ['dp'] # 'dp' = data parallel + # queue_config: + # rps_limit: 100 # Max requests per second + # tps_limit: 10000 # Max tokens per second + # adapter_config: + # per_token_adapter_limit: 30 # Max concurrent LoRA adapters + # adapter_timeout: 1800 # Seconds before idle adapter unload + # deployments: + # - name: ModelManagement + # autoscaling_config: + # min_replicas: 1 + # max_replicas: 1 + # target_ongoing_requests: 16 + # ray_actor_options: + # num_cpus: 0.1 + + # 3. Sampler Service - Runs inference / sampling using vLLM engine + # Used for generating text from the model (e.g., evaluating LoRA results). + - name: sampler-Qwen2.5-0.5B-Instruct + route_prefix: /api/v1/sampler/Qwen/Qwen2.5-0.5B-Instruct + import_path: sampler + args: + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" # ModelScope model identifier + nproc_per_node: 1 # Number of GPU processes per node + sampler_type: vllm # Inference engine: 'vllm' (fast) or 'torch' (TorchSampler) + engine_args: # vLLM engine-specific settings + max_model_len: 4096 # Maximum sequence length the engine supports + gpu_memory_utilization: 0.5 # Fraction of GPU memory to use (0.0-1.0) + enable_lora: true # Allow loading LoRA adapters during inference + device_group: # Logical device group for the sampler + name: sampler + ranks: [0] # GPU rank indices to use + device_type: cuda + device_mesh: + device_type: cuda + mesh: [0] + mesh_dim_names: ['dp'] + deployments: + - name: SamplerManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + num_gpus: 1 # Sampler needs a full GPU for inference diff --git a/cookbook/client/twinkle/megatron/lora.py b/cookbook/client/twinkle/megatron/lora.py new file mode 100644 index 00000000..18f8994a --- /dev/null +++ b/cookbook/client/twinkle/megatron/lora.py @@ -0,0 +1,123 @@ +# Twinkle Client - Megatron LoRA Training Example +# +# This script demonstrates how to fine-tune a language model using LoRA +# through the Twinkle client with a Megatron backend. +# Key difference from the Transformers version: +# - Loss is computed internally by Megatron (no explicit set_loss call) +# - Optimizer and LR scheduler use Megatron's built-in defaults +# The server must be running first (see server.py and server_config.yaml). + +from peft import LoraConfig + +from twinkle import get_device_placement, get_logger +from twinkle.dataset import DatasetMeta +from twinkle_client.dataloader import DataLoader +from twinkle_client.dataset import Dataset +from twinkle_client.model import MultiLoraTransformersModel +from twinkle_client import init_twinkle_client + +logger = get_logger() + +# Step 1: Initialize the Twinkle client to communicate with the remote server. +# - base_url: the address of the running Twinkle server +# - api_key: your authentication token +client = init_twinkle_client(base_url='http://127.0.0.1:8000', api_key='tml-xxxx') + +# Step 2: Query the server for existing training runs and their checkpoints. +# This is useful for resuming a previous training session. +runs = client.list_training_runs() + +resume_path = None +for run in runs: + logger.info(run.model_dump_json(indent=2)) + # List all saved checkpoints for this training run + checkpoints = client.list_checkpoints(run.training_run_id) + + for checkpoint in checkpoints: + logger.info(checkpoint.model_dump_json(indent=2)) + # Uncomment the line below to resume from a specific checkpoint: + # resume_path = checkpoint.twinkle_path + + +def train(): + # Step 3: Prepare the dataset + + # Load the self-cognition dataset from ModelScope + dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) + + # Apply a chat template so the data matches the model's expected input format + dataset.set_template('Template', model_id='ms://Qwen/Qwen2.5-7B-Instruct', max_length=512) + + # Replace placeholder names in the dataset with custom model/author names + dataset.map('SelfCognitionProcessor', init_args={'model_name': 'twinkle模型', 'model_author': 'twinkle团队'}) + + # Tokenize and encode the dataset into model-ready input features + dataset.encode(batched=True) + + # Wrap the dataset into a DataLoader that yields batches of size 8 + dataloader = DataLoader(dataset=dataset, batch_size=8) + + # Step 4: Configure the model + + # Create a multi-LoRA model pointing to the base model on ModelScope + model = MultiLoraTransformersModel(model_id='ms://Qwen/Qwen2.5-7B-Instruct') + + # Define LoRA configuration: apply low-rank adapters to all linear layers + lora_config = LoraConfig( + target_modules='all-linear' + ) + + # Attach the LoRA adapter named 'default' to the model. + # gradient_accumulation_steps=2 means gradients accumulate over 2 micro-batches. + model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) + + # Set the same chat template used during data preprocessing + model.set_template('Template') + + # Set the input processor (pads sequences on the right side) + model.set_processor('InputProcessor', padding_side='right') + + # NOTE: No set_loss() call here — Megatron computes loss internally. + + # Use Megatron's default optimizer with learning rate 1e-4 + model.set_optimizer('default', lr=1e-4) + + # Use Megatron's default LR scheduler with linear decay over 1000 steps + model.set_lr_scheduler('default', lr_decay_steps=1000, max_lr=1e-4) + + # Step 5: Optionally resume from a previous checkpoint + if resume_path: + logger.info(f'Resuming training from {resume_path}') + model.load(resume_path, load_optimizer=True) + + # Step 6: Run the training loop + logger.info(model.get_train_configs()) + + for step, batch in enumerate(dataloader): + # Forward pass + backward pass (computes gradients) + output = model.forward_backward(inputs=batch) + + # Log the loss every 2 steps (aligned with gradient accumulation) + if step % 2 == 0: + logger.info(f'Current is step {step // 2}, loss: {output}') + + # Clip gradients to prevent exploding gradients (max norm = 1.0) + model.clip_grad_norm(1.0) + + # Perform one optimizer step (update model weights) + model.step() + + # Reset gradients to zero for the next iteration + model.zero_grad() + + # Advance the learning rate scheduler by one step + model.lr_step() + + # Save a checkpoint every 8 steps for fault tolerance + if step > 0 and step % 8 == 0: + logger.info(f'Saving checkpoint at step {step}') + model.save(f'step-{step}', save_optimizer=True) + + +if __name__ == '__main__': + train() diff --git a/cookbook/client/twinkle/megatron/server.py b/cookbook/client/twinkle/megatron/server.py new file mode 100644 index 00000000..433c6309 --- /dev/null +++ b/cookbook/client/twinkle/megatron/server.py @@ -0,0 +1,20 @@ +# Twinkle Server Launcher - Megatron Backend +# +# This script starts the Twinkle server using Ray Serve with Megatron support. +# It reads the server_config.yaml in the same directory for all +# configuration (model, processor, deployment settings, etc.). +# Run this script BEFORE running the client training script (lora.py). + +import os + +# Enable Ray debug mode for verbose logging during development +os.environ['RAY_DEBUG'] = '1' + +from twinkle.server import launch_server + +# Resolve the path to server_config.yaml relative to this script's location +file_dir = os.path.abspath(os.path.dirname(__file__)) +config_path = os.path.join(file_dir, 'server_config.yaml') + +# Launch the Twinkle server — this call blocks until the server is shut down +launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/client/twinkle/megatron/server_config.yaml b/cookbook/client/twinkle/megatron/server_config.yaml new file mode 100644 index 00000000..fa730098 --- /dev/null +++ b/cookbook/client/twinkle/megatron/server_config.yaml @@ -0,0 +1,83 @@ +# Twinkle Server Configuration - Megatron Backend + +# Server protocol type: "twinkle" for the native Twinkle client protocol +server_type: twinkle + +# proxy_location: determines where the HTTP proxy runs. +# "EveryNode" means each Ray node runs its own proxy (good for multi-node). +proxy_location: EveryNode + +# HTTP listener settings +http_options: + host: 0.0.0.0 # Listen on all network interfaces + port: 8000 # Port number for the server + +# Applications: each entry defines a service component deployed on the server +applications: + + # 1. TwinkleServer - The central management server + # Handles client connections, training run tracking, checkpoint listing. + - name: server + route_prefix: /server # API endpoint prefix + import_path: server # Python module to import + args: + + deployments: + - name: TwinkleServer + autoscaling_config: + min_replicas: 1 # Minimum number of replicas + max_replicas: 1 # Maximum number of replicas + target_ongoing_requests: 128 # Target concurrent requests per replica + ray_actor_options: + num_cpus: 0.1 # CPU resources allocated to this actor + + # 2. Model Service - Hosts the base model for training (Megatron backend) + # This is the actual model worker that performs forward/backward passes. + - name: models-Qwen2.5-7B-Instruct + route_prefix: /models/Qwen/Qwen2.5-7B-Instruct # REST path for this model + import_path: model + args: + use_megatron: true # Use Megatron-LM backend (not HuggingFace) + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" # ModelScope model identifier to load + nproc_per_node: 2 # Number of GPU processes per node + device_group: # Logical device group for this model + name: model + ranks: [0,1] # GPU rank indices to use + device_type: cuda + device_mesh: # Distributed training mesh configuration + device_type: cuda + mesh: [0,1] # Device indices in the mesh + mesh_dim_names: ['dp'] # Mesh dimension names: 'dp' = data parallel + deployments: + - name: ModelManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + + # 3. Processor Service - Handles data preprocessing on CPU + # Runs tokenization, template application, and other CPU-bound tasks. + - name: processor + route_prefix: /processors + import_path: processor + args: + nproc_per_node: 2 # Number of processor workers per node + ncpu_proc_per_node: 2 # Number of CPU processes per node + device_group: + name: model + ranks: 2 # CPU rank index + device_type: CPU + device_mesh: + device_type: CPU + mesh: [0,1] + mesh_dim_names: ['dp'] + deployments: + - name: ProcessorManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 128 + ray_actor_options: + num_cpus: 0.1 \ No newline at end of file diff --git a/cookbook/legacy/client/twinkle/transformer/lora.py b/cookbook/client/twinkle/transformer/lora.py similarity index 50% rename from cookbook/legacy/client/twinkle/transformer/lora.py rename to cookbook/client/twinkle/transformer/lora.py index f83a18c7..5e2d292a 100644 --- a/cookbook/legacy/client/twinkle/transformer/lora.py +++ b/cookbook/client/twinkle/transformer/lora.py @@ -1,3 +1,10 @@ +# Twinkle Client - Transformers LoRA Training Example +# +# This script demonstrates how to fine-tune a language model using LoRA +# (Low-Rank Adaptation) through the Twinkle client-server architecture. +# The server must be running first (see server.py and server_config.yaml). + +# Step 1: Load environment variables from a .env file (e.g., API tokens) import dotenv dotenv.load_dotenv('.env') @@ -13,65 +20,113 @@ logger = get_logger() +# Step 2: Initialize the Twinkle client to communicate with the remote server. +# - base_url: the address of the running Twinkle server +# - api_key: authentication token (loaded from environment variable) client = init_twinkle_client( base_url='http://127.0.0.1:8000', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')) -# List all training runs + +# Step 3: Query the server for existing training runs and their checkpoints. +# This is useful for resuming a previous training session. runs = client.list_training_runs() resume_path = None for run in runs: logger.info(run.model_dump_json(indent=2)) - # Get checkpoints for a run + # List all saved checkpoints for this training run checkpoints = client.list_checkpoints(run.training_run_id) for checkpoint in checkpoints: logger.info(checkpoint.model_dump_json(indent=2)) + # Uncomment the line below to resume from a specific checkpoint: # resume_path = checkpoint.twinkle_path def train(): + # Step 4: Prepare the dataset + + # Load the self-cognition dataset from ModelScope dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) + + # Apply a chat template so the data matches the model's expected input format dataset.set_template( 'Template', model_id='ms://Qwen/Qwen2.5-7B-Instruct', max_length=512) + + # Replace placeholder names in the dataset with custom model/author names dataset.map('SelfCognitionProcessor', init_args={ 'model_name': 'twinkle模型', 'model_author': 'twinkle团队'}) + + # Tokenize and encode the dataset into model-ready input features dataset.encode(batched=True) + + # Wrap the dataset into a DataLoader that yields batches of size 8 dataloader = DataLoader(dataset=dataset, batch_size=8) + # Step 5: Configure the model + + # Create a multi-LoRA Transformers model pointing to the base model on ModelScope model = MultiLoraTransformersModel( model_id='ms://Qwen/Qwen2.5-7B-Instruct') + # Define LoRA configuration: apply low-rank adapters to all linear layers lora_config = LoraConfig( target_modules='all-linear' ) + # Attach the LoRA adapter named 'default' to the model. + # gradient_accumulation_steps=2 means gradients are accumulated over 2 micro-batches + # before an optimizer step, effectively doubling the batch size. model.add_adapter_to_model( 'default', lora_config, gradient_accumulation_steps=2) + + # Set the same chat template used during data preprocessing model.set_template('Template') + + # Set the input processor (pads sequences on the right side) model.set_processor('InputProcessor', padding_side='right') + + # Use cross-entropy loss for language modeling model.set_loss('CrossEntropyLoss') + + # Use AdamW optimizer with a learning rate of 1e-4 model.set_optimizer('AdamW', lr=1e-4) + + # Use a linear learning rate scheduler (linearly decays the LR) model.set_lr_scheduler('LinearLR') - # Resume training if resume_path is provided + + # Step 6: Optionally resume from a previous checkpoint if resume_path: logger.info(f'Resuming training from {resume_path}') model.load(resume_path, load_optimizer=True) - # Start training + + # Step 7: Run the training loop logger.info(model.get_train_configs()) + for step, batch in enumerate(dataloader): + # Forward pass + backward pass (computes gradients) output = model.forward_backward(inputs=batch) + + # Log the loss every 2 steps (aligned with gradient accumulation) if step % 2 == 0: logger.info(f'Current is step {step // 2}, loss: {output}') + + # Clip gradients to prevent exploding gradients (max norm = 1.0) model.clip_grad_norm(1.0) + + # Perform one optimizer step (update model weights) model.step() + + # Reset gradients to zero for the next iteration model.zero_grad() + + # Advance the learning rate scheduler by one step model.lr_step() - # Save the model + # Step 8: Save the trained checkpoint twinkle_path = model.save(name=f'step-{step}', save_optimizer=True) logger.info(f"Saved checkpoint: {twinkle_path}") - - # Upload the model to ModelScope + + # Step 9: Upload the checkpoint to ModelScope Hub hub_model_id = 'AlexEz/twinkle-self-cognition' model.upload_to_hub( checkpoint_dir=twinkle_path, diff --git a/cookbook/client/twinkle/transformer/server.py b/cookbook/client/twinkle/transformer/server.py new file mode 100644 index 00000000..92260007 --- /dev/null +++ b/cookbook/client/twinkle/transformer/server.py @@ -0,0 +1,20 @@ +# Twinkle Server Launcher - Transformers Backend +# +# This script starts the Twinkle server using Ray Serve. +# It reads the server_config.yaml in the same directory for all +# configuration (model, processor, deployment settings, etc.). +# Run this script BEFORE running the client training script (lora.py). + +import os + +# Enable Ray debug mode for verbose logging during development +os.environ['RAY_DEBUG'] = '1' + +from twinkle.server import launch_server + +# Resolve the path to server_config.yaml relative to this script's location +file_dir = os.path.abspath(os.path.dirname(__file__)) +config_path = os.path.join(file_dir, 'server_config.yaml') + +# Launch the Twinkle server — this call blocks until the server is shut down +launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/client/twinkle/transformer/server_config.yaml b/cookbook/client/twinkle/transformer/server_config.yaml new file mode 100644 index 00000000..e65c2e7c --- /dev/null +++ b/cookbook/client/twinkle/transformer/server_config.yaml @@ -0,0 +1,86 @@ +# Twinkle Server Configuration - Transformers Backend + +# Server protocol type: "twinkle" for the native Twinkle client protocol +server_type: twinkle + +# proxy_location: determines where the HTTP proxy runs. +# "EveryNode" means each Ray node runs its own proxy (good for multi-node). +proxy_location: EveryNode + +# HTTP listener settings +http_options: + host: 0.0.0.0 # Listen on all network interfaces + port: 8000 # Port number for the server + +# Applications: each entry defines a service component deployed on the server +applications: + + # 1. TwinkleServer - The central management server + # Handles client connections, training run tracking, checkpoint listing. + - name: server + route_prefix: /server # API endpoint prefix + import_path: server # Python module to import + args: + + deployments: + - name: TwinkleServer + autoscaling_config: + min_replicas: 1 # Minimum number of replicas + max_replicas: 1 # Maximum number of replicas + target_ongoing_requests: 128 # Target concurrent requests per replica + ray_actor_options: + num_cpus: 0.1 # CPU resources allocated to this actor + + # 2. Model Service - Hosts the base model for training + # This is the actual model worker that performs forward/backward passes. + - name: models-Qwen2.5-7B-Instruct + route_prefix: /models/Qwen/Qwen2.5-7B-Instruct # REST path for this model + import_path: model + args: + use_megatron: false # Use HuggingFace Transformers (not Megatron) + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" # ModelScope model identifier to load + adapter_config: + per_token_adapter_limit: 30 # Max LoRA adapters that can be active simultaneously + adapter_timeout: 1800 # Seconds before an idle adapter is unloaded + nproc_per_node: 2 # Number of GPU processes per node + device_group: # Logical device group for this model + name: model + ranks: [0,1] # GPU rank indices to use + device_type: cuda + device_mesh: # Distributed training mesh configuration + device_type: cuda + mesh: [0,1] # Device indices in the mesh + mesh_dim_names: ['dp'] # Mesh dimension names: 'dp' = data parallel + deployments: + - name: ModelManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + + # 3. Processor Service - Handles data preprocessing on CPU + # Runs tokenization, template application, and other CPU-bound tasks. + - name: processor + route_prefix: /processors + import_path: processor + args: + nproc_per_node: 2 # Number of processor workers per node + ncpu_proc_per_node: 2 # Number of CPU processes per node + device_group: + name: model + ranks: 2 # CPU rank index + device_type: CPU + device_mesh: + device_type: CPU + mesh: [0,1] + mesh_dim_names: ['dp'] + deployments: + - name: ProcessorManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 128 + ray_actor_options: + num_cpus: 0.1 \ No newline at end of file diff --git a/cookbook/legacy/client/tinker/megatron/server.py b/cookbook/legacy/client/tinker/megatron/server.py deleted file mode 100644 index 1fd179f1..00000000 --- a/cookbook/legacy/client/tinker/megatron/server.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -os.environ['RAY_DEBUG'] = '1' - -from twinkle.server import launch_server - -file_dir = os.path.abspath(os.path.dirname(__file__)) -config_path = os.path.join(file_dir, 'server_config.yaml') - -launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/legacy/client/tinker/megatron/server_config.yaml b/cookbook/legacy/client/tinker/megatron/server_config.yaml deleted file mode 100644 index 3c2a9265..00000000 --- a/cookbook/legacy/client/tinker/megatron/server_config.yaml +++ /dev/null @@ -1,49 +0,0 @@ -server_type: tinker -proxy_location: EveryNode -http_options: - host: 0.0.0.0 - port: 8000 - -applications: - - name: server - route_prefix: /api/v1 - import_path: server - args: - - deployments: - - name: TinkerCompatServer - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - - - name: models-Qwen2.5-0.5B-Instruct - route_prefix: /api/v1/model/Qwen/Qwen2.5-0.5B-Instruct - import_path: model - args: - use_megatron: true - model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" - nproc_per_node: 2 - device_group: - name: model - ranks: [0, 1] - device_type: cuda - device_mesh: - device_type: cuda - mesh: [0, 1] - mesh_dim_names: ['dp'] - deployments: - - name: ModelManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 16 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - diff --git a/cookbook/legacy/client/tinker/transformer/sample.py b/cookbook/legacy/client/tinker/transformer/sample.py deleted file mode 100644 index 2c070ba6..00000000 --- a/cookbook/legacy/client/tinker/transformer/sample.py +++ /dev/null @@ -1,27 +0,0 @@ -#%% -from tinker import types -from twinkle_client import init_tinker_compat_client -from modelscope import AutoTokenizer - -base_model = "Qwen/Qwen2.5-0.5B-Instruct" -service_client = init_tinker_compat_client(base_url='http://localhost:8000', api_key="tml-EMPTY_TOKEN") - -sampling_client = service_client.create_sampling_client( - model_path="twinkle://20260130_133245-Qwen_Qwen2_5-0_5B-Instruct-ffebd239/weights/pig-latin-lora-epoch-1", - base_model=base_model) - - -print(f"Using model {base_model}") -tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) -# First, create a sampling client. We need to transfer weights - -# Now, we can sample from the model. -prompt = types.ModelInput.from_ints(tokenizer.encode("English: coffee break\nPig Latin:")) -params = types.SamplingParams(max_tokens=20, temperature=0.0, stop=["\n"]) # Greedy sampling - -print("Sampling...") -future = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8) -result = future.result() -print("Responses:") -for i, seq in enumerate(result.sequences): - print(f"{i}: {repr(tokenizer.decode(seq.tokens))}") diff --git a/cookbook/legacy/client/tinker/transformer/server.py b/cookbook/legacy/client/tinker/transformer/server.py deleted file mode 100644 index 1fd179f1..00000000 --- a/cookbook/legacy/client/tinker/transformer/server.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -os.environ['RAY_DEBUG'] = '1' - -from twinkle.server import launch_server - -file_dir = os.path.abspath(os.path.dirname(__file__)) -config_path = os.path.join(file_dir, 'server_config.yaml') - -launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/legacy/client/tinker/transformer/server_config.yaml b/cookbook/legacy/client/tinker/transformer/server_config.yaml deleted file mode 100644 index df56f60a..00000000 --- a/cookbook/legacy/client/tinker/transformer/server_config.yaml +++ /dev/null @@ -1,86 +0,0 @@ -server_type: tinker -proxy_location: EveryNode -http_options: - host: 0.0.0.0 - port: 8000 - -applications: - - name: server - route_prefix: /api/v1 - import_path: server - args: - - deployments: - - name: TinkerCompatServer - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - - - name: models-Qwen2.5-0.5B-Instruct - route_prefix: /api/v1/model/Qwen/Qwen2.5-0.5B-Instruct - import_path: model - args: - use_megatron: false - model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" - nproc_per_node: 2 - device_group: - name: model - ranks: [0, 1] - device_type: cuda - device_mesh: - device_type: cuda - mesh: [0, 1] - mesh_dim_names: ['dp'] - queue_config: - rps_limit: 100 - tps_limit: 10000 - adapter_config: - per_token_adapter_limit: 30 - adapter_timeout: 1800 - deployments: - - name: ModelManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 16 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - - - name: sampler-Qwen2.5-0.5B-Instruct - route_prefix: /api/v1/sampler/Qwen/Qwen2.5-0.5B-Instruct - import_path: sampler - args: - model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" - nproc_per_node: 1 - sampler_type: vllm # or 'torch' for TorchSampler - engine_args: - max_model_len: 4096 - gpu_memory_utilization: 0.5 - enable_lora: false - device_group: - name: sampler - ranks: [0] - device_type: cuda - device_mesh: - device_type: cuda - mesh: [0] - mesh_dim_names: ['dp'] - deployments: - - name: SamplerManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 16 - ray_actor_options: - num_cpus: 0.1 - num_gpus: 1 - logging_config: - log_level: DEBUG - diff --git a/cookbook/legacy/client/twinkle/megatron/lora.py b/cookbook/legacy/client/twinkle/megatron/lora.py deleted file mode 100644 index d3744d09..00000000 --- a/cookbook/legacy/client/twinkle/megatron/lora.py +++ /dev/null @@ -1,63 +0,0 @@ -from peft import LoraConfig - -from twinkle import get_device_placement, get_logger -from twinkle.dataset import DatasetMeta -from twinkle_client.dataloader import DataLoader -from twinkle_client.dataset import Dataset -from twinkle_client.model import MultiLoraTransformersModel -from twinkle_client import init_twinkle_client - -logger = get_logger() - -client = init_twinkle_client(base_url='http://127.0.0.1:8000', api_key='tml-xxxx') -# List all training runs -runs = client.list_training_runs() - -resume_path = None -for run in runs: - logger.info(run.model_dump_json(indent=2)) - # Get checkpoints for a run - checkpoints = client.list_checkpoints(run.training_run_id) - - for checkpoint in checkpoints: - logger.info(checkpoint.model_dump_json(indent=2)) - # resume_path = checkpoint.twinkle_path -def train(): - dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) - dataset.set_template('Template', model_id='ms://Qwen/Qwen2.5-7B-Instruct', max_length=512) - dataset.map('SelfCognitionProcessor', init_args={'model_name': 'twinkle模型', 'model_author': 'twinkle团队'}) - dataset.encode(batched=True) - dataloader = DataLoader(dataset=dataset, batch_size=8) - - model = MultiLoraTransformersModel(model_id='ms://Qwen/Qwen2.5-7B-Instruct') - - lora_config = LoraConfig( - target_modules='all-linear' - ) - - model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) - model.set_template('Template') - model.set_processor('InputProcessor', padding_side='right') - # not set loss for megatron model - model.set_optimizer('default', lr=1e-4) - model.set_lr_scheduler('default', lr_decay_steps=1000, max_lr=1e-4) - # Resume training if resume_path is provided - if resume_path: - logger.info(f'Resuming training from {resume_path}') - model.load(resume_path, load_optimizer=True) - logger.info(model.get_train_configs()) - for step, batch in enumerate(dataloader): - output = model.forward_backward(inputs=batch) - if step % 2 == 0: - logger.info(f'Current is step {step // 2}, loss: {output}') - model.clip_grad_norm(1.0) - model.step() - model.zero_grad() - model.lr_step() - if step > 0 and step % 8 == 0: - logger.info(f'Saving checkpoint at step {step}') - model.save(f'step-{step}', save_optimizer=True) - - -if __name__ == '__main__': - train() diff --git a/cookbook/legacy/client/twinkle/megatron/server.py b/cookbook/legacy/client/twinkle/megatron/server.py deleted file mode 100644 index 1fd179f1..00000000 --- a/cookbook/legacy/client/twinkle/megatron/server.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -os.environ['RAY_DEBUG'] = '1' - -from twinkle.server import launch_server - -file_dir = os.path.abspath(os.path.dirname(__file__)) -config_path = os.path.join(file_dir, 'server_config.yaml') - -launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/legacy/client/twinkle/megatron/server_config.yaml b/cookbook/legacy/client/twinkle/megatron/server_config.yaml deleted file mode 100644 index 004c1e86..00000000 --- a/cookbook/legacy/client/twinkle/megatron/server_config.yaml +++ /dev/null @@ -1,69 +0,0 @@ -server_type: twinkle -proxy_location: EveryNode -http_options: - host: 0.0.0.0 - port: 8000 - -applications: - - name: server - route_prefix: /server - import_path: server - args: - - deployments: - - name: TwinkleServer - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - - - name: models-Qwen2.5-7B-Instruct - route_prefix: /models/Qwen/Qwen2.5-7B-Instruct - import_path: model - args: - use_megatron: true - model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" - nproc_per_node: 2 - device_group: - name: model - ranks: [0,1] - device_type: cuda - device_mesh: - device_type: cuda - mesh: [0,1] - mesh_dim_names: ['dp'] - deployments: - - name: ModelManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 16 - ray_actor_options: - num_cpus: 0.1 - - - name: processor - route_prefix: /processors - import_path: processor - args: - nproc_per_node: 2 - ncpu_proc_per_node: 2 - device_group: - name: model - ranks: 2 - device_type: CPU - device_mesh: - device_type: CPU - mesh: [0,1] - mesh_dim_names: ['dp'] - deployments: - - name: ProcessorManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 \ No newline at end of file diff --git a/cookbook/legacy/client/twinkle/transformer/grpo_lora.py b/cookbook/legacy/client/twinkle/transformer/grpo_lora.py deleted file mode 100644 index c7194944..00000000 --- a/cookbook/legacy/client/twinkle/transformer/grpo_lora.py +++ /dev/null @@ -1,65 +0,0 @@ -import numpy as np -from peft import LoraConfig - -import client -from client.dataloader import DataLoader -from client.dataset import Dataset, DatasetMeta -from client.model import TransformersModel -from client.processor import GRPOLossProcessor -from client.reward import MathReward -from client.sampler import VLLMSampler -from client.weight_syncronizer.vanilla_synchronizer import VanillaSynchronizer - - -client.initialize(mode='remote') - - -def create_dataset(): - dataset = Dataset(DatasetMeta('ms://modelscope/competition_math')) - dataset.set_template('Qwen3Template') - dataset.map('CompetitionMathProcessor') - dataset.check(batched=True) - return dataset - - -def train(): - dataloader = DataLoader( - create_dataset, - remote_group='actor', - device_mesh=actor_device_mesh - ) - - engine_args = { - - } - lora_config = LoraConfig( - target_modules=['q_proj', 'k_proj', 'v_proj', 'o_proj'] - ) - - actor_group = ActorGroup( - engine_args, - remote_group='actor', - lora_config=lora_config, - adapter_name='default', - ) - - ref_model = TransformersModel( - model_id='Qwen/Qwen2.5-7B-Instruct', - remote_group='ref', - device_mesh=ref_device_mesh - ) - ref_model.set_processor('InputProcessor') - ref_model.set_template('Qwen3Template') - reward = MathReward() - - print("Device placement:", get_device_placement()) - - for batch in dataloader: - trajectories = actor_group.sample(batch) - old_logits = actor_group.forward(trajectories) - ref_logits = ref_model.forward(trajectories) - trajectories = reward.calculate(trajectories, batch) - actor_group.forward_backward(batch, trajectories, ref_logits, adapter_name='default') - actor_group.step() - actor_group.zero_grad() - actor_group.lr_step() diff --git a/cookbook/legacy/client/twinkle/transformer/server.py b/cookbook/legacy/client/twinkle/transformer/server.py deleted file mode 100644 index 1fd179f1..00000000 --- a/cookbook/legacy/client/twinkle/transformer/server.py +++ /dev/null @@ -1,9 +0,0 @@ -import os -os.environ['RAY_DEBUG'] = '1' - -from twinkle.server import launch_server - -file_dir = os.path.abspath(os.path.dirname(__file__)) -config_path = os.path.join(file_dir, 'server_config.yaml') - -launch_server(config_path=config_path) \ No newline at end of file diff --git a/cookbook/legacy/client/twinkle/transformer/server_config.yaml b/cookbook/legacy/client/twinkle/transformer/server_config.yaml deleted file mode 100644 index 7a82bfe2..00000000 --- a/cookbook/legacy/client/twinkle/transformer/server_config.yaml +++ /dev/null @@ -1,72 +0,0 @@ -server_type: twinkle -proxy_location: EveryNode -http_options: - host: 0.0.0.0 - port: 8000 - -applications: - - name: server - route_prefix: /server - import_path: server - args: - - deployments: - - name: TwinkleServer - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 - logging_config: - log_level: DEBUG - - - name: models-Qwen2.5-7B-Instruct - route_prefix: /models/Qwen/Qwen2.5-7B-Instruct - import_path: model - args: - use_megatron: false - model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" - adapter_config: - per_token_adapter_limit: 30 - adapter_timeout: 1800 - nproc_per_node: 2 - device_group: - name: model - ranks: [0,1] - device_type: cuda - device_mesh: - device_type: cuda - mesh: [0,1] - mesh_dim_names: ['dp'] - deployments: - - name: ModelManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 16 - ray_actor_options: - num_cpus: 0.1 - - - name: processor - route_prefix: /processors - import_path: processor - args: - nproc_per_node: 2 - ncpu_proc_per_node: 2 - device_group: - name: model - ranks: 2 - device_type: CPU - device_mesh: - device_type: CPU - mesh: [0,1] - mesh_dim_names: ['dp'] - deployments: - - name: ProcessorManagement - autoscaling_config: - min_replicas: 1 - max_replicas: 1 - target_ongoing_requests: 128 - ray_actor_options: - num_cpus: 0.1 \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index ef9086a2..e6e9b848 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -11,6 +11,7 @@ Twinkle DOCUMENTATION 使用指引/快速开始.md 使用指引/安装.md + 使用指引/服务端和客户端/index.rst 使用指引/NPU的支持.md .. toctree:: diff --git "a/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" new file mode 100644 index 00000000..2e25527c --- /dev/null +++ "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Tinker\345\205\274\345\256\271\345\256\242\346\210\267\347\253\257.md" @@ -0,0 +1,254 @@ +# Tinker 兼容客户端 + +Tinker 兼容 Client 适用于已有 Tinker 训练代码的场景。通过 `init_tinker_compat_client` 初始化后,会对 Tinker SDK 进行 patch,使其指向 Twinkle Server,**其余代码可直接复用已有的 Tinker 训练代码**。 + +## 初始化 + +```python +from twinkle_client import init_tinker_compat_client + +# 初始化 Tinker 兼容客户端 +# init_tinker_compat_client 会自动 patch Tinker SDK, +# 使其可以连接到 Twinkle Server 而非 Tinker Server +service_client = init_tinker_compat_client( + base_url='http://localhost:8000', # Server 地址 + api_key='your-api-key' # 认证令牌 +) + +# 验证连接:列出 Server 上可用的模型 +for item in service_client.get_server_capabilities().supported_models: + print("- " + item.model_name) +``` + +### init_tinker_compat_client 做了什么? + +调用 `init_tinker_compat_client` 时,会自动执行以下操作: + +1. **Patch Tinker SDK**:绕过 Tinker 的 `tinker://` 前缀校验,使其可以连接到标准 HTTP 地址 +2. **设置请求头**:注入 `X-Ray-Serve-Request-Id` 和 `Twinkle-Authorization` 等必要的认证头 +3. **返回 `ServiceClient`**:返回一个标准的 Tinker `ServiceClient` 对象,后续操作与原生 Tinker 完全一致 + +这意味着在初始化之后,**所有已有的 Tinker 训练代码都可以直接使用**,无需任何修改。 + +## 完整训练示例 + +```python +import os +import numpy as np +import dotenv +dotenv.load_dotenv('.env') + +from tinker import types +from modelscope import AutoTokenizer +from twinkle_client import init_tinker_compat_client + +# Step 1: 初始化客户端(会自动 patch Tinker SDK) +service_client = init_tinker_compat_client( + base_url='http://localhost:8000', + api_key=os.environ.get('MODELSCOPE_SDK_TOKEN') +) + +# Step 2: 查询已有训练运行(可选) +rest_client = service_client.create_rest_client() +response = rest_client.list_training_runs(limit=50).result() +print(f"Found {len(response.training_runs)} training runs") + +# Step 3: 创建训练客户端 +base_model = "Qwen/Qwen2.5-0.5B-Instruct" + +# 新建训练会话 +training_client = service_client.create_lora_training_client( + base_model=base_model +) + +# 或从检查点恢复 +# resume_path = "twinkle://run_id/weights/checkpoint_name" +# training_client = service_client.create_training_client_from_state_with_optimizer(path=resume_path) + +# Step 4: 准备训练数据 +examples = [ + {"input": "banana split", "output": "anana-bay plit-say"}, + {"input": "quantum physics", "output": "uantum-qay ysics-phay"}, + {"input": "donut shop", "output": "onut-day op-shay"}, +] + +tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + +def process_example(example: dict, tokenizer) -> types.Datum: + """将原始样本转为 Tinker API 所需的 Datum 格式。 + + Datum 包含: + - model_input: 输入 token IDs + - loss_fn_inputs: 目标 token 和逐 token 权重(0=忽略, 1=计算损失) + """ + prompt = f"English: {example['input']}\nPig Latin:" + + # 提示部分:weight=0,不参与损失计算 + prompt_tokens = tokenizer.encode(prompt, add_special_tokens=True) + prompt_weights = [0] * len(prompt_tokens) + + # 补全部分:weight=1,参与损失计算 + completion_tokens = tokenizer.encode(f" {example['output']}\n\n", add_special_tokens=False) + completion_weights = [1] * len(completion_tokens) + + # 拼接并构建 next-token prediction 格式 + tokens = prompt_tokens + completion_tokens + weights = prompt_weights + completion_weights + + input_tokens = tokens[:-1] + target_tokens = tokens[1:] + weights = weights[1:] + + return types.Datum( + model_input=types.ModelInput.from_ints(tokens=input_tokens), + loss_fn_inputs=dict(weights=weights, target_tokens=target_tokens) + ) + +processed_examples = [process_example(ex, tokenizer) for ex in examples] + +# Step 5: 训练循环 +for epoch in range(2): + for batch in range(5): + # 发送训练数据到 Server:前向 + 反向传播 + fwdbwd_future = training_client.forward_backward(processed_examples, "cross_entropy") + # 优化器更新 + optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) + + # 等待结果 + fwdbwd_result = fwdbwd_future.result() + optim_result = optim_future.result() + + # 计算加权平均 log-loss + logprobs = np.concatenate([o['logprobs'].tolist() for o in fwdbwd_result.loss_fn_outputs]) + weights = np.concatenate([e.loss_fn_inputs['weights'].tolist() for e in processed_examples]) + print(f"Epoch {epoch}, Batch {batch}: Loss = {-np.dot(logprobs, weights) / weights.sum():.4f}") + + # 每个 epoch 保存检查点 + save_result = training_client.save_state(f"lora-epoch-{epoch}").result() + print(f"Saved checkpoint to {save_result.path}") +``` + +## 使用 Twinkle 数据集组件 + +Tinker 兼容模式也可以利用 Twinkle 的数据集组件来简化数据准备,而不是手动构建 `Datum`: + +```python +from tqdm import tqdm +from tinker import types +from twinkle_client import init_tinker_compat_client +from twinkle.dataloader import DataLoader +from twinkle.dataset import Dataset, DatasetMeta +from twinkle.preprocessor import SelfCognitionProcessor +from twinkle.server.tinker.common import input_feature_to_datum + +base_model = "Qwen/Qwen2.5-0.5B-Instruct" + +# 使用 Twinkle 的 Dataset 组件加载和预处理数据 +dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500))) +dataset.set_template('Template', model_id=f'ms://{base_model}', max_length=256) +dataset.map(SelfCognitionProcessor('twinkle模型', 'twinkle团队'), load_from_cache_file=False) +dataset.encode(batched=True, load_from_cache_file=False) +dataloader = DataLoader(dataset=dataset, batch_size=8) + +# 初始化 Tinker 兼容客户端 +service_client = init_tinker_compat_client(base_url='http://localhost:8000') +training_client = service_client.create_lora_training_client(base_model=base_model, rank=16) + +# 训练循环:使用 input_feature_to_datum 转换数据格式 +for epoch in range(3): + for step, batch in tqdm(enumerate(dataloader)): + # 将 Twinkle 的 InputFeature 转换为 Tinker 的 Datum + input_datum = [input_feature_to_datum(input_feature) for input_feature in batch] + + fwdbwd_future = training_client.forward_backward(input_datum, "cross_entropy") + optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4)) + + fwdbwd_result = fwdbwd_future.result() + optim_result = optim_future.result() + + training_client.save_state(f"twinkle-lora-{epoch}").result() +``` + +## 推理采样 + +Tinker 兼容模式支持推理采样功能(需要 Server 配置了 Sampler 服务)。 + +### 从训练中采样 + +在训练完成后,可以直接从训练客户端创建采样客户端: + +```python +# 保存当前权重并创建采样客户端 +sampling_client = training_client.save_weights_and_get_sampling_client(name='my-model') + +# 准备推理输入 +prompt = types.ModelInput.from_ints(tokenizer.encode("English: coffee break\nPig Latin:")) +params = types.SamplingParams( + max_tokens=20, # 最大生成 token 数 + temperature=0.0, # 贪心采样(确定性输出) + stop=["\n"] # 遇到换行停止 +) + +# 生成多条补全 +result = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8).result() + +for i, seq in enumerate(result.sequences): + print(f"{i}: {tokenizer.decode(seq.tokens)}") +``` + +### 从检查点采样 + +也可以加载已保存的检查点进行推理: + +```python +from tinker import types +from modelscope import AutoTokenizer +from twinkle_client import init_tinker_compat_client + +base_model = "Qwen/Qwen2.5-0.5B-Instruct" + +# 初始化客户端 +service_client = init_tinker_compat_client(base_url='http://localhost:8000') + +# 从已保存的检查点创建采样客户端 +sampling_client = service_client.create_sampling_client( + model_path="twinkle://run_id/weights/checkpoint_name", # 检查点的 twinkle:// 路径 + base_model=base_model +) + +# 准备推理输入 +tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True) + +# 构建多轮对话输入 +inputs = [ + {'role': 'system', 'content': 'You are a helpful assistant.'}, + {'role': 'user', 'content': 'what is your name?'} +] +input_ids = tokenizer.apply_chat_template(inputs, tokenize=True, add_generation_prompt=True) + +prompt = types.ModelInput.from_ints(input_ids) +params = types.SamplingParams( + max_tokens=50, # 最大生成 token 数 + temperature=0.2, # 低温度,更聚焦的回答 + stop=["\n"] # 遇到换行停止 +) + +# 生成多条补全 +result = sampling_client.sample(prompt=prompt, sampling_params=params, num_samples=8).result() + +for i, seq in enumerate(result.sequences): + print(f"{i}: {tokenizer.decode(seq.tokens)}") +``` + +### 发布检查点到 ModelScope Hub + +训练完成后,可以通过 REST client 将检查点发布到 ModelScope Hub: + +```python +rest_client = service_client.create_rest_client() + +# 从 tinker 路径发布检查点 +# 需要在初始化客户端时设置有效的 ModelScope token 作为 api_key +rest_client.publish_checkpoint_from_tinker_path(save_result.path).result() +print("Published checkpoint to ModelScope Hub") +``` diff --git "a/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" new file mode 100644 index 00000000..4d5734b3 --- /dev/null +++ "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/Twinkle\345\256\242\346\210\267\347\253\257.md" @@ -0,0 +1,174 @@ +# Twinkle 客户端 + +Twinkle Client 是原生客户端,设计理念是:**将 `from twinkle import` 改为 `from twinkle_client import`,即可将本地训练代码迁移为远端调用,原有训练逻辑无需改动**。 + +## 初始化 + +```python +from twinkle_client import init_twinkle_client + +# 初始化客户端,连接到 Twinkle Server +client = init_twinkle_client( + base_url='http://127.0.0.1:8000', # Server 地址 + api_key='your-api-key' # 认证令牌(可通过环境变量 TWINKLE_SERVER_TOKEN 设置) +) +``` + +初始化完成后,`client` 对象(`TwinkleClient`)提供以下管理功能: + +```python +# 健康检查 +client.health_check() + +# 列出当前用户的训练运行 +runs = client.list_training_runs(limit=20) + +# 获取特定训练运行详情 +run = client.get_training_run(run_id='xxx') + +# 列出检查点 +checkpoints = client.list_checkpoints(run_id='xxx') + +# 获取检查点路径(用于恢复训练) +path = client.get_checkpoint_path(run_id='xxx', checkpoint_id='yyy') + +# 获取最新检查点路径 +latest_path = client.get_latest_checkpoint_path(run_id='xxx') +``` + +## 从本地代码迁移到远端 + +迁移非常简单,只需将 import 路径从 `twinkle` 替换为 `twinkle_client`: + +```python +# 本地训练代码(原始) +from twinkle.dataloader import DataLoader +from twinkle.dataset import Dataset +from twinkle.model import MultiLoraTransformersModel + +# 远端训练代码(迁移后) +from twinkle_client.dataloader import DataLoader +from twinkle_client.dataset import Dataset +from twinkle_client.model import MultiLoraTransformersModel +``` + +训练循环、数据处理等逻辑完全不需要修改。 + +## 完整训练示例(Transformers 后端) + +```python +import os +import dotenv +dotenv.load_dotenv('.env') + +from peft import LoraConfig +from twinkle import get_logger +from twinkle.dataset import DatasetMeta + +# 从 twinkle_client import 替代 twinkle,实现远端调用 +from twinkle_client.dataloader import DataLoader +from twinkle_client.dataset import Dataset +from twinkle_client.model import MultiLoraTransformersModel +from twinkle_client import init_twinkle_client + +logger = get_logger() + +# Step 1: 初始化客户端 +client = init_twinkle_client( + base_url='http://127.0.0.1:8000', + api_key=os.environ.get('MODELSCOPE_SDK_TOKEN') +) + +# Step 2: 查询已有训练运行(可选,用于恢复训练) +runs = client.list_training_runs() +resume_path = None +for run in runs: + checkpoints = client.list_checkpoints(run.training_run_id) + for checkpoint in checkpoints: + logger.info(checkpoint.model_dump_json(indent=2)) + # 取消注释以从检查点恢复: + # resume_path = checkpoint.twinkle_path + +# Step 3: 准备数据集 +dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition')) + +# 设置 chat 模板,使数据匹配模型的输入格式 +dataset.set_template('Template', model_id='ms://Qwen/Qwen2.5-7B-Instruct', max_length=512) + +# 数据预处理:替换占位符为自定义名称 +dataset.map('SelfCognitionProcessor', + init_args={'model_name': 'twinkle模型', 'model_author': 'twinkle团队'}) + +# 编码数据集为模型可用的 token +dataset.encode(batched=True) + +# 创建 DataLoader +dataloader = DataLoader(dataset=dataset, batch_size=8) + +# Step 4: 配置模型 +model = MultiLoraTransformersModel(model_id='ms://Qwen/Qwen2.5-7B-Instruct') + +# 配置 LoRA +lora_config = LoraConfig(target_modules='all-linear') +model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) + +# 设置模板、处理器、损失函数 +model.set_template('Template') +model.set_processor('InputProcessor', padding_side='right') +model.set_loss('CrossEntropyLoss') + +# 设置优化器和学习率调度器 +model.set_optimizer('AdamW', lr=1e-4) +model.set_lr_scheduler('LinearLR') + +# Step 5: 恢复训练(可选) +if resume_path: + logger.info(f'Resuming training from {resume_path}') + model.load(resume_path, load_optimizer=True) + +# Step 6: 训练循环 +for step, batch in enumerate(dataloader): + # 前向传播 + 反向传播 + output = model.forward_backward(inputs=batch) + + if step % 2 == 0: + logger.info(f'Step {step // 2}, loss: {output}') + + # 梯度裁剪 + model.clip_grad_norm(1.0) + + # 优化器更新 + model.step() + + # 梯度清零 + model.zero_grad() + + # 学习率调度 + model.lr_step() + +# Step 7: 保存检查点 +twinkle_path = model.save(name=f'step-{step}', save_optimizer=True) +logger.info(f"Saved checkpoint: {twinkle_path}") + +# Step 8: 上传到 ModelScope Hub(可选) +model.upload_to_hub( + checkpoint_dir=twinkle_path, + hub_model_id='your-username/your-model-name', + async_upload=False +) +``` + +## Megatron 后端的差异 + +使用 Megatron 后端时,客户端代码的主要差异: + +```python +# Megatron 后端不需要显式设置 loss(由 Megatron 内部计算) +# model.set_loss('CrossEntropyLoss') # 不需要 + +# 优化器和 LR 调度器使用 Megatron 内置默认值 +model.set_optimizer('default', lr=1e-4) +model.set_lr_scheduler('default', lr_decay_steps=1000, max_lr=1e-4) +``` + +其余数据处理、训练循环、检查点保存等代码完全相同。 diff --git "a/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/index.rst" "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/index.rst" new file mode 100644 index 00000000..6effe8f9 --- /dev/null +++ "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/index.rst" @@ -0,0 +1,9 @@ +服务端和客户端 +=============== +.. toctree:: + :maxdepth: 1 + + 概述.md + 服务端.md + Twinkle客户端.md + Tinker兼容客户端.md diff --git "a/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\234\215\345\212\241\347\253\257.md" "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\234\215\345\212\241\347\253\257.md" new file mode 100644 index 00000000..4d56229a --- /dev/null +++ "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\234\215\345\212\241\347\253\257.md" @@ -0,0 +1,276 @@ +# 服务端(Server) + +## 启动方式 + +Server 统一通过 `launch_server` 函数或 CLI 命令启动,配合 YAML 配置文件。 + +### 方式一:Python 脚本启动 + +```python +# server.py +import os +from twinkle.server import launch_server + +# 获取配置文件路径(与脚本同目录的 server_config.yaml) +file_dir = os.path.abspath(os.path.dirname(__file__)) +config_path = os.path.join(file_dir, 'server_config.yaml') + +# 启动服务,此调用将阻塞直到服务关闭 +launch_server(config_path=config_path) +``` + +### 方式二:命令行启动 + +```bash +# 启动 Twinkle 原生 Server +python -m twinkle.server --config server_config.yaml + +# 启动 Tinker 兼容 Server +python -m twinkle.server --config server_config.yaml --server-type tinker +``` + +CLI 支持的参数: + +| 参数 | 说明 | 默认值 | +|------|------|-------| +| `-c, --config` | YAML 配置文件路径(必须) | — | +| `-t, --server-type` | Server 模式:`twinkle` 或 `tinker` | `twinkle` | +| `--namespace` | Ray 命名空间 | tinker 模式默认 `twinkle_cluster` | +| `--no-wait` | 不阻塞等待(守护模式) | `False` | +| `--log-level` | 日志级别 | `INFO` | + +## YAML 配置详解 + +配置文件定义了 Server 的完整部署方案,包括 HTTP 监听、应用组件和资源分配。 + +### Twinkle Server + Transformers 后端 + +```yaml +# server_config.yaml — Twinkle 原生协议 + Transformers 后端 + +# 协议类型:twinkle 原生协议 +server_type: twinkle + +# HTTP 代理位置:EveryNode 表示每个 Ray 节点运行一个代理(多节点场景推荐) +proxy_location: EveryNode + +# HTTP 监听配置 +http_options: + host: 0.0.0.0 # 监听所有网络接口 + port: 8000 # 服务端口号 + +# 应用列表:每个条目定义一个部署在 Server 上的服务组件 +applications: + + # 1. TwinkleServer:中央管理服务 + # 负责处理客户端连接、训练运行跟踪、检查点管理等 + - name: server + route_prefix: /server # API 路径前缀 + import_path: server # 内置组件标识 + args: # 无额外参数 + deployments: + - name: TwinkleServer + autoscaling_config: + min_replicas: 1 # 最小副本数 + max_replicas: 1 # 最大副本数 + target_ongoing_requests: 128 # 每副本目标并发请求数 + ray_actor_options: + num_cpus: 0.1 # 此 Actor 分配的 CPU 资源 + + # 2. Model 服务:承载基座模型 + # 执行前向传播、反向传播等训练计算 + - name: models-Qwen2.5-7B-Instruct + route_prefix: /models/Qwen/Qwen2.5-7B-Instruct # 模型的 REST 路径 + import_path: model + args: + use_megatron: false # 使用 Transformers 后端 + model_id: "ms://Qwen/Qwen2.5-7B-Instruct" # ModelScope 模型标识 + adapter_config: # LoRA 适配器配置 + per_token_adapter_limit: 30 # 同时可激活的最大 LoRA 数量 + adapter_timeout: 1800 # 空闲适配器超时卸载时间(秒) + nproc_per_node: 2 # 每节点 GPU 进程数 + device_group: # 逻辑设备组 + name: model + ranks: [0, 1] # 使用的 GPU 卡号 + device_type: cuda + device_mesh: # 分布式训练网格 + device_type: cuda + mesh: [0, 1] # 网格中的设备索引 + mesh_dim_names: ['dp'] # 网格维度:dp=数据并行 + deployments: + - name: ModelManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + + # 3. Processor 服务:数据预处理 + # 在 CPU 上执行 tokenization、模板转换等预处理任务 + - name: processor + route_prefix: /processors + import_path: processor + args: + nproc_per_node: 2 # 每节点处理器 worker 数 + ncpu_proc_per_node: 2 # 每节点 CPU 进程数 + device_group: + name: model + ranks: 2 + device_type: CPU + device_mesh: + device_type: CPU + mesh: [0, 1] + mesh_dim_names: ['dp'] + deployments: + - name: ProcessorManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 128 + ray_actor_options: + num_cpus: 0.1 +``` + +### Twinkle Server + Megatron 后端 + +与 Transformers 后端的区别仅在 Model 服务的 `use_megatron` 参数: + +```yaml + # Model 服务 — Megatron 后端 + - name: models-Qwen2.5-7B-Instruct + route_prefix: /models/Qwen/Qwen2.5-7B-Instruct + import_path: model + args: + use_megatron: true # 使用 Megatron-LM 后端 + model_id: "ms://Qwen/Qwen2.5-7B-Instruct" + nproc_per_node: 2 + device_group: + name: model + ranks: [0, 1] + device_type: cuda + device_mesh: + device_type: cuda + mesh: [0, 1] + mesh_dim_names: ['dp'] +``` + +> **注意**:Megatron 后端不需要 `adapter_config`(LoRA 适配器管理由 Megatron 内部处理)。 + +### Tinker 兼容 Server 配置 + +Tinker 兼容模式的主要区别: +- `server_type` 设为 `tinker` +- `route_prefix` 使用 `/api/v1` 前缀(Tinker 协议规范) +- 可额外配置 Sampler 服务(用于推理采样) + +```yaml +# server_config.yaml — Tinker 兼容协议 + +server_type: tinker + +proxy_location: EveryNode + +http_options: + host: 0.0.0.0 + port: 8000 + +applications: + + # 1. TinkerCompatServer:中央 API 服务 + - name: server + route_prefix: /api/v1 # Tinker 协议 API 前缀 + import_path: server + args: + deployments: + - name: TinkerCompatServer + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 128 + ray_actor_options: + num_cpus: 0.1 + + # 2. Model 服务(Megatron 后端示例) + - name: models-Qwen2.5-0.5B-Instruct + route_prefix: /api/v1/model/Qwen/Qwen2.5-0.5B-Instruct + import_path: model + args: + use_megatron: true + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" + nproc_per_node: 2 + device_group: + name: model + ranks: [0, 1] + device_type: cuda + device_mesh: + device_type: cuda + mesh: [0, 1] + mesh_dim_names: ['dp'] + deployments: + - name: ModelManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + + # 3. Sampler 服务(可选,用于推理采样) + - name: sampler-Qwen2.5-0.5B-Instruct + route_prefix: /api/v1/sampler/Qwen/Qwen2.5-0.5B-Instruct + import_path: sampler + args: + model_id: "ms://Qwen/Qwen2.5-0.5B-Instruct" + nproc_per_node: 1 + sampler_type: vllm # 推理引擎:vllm(高性能)或 torch + engine_args: # vLLM 引擎参数 + max_model_len: 4096 # 最大序列长度 + gpu_memory_utilization: 0.5 # GPU 显存使用比例 + enable_lora: true # 支持推理时加载 LoRA + device_group: + name: sampler + ranks: [0] + device_type: cuda + device_mesh: + device_type: cuda + mesh: [0] + mesh_dim_names: ['dp'] + deployments: + - name: SamplerManagement + autoscaling_config: + min_replicas: 1 + max_replicas: 1 + target_ongoing_requests: 16 + ray_actor_options: + num_cpus: 0.1 + num_gpus: 1 # Sampler 需要独立 GPU +``` + +## 配置项说明 + +### 应用组件(import_path) + +| import_path | Twinkle 模式 | Tinker 模式 | 说明 | +|-------------|-------------|------------|------| +| `server` | ✅ | ✅ | 中央管理服务,处理训练运行和检查点 | +| `model` | ✅ | ✅ | 模型服务,承载基座模型进行训练 | +| `processor` | ✅ | ❌ | 数据预处理服务(仅 Twinkle 模式,Tinker 模式需在本地处理) | +| `sampler` | ✅ | ✅ | 推理采样服务 | + +### device_group 与 device_mesh + +- **device_group**:定义逻辑设备组,指定使用哪些 GPU 卡 +- **device_mesh**:定义分布式训练网格,控制并行策略 + +```yaml +device_group: + name: model # 设备组名称 + ranks: [0, 1] # GPU 卡号列表 + device_type: cuda # 设备类型:cuda / CPU + +device_mesh: + device_type: cuda + mesh: [0, 1] # 网格中的设备索引 + mesh_dim_names: ['dp'] # 维度名称,常用:dp(数据并行), tp(张量并行), pp(流水线并行) +``` diff --git "a/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" new file mode 100644 index 00000000..89540bc3 --- /dev/null +++ "b/docs/source/\344\275\277\347\224\250\346\214\207\345\274\225/\346\234\215\345\212\241\347\253\257\345\222\214\345\256\242\346\210\267\347\253\257/\346\246\202\350\277\260.md" @@ -0,0 +1,97 @@ +# 服务端和客户端 + +Twinkle提供了完整的HTTP Server/Client架构,支持将模型部署为服务,并通过客户端远程调用完成训练、推理等任务。这种架构将**模型承载(Server端)**和**训练逻辑(Client端)**解耦,使得多个用户可以共享同一个基座模型进行训练。 + +## 核心概念 + +- **Server 端**:基于 Ray Serve 部署,承载模型权重和推理/训练计算。Server 负责管理模型加载、前向/反向传播、权重保存、采样推理等。 +- **Client 端**:在本地运行,负责数据准备、训练循环编排、超参配置等。Client 通过 HTTP 与 Server 通信,发送数据和指令。 + +### 两种 Server 模式 + +Twinkle Server 支持两种协议模式: + +| 模式 | server_type | 说明 | +|------|------------|------| +| **Twinkle Server** | `twinkle` | 原生 Twinkle 协议,搭配 `twinkle_client` 使用,API 更简洁 | +| **Tinker 兼容 Server** | `tinker` | 兼容 Tinker 协议,搭配 `init_tinker_compat_client` 使用,可复用已有 Tinker 训练代码 | + +### 两种模型后端 + +无论哪种 Server 模式,模型加载均支持两种后端: + +| 后端 | use_megatron | 说明 | +|------|-------------|------| +| **Transformers** | `false` | 基于 HuggingFace Transformers,适用于大多数场景 | +| **Megatron** | `true` | 基于 Megatron-LM,适用于超大规模模型训练,支持更高效的并行策略 | + +### 两种 Client 模式 + +| Client | 初始化方式 | 说明 | +|--------|---------|------| +| **Twinkle Client** | `init_twinkle_client` | 原生客户端,将 `from twinkle import` 改为 `from twinkle_client import` 即可将本地训练代码迁移为远端调用 | +| **Tinker 兼容 Client** | `init_tinker_compat_client` | 对 Tinker SDK 进行 patch,使已有 Tinker 训练代码可直接复用 | + +## 如何选择 + +### Server 模式选择 + +| 场景 | 推荐 | +|------|------| +| 全新项目,使用 Twinkle 体系 | Twinkle Server (`server_type: twinkle`) | +| 已有 Tinker 训练代码,希望迁移到 Twinkle | Tinker 兼容 Server (`server_type: tinker`) | +| 需要推理采样功能 | Tinker 兼容 Server(内置 Sampler 支持) | + +### Client 模式选择 + +| 场景 | 推荐 | +|------|------| +| 已有 Twinkle 本地训练代码,希望改为远端 | Twinkle Client — 仅需改 import 路径 | +| 已有 Tinker 训练代码,希望复用 | Tinker 兼容 Client — 仅需初始化 patch | +| 全新项目 | Twinkle Client — API 更简洁 | + +### 模型后端选择 + +| 场景 | 推荐 | +|------|------| +| 7B/14B 等中小规模模型 | Transformers 后端 | +| 超大规模模型,需要高级并行策略 | Megatron 后端 | +| 快速实验和原型验证 | Transformers 后端 | + +## Cookbook 参考 + +完整的可运行示例位于 `cookbook/client/` 目录: + +``` +cookbook/client/ +├── twinkle/ # Twinkle 原生协议示例 +│ ├── transformer/ # Transformers 后端 +│ │ ├── server.py # 启动脚本 +│ │ ├── server_config.yaml # 配置文件 +│ │ └── lora.py # LoRA 训练客户端 +│ └── megatron/ # Megatron 后端 +│ ├── server.py +│ ├── server_config.yaml +│ └── lora.py +└── tinker/ # Tinker 兼容协议示例 + ├── transformer/ # Transformers 后端 + │ ├── server.py + │ ├── server_config.yaml + │ ├── lora.py # LoRA 训练 + │ ├── sample.py # 推理采样 + │ └── self_congnition.py # 自我认知训练+评估 + └── megatron/ # Megatron 后端 + ├── server.py + ├── server_config.yaml + └── lora.py +``` + +运行步骤: + +```bash +# 1. 先启动 Server +python cookbook/client/twinkle/transformer/server.py + +# 2. 在另一个终端运行 Client +python cookbook/client/twinkle/transformer/lora.py +``` diff --git a/src/twinkle/sampler/vllm_engine.py b/src/twinkle/sampler/vllm_engine.py index 422efa45..a9d0bcbd 100644 --- a/src/twinkle/sampler/vllm_engine.py +++ b/src/twinkle/sampler/vllm_engine.py @@ -371,7 +371,7 @@ async def _get_or_load_lora(self, lora_path: str, user_id: Optional[str] = None) if user_id is None: user_id = 'default' - + # Check if already loaded for this user if user_id in self._user_lora_ids: lora_int_id = self._user_lora_ids[user_id] diff --git a/src/twinkle_client/dataloader/__init__.py b/src/twinkle_client/dataloader/__init__.py new file mode 100644 index 00000000..341d0b77 --- /dev/null +++ b/src/twinkle_client/dataloader/__init__.py @@ -0,0 +1,11 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .dataloader import DataLoader diff --git a/src/twinkle_client/dataloader/dataloader.py b/src/twinkle_client/dataloader/dataloader.py new file mode 100644 index 00000000..f43b87fd --- /dev/null +++ b/src/twinkle_client/dataloader/dataloader.py @@ -0,0 +1,94 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from typing import Callable, Optional, Type, Union +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle import DeviceMesh +from twinkle.dataset import Dataset +from twinkle.processor import InputProcessor + +class DataLoader(object): + """Client wrapper for DataLoader that calls server HTTP endpoints.""" + + def __init__(self, dataset: Union[Dataset, Callable], device_mesh: Optional[DeviceMesh] = None, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataloader', + 'class_type': 'DataLoader', + **{'dataset': dataset, 'device_mesh': device_mesh}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def __len__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__len__', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def set_processor(self, processor_cls: Union[Type[InputProcessor], str, InputProcessor, Callable], **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'set_processor', + **{'processor_cls': processor_cls}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __iter__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__iter__', + **{}, + } + ) + response.raise_for_status() + return self + + def __next__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__next__', + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/dataset/__init__.py b/src/twinkle_client/dataset/__init__.py new file mode 100644 index 00000000..ad90b90a --- /dev/null +++ b/src/twinkle_client/dataset/__init__.py @@ -0,0 +1,15 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .base import Dataset +from .iterable_dataset import IterableDataset +from .iterable_packing_dataset import IterablePackingDataset +from .lazy_dataset import LazyDataset +from .packing_dataset import PackingDataset diff --git a/src/twinkle_client/dataset/base.py b/src/twinkle_client/dataset/base.py new file mode 100644 index 00000000..8417af98 --- /dev/null +++ b/src/twinkle_client/dataset/base.py @@ -0,0 +1,168 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from typing import Any, Callable, Dict, Type, Union +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.dataset import Dataset +from twinkle.dataset import DatasetMeta +from twinkle.preprocessor import DataFilter +from twinkle.preprocessor import Preprocessor +from twinkle.template import Template + +class Dataset(object): + """Client wrapper for Dataset that calls server HTTP endpoints.""" + + def __init__(self, dataset_meta: DatasetMeta, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataset', + 'class_type': 'Dataset', + **{'dataset_meta': dataset_meta}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def set_template(self, template_func: Union[Template, Type[Template], str], **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'set_template', + **{'template_func': template_func}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def encode(self, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'encode', + **{}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def check(self, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'check', + **{}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def map(self, preprocess_func: Union[Preprocessor, Callable, str, Type[Preprocessor]], dataset_meta: DatasetMeta = None, init_args: Dict[str, Any] = None, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'map', + **{'preprocess_func': preprocess_func, 'dataset_meta': dataset_meta, 'init_args': init_args}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def filter(self, filter_func: Union[Callable, str, Type[DataFilter], DataFilter], dataset_meta: DatasetMeta = None, init_args: Dict[str, Any] = None, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'filter', + **{'filter_func': filter_func, 'dataset_meta': dataset_meta, 'init_args': init_args}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def add_dataset(self, dataset_meta: DatasetMeta, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'add_dataset', + **{'dataset_meta': dataset_meta}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def mix_dataset(self, interleave = True): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'mix_dataset', + **{'interleave': interleave}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __getitem__(self, idx): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__getitem__', + **{'idx': idx}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __len__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__len__', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/dataset/iterable_dataset.py b/src/twinkle_client/dataset/iterable_dataset.py new file mode 100644 index 00000000..2edad39a --- /dev/null +++ b/src/twinkle_client/dataset/iterable_dataset.py @@ -0,0 +1,106 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.dataset import Dataset +from twinkle.dataset import DatasetMeta +from torch.utils.data import IterableDataset + +class IterableDataset(IterableDataset): + """Client wrapper for IterableDataset that calls server HTTP endpoints.""" + + def __init__(self, dataset_meta: DatasetMeta, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataset', + 'class_type': 'IterableDataset', + **{'dataset_meta': dataset_meta}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def add_dataset(self, dataset_meta: DatasetMeta, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'add_dataset', + **{'dataset_meta': dataset_meta}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __len__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__len__', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __getitem__(self, idx): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__getitem__', + **{'idx': idx}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __iter__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__iter__', + **{}, + } + ) + response.raise_for_status() + return self + + def __next__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__next__', + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/dataset/iterable_packing_dataset.py b/src/twinkle_client/dataset/iterable_packing_dataset.py new file mode 100644 index 00000000..de2d7509 --- /dev/null +++ b/src/twinkle_client/dataset/iterable_packing_dataset.py @@ -0,0 +1,95 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from typing import Type, Union +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.dataset import Dataset +from twinkle.dataset import DatasetMeta +from twinkle.template import Template +from torch.utils.data import IterableDataset + +class IterablePackingDataset(IterableDataset): + """Client wrapper for IterablePackingDataset that calls server HTTP endpoints.""" + + def __init__(self, dataset_meta: DatasetMeta, packing_interval: int = 128, packing_num_proc: int = 1, cyclic: bool = False, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataset', + 'class_type': 'IterablePackingDataset', + **{'dataset_meta': dataset_meta, 'packing_interval': packing_interval, 'packing_num_proc': packing_num_proc, 'cyclic': cyclic}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def set_template(self, template_cls: Union[Type[Template], str, Template], **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'set_template', + **{'template_cls': template_cls}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def pack_dataset(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'pack_dataset', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __iter__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__iter__', + **{}, + } + ) + response.raise_for_status() + return self + + def __next__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__next__', + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/dataset/lazy_dataset.py b/src/twinkle_client/dataset/lazy_dataset.py new file mode 100644 index 00000000..b6238942 --- /dev/null +++ b/src/twinkle_client/dataset/lazy_dataset.py @@ -0,0 +1,96 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.dataset import Dataset +from twinkle.dataset import DatasetMeta +from .base import Dataset + +class LazyDataset(Dataset): + """Client wrapper for LazyDataset that calls server HTTP endpoints.""" + + def __init__(self, dataset_meta: DatasetMeta, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataset', + 'class_type': 'LazyDataset', + **{'dataset_meta': dataset_meta}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def encode(self, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'encode', + **{}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def check(self, **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'check', + **{}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __getitem__(self, idx): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__getitem__', + **{'idx': idx}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __len__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__len__', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/dataset/packing_dataset.py b/src/twinkle_client/dataset/packing_dataset.py new file mode 100644 index 00000000..23102dd5 --- /dev/null +++ b/src/twinkle_client/dataset/packing_dataset.py @@ -0,0 +1,81 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.dataset import Dataset +from twinkle.dataset import DatasetMeta +from .base import Dataset + +class PackingDataset(Dataset): + """Client wrapper for PackingDataset that calls server HTTP endpoints.""" + + def __init__(self, dataset_meta: DatasetMeta, packing_num_proc: int = 1, **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'dataset', + 'class_type': 'PackingDataset', + **{'dataset_meta': dataset_meta, 'packing_num_proc': packing_num_proc}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def pack_dataset(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': 'pack_dataset', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __getitem__(self, index): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__getitem__', + **{'index': index}, + } + ) + response.raise_for_status() + return response.json()["result"] + + + def __len__(self): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__len__', + **{}, + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/model/__init__.py b/src/twinkle_client/model/__init__.py new file mode 100644 index 00000000..507cc4cb --- /dev/null +++ b/src/twinkle_client/model/__init__.py @@ -0,0 +1,11 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .multi_lora_transformers import MultiLoraTransformersModel diff --git a/src/twinkle_client/model/multi_lora_transformers.py b/src/twinkle_client/model/multi_lora_transformers.py new file mode 100644 index 00000000..c0762ded --- /dev/null +++ b/src/twinkle_client/model/multi_lora_transformers.py @@ -0,0 +1,261 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from typing import Any, Optional, Union, Type, Dict, Literal, List +import uuid +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle import DeviceMesh +from twinkle.data_format import InputFeature, Trajectory + + +class MultiLoraTransformersModel: + """Client wrapper for TwinkleModel that calls server HTTP endpoints. + + This client manages adapters and sends training/inference requests to the model server. + Each adapter has its own lifecycle managed through automatic heartbeats. + """ + + def __init__(self, model_id: str, **kwargs): + """Initialize model client.""" + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + self.model_id = model_id + if '://' in model_id: + model_id = model_id.split('://')[1] + self.server_url = f'{self.server_url}/models/{model_id}' + self.adapter_name = None + response = http_post( + url=f'{self.server_url}/create', + ) + response.raise_for_status() + + def _send_adapter_heartbeat(self): + """Internal method to send adapter heartbeat.""" + response = http_post( + url=f'{self.server_url}/heartbeat', + json_data={'adapter_name': self.adapter_name} + ) + response.raise_for_status() + + def add_adapter_to_model(self, adapter_name: str, config: Dict[str, Any], **kwargs): + """Add a new adapter to the model and start automatic heartbeat.""" + response = http_post( + url=f'{self.server_url}/add_adapter_to_model', + json_data={'adapter_name': adapter_name, 'config': config, **kwargs} + ) + response.raise_for_status() + + # Register adapter for automatic heartbeat after successful creation + self.adapter_name = adapter_name + heartbeat_manager.register_adapter( + self.adapter_name, + self._send_adapter_heartbeat + ) + + def __del__(self): + """Cleanup: unregister adapter from heartbeat manager.""" + try: + heartbeat_manager.unregister_adapter(self.adapter_name) + except: + pass + + def forward(self, inputs: Any, **kwargs): + """Execute forward pass on the model.""" + response = http_post( + url=f'{self.server_url}/forward', + json_data={'inputs': inputs, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def forward_only(self, inputs: Any, **kwargs): + """Execute forward pass without gradient computation.""" + response = http_post( + url=f'{self.server_url}/forward_only', + json_data={'inputs': inputs, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def calculate_loss(self, **kwargs): + """Calculate loss from model outputs.""" + response = http_post( + url=f'{self.server_url}/calculate_loss', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def get_train_configs(self, **kwargs): + """Get training configs""" + response = http_post( + url=f'{self.server_url}/get_train_configs', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def backward(self, **kwargs): + """Execute backward pass.""" + response = http_post( + url=f'{self.server_url}/backward', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def forward_backward(self, inputs: Any, **kwargs): + """Execute combined forward and backward pass.""" + response = http_post( + url=f'{self.server_url}/forward_backward', + json_data={'inputs': inputs, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def step(self, **kwargs): + """Execute optimizer step.""" + response = http_post( + url=f'{self.server_url}/step', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def zero_grad(self, **kwargs): + """Zero out gradients.""" + response = http_post( + url=f'{self.server_url}/zero_grad', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def lr_step(self, **kwargs): + """Execute learning rate scheduler step.""" + response = http_post( + url=f'{self.server_url}/lr_step', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def set_loss(self, loss_cls: str, **kwargs): + """Set the loss function.""" + response = http_post( + url=f'{self.server_url}/set_loss', + json_data={'loss_cls': loss_cls, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def clip_grad_norm(self, max_grad_norm: float=1.0, norm_type=2, **kwargs): + """Set the loss function.""" + response = http_post( + url=f'{self.server_url}/clip_grad_norm', + json_data={'max_grad_norm': max_grad_norm, 'norm_type': norm_type, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def set_optimizer(self, optimizer_cls: str, **kwargs): + """Set the optimizer.""" + response = http_post( + url=f'{self.server_url}/set_optimizer', + json_data={'optimizer_cls': optimizer_cls, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def set_lr_scheduler(self, scheduler_cls: str, **kwargs): + """Set the learning rate scheduler.""" + response = http_post( + url=f'{self.server_url}/set_lr_scheduler', + json_data={'scheduler_cls': scheduler_cls, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def save(self, name: str, **kwargs): + """Save model checkpoint.""" + response = http_post( + url=f'{self.server_url}/save', + json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def load(self, name: str, **kwargs): + """Load model checkpoint.""" + response = http_post( + url=f'{self.server_url}/load', + json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def set_template(self, template_cls: str, **kwargs): + """Set the template for data processing.""" + response = http_post( + url=f'{self.server_url}/set_template', + json_data={'template_cls': template_cls, 'adapter_name': self.adapter_name, 'model_id': self.model_id, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def set_processor(self, processor_cls: str, **kwargs): + """Set the input processor.""" + response = http_post( + url=f'{self.server_url}/set_processor', + json_data={'processor_cls': processor_cls, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def calculate_metric(self, is_training: bool = True, **kwargs): + """Calculate metrics from model outputs.""" + response = http_post( + url=f'{self.server_url}/calculate_metric', + json_data={'is_training': is_training, 'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def get_state_dict(self, **kwargs): + """Get model state dictionary.""" + response = http_post( + url=f'{self.server_url}/get_state_dict', + json_data={'adapter_name': self.adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()['result'] + + def upload_to_hub(self, checkpoint_dir: str, hub_model_id: str, hub_token: Optional[str] = None, async_upload: bool = True): + """Upload model checkpoint to hub. + + Args: + checkpoint_dir: The directory path of the checkpoint to upload. + hub_model_id: The hub model id. + hub_token: The hub token (optional). + async_upload: Whether to use async upload (default: True). + """ + response = http_post( + url=f'{self.server_url}/upload_to_hub', + json_data={ + 'checkpoint_dir': checkpoint_dir, + 'hub_model_id': hub_model_id, + 'hub_token': hub_token, + 'async_upload': async_upload + } + ) + response.raise_for_status() + return response.json() diff --git a/src/twinkle_client/processor/__init__.py b/src/twinkle_client/processor/__init__.py new file mode 100644 index 00000000..1f8acd8f --- /dev/null +++ b/src/twinkle_client/processor/__init__.py @@ -0,0 +1,11 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .base import InputProcessor diff --git a/src/twinkle_client/processor/base.py b/src/twinkle_client/processor/base.py new file mode 100644 index 00000000..6de80530 --- /dev/null +++ b/src/twinkle_client/processor/base.py @@ -0,0 +1,56 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from typing import List, Literal, Optional, Union +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle import DeviceMesh +from twinkle.data_format import InputFeature + +class InputProcessor(object): + """Client wrapper for InputProcessor that calls server HTTP endpoints.""" + + def __init__(self, device_mesh: Optional[DeviceMesh] = None, padding_free: bool = False, framework: Literal['transformers', 'megatron'] = 'transformers', **kwargs): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'processor', + 'class_type': 'InputProcessor', + **{'device_mesh': device_mesh, 'padding_free': padding_free, 'framework': framework}, **kwargs + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def __call__(self, inputs: Union[InputFeature, List[InputFeature]], **kwargs): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__call__', + **{'inputs': inputs}, + **kwargs + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/reward/__init__.py b/src/twinkle_client/reward/__init__.py new file mode 100644 index 00000000..e632b263 --- /dev/null +++ b/src/twinkle_client/reward/__init__.py @@ -0,0 +1,11 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .math_reward import MathReward diff --git a/src/twinkle_client/reward/math_reward.py b/src/twinkle_client/reward/math_reward.py new file mode 100644 index 00000000..4e523d7d --- /dev/null +++ b/src/twinkle_client/reward/math_reward.py @@ -0,0 +1,54 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ + +from typing import List +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.data_format import Trajectory + +class MathReward(object): + """Client wrapper for MathReward that calls server HTTP endpoints.""" + + def __init__(self, ground_truth_key: str = 'solution'): + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + response = http_post( + url=f'{self.server_url}/processors/create', + json_data={ + 'processor_type': 'reward', + 'class_type': 'MathReward', + **{'ground_truth_key': ground_truth_key} + } + ) + response.raise_for_status() + self.processor_id = response.json()['processor_id'] + heartbeat_manager.register_processor(self.processor_id) + + def __del__(self): + try: + heartbeat_manager.unregister_processor(self.processor_id) + except: + pass + + + def __call__(self, trajectories: List[Trajectory], ground_truths: List[Trajectory]): + response = http_post( + url=f'{self.server_url}/processors/call', + json_data={ + 'processor_id': self.processor_id, + 'function': '__call__', + **{'trajectories': trajectories, 'ground_truths': ground_truths}, + } + ) + response.raise_for_status() + return response.json()["result"] + \ No newline at end of file diff --git a/src/twinkle_client/sampler/__init__.py b/src/twinkle_client/sampler/__init__.py new file mode 100644 index 00000000..f28c1686 --- /dev/null +++ b/src/twinkle_client/sampler/__init__.py @@ -0,0 +1,11 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from .vllm_sampler import VLLMSampler diff --git a/src/twinkle_client/sampler/vllm_sampler.py b/src/twinkle_client/sampler/vllm_sampler.py new file mode 100644 index 00000000..e8c6302e --- /dev/null +++ b/src/twinkle_client/sampler/vllm_sampler.py @@ -0,0 +1,127 @@ +# ============================================================================ +# WARNING: AUTO-GENERATED FILE - DO NOT MODIFY MANUALLY! +# ============================================================================ +# This file is automatically generated by client_tools/client_generator.py +# Any manual changes will be overwritten when the generator runs again. +# +# To update this file: +# 1. Modify the source files in src/twinkle/ +# 2. Run: python client_tools/client_generator.py +# ============================================================================ +from typing import Any, Optional, List, Dict, Union +import uuid +from twinkle_client.http import TWINKLE_SERVER_URL +from twinkle_client.http import http_post, heartbeat_manager +from twinkle.sampler.base import Sampler +from twinkle.sampler.types import SamplingParams, SampleResponse +from twinkle import DeviceMesh +from peft import PeftConfig +from twinkle.data_format import Trajectory, InputFeature +import json + + +class VLLMSampler(Sampler): + """Client wrapper for Sampler that calls server HTTP endpoints. + + This client manages sampling operations and adapter synchronization with the sampler server. + Each adapter has its own lifecycle managed through automatic heartbeats. + """ + + def __init__(self, model_id: str, **kwargs): + """Create the sampler instance on server.""" + from twinkle_client.http import get_base_url + self.server_url = get_base_url() + + self.adapter_name = None + if '://' in model_id: + model_id = model_id.split('://')[1] + self.server_url = f'{self.server_url}/models/{model_id}' + response = http_post( + url=f'{self.server_url}/create', + json_data=kwargs + ) + response.raise_for_status() + return response.json() + + def _send_adapter_heartbeat(self): + """Internal method to send adapter heartbeat.""" + if not self.adapter_name: + return + response = http_post( + url=f'{self.server_url}/heartbeat', + json_data={'adapter_name': self.adapter_name} + ) + response.raise_for_status() + + def add_adapter_to_sampler(self, adapter_name: str, config: PeftConfig, **kwargs): + """Add a new adapter to the sampler and start automatic heartbeat.""" + if isinstance(config, PeftConfig): + config = config.__dict__ + response = http_post( + url=f'{self.server_url}/add_adapter_to_sampler', + json_data={'adapter_name': adapter_name, 'config': config, **kwargs} + ) + response.raise_for_status() + + # Register adapter for automatic heartbeat after successful creation + self.adapter_name = adapter_name + heartbeat_manager.register_adapter( + self.adapter_name, + self._send_adapter_heartbeat + ) + + return response.json() + + def __del__(self): + """Cleanup: unregister adapter from heartbeat manager.""" + try: + if self.adapter_name: + heartbeat_manager.unregister_adapter(self.adapter_name) + except: + pass + + def sample( + self, + inputs: Union[List[Trajectory], List[InputFeature]], + sampling_params: Optional[Dict[str, Any]] = None, + adapter_name: str = '' + ) -> SampleResponse: + """Sample from the model. + + Args: + inputs: List of Trajectory or InputFeature to sample from. + sampling_params: Sampling parameters dict. + adapter_name: Adapter name. + + Returns: + SampleResponse with sampled sequences. + """ + response = http_post( + url=f'{self.server_url}/sample', + json_data={ + 'inputs': inputs, + 'sampling_params': sampling_params, + 'adapter_name': adapter_name + } + ) + response.raise_for_status() + return response.json() + + def sync_weights(self, state_dict: Dict[str, Any], adapter_name: str = ''): + """Synchronize weights to the sampler.""" + adapter = adapter_name or self.adapter_name + response = http_post( + url=f'{self.server_url}/sync_weights', + json_data={'state_dict': state_dict, 'adapter_name': adapter} + ) + response.raise_for_status() + return response.json() + + def set_template(self, template_cls: str, adapter_name: str = '', **kwargs): + """Set the template for encoding trajectories.""" + response = http_post( + url=f'{self.server_url}/set_template', + json_data={'template_cls': template_cls, 'adapter_name': adapter_name, **kwargs} + ) + response.raise_for_status() + return response.json()