Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion examples/rl/environments/math/math_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,24 @@ def compute_score(self, response: str, golden: dict, golden_key: str = "answer")
# Did not format the answer correctly
return self.negative_reward

# def make_prefix(self, problem_key: str = "problem", **kwargs) -> str:
# """Take a string math problem and return the prompt. Supports requesting tagged or boxed answers. Supports chat mode prompts."""
# if self.answer_format == "boxed":
# answer_format = "Please reason step by step and provide your answer between \\boxed{} tags, for example \\boxed{20\\sqrt{3}}."
# elif self.answer_format == "tagged":
# answer_format = "Please reason step by step and provide your answer between <answer> </answer> tags, for example <answer> 20\\sqrt{3} </answer>. Do not include an = sign."
# else:
# raise ValueError(f"Invalid answer format: {self.answer_format}")

# if self.chat_mode:
# prefix = f"""{kwargs[problem_key]}\n{answer_format}"""
# else:
# prefix = f"""A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer.
# The question will be a word math problem. Show your work in <think> </think> tags.
# {answer_format}
# User: {kwargs[problem_key]}
# {self.assistant_suffix}"""
# return prefix
def make_prefix(self, problem_key: str = "problem", **kwargs) -> str:
"""Take a string math problem and return the prompt. Supports requesting tagged or boxed answers. Supports chat mode prompts."""
if self.answer_format == "boxed":
Expand All @@ -126,6 +144,11 @@ def make_prefix(self, problem_key: str = "problem", **kwargs) -> str:
else:
raise ValueError(f"Invalid answer format: {self.answer_format}")

prefix = f"""{kwargs[problem_key]}\n{answer_format}"""
# prefix = f"""{kwargs[problem_key]}\n{answer_format}"""
prefix = f"""A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer.
The question will be a word math problem. Show your work in <think> </think> tags.
{answer_format}
User: {kwargs[problem_key]}
Assistant:"""

return prefix
3 changes: 0 additions & 3 deletions examples/rl/model_configs/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@ COMMON_OPTIONS="\
--transformer-impl transformer_engine \
--${PRECISION:-bf16} \
--te-rng-tracker \
--rl-offload-optimizer-during-inference \
--inference-dynamic-batching-buffer-size-gb 20 \
--data-parallel-random-init \
--attention-backend flash \
--timing-log-level 1 \
--log-timers-to-tensorboard \
--save-retain-interval 160 \
--inference-dynamic-batching-num-cuda-graphs 1 \
--inference-dynamic-batching-unified-memory-level 1 \
--adam-beta1 0.9 \
--adam-beta2 ${ADAM_BETA2:-0.95} \
Expand Down
72 changes: 34 additions & 38 deletions examples/rl/model_configs/nemotron6_3b_moe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,21 @@ echo "Using Nemotron6 3B MOE model checkpoint"
SCRIPT_PATH="${BASH_SOURCE[0]}"
source $(dirname $SCRIPT_PATH)/common.sh

# In all cases, one can override those values.
# However, running without envs will give you some
# good perf out of the box for established envs.
if [ "$(basename "$ENV_CONFIG")" = "dapo.yaml" ]; then
echo "Using DAPO environment config"
GRPO_CLAMP_EPS_LOWER=${GRPO_CLAMP_EPS_LOWER:-0.2}
GRPO_CLAMP_EPS_UPPER=${GRPO_CLAMP_EPS_UPPER:-0.28}
MAX_INFERENCE_BS=${MAX_INFERENCE_BS:-32}
GRPO_GROUP_SIZE=${GRPO_GROUP_SIZE:-16}
GRPO_PROMPTS_PER_STEP=${GRPO_PROMPTS_PER_STEP:-64}
GRPO_ITERATIONS=${GRPO_ITERATIONS:-1}
GRPO_KL_BETA=${GRPO_KL_BETA:-"0.0"}
TRAINING_BATCH_SIZE=${TRAINING_BATCH_SIZE:-1024}
MICRO_BATCH_SIZE=${MICRO_BATCH_SIZE:-1}
MAX_SEQ_LENGTH=${MAX_SEQ_LENGTH:-11999}
EXIT_INTERVAL=${EXIT_INTERVAL:-20}
CHKPT_SAVE_INTERVAL=${CHKPT_SAVE_INTERVAL:-20}
else
# Some default values if config is unsupported.
echo "Undected environment config, using default values"
GRPO_CLAMP_EPS_LOWER=${GRPO_CLAMP_EPS_LOWER:-0.2}
GRPO_CLAMP_EPS_UPPER=${GRPO_CLAMP_EPS_UPPER:-0.28}
MAX_INFERENCE_BS=${MAX_INFERENCE_BS:-64}
GRPO_GROUP_SIZE=${GRPO_GROUP_SIZE:-2}
GRPO_PROMPTS_PER_STEP=${GRPO_PROMPTS_PER_STEP:-16}
GRPO_ITERATIONS=${GRPO_ITERATIONS:-1}
GRPO_KL_BETA=${GRPO_KL_BETA:-"0.0"}
TRAINING_BATCH_SIZE=${TRAINING_BATCH_SIZE:-32}
MICRO_BATCH_SIZE=${MICRO_BATCH_SIZE:-1}
MAX_SEQ_LENGTH=${MAX_SEQ_LENGTH:-1024}
EXIT_INTERVAL=${EXIT_INTERVAL:-20}
CHKPT_SAVE_INTERVAL=${CHKPT_SAVE_INTERVAL:-20}
fi

