diff --git a/cookbook/megatron/tp_moe.py b/cookbook/megatron/tp_moe.py index b66b109f..a13b0e58 100644 --- a/cookbook/megatron/tp_moe.py +++ b/cookbook/megatron/tp_moe.py @@ -9,8 +9,8 @@ from twinkle.model import MegatronModel from twinkle.preprocessor import SelfCognitionProcessor -# Construct a device_mesh, tp=pp=cp=ep=2, dp=1 -device_mesh = DeviceMesh.from_sizes(dp_size=1, tp_size=2, pp_size=2, cp_size=2, ep_size=2) +# Construct a device_mesh, tp=pp=ep=dp=2 +device_mesh = DeviceMesh.from_sizes(dp_size=2, tp_size=2, pp_size=2, ep_size=2, sequence_parallel=True) # use torchrun mode twinkle.initialize(mode='local', global_device_mesh=device_mesh) diff --git a/cookbook/mm/fsdp2.py b/cookbook/mm/fsdp2.py index cbe6f50d..4dc50850 100644 --- a/cookbook/mm/fsdp2.py +++ b/cookbook/mm/fsdp2.py @@ -89,7 +89,7 @@ def train(): # Print metric metric = model.calculate_metric(is_training=True) logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric}') - if step > 0 and step % 40 == 0: + if step > 0 and step % 200 == 0: metrics = eval(model) logger.info(f'Eval metric: {metrics}') metrics['step'] = step diff --git a/cookbook/ray/run.sh b/cookbook/ray/run.sh deleted file mode 100644 index bbf8a400..00000000 --- a/cookbook/ray/run.sh +++ /dev/null @@ -1 +0,0 @@ -python3 single_controller.py diff --git a/cookbook/ray/single_controller.py b/cookbook/ray/single_controller.py deleted file mode 100644 index edb8d8e6..00000000 --- a/cookbook/ray/single_controller.py +++ /dev/null @@ -1,91 +0,0 @@ -import os -from peft import LoraConfig -from tqdm import tqdm - -import twinkle -from twinkle import DeviceGroup, DeviceMesh, Platform, get_device_placement, get_logger -from twinkle.dataloader import DataLoader -from twinkle.dataset import Dataset, DatasetMeta -from twinkle.model import TransformersModel -from twinkle.preprocessor import SelfCognitionProcessor - -device_group = [DeviceGroup( - name='default', - ranks=8, - device_type='cuda', -)] - -# Construct a device_mesh, fsdp=4, dp=2 -device_mesh = DeviceMesh.from_sizes(fsdp_size=4, dp_size=2) -# use ray mode -twinkle.initialize(mode='ray', groups=device_group, global_device_mesh=device_mesh) - -logger = get_logger() - - -def eval(model): - # 100 Samples - dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(100))) - dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-35B-A3B') - dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区')) - dataset.encode() - dataloader = DataLoader(dataset=dataset, batch_size=8, min_batch_size=8) - for step, batch in tqdm(enumerate(dataloader)): - model.forward_only(inputs=batch) - model.calculate_loss() - metrics = model.calculate_metric(is_training=False) - return metrics - - -def train(): - # 1000 samples - dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(1000))) - # Set template to prepare encoding - dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B') - # Preprocess the dataset to standard format - dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区')) - # Encode dataset - dataset.encode() - # Global batch size = 8, for GPUs, so 1 sample per GPU - dataloader = DataLoader(dataset=dataset, batch_size=8, min_batch_size=8) - # Use a TransformersModel - model = TransformersModel(model_id='ms://Qwen/Qwen3.5-4B', remote_group='default') - - lora_config = LoraConfig(r=8, lora_alpha=32, target_modules='all-linear') - - # Add a lora to model, with name `default` - # Comment this to use full-parameter training - model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2) - # Add Optimizer for lora `default` - model.set_optimizer(optimizer_cls='AdamW', lr=1e-4) - # Add LRScheduler for lora `default` - model.set_lr_scheduler( - scheduler_cls='CosineWarmupScheduler', num_warmup_steps=5, num_training_steps=len(dataloader)) - logger.info(get_device_placement()) - # Print the training config - logger.info(model.get_train_configs()) - logger.info(f'Total steps: {len(dataloader)}') - loss_metric = 99.0 - # lora: 18G * 4 - # full: 50G * 4 - for step, batch in enumerate(dataloader): - # Do forward and backward - model.forward_backward(inputs=batch) - # Step - model.clip_grad_and_step() - if step % 20 == 0: - # Print metric - metric = model.calculate_metric(is_training=True) - logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric}') - if step > 0 and step % 40 == 0: - metrics = eval(model) - logger.info(f'Eval metric: {metrics}') - metrics['step'] = step - if loss_metric > float(metrics['loss']): - model.save(f'checkpoint-{step}') - loss_metric = float(metrics['loss']) - model.save(f'last-checkpoint') - - -if __name__ == '__main__': - train() diff --git a/cookbook/rl/gkd_off_policy.py b/cookbook/rl/gkd_off_policy.py index 3315c962..204e90f9 100644 --- a/cookbook/rl/gkd_off_policy.py +++ b/cookbook/rl/gkd_off_policy.py @@ -60,8 +60,8 @@ STUDENT_MODEL_ID = os.environ.get('STUDENT_MODEL_ID', 'ms://Qwen/Qwen3-0.6B') TEACHER_MODEL_ID = os.environ.get('TEACHER_MODEL_ID', 'ms://Qwen/Qwen3-8B') -MODEL_GPUS = int(os.environ.get('MODEL_GPUS', 8)) -SAMPLER_GPUS = int(os.environ.get('SAMPLER_GPUS', 8)) +MODEL_GPUS = int(os.environ.get('MODEL_GPUS', 4)) +SAMPLER_GPUS = int(os.environ.get('SAMPLER_GPUS', 4)) NUM_GPUS = MODEL_GPUS + SAMPLER_GPUS BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 16)) diff --git a/cookbook/rl/gkd_on_policy.py b/cookbook/rl/gkd_on_policy.py index f30df2ea..9c792eab 100644 --- a/cookbook/rl/gkd_on_policy.py +++ b/cookbook/rl/gkd_on_policy.py @@ -62,7 +62,7 @@ TEACHER_MODEL_ID = os.environ.get('TEACHER_MODEL_ID', 'ms://Qwen/Qwen3-8B') MODEL_GPUS = int(os.environ.get('MODEL_GPUS', 4)) -SAMPLER_GPUS = int(os.environ.get('SAMPLER_GPUS', 4)) +SAMPLER_GPUS = int(os.environ.get('SAMPLER_GPUS', 2)) NUM_GPUS = MODEL_GPUS + 2*SAMPLER_GPUS MAX_NEW_TOKENS = int(os.environ.get('MAX_NEW_TOKENS', 2048)) diff --git a/cookbook/transformers/fsdp2.sh b/cookbook/transformers/fsdp2.sh index 46e9f27f..93c531a9 100644 --- a/cookbook/transformers/fsdp2.sh +++ b/cookbook/transformers/fsdp2.sh @@ -1 +1 @@ -CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 fsdp2.py +CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 torchrun --nproc_per_node=8 fsdp2.py diff --git a/cookbook/transformers/fsdp2_moe.py b/cookbook/transformers/fsdp2_moe.py index 23a53f4a..3ea649d3 100644 --- a/cookbook/transformers/fsdp2_moe.py +++ b/cookbook/transformers/fsdp2_moe.py @@ -20,7 +20,7 @@ def eval(model): # 100 Samples dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(100))) - dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B') + dataset.set_template('Template', model_id='ms://Qwen/Qwen3-30B-A3B-Instruct-2507') dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区')) dataset.encode() dataloader = DataLoader(dataset=dataset, batch_size=4) @@ -35,7 +35,7 @@ def train(): # 1000 samples dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(1000))) # Set template to prepare encoding - dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B') + dataset.set_template('Template', model_id='ms://Qwen/Qwen3-30B-A3B-Instruct-2507') # Preprocess the dataset to standard format dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区')) # Encode dataset @@ -43,7 +43,7 @@ def train(): # Global batch size = 4, for GPUs, so 1 sample per GPU dataloader = DataLoader(dataset=dataset, batch_size=8) # Use a TransformersModel, transformer_cls_names_to_wrap=Qwen3MoeSparseMoeBlock to avoid hang of fsdp2 - model = TransformersModel(model_id='ms://Qwen/Qwen3.5-4B', fsdp_config={'transformer_cls_names_to_wrap':['Qwen3MoeSparseMoeBlock']}) + model = TransformersModel(model_id='ms://Qwen/Qwen3-30B-A3B-Instruct-2507', fsdp_config={'transformer_cls_names_to_wrap':['Qwen3MoeSparseMoeBlock']}) # Patch MoE model to fix the hang bug, support transformers==4.* model.apply_patch('ms://twinkle-kit/qwen3_moe_transformers4_patch') lora_config = LoraConfig( diff --git a/src/twinkle/model/transformers/transformers.py b/src/twinkle/model/transformers/transformers.py index ab464811..390666d6 100644 --- a/src/twinkle/model/transformers/transformers.py +++ b/src/twinkle/model/transformers/transformers.py @@ -476,8 +476,12 @@ def calculate_loss(self, **kwargs): optimizer_config = self.optimizer_group[adapter_name] loss_instance: Loss = optimizer_config.loss_instance assert isinstance(loss_instance, Loss), 'Set a loss_instance before calculating loss' - inputs = optimizer_config.train_status.inputs - outputs = optimizer_config.train_status.outputs + if self.model.training: + status = optimizer_config.train_status + else: + status = optimizer_config.eval_status + inputs = status.inputs + outputs = status.outputs assert inputs is not None and outputs is not None, 'Cannot calculate loss of empty inputs and outputs' result = loss_instance(inputs, outputs, **kwargs) loss_value = result['loss'] @@ -499,15 +503,15 @@ def calculate_loss(self, **kwargs): # = global_per_token_grad / dp_world_size = avg_per_token_grad counts = counts / self.device_mesh.data_world_size optimizer_config = self.optimizer_group[adapter_name] - optimizer_config.train_status.num_tokens += counts.item() + status.num_tokens += counts.item() if self.sp_strategy is not None and 'labels' in inputs: reduction = getattr(loss_instance, 'reduction', None) if reduction is not None: self.sp_strategy.sp_config['loss_reduction'] = str(reduction) loss_value = self.sp_strategy.reduce_loss(loss_value, inputs['labels']) - optimizer_config.train_status.loss_value += loss_value - outputs['loss'] = optimizer_config.train_status.loss_value - return optimizer_config.train_status.loss_value.item() + status.loss_value += loss_value + outputs['loss'] = status.loss_value + return status.loss_value.item() @remote_function() def backward(self, **kwargs): diff --git a/src/twinkle/processor/base.py b/src/twinkle/processor/base.py index 3269a574..eb182bf5 100644 --- a/src/twinkle/processor/base.py +++ b/src/twinkle/processor/base.py @@ -130,7 +130,7 @@ def pad_cp_inputs(input_tensor: torch.Tensor, padding_value: int) -> torch.Tenso if input_tensor is None: return input_tensor - seq_len = input_tensor.shape[1] + seq_len = input_tensor.shape[-1] # Calculate required divisor based on parallelism settings if cp_size > 1: diff --git a/src/twinkle/sampler/vllm_sampler/vllm_engine.py b/src/twinkle/sampler/vllm_sampler/vllm_engine.py index ce487436..a1b7123e 100644 --- a/src/twinkle/sampler/vllm_sampler/vllm_engine.py +++ b/src/twinkle/sampler/vllm_sampler/vllm_engine.py @@ -291,8 +291,8 @@ async def sample(self, continue # Get logprob for the actual token - if i < len(prompt_token_ids): - token_id = prompt_token_ids[i] + if i < len(result.prompt_token_ids): + token_id = result.prompt_token_ids[i] if token_id in lp_dict: lp_obj = lp_dict[token_id] result_prompt_logprobs.append(lp_obj.logprob) diff --git a/src/twinkle/sampler/vllm_sampler/vllm_sampler.py b/src/twinkle/sampler/vllm_sampler/vllm_sampler.py index 4c3bc6de..b52aa6f6 100644 --- a/src/twinkle/sampler/vllm_sampler/vllm_sampler.py +++ b/src/twinkle/sampler/vllm_sampler/vllm_sampler.py @@ -166,6 +166,9 @@ def encode_trajectory_for_vllm(self, add_generation_prompt=add_generation_prompt, )[0] encoded['prompt'] = prompt['prompt'] + for key in encoded: + if isinstance(encoded[key], np.ndarray): + encoded[key] = encoded[key].tolist() return encoded def apply_patch(self, patch_cls: Union[Patch, Type[Patch], str], **kwargs) -> None: @@ -218,6 +221,7 @@ async def _sample_single( sampling_params: SamplingParams, lora_request: Optional[Any] = None, *, + multi_modal_data: Optional[Dict[str, Any]] = None, logprobs_only: bool = False, ) -> SampleResponse: """Sample a single input asynchronously. @@ -228,20 +232,23 @@ async def _sample_single( adapter_path: Optional LoRA adapter path (legacy, prefer lora_request). lora_request: Pre-built LoRARequest to attach to the sampling request. Avoids repeated ``_get_or_load_lora`` calls per input. + multi_modal_data: The multi modal data dict. logprobs_only: Only return logprobs (no generated tokens). Returns: A SampleResponse object """ - multi_modal_data = self._extract_multi_modal_data(feat) response = await self.engine.sample( - prompt=feat['prompt'] if 'prompt' in feat else feat['input_ids'], + # Pick input_ids first because prompt may not contain response + # if vLLM are used sequentially + # multi-modal does not support input_ids + prompt=feat['input_ids'] if 'input_ids' in feat and multi_modal_data else feat['prompt'], sampling_params=sampling_params, lora_request=lora_request, multi_modal_data=multi_modal_data, mm_processor_kwargs=feat.get('mm_processor_kwargs'), ) - if 'input_ids' not in feat: + if 'input_ids' not in feat or multi_modal_data: feat['input_ids'] = response.prompt_token_ids feat['labels'] = [-100] * len(response.prompt_token_ids) if not logprobs_only: @@ -325,7 +332,11 @@ def sample( sampling_params.max_tokens = 1 logprobs_only = True - if is_trajectory: + multi_modal_data_list = [] + for feat in inputs_list: + multi_modal_data_list.append(self._extract_multi_modal_data(feat)) + + if is_trajectory or any(multi_modal_data_list): template = self.template assert template is not None, \ 'Use set_template to add a template when trying to input Trajectory' @@ -349,8 +360,9 @@ async def _sample_all(): feat, sampling_params, lora_request=lora_request, + multi_modal_data=multi_modal_data, logprobs_only=logprobs_only, - ) for feat in encoded_inputs + ) for feat, multi_modal_data in zip(encoded_inputs, multi_modal_data_list) ] return await asyncio.gather(*tasks) diff --git a/src/twinkle/template/qwen3_5_vl.py b/src/twinkle/template/qwen3_5_vl.py index a7967777..3a8ffe9a 100644 --- a/src/twinkle/template/qwen3_5_vl.py +++ b/src/twinkle/template/qwen3_5_vl.py @@ -26,9 +26,11 @@ def __init__(self, *args, **kwargs): self._patch_size: Optional[int] = None self._merge_size: Optional[int] = None self._init_vision_config() - from transformers.models.qwen3_vl import Qwen3VLModel with torch.device('meta'): - self.dummy_model = Qwen3VLModel(self.config) + import transformers + model_cls = self.config.architectures[0] + model_cls = getattr(transformers, model_cls) + self.dummy_model = model_cls(self.config) self.rope_index_func = self.get_rope_index() def get_rope_index(self):