Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .dev_scripts/ci_container_test.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
install_twinkle_with_kernels() {
pip install ".[kernels]" -i https://mirrors.aliyun.com/pypi/simple/ || pip install ".[kernels]"
}

if [ "$MODELSCOPE_SDK_DEBUG" == "True" ]; then
# pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
git config --global --add safe.directory /twinkle
Expand All @@ -22,11 +26,14 @@ if [ "$MODELSCOPE_SDK_DEBUG" == "True" ]; then
pip uninstall autoawq -y
pip uninstall lmdeploy -y
pip uninstall tensorflow -y
pip install kernels -U
pip install ray==2.48
pip install optimum

# test with install
pip install .
install_twinkle_with_kernels
else
install_twinkle_with_kernels
echo "Running case in release image, run case directly!"
fi
# remove torch_extensions folder to avoid ci hang.
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,12 @@ foundation for building customizable, enterprise-grade training services.
| Component Type | Component Link | Component Function | Author |
| -------------- | -------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------- | ------------------- |
| Patch | [qwen3_moe_transformers4_patch](https://www.modelscope.cn/models/twinkle-kit/qwen3_moe_transformers4_patch) | Fixes Qwen3 MoE model hang issue during FSDP2 training, effective for transformers==4.x | ModelScope Official |

## Acknowledgements

This project is maintained and supported by multiple teams under Workshop:

- ModelScope Team
- CMB-Tech Team

Twinkle is built on the shoulders of giants, including [Transformers](https://github.com/huggingface/transformers),[MS-SWIFT](https://github.com/modelscope/swift), [veRL](https://github.com/verl-project/verl), and other excellent projects.
9 changes: 9 additions & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,12 @@ for epoch in range(3):
| 组件类型 | 组件链接 | 组件功能 | 作者 |
| -------- | -------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------- | ----------------- |
| Patch | [qwen3_moe_transformers4_patch](https://www.modelscope.cn/models/twinkle-kit/qwen3_moe_transformers4_patch) | 修复 Qwen3 MoE 模型在 FSDP2 训练期间挂起的问题,适用于 transformers==4.x | ModelScope 官方 |

## 致谢

本项目由 Workshop 组织下的多个团队共同维护和支持:

- ModelScope官方团队
- 招商银行开源技术团队

Twinkle 的构建基于多个优秀的开源项目,包括 [Transformers](https://github.com/huggingface/transformers)、[MS-SWIFT](https://github.com/modelscope/swift)、[veRL](https://github.com/verl-project/verl) 等。
2 changes: 1 addition & 1 deletion cookbook/client/tinker/lora.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# - base_url: the address of the running server
# - api_key: authentication token (loaded from environment variable)
service_client = init_tinker_compat_client(
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'))
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_TOKEN'))

# Step 3: List models available on the server to verify the connection
print('Available models:')
Expand Down
8 changes: 4 additions & 4 deletions cookbook/client/tinker/megatron/server_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ applications:
nproc_per_node: 4 # Number of GPU processes per node
sampler_type: vllm # Inference engine: 'vllm' (fast) or 'torch' (TorchSampler)
engine_args: # vLLM engine-specific settings
max_model_len: 8192 # Maximum sequence length the engine supports
max_model_len: 16000 # Maximum sequence length the engine supports
gpu_memory_utilization: 0.85 # Fraction of GPU memory to use (0.0-1.0)
enable_lora: true # Allow loading LoRA adapters during inference
device_group: # Logical device group for the sampler
Expand All @@ -58,7 +58,7 @@ applications:
dp_size: 4
queue_config:
rps_limit: 20 # Max requests per second
tps_limit: 10000 # Max tokens per second
tps_limit: 16000 # Max tokens per second
deployments:
- name: SamplerManagement
autoscaling_config:
Expand All @@ -80,7 +80,7 @@ applications:
args:
use_megatron: true # Use HuggingFace Transformers backend
model_id: "ms://Qwen/Qwen3-30B-A3B-Instruct-2507" # ModelScope model identifier
max_length: 10240 # model max length
max_length: 16000 # model max length
max_loras: 5 # model max loras
nproc_per_node: 4 # Number of GPU processes per node
device_group:
Expand All @@ -94,7 +94,7 @@ applications:

queue_config:
rps_limit: 20 # Max requests per second
tps_limit: 10000 # Max tokens per second
tps_limit: 16000 # Max tokens per second
adapter_config:
per_token_adapter_limit: 3 # Max concurrent LoRA adapters
adapter_timeout: 30 # Seconds before idle adapter unload
Expand Down
2 changes: 1 addition & 1 deletion cookbook/client/tinker/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
base_model = 'Qwen/Qwen3-30B-A3B-Instruct-2507'
service_client = init_tinker_compat_client(
base_url='http://www.modelscope.cn/twinkle',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')
api_key=os.environ.get('MODELSCOPE_TOKEN')
)
# Step 2: Create a sampling client by loading weights from a saved checkpoint.
# The model_path is a twinkle:// URI pointing to a previously saved LoRA checkpoint.
Expand Down
2 changes: 1 addition & 1 deletion cookbook/client/tinker/self_congnition.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def train():

# Connect to the Twinkle server running locally
service_client = init_tinker_compat_client(
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'))
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_TOKEN'))

# Create a LoRA training client for the base model (rank=16 for the LoRA adapter)
training_client = service_client.create_lora_training_client(base_model=base_model, rank=16)
Expand Down
6 changes: 3 additions & 3 deletions cookbook/client/tinker/short_math_grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __call__(self, trajectories: List[Trajectory], ground_truths: List[Trajector
return rewards


def create_Math_dataset():
def create_math_dataset():
"""Create Math dataset."""
meta = DatasetMeta(
'ms://modelscope/competition_math',
Expand Down Expand Up @@ -207,7 +207,7 @@ def main():
logger.info('Starting Math GRPO training...')

# Step 1: Prepare dataset and dataloader (client-side)
dataset = create_Math_dataset()
dataset = create_math_dataset()
dataloader = DataLoader(dataset=dataset, batch_size=BATCH_SIZE)
template = Template(model_id=f'ms://{BASE_MODEL}')

Expand All @@ -216,7 +216,7 @@ def main():
# Step 2: Initialize the Tinker-compatible client
logger.info('Connecting to Tinker server...')
service_client = init_tinker_compat_client(
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'))
base_url='http://www.modelscope.cn/twinkle', api_key=os.environ.get('MODELSCOPE_TOKEN'))

logger.info('Creating LoRA training client...')
# Create a LoRA training client for GRPO
Expand Down
2 changes: 1 addition & 1 deletion cookbook/client/twinkle/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def train():
# Step 1: Initialize the Twinkle client
client = init_twinkle_client(
base_url='http://127.0.0.1:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'),
api_key=os.environ.get('MODELSCOPE_TOKEN'),
)

# Step 2: Prepare dataset and dataloader
Expand Down
2 changes: 1 addition & 1 deletion cookbook/client/twinkle/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def sample():
# Step 2: Initialize the Twinkle client to communicate with the remote server.
client = init_twinkle_client(
base_url='http://127.0.0.1:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'),
api_key=os.environ.get('MODELSCOPE_TOKEN'),
)

# Step 3: Create the sampler client pointing to the model on the server
Expand Down
2 changes: 1 addition & 1 deletion cookbook/client/twinkle/self_congnition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
# Step 2: Initialize the Twinkle client to communicate with the remote server.
# - base_url: the address of the running Twinkle server
# - api_key: authentication token (loaded from environment variable)
client = init_twinkle_client(base_url='http://127.0.0.1:8000', api_key=os.environ.get('MODELSCOPE_SDK_TOKEN'))
client = init_twinkle_client(base_url='http://127.0.0.1:8000', api_key=os.environ.get('MODELSCOPE_TOKEN'))

# Step 3: Query the server for existing training runs and their checkpoints.
# This is useful for resuming a previous training session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ from twinkle_client import init_tinker_compat_client
# Step 1: Initialize client (automatically patches Tinker SDK)
service_client = init_tinker_compat_client(
base_url='http://localhost:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')
api_key=os.environ.get('MODELSCOPE_TOKEN')
)

# Step 2: Query existing training runs (optional)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ logger = get_logger()
# Step 1: Initialize client
client = init_twinkle_client(
base_url='http://127.0.0.1:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')
api_key=os.environ.get('MODELSCOPE_TOKEN')
)

# Step 2: Query existing training runs (optional, for resuming training)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ from twinkle_client import init_tinker_compat_client
# Step 1: 初始化客户端(会自动 patch Tinker SDK)
service_client = init_tinker_compat_client(
base_url='http://localhost:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')
api_key=os.environ.get('MODELSCOPE_TOKEN')
)

# Step 2: 查询已有训练运行(可选)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ logger = get_logger()
# Step 1: 初始化客户端
client = init_twinkle_client(
base_url='http://127.0.0.1:8000',
api_key=os.environ.get('MODELSCOPE_SDK_TOKEN')
api_key=os.environ.get('MODELSCOPE_TOKEN')
)

# Step 2: 查询已有训练运行(可选,用于恢复训练)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ transformers = [
"torch>=2.6.0,<3.0.0",
"torchvision",
]
kernels = ["kernels"]
megatron = ["megatron-core>=0.12.0", "transformer-engine[pytorch]"]
vllm = ["vllm>=0.11"]
ray = ["ray[serve]"]
Expand Down
12 changes: 10 additions & 2 deletions src/twinkle/kernel/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ def impl(*args, **kwargs):
from kernels._versions import select_revision_or_version
from kernels.utils import get_kernel
assert repo_id is not None
resolved = select_revision_or_version(repo_id, revision, version)
kernel = get_kernel(repo_id, revision=resolved)
# kernels API changed across versions; use keyword args for modern API
# and fall back to repo_id-only for older variants.
try:
resolved = select_revision_or_version(repo_id, revision=revision, version=version)
except TypeError:
resolved = select_revision_or_version(repo_id)
try:
kernel = get_kernel(repo_id, revision=resolved)
except TypeError:
kernel = get_kernel(repo_id, resolved)
func = getattr(kernel, func_name, None)
if func is None:
raise AttributeError(f'Kernel repo {repo_id} does not export {func_name}.')
Expand Down
83 changes: 59 additions & 24 deletions src/twinkle/model/transformers/strategy/sequence_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def backward(ctx, *grad_output):
# Split grads back to local sequence chunk.
_grad = grad_output[0]
if sequence_parallel.world_size > 1 and sequence_parallel._sp_group is not None:
# Gather replicates the sequence dimension across SP ranks. Scale once here
# so downstream FSDP avg does not shrink this path by an extra SP factor.
_grad = _grad * sequence_parallel.world_size
_grad = sequence_parallel.split(_grad, dim=ctx.gather_idx, position_ids=ctx.position_ids).contiguous()
return _grad, None, None, None

Expand Down Expand Up @@ -785,7 +788,8 @@ def pad_and_split_inputs(self,
# - In next-token-aligned labels, this appears at labels[b-1]
boundary_starts = (real_position_ids == 0)
prev = torch.zeros_like(boundary_starts, dtype=torch.bool)
prev[..., 1:] = boundary_starts[..., :-1]
# Mask token b-1 when boundary starts at b.
prev[..., :-1] = boundary_starts[..., 1:]
labels = labels.clone()
labels[prev] = -100
# Also avoid any potential wrap-around supervision at the end of the concatenated stream.
Expand Down Expand Up @@ -867,6 +871,7 @@ class SequenceParallelConfig:
ulysses_size: Optional[int] = None
gather_logits: bool = True
loss_reduction: str = 'mean'
compensate_fsdp_avg: bool = False


def _get_ulysses_size(device_mesh, sp_config: Optional[Dict[str, Any]] = None) -> int:
Expand Down Expand Up @@ -969,34 +974,64 @@ def reduce_loss(self, loss: torch.Tensor, labels: Optional[torch.Tensor], ignore
return loss
if labels is None or sequence_parallel._sp_group is None:
return loss
# Compute full-sequence loss in forward, but keep backward local to this rank.
# Compute global loss via autograd-aware all-reduce.
reduction = str(self.sp_config.get('loss_reduction', 'mean')).lower()
if reduction == 'none':
raise ValueError("SequenceParallelStrategy.reduce_loss only supports reduction='sum' or 'mean'. "
'Please aggregate per-token losses before calling reduce_loss.')
num_valid_tokens = (labels != ignore_index).sum().to(loss.device)
compensate_fsdp_avg = bool(self.sp_config.get('compensate_fsdp_avg', False))
compensate_factor = float(self.ulysses_size if compensate_fsdp_avg else 1.0)
sum_metric_scale = float(self.ulysses_size)

class _ReduceSequenceParallelLoss(torch.autograd.Function):

@staticmethod
def forward(ctx, local_mean: torch.Tensor, num_valid_tokens: torch.Tensor) -> torch.Tensor:
local_tokens = num_valid_tokens.detach().clone()
local_sum = local_mean * local_tokens
if local_tokens.item() == 0:
local_sum = torch.nan_to_num(local_sum)
global_sum = local_sum.detach().clone()
dist.all_reduce(global_sum, group=sequence_parallel._sp_group)
global_tokens = num_valid_tokens.detach().clone()
dist.all_reduce(global_tokens, group=sequence_parallel._sp_group)
ctx.save_for_backward(local_tokens, global_tokens)
if global_tokens.item() == 0:
return local_sum
return global_sum / global_tokens

@staticmethod
def backward(ctx, grad_output: torch.Tensor):
local_tokens, global_tokens = ctx.saved_tensors
if global_tokens.item() == 0:
return torch.zeros_like(grad_output), None
# d(global_mean)/d(local_mean) = local_tokens / global_tokens.
grad_local_mean = grad_output * (local_tokens / global_tokens) * compensate_factor
return grad_local_mean, None

class _ReduceSequenceParallelSum(torch.autograd.Function):

@staticmethod
def forward(ctx, local_sum: torch.Tensor) -> torch.Tensor:
ctx.sum_metric_scale = sum_metric_scale
global_sum = local_sum.detach().clone()
dist.all_reduce(global_sum, group=sequence_parallel._sp_group)
# Keep logging/metric value aligned with non-SP sum semantics under
# outer collect='mean' by removing one SP replication factor.
return global_sum / ctx.sum_metric_scale

@staticmethod
def backward(ctx, grad_output: torch.Tensor):
# Keep training gradient scale unchanged; forward-side scaling is for
# logging/metric alignment under outer collect='mean'.
return grad_output

if reduction == 'sum':
local_sum = loss
global_sum = local_sum.detach().clone()
dist.all_reduce(global_sum, group=sequence_parallel._sp_group)
out = global_sum + (local_sum - local_sum.detach())
if sequence_parallel.world_size > 1:
out_metric = out.detach() / sequence_parallel.world_size
return out_metric + (out - out.detach())
return out
# Default to mean reduction.
local_sum = loss * num_valid_tokens
global_sum = local_sum.detach().clone()
dist.all_reduce(global_sum, group=sequence_parallel._sp_group)
global_tokens = num_valid_tokens.detach().clone()
dist.all_reduce(global_tokens, group=sequence_parallel._sp_group)
if global_tokens.item() == 0:
return loss
out = (global_sum + (local_sum - local_sum.detach())) / global_tokens
if sequence_parallel.world_size > 1:
out_metric = out.detach() / sequence_parallel.world_size
return out_metric + (out - out.detach())
return out
return _ReduceSequenceParallelSum.apply(loss)

# Default to mean reduction: `loss` is local mean.
num_valid_tokens = (labels != ignore_index).sum().to(loss.device)
return _ReduceSequenceParallelLoss.apply(loss, num_valid_tokens)

def wrap_model(self, model, optimizer=None):
self.initialize()
Expand Down
15 changes: 10 additions & 5 deletions src/twinkle/model/transformers/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,15 @@ def _ensure_sp_strategy(self) -> None:
return
from .strategy.sequence_parallel import SequenceParallelStrategy

sp_config = {}
# When data-parallel gradient averaging runs across SP shards (native FSDP or
# accelerate DDP/FSDP paths), compensate SP loss backward to keep gradient scale.
if isinstance(self.strategy, (NativeFSDPStrategy, AccelerateStrategy)) and self.device_mesh is not None:
if (self.device_mesh.ulysses_size or 1) > 1 and (self.device_mesh.data_world_size or 1) > 1:
sp_config['compensate_fsdp_avg'] = True
self.sp_strategy = SequenceParallelStrategy(
self.device_mesh,
{},
sp_config,
model=self.model,
tokenizer_id=self.tokenizer_id,
)
Expand Down Expand Up @@ -434,10 +440,9 @@ def calculate_loss(self, **kwargs):
optimizer_config = self.optimizer_group[adapter_name]
optimizer_config.num_tokens += counts.item()
if self.sp_strategy is not None and 'labels' in inputs:
if 'loss_reduction' not in self.sp_strategy.sp_config:
reduction = getattr(loss_instance, 'reduction', None)
if reduction is not None:
self.sp_strategy.sp_config['loss_reduction'] = str(reduction)
reduction = getattr(loss_instance, 'reduction', None)
if reduction is not None:
self.sp_strategy.sp_config['loss_reduction'] = str(reduction)
loss_value = self.sp_strategy.reduce_loss(loss_value, inputs['labels'])
optimizer_config.loss_value += loss_value
outputs['loss'] = optimizer_config.loss_value
Expand Down
4 changes: 2 additions & 2 deletions src/twinkle/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self,
def __call__(self, inputs: Union[InputFeature, List[InputFeature]],
**kwargs) -> Union[InputFeature, List[InputFeature]]:
for pipe in self.process_pipeline:
inputs = pipe(inputs)
inputs = pipe(inputs, **kwargs)
return inputs

def prepare_outputs(self, inputs: List[InputFeature], **kwargs) -> Union[List[InputFeature], InputFeature]:
Expand Down Expand Up @@ -293,7 +293,7 @@ def _any_packing(inputs: List[InputFeature]):
return is_padding_free

@staticmethod
def to_transformers_dict(inputs: List[InputFeature]) -> List[InputFeature]:
def to_transformers_dict(inputs: List[InputFeature], **kwargs) -> List[InputFeature]:
import torch
results = []
for _input in inputs:
Expand Down
Loading
Loading