echo "Undected environment config, using default values"
GRPO_CLAMP_EPS_LOWER=${GRPO_CLAMP_EPS_LOWER:-0.2}
GRPO_CLAMP_EPS_UPPER=${GRPO_CLAMP_EPS_UPPER:-0.28}
MAX_INFERENCE_BS=${MAX_INFERENCE_BS:-64}
GRPO_GROUP_SIZE=${GRPO_GROUP_SIZE:-16}
GRPO_PROMPTS_PER_STEP=${GRPO_PROMPTS_PER_STEP:-64}
GRPO_ITERATIONS=${GRPO_ITERATIONS:-1}
GRPO_KL_BETA=${GRPO_KL_BETA:-"0.0"}
TRAINING_BATCH_SIZE=${TRAINING_BATCH_SIZE:-1024}
MICRO_BATCH_SIZE=${MICRO_BATCH_SIZE:-1}
MAX_SEQ_LENGTH=${MAX_SEQ_LENGTH:-8192}
EXIT_INTERVAL=${EXIT_INTERVAL:-15}
CHKPT_SAVE_INTERVAL=${CHKPT_SAVE_INTERVAL:-20}


ENV_DEPENDENT="\
--micro-batch-size $MICRO_BATCH_SIZE \
Expand All @@ -56,14 +38,19 @@ ENV_DEPENDENT="\

MODEL_OPTIONS="\
--rl-skip-bos-token \
--no-rl-use-sequence-packing \
--rl-use-sequence-packing \
--rl-partial-rollouts \
--rl-offload-optimizer-during-inference \
--moe-pad-experts-for-cuda-graph-inference \
--inference-dynamic-batching-max-tokens 8192 \
--inference-dynamic-batching-num-cuda-graphs 4 \
--inference-dynamic-batching-max-requests 128 \
--inference-dynamic-batching-num-cuda-graphs 2 \
--decode-only-cuda-graphs \
--inference-dynamic-batching-paused-buffer-size-gb 5 \
--inference-dynamic-batching-buffer-size-gb 5 \
--inference-dynamic-batching-unified-memory-level 1 \
--rl-training-cuda-graphs \
--empty-unused-memory-level 0 \
--rl-parallel-generation-tasks 128 \
--inference-dynamic-batching-cuda-graph-mixed-prefill-count 0 \
--cuda-graph-impl local \
--cuda-graph-scope full \
--use-checkpoint-args \
Expand Down Expand Up @@ -118,4 +105,13 @@ MODEL_OPTIONS="\
--lr-warmup-samples 640 \
--lr-warmup-init 0.3e-7 \
--no-load-optim \
--no-load-rng "
--no-load-rng \
--moe-permute-fusion \
--eval-interval 1000 \
--timing-log-level 2 \
"
# --inference-dynamic-batching-max-tokens 8192 \
# --rl-training-cuda-graphs \
# --rl-training-cuda-graphs \
# --empty-unused-memory-level 0 \ # try with the default value (=2)
# --inference-logging-step-interval 100 \
78 changes: 78 additions & 0 deletions megatron/core/inference/engines/dynamic_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
MaxSequenceLengthOverflowError,
TokenOverflowError,
)
from megatron.core.inference.inference_flops import InferenceFLOPsCalculator
from megatron.core.inference.data_parallel_inference_coordinator import (
DataParallelInferenceCoordinator,
)
Expand Down Expand Up @@ -188,6 +189,21 @@ def __init__(self, controller: TextGenerationController, context: DynamicInferen
)
self.cuda_graph_impl = model_config.cuda_graph_impl
self.cuda_graph_scope = model_config.cuda_graph_scope

