From 1303ce1fd3c508070ff62740df07dcbda6760140 Mon Sep 17 00:00:00 2001 From: kimmishi Date: Mon, 26 Jun 2023 16:42:37 +0800 Subject: [PATCH 1/6] runned fmoe --- deepspeed/moe/layer.py | 172 ++++++++++++++++++++++++++++++++++++ deepspeed/runtime/engine.py | 10 ++- 2 files changed, 178 insertions(+), 4 deletions(-) diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index 89fe2bb46c3c..2f62af48caa5 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -122,3 +122,175 @@ def forward(self, hidden_states, used_token=None): coef = torch.nn.functional.softmax(coef, dim=-1) output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] return output, self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts + +import fmoe +from fmoe import FMoE +from fmoe.linear import FMoELinear +import copy + +class FMoEMLPLinear(FMoELinear): + def __init__(self, num_expert: int, fc:torch.nn.Module, rank: int = 0): + in_feat = fc.weight.shape[1] + out_feat = fc.weight.shape[0] + bias=hasattr(fc, 'bias') + super().__init__(num_expert, in_feat, out_feat, bias, rank) + + self.weight.data.copy_(fc.weight.data) + if bias: + self.bias.data.copy_(fc.bias.data) + +class MlpExpert(torch.nn.Module): + def __init__(self, mlp, ep_rank=0, num_expert=1): + super().__init__() + # self.fc1 = FMoEMLPLinear(num_expert, mlp.fc1, rank=ep_rank) + # self.fc2 = FMoEMLPLinear(num_expert, mlp.fc2, rank=ep_rank) + self.fc1 = copy.deepcopy(mlp.fc1) + self.fc2 = copy.deepcopy(mlp.fc2) + + self.act = copy.deepcopy(mlp.act) + self.drop1 = copy.deepcopy(mlp.drop1) + self.drop2 = copy.deepcopy(mlp.drop2) + + def forward(self, x, cnt): + # print("rank: ", torch.distributed.get_rank(), "cnt:", cnt) + # x = self.fc1(x, cnt) + x = self.fc1(x) + x = self.act(x) + x = self.drop1(x) + # x = self.fc2(x, cnt) + x = self.fc2(x) + x = self.drop2(x) + return x + + +class VitFMoE(FMoE): + def __init__( + self, experts, num_expert=1, d_model=1, top_k=1, moe_group=None, expert_kwargs={} + ): + world_size = torch.distributed.get_world_size(moe_group) + super().__init__( + num_expert=num_expert, + d_model=d_model, + world_size=world_size, + mp_group=None, + top_k=top_k, + moe_group=moe_group, + # gate = fmoe.gates.GShardGate, + ) + # expert_kwargs['group'] = moe_group + self.experts = experts + self.experts_fused = True + self.num_experts = num_expert + + def expert_fn(self, inp, fwd_cnt): + # import pdb;pdb.set_trace() + # print("rank:", torch.distributed.get_rank(), "fwd_cnt:", fwd_cnt) + return self.experts(inp) + + def _set_ep_group(self, group): + self.moe_group = group + + def forward(self, inp: torch.Tensor): + r""" + This module wraps up the FMoE module with reshape, residual and layer + normalization. + """ + original_shape = inp.shape + inp = inp.reshape(-1, self.d_model) + output = super().forward(inp) + return output.reshape(original_shape) + +class FmoeMoE(MoE): + def __init__(self, + hidden_size, + expert, + num_experts=1, + ep_size=1, + k=1, + capacity_factor=1., + eval_capacity_factor=1., + min_capacity=4, + use_residual=False, + noisy_gate_policy: typing.Optional[str] = None, + drop_tokens: bool = True, + use_rts=True, + use_tutel: bool = False, + enable_expert_tensor_parallelism: bool = False): + + super(MoE, self).__init__() + + self.use_residual = use_residual + self.enable_expert_tensor_parallelism = enable_expert_tensor_parallelism + assert num_experts % ep_size == 0, f"Number of experts ({num_experts}) should be divisible by expert parallel size ({ep_size})" + self.ep_size = ep_size + self.expert_group_name = f"ep_size_{self.ep_size}" + self.num_experts = num_experts + self.num_local_experts = num_experts // self.ep_size + + log_dist( + f'Creating MoE layer with num_experts: {num_experts} | num_local_experts: {self.num_local_experts} | expert_parallel_size: {self.ep_size}', + [0]) + + assert noisy_gate_policy is None or noisy_gate_policy in ['None', 'Jitter', 'RSample'], \ + 'Unsupported noisy_gate_policy: ' + noisy_gate_policy + + ep_group = groups._get_expert_parallel_group(self.expert_group_name) + # ep_rank=torch.distributed.get_rank(ep_group) + experts = Experts(expert, self.num_local_experts, self.expert_group_name) + # fmoe_linear_hacked_expert = experts = MlpExpert(expert,ep_rank=ep_rank,num_expert=1) + # for name, param in experts.named_parameters(): + # param.allreduce = False + # param.group_name = self.expert_group_name + + self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, + moe_group = ep_group, + ) + + # if self.use_residual: + # self.mlp = expert + # # coefficient is used for weighted sum of the output of expert and mlp + # self.coefficient = torch.nn.Linear(hidden_size, 2) + + def set_deepspeed_parallelism(self): + self._create_process_groups() + + def _create_process_groups(self): + # Create process group for a layer if needed + if self.expert_group_name not in groups._get_expert_parallel_group_dict(): + print(f"No existing process group found, creating a new group named: {self.expert_group_name}") + if (groups.mpu is None) or (not self.enable_expert_tensor_parallelism): + # Condition 1 - no groups.mpu means no tensor parallelism + # Condition 2 - disabling expert tensor parallelism on purpose + groups._create_expert_and_data_parallel(self.ep_size) + else: + # expert tensor parallelism is enabled + groups._create_expert_data_and_model_parallel(self.ep_size, mpu=groups.mpu) + # Set the group handle for the MOELayer (deepspeed_moe) object + self.deepspeed_moe._set_ep_group(groups._get_expert_parallel_group(self.expert_group_name)) + + def forward(self, hidden_states, used_token=None): + """ MoE forward + + Arguments: + hidden_states (Tensor): input to the layer + used_token (Tensor, optional): default: None, mask only used tokens + + Returns: + A tuple including output, gate loss, and expert count. + + * output (Tensor): output of the model + + * l_aux (Tensor): gate loss value + + * exp_counts (int): expert count + """ + output = self.deepspeed_moe(hidden_states) + # if self.use_residual: + # # Residual MoE + # output_mlp = self.mlp(hidden_states) + # if type(output_mlp) is tuple: + # output_mlp = output_mlp[0] # Ignore the bias term for now + # coef = self.coefficient(hidden_states) + # coef = torch.nn.functional.softmax(coef, dim=-1) + # output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] + return output, None # self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index e953938c06a4..41db7536479c 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -84,7 +84,7 @@ from .utils import get_ma_status from ..ops.adam import FusedAdam from ..moe.sharded_moe import TopKGate, MOELayer -from ..moe.layer import MoE +from ..moe.layer import MoE,FmoeMoE from ..moe.utils import is_moe_param from ..git_version_info import version @@ -96,6 +96,8 @@ from deepspeed.runtime.config import DtypeEnum +from fmoe import FMoE + # Set to torch's distributed package or deepspeed.comm based inside DeepSpeedEngine init dist = None @@ -223,8 +225,8 @@ def __init__( self.gas_boundary_ctr = 0 self.dist_backend = get_accelerator().communication_backend_name() self.has_moe_layers = False - self.num_experts = [] - self.gate_modules = [] + self.num_experts = [] # for load and save checkpoint + self.gate_modules = [] # for time profile and print self.moe_layers = [] self._step_applied = False self._global_grad_norm = None @@ -1052,7 +1054,7 @@ def _configure_distributed_model(self, model): # MoE related initialization for _, module in self.module.named_modules(): - if isinstance(module, MoE): + if isinstance(module, MoE) or isinstance(module, FMoE) or isinstance(module, FmoeMoE): self.has_moe_layers = True self.num_experts.append(module.num_experts) From 51db57e9d2e102469f31180d6d9d4c288bd98ee9 Mon Sep 17 00:00:00 2001 From: kimmishi Date: Mon, 26 Jun 2023 17:53:15 +0800 Subject: [PATCH 2/6] update moe impl, supports fmoe --- deepspeed/moe/layer.py | 232 +++++++++--------------------------- deepspeed/runtime/engine.py | 4 +- 2 files changed, 59 insertions(+), 177 deletions(-) diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index 2f62af48caa5..f81442f2f979 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -12,156 +12,7 @@ from .experts import Experts import typing - -class MoE(torch.nn.Module): - """Initialize an MoE layer. - - Arguments: - hidden_size (int): the hidden dimension of the model, importantly this is also the input and output dimension. - expert (torch.nn.Module): the torch module that defines the expert (e.g., MLP, torch.linear). - num_experts (int, optional): default=1, the total number of experts per layer. - ep_size (int, optional): default=1, number of ranks in the expert parallel world or group. - k (int, optional): default=1, top-k gating value, only supports k=1 or k=2. - capacity_factor (float, optional): default=1.0, the capacity of the expert at training time. - eval_capacity_factor (float, optional): default=1.0, the capacity of the expert at eval time. - min_capacity (int, optional): default=4, the minimum capacity per expert regardless of the capacity_factor. - use_residual (bool, optional): default=False, make this MoE layer a Residual MoE (https://arxiv.org/abs/2201.05596) layer. - noisy_gate_policy (str, optional): default=None, noisy gate policy, valid options are 'Jitter', 'RSample' or 'None'. - drop_tokens (bool, optional): default=True, whether to drop tokens - (setting to False is equivalent to infinite capacity). - use_rts (bool, optional): default=True, whether to use Random Token Selection. - use_tutel (bool, optional): default=False, whether to use Tutel optimizations (if installed). - enable_expert_tensor_parallelism (bool, optional): default=False, whether to use tensor parallelism for experts - """ - - def __init__(self, - hidden_size, - expert, - num_experts=1, - ep_size=1, - k=1, - capacity_factor=1., - eval_capacity_factor=1., - min_capacity=4, - use_residual=False, - noisy_gate_policy: typing.Optional[str] = None, - drop_tokens: bool = True, - use_rts=True, - use_tutel: bool = False, - enable_expert_tensor_parallelism: bool = False): - - super(MoE, self).__init__() - - self.use_residual = use_residual - self.enable_expert_tensor_parallelism = enable_expert_tensor_parallelism - assert num_experts % ep_size == 0, f"Number of experts ({num_experts}) should be divisible by expert parallel size ({ep_size})" - self.ep_size = ep_size - self.expert_group_name = f"ep_size_{self.ep_size}" - self.num_experts = num_experts - self.num_local_experts = num_experts // self.ep_size - - log_dist( - f'Creating MoE layer with num_experts: {num_experts} | num_local_experts: {self.num_local_experts} | expert_parallel_size: {self.ep_size}', - [0]) - - assert noisy_gate_policy is None or noisy_gate_policy in ['None', 'Jitter', 'RSample'], \ - 'Unsupported noisy_gate_policy: ' + noisy_gate_policy - - experts = Experts(expert, self.num_local_experts, self.expert_group_name) - self.deepspeed_moe = MOELayer(TopKGate(hidden_size, num_experts, k, capacity_factor, eval_capacity_factor, - min_capacity, noisy_gate_policy, drop_tokens, use_rts), - experts, - self.expert_group_name, - self.ep_size, - self.num_local_experts, - use_tutel=use_tutel) - if self.use_residual: - self.mlp = expert - # coefficient is used for weighted sum of the output of expert and mlp - self.coefficient = torch.nn.Linear(hidden_size, 2) - - def set_deepspeed_parallelism(self): - self._create_process_groups() - - def _create_process_groups(self): - # Create process group for a layer if needed - if self.expert_group_name not in groups._get_expert_parallel_group_dict(): - print(f"No existing process group found, creating a new group named: {self.expert_group_name}") - if (groups.mpu is None) or (not self.enable_expert_tensor_parallelism): - # Condition 1 - no groups.mpu means no tensor parallelism - # Condition 2 - disabling expert tensor parallelism on purpose - groups._create_expert_and_data_parallel(self.ep_size) - else: - # expert tensor parallelism is enabled - groups._create_expert_data_and_model_parallel(self.ep_size, mpu=groups.mpu) - # Set the group handle for the MOELayer (deepspeed_moe) object - self.deepspeed_moe._set_ep_group(groups._get_expert_parallel_group(self.expert_group_name)) - - def forward(self, hidden_states, used_token=None): - """ MoE forward - - Arguments: - hidden_states (Tensor): input to the layer - used_token (Tensor, optional): default: None, mask only used tokens - - Returns: - A tuple including output, gate loss, and expert count. - - * output (Tensor): output of the model - - * l_aux (Tensor): gate loss value - - * exp_counts (int): expert count - """ - output = self.deepspeed_moe(hidden_states, used_token) - if self.use_residual: - # Residual MoE - output_mlp = self.mlp(hidden_states) - if type(output_mlp) is tuple: - output_mlp = output_mlp[0] # Ignore the bias term for now - coef = self.coefficient(hidden_states) - coef = torch.nn.functional.softmax(coef, dim=-1) - output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] - return output, self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts - -import fmoe from fmoe import FMoE -from fmoe.linear import FMoELinear -import copy - -class FMoEMLPLinear(FMoELinear): - def __init__(self, num_expert: int, fc:torch.nn.Module, rank: int = 0): - in_feat = fc.weight.shape[1] - out_feat = fc.weight.shape[0] - bias=hasattr(fc, 'bias') - super().__init__(num_expert, in_feat, out_feat, bias, rank) - - self.weight.data.copy_(fc.weight.data) - if bias: - self.bias.data.copy_(fc.bias.data) - -class MlpExpert(torch.nn.Module): - def __init__(self, mlp, ep_rank=0, num_expert=1): - super().__init__() - # self.fc1 = FMoEMLPLinear(num_expert, mlp.fc1, rank=ep_rank) - # self.fc2 = FMoEMLPLinear(num_expert, mlp.fc2, rank=ep_rank) - self.fc1 = copy.deepcopy(mlp.fc1) - self.fc2 = copy.deepcopy(mlp.fc2) - - self.act = copy.deepcopy(mlp.act) - self.drop1 = copy.deepcopy(mlp.drop1) - self.drop2 = copy.deepcopy(mlp.drop2) - - def forward(self, x, cnt): - # print("rank: ", torch.distributed.get_rank(), "cnt:", cnt) - # x = self.fc1(x, cnt) - x = self.fc1(x) - x = self.act(x) - x = self.drop1(x) - # x = self.fc2(x, cnt) - x = self.fc2(x) - x = self.drop2(x) - return x - class VitFMoE(FMoE): def __init__( @@ -200,7 +51,26 @@ def forward(self, inp: torch.Tensor): output = super().forward(inp) return output.reshape(original_shape) -class FmoeMoE(MoE): +class MoE(torch.nn.Module): + """Initialize an MoE layer. + + Arguments: + hidden_size (int): the hidden dimension of the model, importantly this is also the input and output dimension. + expert (torch.nn.Module): the torch module that defines the expert (e.g., MLP, torch.linear). + num_experts (int, optional): default=1, the total number of experts per layer. + ep_size (int, optional): default=1, number of ranks in the expert parallel world or group. + k (int, optional): default=1, top-k gating value, only supports k=1 or k=2. + capacity_factor (float, optional): default=1.0, the capacity of the expert at training time. + eval_capacity_factor (float, optional): default=1.0, the capacity of the expert at eval time. + min_capacity (int, optional): default=4, the minimum capacity per expert regardless of the capacity_factor. + use_residual (bool, optional): default=False, make this MoE layer a Residual MoE (https://arxiv.org/abs/2201.05596) layer. + noisy_gate_policy (str, optional): default=None, noisy gate policy, valid options are 'Jitter', 'RSample' or 'None'. + drop_tokens (bool, optional): default=True, whether to drop tokens - (setting to False is equivalent to infinite capacity). + use_rts (bool, optional): default=True, whether to use Random Token Selection. + use_tutel (bool, optional): default=False, whether to use Tutel optimizations (if installed). + enable_expert_tensor_parallelism (bool, optional): default=False, whether to use tensor parallelism for experts + """ + def __init__(self, hidden_size, expert, @@ -215,7 +85,8 @@ def __init__(self, drop_tokens: bool = True, use_rts=True, use_tutel: bool = False, - enable_expert_tensor_parallelism: bool = False): + enable_expert_tensor_parallelism: bool = False, + use_fmoe: bool = True): super(MoE, self).__init__() @@ -226,6 +97,7 @@ def __init__(self, self.expert_group_name = f"ep_size_{self.ep_size}" self.num_experts = num_experts self.num_local_experts = num_experts // self.ep_size + self.use_fmoe = use_fmoe log_dist( f'Creating MoE layer with num_experts: {num_experts} | num_local_experts: {self.num_local_experts} | expert_parallel_size: {self.ep_size}', @@ -234,22 +106,25 @@ def __init__(self, assert noisy_gate_policy is None or noisy_gate_policy in ['None', 'Jitter', 'RSample'], \ 'Unsupported noisy_gate_policy: ' + noisy_gate_policy - ep_group = groups._get_expert_parallel_group(self.expert_group_name) - # ep_rank=torch.distributed.get_rank(ep_group) experts = Experts(expert, self.num_local_experts, self.expert_group_name) - # fmoe_linear_hacked_expert = experts = MlpExpert(expert,ep_rank=ep_rank,num_expert=1) - # for name, param in experts.named_parameters(): - # param.allreduce = False - # param.group_name = self.expert_group_name + if not use_fmoe: + self.deepspeed_moe = MOELayer(TopKGate(hidden_size, num_experts, k, capacity_factor, eval_capacity_factor, + min_capacity, noisy_gate_policy, drop_tokens, use_rts), + experts, + self.expert_group_name, + self.ep_size, + self.num_local_experts, + use_tutel=use_tutel) + else: + # Note: need to setup groups for moe before creating MOE layers + ep_group = groups._get_expert_parallel_group(self.expert_group_name) + self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, + moe_group = ep_group) - self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, - moe_group = ep_group, - ) - - # if self.use_residual: - # self.mlp = expert - # # coefficient is used for weighted sum of the output of expert and mlp - # self.coefficient = torch.nn.Linear(hidden_size, 2) + if self.use_residual: + self.mlp = expert + # coefficient is used for weighted sum of the output of expert and mlp + self.coefficient = torch.nn.Linear(hidden_size, 2) def set_deepspeed_parallelism(self): self._create_process_groups() @@ -284,13 +159,20 @@ def forward(self, hidden_states, used_token=None): * exp_counts (int): expert count """ - output = self.deepspeed_moe(hidden_states) - # if self.use_residual: - # # Residual MoE - # output_mlp = self.mlp(hidden_states) - # if type(output_mlp) is tuple: - # output_mlp = output_mlp[0] # Ignore the bias term for now - # coef = self.coefficient(hidden_states) - # coef = torch.nn.functional.softmax(coef, dim=-1) - # output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] - return output, None # self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts + if self.use_fmoe: + output = self.deepspeed_moe(hidden_states) + else: + output = self.deepspeed_moe(hidden_states, used_token) + + if self.use_residual: + # Residual MoE + output_mlp = self.mlp(hidden_states) + if type(output_mlp) is tuple: + output_mlp = output_mlp[0] # Ignore the bias term for now + coef = self.coefficient(hidden_states) + coef = torch.nn.functional.softmax(coef, dim=-1) + output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] + if self.use_fmoe: + return output,None,None # todo: fix this + else: + return output, self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 41db7536479c..504da680ba64 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -84,7 +84,7 @@ from .utils import get_ma_status from ..ops.adam import FusedAdam from ..moe.sharded_moe import TopKGate, MOELayer -from ..moe.layer import MoE,FmoeMoE +from ..moe.layer import MoE from ..moe.utils import is_moe_param from ..git_version_info import version @@ -1054,7 +1054,7 @@ def _configure_distributed_model(self, model): # MoE related initialization for _, module in self.module.named_modules(): - if isinstance(module, MoE) or isinstance(module, FMoE) or isinstance(module, FmoeMoE): + if isinstance(module, MoE) or isinstance(module, FMoE): self.has_moe_layers = True self.num_experts.append(module.num_experts) From 0103aa14c4ad65b82e158f7ce279831c69721dde Mon Sep 17 00:00:00 2001 From: kimmishi Date: Wed, 28 Jun 2023 10:40:57 +0800 Subject: [PATCH 3/6] update fmoe expert def --- deepspeed/moe/experts.py | 4 ++++ deepspeed/moe/layer.py | 39 ++++++++++++++++++++++++++++++++----- deepspeed/runtime/engine.py | 5 +++-- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/deepspeed/moe/experts.py b/deepspeed/moe/experts.py index 8cadb0c387fa..e3f7a794d308 100644 --- a/deepspeed/moe/experts.py +++ b/deepspeed/moe/experts.py @@ -33,3 +33,7 @@ def forward(self, inputs): expert_output = torch.cat(expert_outputs, dim=1) return expert_output + + def forward_single(self, idx, inputs): + expert_output = self.deepspeed_experts[idx](inputs) + return expert_output \ No newline at end of file diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index f81442f2f979..5e79df7634a8 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -30,13 +30,41 @@ def __init__( ) # expert_kwargs['group'] = moe_group self.experts = experts - self.experts_fused = True + self.experts_fused = False self.num_experts = num_expert - def expert_fn(self, inp, fwd_cnt): - # import pdb;pdb.set_trace() - # print("rank:", torch.distributed.get_rank(), "fwd_cnt:", fwd_cnt) - return self.experts(inp) + # def expert_fn(self, inp, fwd_cnt): + # # import pdb;pdb.set_trace() + # # print("rank:", torch.distributed.get_rank(), "fwd_cnt:", fwd_cnt) + # return self.experts(inp) + + def expert_fn(self, inp, fwd_expert_count): + r""" + The default expert function which either calls the experts as a whole + or as separate experts. + """ + if self.experts_fused: + return self.experts(inp, fwd_expert_count) + if isinstance(fwd_expert_count, torch.Tensor): + fwd_expert_count_cpu = fwd_expert_count.cpu().numpy() + outputs = [] + base_idx = 0 + for i in range(self.num_expert): + batch_size = fwd_expert_count_cpu[i] + inp_slice = inp[base_idx : base_idx + batch_size] + # outputs.append(self.experts[i](inp_slice, torch.tensor([fwd_expert_count[i]]))) + outputs.append(self.experts.forward_single(i, inp_slice)) + base_idx += batch_size + return torch.cat(outputs, dim=0) + + def expert_fn_single(self, inp, fwd_expert_count, idx): + r""" + forward single expert for smart scheduling. + """ + assert not self.experts_fused, "should not use fused experts" + # output = self.experts[idx](inp) + output = self.experts.forward_single(idx, inp) + return output def _set_ep_group(self, group): self.moe_group = group @@ -162,6 +190,7 @@ def forward(self, hidden_states, used_token=None): if self.use_fmoe: output = self.deepspeed_moe(hidden_states) else: + # import pdb;pdb.set_trace() output = self.deepspeed_moe(hidden_states, used_token) if self.use_residual: diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 504da680ba64..7e46d730037a 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -2870,7 +2870,7 @@ def _get_non_moe_state_dict(self, full_state_dict): Get the state dict of the non-moe layers """ for key in list(full_state_dict.keys()): - if 'expert' in key and 'moe.gate.wg.weight' not in key: + if 'expert' in key and 'moe.gate' not in key: full_state_dict.pop(key) return full_state_dict @@ -2897,7 +2897,7 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}): # get all moe parameters moe_state_dict = {} for n, p in module.state_dict().items(): - if 'expert' in n and 'moe.gate.wg.weight' not in n: + if 'expert' in n and 'moe.gate' not in n: moe_state_dict[n_module + '.' + n] = p moe_str_prefix = '.deepspeed_moe.experts.deepspeed_experts.' # print(moe_state_dict.keys()) # until now, everything is fine. So the bug happens at next few lines @@ -2942,6 +2942,7 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}): return # Save optimizer states. They are different across each exp parallel rank. + # for zero, this is None optimizer_state = { 'optimizer': self.optimizer.state_dict() if self.optimizer and not self.zero_optimization() else None } From 84bade8a194f899836a203d178392c52d664d1b5 Mon Sep 17 00:00:00 2001 From: kimmishi Date: Thu, 29 Jun 2023 10:31:35 +0800 Subject: [PATCH 4/6] add kwargs for gate --- deepspeed/moe/layer.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index 5e79df7634a8..e2a43a8aeff3 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -13,10 +13,10 @@ import typing from fmoe import FMoE - +import fmoe class VitFMoE(FMoE): def __init__( - self, experts, num_expert=1, d_model=1, top_k=1, moe_group=None, expert_kwargs={} + self, experts, num_expert=1, d_model=1, top_k=1, moe_group=None, gate_kwargs={} ): world_size = torch.distributed.get_world_size(moe_group) super().__init__( @@ -26,7 +26,8 @@ def __init__( mp_group=None, top_k=top_k, moe_group=moe_group, - # gate = fmoe.gates.GShardGate, + gate = fmoe.gates.GShardGate, + gate_kwargs = gate_kwargs ) # expert_kwargs['group'] = moe_group self.experts = experts @@ -114,7 +115,7 @@ def __init__(self, use_rts=True, use_tutel: bool = False, enable_expert_tensor_parallelism: bool = False, - use_fmoe: bool = True): + use_fmoe: bool = False): super(MoE, self).__init__() @@ -146,8 +147,9 @@ def __init__(self, else: # Note: need to setup groups for moe before creating MOE layers ep_group = groups._get_expert_parallel_group(self.expert_group_name) - self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, - moe_group = ep_group) + self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, top_k=k, + moe_group = ep_group, + gate_kwargs = {'capacity':(capacity_factor, eval_capacity_factor)}) if self.use_residual: self.mlp = expert @@ -202,6 +204,6 @@ def forward(self, hidden_states, used_token=None): coef = torch.nn.functional.softmax(coef, dim=-1) output = output * coef[..., 0:1] + output_mlp * coef[..., 1:] if self.use_fmoe: - return output,None,None # todo: fix this + return output#,None,None # todo: fix this else: - return output, self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts + return output#, self.deepspeed_moe.l_aux, self.deepspeed_moe.exp_counts From c675ada5794bd3e4f13e3b1ccc9773de8dc63c30 Mon Sep 17 00:00:00 2001 From: kimmishi Date: Tue, 4 Jul 2023 19:20:46 +0800 Subject: [PATCH 5/6] update fmoe args --- deepspeed/moe/layer.py | 1 + deepspeed/runtime/engine.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index e2a43a8aeff3..1aa478d0e070 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -148,6 +148,7 @@ def __init__(self, # Note: need to setup groups for moe before creating MOE layers ep_group = groups._get_expert_parallel_group(self.expert_group_name) self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, top_k=k, + num_expert=self.num_local_experts, moe_group = ep_group, gate_kwargs = {'capacity':(capacity_factor, eval_capacity_factor)}) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 7e46d730037a..dda37d381000 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -1054,7 +1054,7 @@ def _configure_distributed_model(self, model): # MoE related initialization for _, module in self.module.named_modules(): - if isinstance(module, MoE) or isinstance(module, FMoE): + if isinstance(module, MoE):# or isinstance(module, FMoE): self.has_moe_layers = True self.num_experts.append(module.num_experts) From f82ef1bea0da8a56677d748b8af95ce4b08137f5 Mon Sep 17 00:00:00 2001 From: kimmishi Date: Thu, 6 Jul 2023 15:39:26 +0800 Subject: [PATCH 6/6] try to use fstermoe,failed --- deepspeed/moe/layer.py | 37 +++++++++++++++++++++---------------- deepspeed/runtime/engine.py | 7 ++++--- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/deepspeed/moe/layer.py b/deepspeed/moe/layer.py index 1aa478d0e070..7d6bdc6963e9 100644 --- a/deepspeed/moe/layer.py +++ b/deepspeed/moe/layer.py @@ -14,13 +14,15 @@ from fmoe import FMoE import fmoe +import copy class VitFMoE(FMoE): def __init__( - self, experts, num_expert=1, d_model=1, top_k=1, moe_group=None, gate_kwargs={} + self, expert, d_model=1, top_k=1, num_local_experts=1, expert_group_name=None, moe_group=None, gate_kwargs={} ): + assert expert_group_name is not None, 'expert_group_name should be provided' world_size = torch.distributed.get_world_size(moe_group) super().__init__( - num_expert=num_expert, + num_expert=num_local_experts, d_model=d_model, world_size=world_size, mp_group=None, @@ -29,15 +31,16 @@ def __init__( gate = fmoe.gates.GShardGate, gate_kwargs = gate_kwargs ) - # expert_kwargs['group'] = moe_group - self.experts = experts + + self.experts = torch.nn.ModuleList([copy.deepcopy(expert) for i in range(num_local_experts)]) self.experts_fused = False - self.num_experts = num_expert + self.num_experts = num_local_experts + self.num_local_experts = num_local_experts - # def expert_fn(self, inp, fwd_cnt): - # # import pdb;pdb.set_trace() - # # print("rank:", torch.distributed.get_rank(), "fwd_cnt:", fwd_cnt) - # return self.experts(inp) + for expert in self.experts: + for name, param in expert.named_parameters(): + param.allreduce = False + param.group_name = expert_group_name def expert_fn(self, inp, fwd_expert_count): r""" @@ -50,11 +53,12 @@ def expert_fn(self, inp, fwd_expert_count): fwd_expert_count_cpu = fwd_expert_count.cpu().numpy() outputs = [] base_idx = 0 - for i in range(self.num_expert): + for i in range(self.num_experts): batch_size = fwd_expert_count_cpu[i] inp_slice = inp[base_idx : base_idx + batch_size] # outputs.append(self.experts[i](inp_slice, torch.tensor([fwd_expert_count[i]]))) - outputs.append(self.experts.forward_single(i, inp_slice)) + outputs.append(self.experts[i](inp_slice)) + # outputs.append(self.experts.forward_single(i, inp_slice)) base_idx += batch_size return torch.cat(outputs, dim=0) @@ -63,8 +67,8 @@ def expert_fn_single(self, inp, fwd_expert_count, idx): forward single expert for smart scheduling. """ assert not self.experts_fused, "should not use fused experts" - # output = self.experts[idx](inp) - output = self.experts.forward_single(idx, inp) + output = self.experts[idx](inp) + # output = self.experts.forward_single(idx, inp) return output def _set_ep_group(self, group): @@ -148,9 +152,10 @@ def __init__(self, # Note: need to setup groups for moe before creating MOE layers ep_group = groups._get_expert_parallel_group(self.expert_group_name) self.deepspeed_moe = VitFMoE(experts, d_model=hidden_size, top_k=k, - num_expert=self.num_local_experts, - moe_group = ep_group, - gate_kwargs = {'capacity':(capacity_factor, eval_capacity_factor)}) + num_local_experts=self.num_local_experts, + moe_group = ep_group, + expert_group_name = self.expert_group_name, + gate_kwargs = {'capacity':(capacity_factor, eval_capacity_factor)}) if self.use_residual: self.mlp = expert diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index dda37d381000..82d7c2c01ed1 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -115,6 +115,7 @@ # Fail silently so we don't spam logs unnecessarily if user isn't using amp APEX_INSTALLED = False +MOE_PARAM_PREFIX = '.deepspeed_moe.experts.' def split_half_float_double_sparse(tensors): device_type = get_accelerator().device_name() @@ -2386,7 +2387,7 @@ def load_moe_state_dict(checkpoint_path, map_location=torch.device('cpu')) # Updating global -> local expert ids - moe_str_prefix = '.deepspeed_moe.experts.deepspeed_experts.' + moe_str_prefix = MOE_PARAM_PREFIX #'.deepspeed_moe.experts.deepspeed_experts.' for key in list(expert_state_dict.keys()): local_key = key.replace(f'{moe_str_prefix}{global_expert_id}', f'{moe_str_prefix}{local_expert_id}') @@ -2408,7 +2409,7 @@ def load_moe_state_dict(checkpoint_path, map_location=torch.device('cpu')) # print(expert_state_dict.keys()) # Updating global -> local expert ids - moe_str_prefix = '.deepspeed_moe.experts.deepspeed_experts.' + moe_str_prefix = MOE_PARAM_PREFIX #'.deepspeed_moe.experts.deepspeed_experts.' for key in list(expert_state_dict.keys()): local_key = key.replace(f'{moe_str_prefix}{global_expert_id}', f'{moe_str_prefix}{local_expert_id}') @@ -2899,7 +2900,7 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}): for n, p in module.state_dict().items(): if 'expert' in n and 'moe.gate' not in n: moe_state_dict[n_module + '.' + n] = p - moe_str_prefix = '.deepspeed_moe.experts.deepspeed_experts.' + moe_str_prefix = MOE_PARAM_PREFIX #'.deepspeed_moe.experts.deepspeed_experts.' # print(moe_state_dict.keys()) # until now, everything is fine. So the bug happens at next few lines # Reorder the moe name rank, so that each checkpoint only has one expert experts_state_dict = defaultdict(dict)