|
| 1 | +import os |
| 2 | + |
| 3 | +from peft import LoraConfig |
| 4 | + |
| 5 | +import twinkle |
| 6 | +from twinkle import DeviceMesh, get_device_placement, get_logger |
| 7 | +from twinkle.dataloader import DataLoader |
| 8 | +from twinkle.dataset import Dataset, DatasetMeta |
| 9 | +from twinkle.model import MegatronModel |
| 10 | +from twinkle.preprocessor import SelfCognitionProcessor |
| 11 | + |
| 12 | +# Build a device mesh for the verified NPU MoE LoRA smoke. |
| 13 | +# Expert LoRA currently only supports ETP=1, so we keep TP at 1 here. |
| 14 | +MODEL_ID = os.environ.get( |
| 15 | + 'TWINKLE_LOCAL_MODEL_DIR', |
| 16 | + 'ms://Qwen/Qwen3-30B-A3B-Instruct-2507', |
| 17 | +) |
| 18 | +DATASET_PATH = os.environ.get( |
| 19 | + 'TWINKLE_LOCAL_DATASET_PATH', |
| 20 | + 'ms://swift/self-cognition', |
| 21 | +) |
| 22 | +MAX_STEPS = int(os.environ.get('TWINKLE_MAX_STEPS', '10')) |
| 23 | +TRAIN_SAMPLES = int(os.environ.get('TWINKLE_TRAIN_SAMPLE_LIMIT', '80')) |
| 24 | +BATCH_SIZE = int(os.environ.get('TWINKLE_BATCH_SIZE', '8')) |
| 25 | +DP_SIZE = int(os.environ.get('TWINKLE_DP_SIZE', '8')) |
| 26 | +TP_SIZE = int(os.environ.get('TWINKLE_TP_SIZE', '1')) |
| 27 | +EP_SIZE = int(os.environ.get('TWINKLE_EP_SIZE', '2')) |
| 28 | +PP_SIZE = int(os.environ.get('TWINKLE_PP_SIZE', '1')) |
| 29 | +CP_SIZE = int(os.environ.get('TWINKLE_CP_SIZE', '1')) |
| 30 | +LR = float(os.environ.get('TWINKLE_LR', '1e-4')) |
| 31 | + |
| 32 | +# 8 cards: dp=8, tp=1, ep=2, pp=1, cp=1 |
| 33 | +device_mesh = DeviceMesh.from_sizes( |
| 34 | + dp_size=DP_SIZE, |
| 35 | + tp_size=TP_SIZE, |
| 36 | + pp_size=PP_SIZE, |
| 37 | + cp_size=CP_SIZE, |
| 38 | + ep_size=EP_SIZE, |
| 39 | +) |
| 40 | +twinkle.initialize(mode='local', global_device_mesh=device_mesh) |
| 41 | + |
| 42 | +logger = get_logger() |
| 43 | + |
| 44 | + |
| 45 | +def build_dataloader() -> DataLoader: |
| 46 | + dataset = Dataset(dataset_meta=DatasetMeta(DATASET_PATH, data_slice=range(TRAIN_SAMPLES))) |
| 47 | + dataset.set_template('Template', model_id=MODEL_ID) |
| 48 | + dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区')) |
| 49 | + dataset.encode() |
| 50 | + return DataLoader(dataset=dataset, batch_size=BATCH_SIZE) |
| 51 | + |
| 52 | + |
| 53 | +def _to_loss_value(outputs) -> float: |
| 54 | + loss = outputs['loss'] if isinstance(outputs, dict) else outputs.loss |
| 55 | + return float(loss.detach().cpu()) if hasattr(loss, 'detach') else float(loss) |
| 56 | + |
| 57 | + |
| 58 | +def train(): |
| 59 | + dataloader = build_dataloader() |
| 60 | + |
| 61 | + model = MegatronModel(model_id=MODEL_ID) |
| 62 | + lora_config = LoraConfig(r=8, lora_alpha=32, target_modules='all-linear') |
| 63 | + model.add_adapter_to_model('default', lora_config) |
| 64 | + model.set_optimizer(optimizer_cls='default', lr=LR) |
| 65 | + |
| 66 | + # Keep the scheduler compatible with the shortened smoke run. |
| 67 | + lr_decay_steps = max(MAX_STEPS, 2) |
| 68 | + model.set_lr_scheduler( |
| 69 | + scheduler_cls='default', |
| 70 | + lr_warmup_steps=1, |
| 71 | + lr_decay_steps=lr_decay_steps, |
| 72 | + ) |
| 73 | + |
| 74 | + logger.info(get_device_placement()) |
| 75 | + logger.info(model.get_train_configs()) |
| 76 | + logger.info( |
| 77 | + 'MoE LoRA NPU smoke config: ' |
| 78 | + f'model_id={MODEL_ID}, dataset={DATASET_PATH}, batch_size={BATCH_SIZE}, ' |
| 79 | + f'train_samples={TRAIN_SAMPLES}, max_steps={MAX_STEPS}, ' |
| 80 | + f'dp={DP_SIZE}, tp={TP_SIZE}, ep={EP_SIZE}, pp={PP_SIZE}, cp={CP_SIZE}' |
| 81 | + ) |
| 82 | + logger.info(f'dataloader_steps={len(dataloader)}') |
| 83 | + |
| 84 | + for step, batch in enumerate(dataloader): |
| 85 | + outputs = model.forward_backward(inputs=batch) |
| 86 | + model.clip_grad_and_step() |
| 87 | + logger.info(f'step={step} loss={_to_loss_value(outputs)}') |
| 88 | + if step + 1 >= MAX_STEPS: |
| 89 | + break |
| 90 | + |
| 91 | + model.save('last-checkpoint') |
| 92 | + |
| 93 | + |
| 94 | +if __name__ == '__main__': |
| 95 | + train() |
0 commit comments