# Initialize inference FLOPs calculator and GPU peak for MFU reporting.
self.flops_calculator = None
self.gpu_peak_tflops = 0.0
self.cumulative_inference_flops = 0.0
self.cumulative_inference_time = 0.0
try:
from megatron.training.global_vars import get_args
from megatron.training.gpu_peak_flops import get_gpu_peak_tflops
args = get_args()
self.flops_calculator = InferenceFLOPsCalculator.from_args(args)
self.gpu_peak_tflops = get_gpu_peak_tflops()
except Exception as e:
logging.warning(f"Could not initialize inference FLOPs calculator: {e}")

# Initialize engine.
self.reset()

Expand Down Expand Up @@ -1487,6 +1503,33 @@ async def async_bookkeep(
self.socket_for_receiving_requests.send(payload)
range_pop()

# Compute inference FLOPs for this step.
step_flops_info = None
if self.flops_calculator is not None:
batch_dims = self.context.batch_dimensions
decode_tokens = batch_dims.decode_req_count if batch_dims else 0
prefill_reqs = batch_dims.prefill_req_count if batch_dims else 0
total_tokens = batch_dims.token_count if batch_dims else 0
prefill_tokens = total_tokens - decode_tokens

step_flops_info = self.flops_calculator.compute_step_flops(
decode_tokens=decode_tokens,
prefill_tokens=prefill_tokens,
total_tokens=total_tokens,
active_blocks=context_state["total_active_used_blocks"],
active_reqs=context_state["total_request_count"] - context_state["paused_request_count"],
num_prefill_reqs=prefill_reqs,
)
self.cumulative_inference_flops += step_flops_info['total_flops']
self.cumulative_inference_time += step_time
try:
from megatron.training.mfu_tracker import get_mfu_tracker
get_mfu_tracker().add_inference_flops(
step_flops_info['total_flops'], step_time, tokens=total_tokens
)
except Exception:
pass

# Log KV cache utilization stats to W&B
if context_state["kv_stats"] is not None:
# Prepare metrics dictionary with all stats
Expand All @@ -1499,6 +1542,29 @@ async def async_bookkeep(
'inference/waiting_queue_len': int(len(self.waiting_request_ids)),
'inference/total_requests_dict_size': int(len(self.requests)),
}

batch_dims = self.context.batch_dimensions
total_tokens = batch_dims.token_count if batch_dims else 0
if step_time > 0 and total_tokens > 0:
metrics['inference/tokens_per_sec_per_gpu'] = float(total_tokens / step_time)

if step_flops_info is not None:
step_tflops = step_flops_info['total_flops'] / 1e12
step_throughput = step_tflops / step_time if step_time > 0 else 0
metrics['inference/step_flops_tflop'] = float(step_tflops)
metrics['inference/throughput_tflops_per_gpu'] = float(step_throughput)
metrics['inference/t_avg'] = float(step_flops_info['t_avg'])
metrics['inference/cumulative_flops_tflop'] = float(self.cumulative_inference_flops / 1e12)
if self.gpu_peak_tflops > 0:
mfu = step_throughput / self.gpu_peak_tflops * 100.0
cumulative_throughput = (
(self.cumulative_inference_flops / 1e12) / self.cumulative_inference_time
if self.cumulative_inference_time > 0 else 0
)
cumulative_mfu = cumulative_throughput / self.gpu_peak_tflops * 100.0
metrics['inference/mfu_percent'] = float(mfu)
metrics['inference/cumulative_mfu_percent'] = float(cumulative_mfu)

# Add KV stats with inference/ prefix
# Convert utilization metrics from 0-1 range to 0-100 percentage range for better visualization
for key, value in context_state["kv_stats"].items():
Expand Down Expand Up @@ -1557,6 +1623,18 @@ async def async_bookkeep(
mem["reserved_bytes.all.current"] / (1024**3),
)
)
batch_dims = self.context.batch_dimensions
total_tokens = batch_dims.token_count if batch_dims else 0
if step_time > 0 and total_tokens > 0:
toks_per_sec_per_gpu = total_tokens / step_time
output_str += f" toks/s/GPU: {toks_per_sec_per_gpu:.0f},"
if step_flops_info is not None:
step_tflops = step_flops_info['total_flops'] / 1e12
step_throughput = step_tflops / step_time if step_time > 0 else 0
output_str += f" {step_throughput:.1f} TFLOP/s/GPU"
if self.gpu_peak_tflops > 0:
mfu = step_throughput / self.gpu_peak_tflops * 100.0
output_str += f", MFU: {mfu:.1f}%"
if context_state["is_decode_only"]:
output_str = f"\033[94m{output_str}\033[0m"
logging.info(output_str)
Expand Down
Loading