diff --git a/README.md b/README.md index d9dc6ea8..0f5d470e 100644 --- a/README.md +++ b/README.md @@ -1309,6 +1309,14 @@ template.delete() ``` ## Examples + +This repository provides both a **full end-to-end example walkthrough** of using the CLI for real-world training and inference workloads as well as **standalone** example notebooks for individual features. + +### End-to-End Walkthrough +[End-to-End Walkthrough Example](https://github.com/aws/sagemaker-hyperpod-cli/blob/main/examples/end_to_end_walkthrough/README.md) + +### Standalone Examples + #### Cluster Management Example Notebooks [CLI Cluster Management Example](https://github.com/aws/sagemaker-hyperpod-cli/blob/main/examples/cluster_management/cluster_creation_init_experience.ipynb) diff --git a/doc/examples.md b/doc/examples.md index a27b218f..18d4b392 100644 --- a/doc/examples.md +++ b/doc/examples.md @@ -1,6 +1,19 @@ (examples)= -# Example Notebooks +# End-to-End Example and Notebooks + +## End-to-End Walkthrough Example + +::::{grid} 1 +:gutter: 3 + +:::{grid-item-card} End-to-End Walkthrough Example +:link: https://github.com/aws/sagemaker-hyperpod-cli/blob/main/examples/cluster_management/cluster_creation_init_experience.ipynb +:class-card: sd-border-primary + +**End-to-End Walkthrough Example** End-to-End walkthrough showing cluster setup; training, inference and spaces deployment as well as task governance usage. +::: +:::: ## Cluster Management Example Notebooks diff --git a/examples/end_to_end_walkthrough/00-getting-started/00-setup.md b/examples/end_to_end_walkthrough/00-getting-started/00-setup.md new file mode 100644 index 00000000..5ddb99e5 --- /dev/null +++ b/examples/end_to_end_walkthrough/00-getting-started/00-setup.md @@ -0,0 +1,249 @@ +# Getting Started - HyperPod CLI End-to-End Walkthrough + +This page walks you through the configuration of your HyperPod cluster and ensures that it supports all examples in this walkthrough. + +## Prerequisites + +This end-to-end example assumes a HyperPod cluster setup in your AWS account. This can be done through the AWS console UI. Setup instructions can be found in the [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-eks-operate-console-ui-create-cluster.html). Alternatively, [01-(optional)-cluster-creation.md](./01-(optional)-cluster-creation.md) contains instructions on how to create a new HyperPod cluster using the CLI features. In both cases, the cluster should have the following minimum specifications to run the above examples. Please ensure that you have the required service quota available: +- General compute instance group with CPU-based nodes for operator pods + - Recommended instances: 2 `ml.t3.2xlarge` instances +- GPU worker instance group for worker pods + - Recommended instances: 3 `ml.g5.12xlarge` instances. +- FSx for Lustre filesystem linked to the cluster. The examples only consume very little storage, thus the minimum FSx filesystem size of `1.200 GB` is sufficient. +- HyperPod Inference Operator installation. This is automatically toggled at cluster creation time. For manual instructions please check the corresponding [AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-model-deployment-setup.html). + +Please ensure you have the following tools available in your local environment to be able to execute the examples. +- A local terminal with access to `python` version `>=3.10`. +- [`aws`](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) CLI version `>=2` +- [`kubectl`](https://kubernetes.io/docs/tasks/tools/) compatible to your EKS cluster k8s version + +## Installing the SageMaker HyperPod CLI + +You can install the SageMaker HyperPod CLI package from PyPi using `pip`. We recommended creating a clean virtual environment before installing the package. + +```bash +# Optional: Create a new virtual environment in the current directory and activate it +python -m venv .venv +source .venv/bin/activate + +# Install the CLI package +pip install sagemaker-hyperpod +``` +This will make the HyperPod CLI available through the `hyp` command in your terminal. +Additionally, the Python SDK can be imported as `sagemaker.hyperpod`. +You can show the available commands and verify that the installation completed successfully by +running the following: +```bash +hyp --help +``` + +Show the current CLI version and template versions. +```bash +hyp --version +``` + +### Connecting to an existing cluster + +List your existing cluster(s) by running the following command. The `--region` flag is optional, if it is not provided, the default region of your `aws` CLI configuration will be used. +```bash +hyp list-cluster --region +``` + +This will show you an output, similar to the following if you already have an existing cluster: +```bash +[ + { + "Cluster": "hp-eks-test-cluster", + "Instances": [ + { + "InstanceType": "ml.t3.xlarge", + "TotalNodes": 2, + "AcceleratorDevicesAvailable": "N/A", + "NodeHealthStatus=Schedulable": 2, + "DeepHealthCheckStatus=Passed": "N/A" + }, + { + "InstanceType": "ml.g5.12xlarge", + "TotalNodes": 2, + "AcceleratorDevicesAvailable": 8, + "NodeHealthStatus=Schedulable": 2, + "DeepHealthCheckStatus=Passed": "N/A" + } + ] + }, +] +``` + +Set the cluster context to a specific cluster to be used with other CLI commands, e.g. for submitting training jobs: +```bash +hyp set-cluster-context --cluster-name --region +``` + +The above command modifies your `.kube/config`. Thus this cluster context will also be used +by general kubernetes tools such as `kubectl` and `k9s`. + +You can retrieve the current cluster context by running: +```bash +hyp get-cluster-context +``` + +## Installing required HyperPod Add-ons + +For the examples in this repository, three Hyperpod add-ons are required: (1) HyperPod Training Operator, (2) HyperPod Task Governance and (3) SageMaker Spaces. + +### HyperPod Training Operator + +The HyperPod training operator helps you accelerate generative AI model development by efficiently managing distributed training across large GPU clusters. The easiest way to install the training operator is through the HyperPod cluster UI in the AWS console: +- Navigate to your HyperPod cluster in the SageMaker part of the AWS console +- In the Dashboard, scroll down to find the Amazon SageMaker HyperPod training operator installation option +- Click on **Install** + +![Training Operator Installation](../images/hpto_install.png) + +Alternatively, the training operator add-on can be installed via the AWS CLI. Further information on the setup can be found in the corresponding [AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-eks-operator-install.html). + +The training operator does not require additional configuration. + +### HyperPod Task Governance + +SageMaker HyperPod task governance is a robust management system designed to streamline resource allocation and ensure efficient utilization of compute resources across teams and projects for your Amazon EKS clusters. This provides administrators with the capability to set: +- Priority levels for various tasks +- Compute allocation for each team +- How each team lends and borrows idle compute +- If a team can preempt their own tasks + +The task governance utilizes `kueue` to implement these functionalities. + +#### Installation +The easiest way to install the task governance add-on is through the HyperPod cluster UI in the AWS console: +- Navigate to your HyperPod cluster in the SageMaker part of the AWS console +- In the Dashboard, scroll down to find the Amazon SageMaker HyperPod Task Governance installation option +- Click on **Install** + +![Task Governance Installation](../images/task_governance_install.png) + +Alternatively, the task governance add-on can be installed via the AWS CLI. Further information on the setup can be found in the corresponding [AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-eks-operate-console-ui-governance.html). + +#### Configuration +For the examples in this repository, we need to configure two aspects of task governance: **(1) Cluster scheduler policy** and **(2) Team compute allocations**. Lastly, we need to adjust the FSx filesystem access accordingly. +While both can be configured through the AWS console, in the "Policies" tab of your HyperPod cluster, we will configure them through CLI commands for easy reproducability. + +Setup the required environment variable +```bash +HYPERPOD_CLUSTER_ARN=$(aws sagemaker list-clusters --name-contains | jq -r '.ClusterSummaries[0].ClusterArn') +``` + +**Cluster Scheduler Policy** + +To update how tasks are prioritized and how idle compute is allocated, apply a Cluster Scheduler Policy using the following configuration: +```bash +aws sagemaker \ + create-cluster-scheduler-config \ + --name "example-cluster-scheduler-config" \ + --cluster-arn $HYPERPOD_CLUSTER_ARN \ + --scheduler-config "PriorityClasses=[{Name=inference,Weight=90},{Name=experimentation,Weight=80},{Name=fine-tuning,Weight=50},{Name=training,Weight=70}],FairShare=Enabled" +``` +To verify creation, run: +```bash +aws sagemaker list-cluster-scheduler-configs --name-contains example-cluster-scheduler-config +``` +You should see `Status: Created`. +Sometimes if we try to create the Scheduler Config before Task Governance is completely installed, it will lead to `CreateFailed`. To resolve, delete the scheduler config. After deleting, please recreate the policy using the aforementioned command. +```bash +SCHEDULER_CONFIG_ID=$(aws sagemaker list-cluster-scheduler-configs --name-contains example-cluster-scheduler-config | jq -r '.ClusterSchedulerConfigSummaries[0].ClusterSchedulerConfigId') + +aws sagemaker delete-cluster-scheduler-config --cluster-scheduler-config-id $SCHEDULER_CONFIG_ID +``` + +This CLI command will output two values: `CreateSchedulerConfigArn` and `ClusterSchedulerConfigId`. This will generate a cluster policy with fair sharing enabled and the following priority classes, which can be viewed by navigating to your HyperPod cluster in the SageMaker part of the AWS console and selecting the tab **"Policies"**: + +![Cluster Scheduler Policy](../images/cluster-policy-example.png) + +**Team compute allocations** + +Each team requires a Compute Allocation to manage their compute capacity. Both teams will have 1 instances allocated, 0 fair-share weight, and 100% borrowing capability. Please adapt the `InstanceType` accordingly if you are using different instance types. + +```bash +aws sagemaker \ + create-compute-quota \ + --name "Team-A" \ + --cluster-arn $HYPERPOD_CLUSTER_ARN \ + --compute-quota-config "ComputeQuotaResources=[{InstanceType=ml.g5.12xlarge,Count=1}],ResourceSharingConfig={Strategy=LendAndBorrow,BorrowLimit=100},PreemptTeamTasks=LowerPriority" \ + --activation-state "Enabled" \ + --compute-quota-target "TeamName=team-a,FairShareWeight=0" +``` + +```bash +aws sagemaker \ + create-compute-quota \ + --name "Team-B" \ + --cluster-arn $HYPERPOD_CLUSTER_ARN \ + --compute-quota-config "ComputeQuotaResources=[{InstanceType=ml.g5.12xlarge,Count=1}],ResourceSharingConfig={Strategy=LendAndBorrow,BorrowLimit=100},PreemptTeamTasks=LowerPriority" \ + --activation-state "Enabled" \ + --compute-quota-target "TeamName=team-b,FairShareWeight=0" +``` + +This will create the team-specific compute quotas as well as the corresponding Kubernetes namespaces in the cluster with the HyperPod naming scheme. In this example, `hyperpod-ns-team-a` and `hyperpod-ns-team-b` are created. This can again be viewed in the **"Policies"** tab of your HyperPod cluster in the AWS console: + +![Team Quotas](../images/team-quotas.png) + +#### FSx for Lustre Configuration + +In this section you will link your cluster's FSx for Lustre filesystem to an S3 bucket via Data Repository Association and enable the newly created Task Governance team namespaces to access the filesytem. + +**Setting up the Data Repository Association (DRA)** + +While the FSx filesystem is typically created automatically at cluster creation as described in the [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-eks-operate-console-ui-create-cluster.html), we need to additionally link it to an S3 bucket so we can easily transfer the training data and scripts to it. This is done through FSx Data Repository Association (DRA), more information is available in the [AWS Documentation](https://docs.aws.amazon.com/fsx/latest/LustreGuide/create-dra-linked-data-repo.html). + +For this, you will need an S3 bucket that will be synced with your FSx for Lustre filesystem. This can be an existing bucket or a newly created one. Once you have selected an S3 bucket, navigate to your FSx filesystem in the AWS console and select the **Data Repository** tab. Consequently click on **Create data repository association**. + +![FSx DRA](../images/dra-console.png) + +In the following form, enter the name of your S3 bucket in **Data repository path** and choose the root path as the **File system path**. All other options can be left with their defaults. Finally, click on **Create**. The DRA will take a few minutes to get created and you can monitor the status in the AWS console on your FSx filesystem's page. + +![FSx S3](../images/dra-s3.png) + +**Making FSx available to Task Governance namespaces** + +The cluster creation workflow will automatically make the FSx filesystem available as a `PersistentVolumeClaim` in the clusters `default` namespace. To run the team-specific examples, we need to make it available to the newly created namespaces as well. + +Retrieve the details of your current FSx Fileystem by navigating to Amazon FSx in the AWS console and clicking on the FSx for Lustre filesystem belonging to your HyperPod cluster. Then, setup the following environment variables with the respective values: +```bash +export FSX_FILESYSTEM_ID="PLEASE_FILL_IN" +export FSX_DNS_NAME="PLEASE_FILL_IN" +export FSX_MOUNT_NAME="PLEASE_FILL_IN" +export FSX_CAPACITY_GIB="1200" # Change if required + +export TEAM_A_NAMESPACE="hyperpod-ns-team-a" +export TEAM_B_NAMESPACE="hyperpod-ns-team-b" +``` + +Use `envsubst` to replace the placeholders in the provided template with your environment variables: +```bash +envsubst < ./00-getting-started/kubernetes/fsx-task-governance-template.yaml > fsx-pv-pvc.yaml +``` + +Apply the manifest to your cluster: +```bash +kubectl apply -f fsx-pv-pvc.yaml +``` + +This will create both `PersistentVolumes` and `PersistenVolumeClaims` to your FSx filesystem in the newly created namespaces. + + +### SageMaker Spaces +SageMaker Spaces allows AI developers to run their interactive machine learning workloads directly on the HyperPod EKS cluster through IDEs and notebooks. + +The easiest way to install the Spaces add-on is through the HyperPod cluster UI in the AWS console: +- Navigate to your HyperPod cluster in the SageMaker part of the AWS console +- In the Dashboard of your HyperPod cluster, select the **IDE and Notebooks** tab. +- Click on **Quick Install** + +![Spaces add-on installation](../images/spaces-install.png) + +The installation will take a few minutes. After successfull installation, you can see the status in the console UI: +![Spaces add-on installation success](../images/spaces-success.png) + +No further setup is required at this point, the actual code editor space will be set up in the example [04-spaces/00-create-code-editor-space.md](../04-spaces/00-create-code-editor-space.md). + +Further information on the setup and usage of SageMaker spaces on HyperPod can be found in the corresponding [AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-eks-cluster-ide.html). \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/00-getting-started/01-(optional)-cluster-creation.md b/examples/end_to_end_walkthrough/00-getting-started/01-(optional)-cluster-creation.md new file mode 100644 index 00000000..fcd01ac8 --- /dev/null +++ b/examples/end_to_end_walkthrough/00-getting-started/01-(optional)-cluster-creation.md @@ -0,0 +1,42 @@ +# Creating a HyperPod cluster using the HyperPod CLI - HyperPod CLI End-to-End Walkthrough +HyperPod clusters can generally be created using the console UI. +While this provides a convenient and user-friendly way for cluster creation, +the HyperPod CLI also provides cluster creation functionality, through a configuration +file-based, repeatable interface. + +Initialize a HyperPod cluster stack configuration in a new directory by running +the following. Note that this will only create the configuration files and not yet +start the actual cluster creation. +``` +mkdir cluster-stack && cd cluster-stack + +hyp init cluster-stack +``` + +This will create three files in the new directory: +- `cfn_params.jinja` - CloudFormation template for the HyperPod cluster stack +- `config.yaml` - Configuration file that contains the values for the CloudFormation +- `README.md` - Usage instructions for this functionality + +The configuration parameters can be either modified directly in the `config.yaml` or via +the CLI by executing `hyp configure -- ` which provides +additional validation. + +For example, you can configure the stack to use a specific prefix for the names of the resources to be created +as well as a specific kubernetes version by running the following. +``` +hyp configure --resource-name-prefix my-cli-cluster +hyp configure --kubernetes-version 1.33 +``` + +Validate the values in `config.yaml` by running: +``` +hyp validate +``` + +Finally, submit the cluster creation stack to CloudFormation by running: +``` +hyp create +``` + +The final, submitted CloudFormation template will be stored for reference in `./run//k8s.yaml`. \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/00-getting-started/README.MD b/examples/end_to_end_walkthrough/00-getting-started/README.MD new file mode 100644 index 00000000..e53090bc --- /dev/null +++ b/examples/end_to_end_walkthrough/00-getting-started/README.MD @@ -0,0 +1,6 @@ +# Cluster and Environment Setup - Amazon SageMaker HyperPod CLI Examples + +This folder contains the following files: + +- [00-setup.md](00-setup.md) - Instructions on how to install the CLI and setting up prerequisites you should have in your AWS account for running the examples. +- [01-(optional)-cluster-creation.md](01-(optional)-cluster-creation.md) - Optional instructions on how to create a new HyperPod cluster using the CLI. Alternatively an existing cluster can be used for the examples, or one can be created using the console UI. \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/00-getting-started/kubernetes/fsx-task-governance-template.yaml b/examples/end_to_end_walkthrough/00-getting-started/kubernetes/fsx-task-governance-template.yaml new file mode 100644 index 00000000..11faa9b4 --- /dev/null +++ b/examples/end_to_end_walkthrough/00-getting-started/kubernetes/fsx-task-governance-template.yaml @@ -0,0 +1,69 @@ +apiVersion: v1 +kind: PersistentVolume +metadata: + name: fsx-pv-${TEAM_A_NAMESPACE} +spec: + capacity: + storage: ${FSX_CAPACITY_GIB}Gi + volumeMode: Filesystem + accessModes: + - ReadWriteMany + mountOptions: + - flock + persistentVolumeReclaimPolicy: Retain + storageClassName: "" # static PV (no dynamic provisioning) + csi: + driver: fsx.csi.aws.com + volumeHandle: ${FSX_FILESYSTEM_ID} + volumeAttributes: + dnsname: ${FSX_DNS_NAME} + mountname: ${FSX_MOUNT_NAME} +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: fsx-pv-${TEAM_B_NAMESPACE} +spec: + capacity: + storage: ${FSX_CAPACITY_GIB}Gi + volumeMode: Filesystem + accessModes: + - ReadWriteMany + mountOptions: + - flock + persistentVolumeReclaimPolicy: Retain + storageClassName: "" # static PV (no dynamic provisioning) + csi: + driver: fsx.csi.aws.com + volumeHandle: ${FSX_FILESYSTEM_ID} + volumeAttributes: + dnsname: ${FSX_DNS_NAME} + mountname: ${FSX_MOUNT_NAME} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: fsx-pvc-${TEAM_A_NAMESPACE} + namespace: ${TEAM_A_NAMESPACE} +spec: + accessModes: + - ReadWriteMany + storageClassName: "" # must match PV + resources: + requests: + storage: ${FSX_CAPACITY_GIB}Gi + volumeName: fsx-pv-${TEAM_A_NAMESPACE} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: fsx-pvc-${TEAM_B_NAMESPACE} + namespace: ${TEAM_B_NAMESPACE} +spec: + accessModes: + - ReadWriteMany + storageClassName: "" # must match PV + resources: + requests: + storage: ${FSX_CAPACITY_GIB}Gi + volumeName: fsx-pv-${TEAM_B_NAMESPACE} diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/00-pytorch-training-job.md b/examples/end_to_end_walkthrough/01-training-job-submission/00-pytorch-training-job.md new file mode 100644 index 00000000..4383bb50 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/00-pytorch-training-job.md @@ -0,0 +1,247 @@ +# Submitting a PyTorch Training Job - HyperPod CLI End-to-End Walkthrough + +This example shows how to fine-tune a **Qwen3 4B Thinking** model using PyTorch FSDP and QLora on your HyperPod cluster. + +In the following you will: +- Create a sample dataset for training and upload it to Amazon S3. +- Upload the training script and it's configuration to Amazon S3. +- Create a training docker image and push it to Amazon ECR. +- Submit the job to your cluster using the CLI and monitor it's progress. + +This example assumes that you completed the setup instructions in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). + +## Training Dataset + +To create the training dataset for this example, please run the following script, +which will download and pre-process the [`interstellarninja/hermes_reasoning_tool_use`](https://huggingface.co/datasets/interstellarninja/hermes_reasoning_tool_use) dataset +to the current local directory under `./data/`. +```bash +pip install -r ./scripts/requirements.txt +python ./scripts/create-training-dataset.py +``` + +Lastly, upload the dataset to the Amazon S3 bucket, which is connected to your HyperPod cluster's FSx for Lustre Filesystem +through DRA. +```bash +S3_BUCKET_NAME="PLEASE_FILL_IN" +S3_PREFIX="qwen-cli-example" +S3_PATH="s3://$S3_BUCKET_NAME/$S3_PREFIX" + +aws s3 sync ./data/ $S3_PATH/dataset +``` + +Verify that the upload was successful by running: +```bash +aws s3 ls $S3_PATH --recursive +``` + +## Training Script and Configuration +The training script is already pre-configured with default values in `./training_scripts/args.yaml` +which you do not need to adapt. + +You can optionally add configurations for **MLFlow tracking**. To utilize MLFlow tracking, open `./training_scripts/args.yaml` +in an editor, e.g. `vscode`, and configure the following two fields: +```yaml +mlflow_uri: "" # MLflow tracking server URI +mlflow_experiment_name: "" # MLflow experiment name +``` + +Upload the training script and configuration to S3 using the following command: +```bash +aws s3 sync ./training_scripts/ $S3_PATH/scripts/ +``` + +## Training Docker Image + +The example uses the `pytorch-training:2.8.0-gpu-py312-cu129-ubuntu22.04-ec2` docker image, +provided by AWS as a base image. This example provides a Dockerfile that extends this image +with the specific requirements of the training job, the training python script as well as the HyperPod Elastic Agent. + +Please note that the docker images are multiple gigabytes in size, thus the following process will take some time depending on your network speed. Alternatively, this can be executed in an EC2 instance or SageMaker AI Studio instance. + +Create a new ECR if required and login to it. +```bash +AWS_REGION="PLEASE_FILL_IN" +AWS_ACCOUNT_ID="PLEASE_FILL_IN" + +DOCKER_IMAGE_TAG="pytorch2.8-cu129" +ECR_NAME="qwen3-finetuning" + +ECR_URI=$AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME + +aws ecr create-repository --repository-name $ECR_NAME --region $AWS_REGION 2>&1 | grep -v "RepositoryAlreadyExistsException" || echo "Repository already exists or created successfully" + +aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin $ECR_URI +aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-west-2.amazonaws.com +``` + +Build the image and push it to ECR: +```bash +cd docker + +docker build --platform linux/amd64 -t $ECR_URI:$DOCKER_IMAGE_TAG . +docker push $ECR_URI:$DOCKER_IMAGE_TAG + +cd .. +``` + +## Submit and monitor the training job (💻) + +You can submit the job to your HyperPod cluster by running the following CLI command. This example +assumes that your cluster is configured with at least 2 `ml.g5.12xlarge` instances. Please adapt the command accordingly +if you are using different instances. +```bash +JOB_NAME=qwen3-4b-thinking-2507-fsdp + +hyp create hyp-pytorch-job \ + --job-name $JOB_NAME \ + --image $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME:$DOCKER_IMAGE_TAG \ + --command "[hyperpodrun, --nnodes=2:2, --nproc_per_node=4, /data/$S3_PREFIX/scripts/train.py]" \ + --args "[--config, /data/$S3_PREFIX/scripts/args.yaml]" \ + --environment '{"LOGLEVEL": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", "NCCL_DEBUG": "INFO", "NCCL_SOCKET_IFNAME": "^lo", "TORCH_NCCL_ASYNC_ERROR_HANDLING": "1", "FI_PROVIDER": "efa", "FI_EFA_FORK_SAFE": "1", "NCCL_PROTO": "simple"}' \ + --pull-policy "IfNotPresent" \ + --instance-type "ml.g5.12xlarge" \ + --node-count 2 \ + --tasks-per-node 4 \ + --deep-health-check-passed-nodes-only false \ + --max-retry 100 \ + --volume name=shmem,type=hostPath,mount_path=/dev/shm,path=/dev/shm,read_only=false \ + --volume name=local,type=hostPath,mount_path=/local,path=/mnt/k8s-disks/0,read_only=false \ + --volume name=fsx-volume,type=pvc,mount_path=/data,claim_name=fsx-claim,read_only=false \ + --namespace default +``` + +After the job got submitted successfully, you can list the jobs running on the cluster and monitor their status +using the following command: +```bash +hyp list hyp-pytorch-job +``` + +Describe the job details by running: +```bash +hyp describe hyp-pytorch-job --job-name $JOB_NAME +``` + +List the pods of the job by running: +```bash +hyp list-pods hyp-pytorch-job --job-name $JOB_NAME +``` + +Get the logs for the job, from a specific pod: +```bash +hyp get-logs hyp-pytorch-job --job-name $JOB_NAME --pod-name $JOB_NAME-pod-0 +``` + +Cancel and delete the job to free up the cluster resources by running: +```bash +hyp delete hyp-pytorch-job --job-name $JOB_NAME +``` + +## (Optional) Submit the training job by creating a customizable template (💻) + +Alternatively to creating a training job via the `hyp create hyp-pytorch-job` command +above, the HyperPod CLI also enables a configuration file-based workflow that allows +for easy reproducability as well as further customization options as the Kubernetes +manifest template is directly exposed to the user. + +Initialize a HyperPod pytorch training job configuration in a new directory by running: +```bash +mkdir pytorch-job-config && cd pytorch-job-config + +hyp init hyp-pytorch-job +``` + +This will create three files in the new directory: +- `k8s.jinja` - Kubernetes template for a `HyperPodPyTorchJob` resource +- `config.yaml` - Configuration file that contains the values for the Kubernetes template +- `README.md` - Usage instructions for this functionality + +The configuration parameters can be either modified directly in the `config.yaml` or via +the CLI by executing `hyp configure -- ` which provides +additional validation. + +To reproduce the earlier training job example, run the following commands: +```bash +hyp configure --job-name $JOB_NAME +hyp configure --image $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME:$DOCKER_IMAGE_TAG +hyp configure --command "[hyperpodrun, --nnodes=2:2, --nproc_per_node=4, /data/$S3_PREFIX/scripts/train.py]" +hyp configure --args "[--config, /data/$S3_PREFIX/scripts/args.yaml]" +hyp configure --environment '{"LOGLEVEL": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", "NCCL_DEBUG": "INFO", "NCCL_SOCKET_IFNAME": "^lo", "TORCH_NCCL_ASYNC_ERROR_HANDLING": "1", "FI_PROVIDER": "efa", "FI_EFA_FORK_SAFE": "1", "NCCL_PROTO": "simple"}' +hyp configure --pull-policy IfNotPresent +hyp configure --instance-type ml.g5.12xlarge +hyp configure --node-count 2 +hyp configure --tasks-per-node 4 +hyp configure --deep-health-check-passed-nodes-only false +hyp configure --max-retry 100 +hyp configure --volume '[{"name":"shmem","type":"hostPath","mount_path":"/dev/shm","path":"/dev/shm","read_only":false},{"name":"local","type":"hostPath","mount_path":"/local","path":"/mnt/k8s-disks/0","read_only":false},{"name":"fsx-volume","type":"pvc","mount_path":"/data","claim_name":"fsx-claim","read_only":false}]' +hyp configure --namespace default +``` + +If specific Kubernetes entities or configuration entries need to be set that are not supported as CLI arguments, +the template in `k8s.jinja` can be modified in addition. This can be useful if you for example require additional label definitions in `metadata:` or other settings to comply with enterprise policies. + +View the following files in an editor of your choice to see the configuration before submitting: +``` +./k8s.jinja +./config.yaml +``` + +Validate the values in `config.yaml` by running: +```bash +hyp validate +``` + +Submit the job to the cluster by running: +```bash +hyp create +``` + +The final, submitted Kubernetes manifest will be stored for reference in `./run//k8s.yaml` which you can again view in your favorite editor. + +## (Optional) Simulate an instance failure (💻) + +To validate HyperPod’s fault-tolerance behavior, you can emulate an instance reboot +(e.g., due to a GPU failure) by marking one of the nodes in your cluster as +`UnschedulablePendingReboot`. This will trigger HyperPod to detect the fault and, +if a spare instance is available, restart the job on a healthy node. + +Run the following command to see the pods that are part of the current training job and the node that they are running on: +```bash +kubectl get pods -o wide +``` + +This will output something similar to the following: +```bash +NAME READY STATUS RESTARTS AGE IP NODE +qwen3-4b-thinking-2507-fsdp-pod-0 1/1 Running 0 3m53s 10.4.224.228 hyperpod-i-00b865d0114cabc20 +qwen3-4b-thinking-2507-fsdp-pod-1 1/1 Running 0 3m53s 10.4.142.65 hyperpod-i-0974a98e3fc0da67b +``` + +Mark a node as pending reboot: +```bash +NODE=hyperpod-i-00b865d0114cabc20 # Replace this with one of your worker node IDs + +kubectl label node "$NODE" \ + sagemaker.amazonaws.com/node-health-status=UnschedulablePendingReboot \ + --overwrite=true +``` + +Check the pod and node status again. The pod on the faulty node will restarted on the spare node: +```bash +kubectl get pods -o wide +``` + +Describe the job to verify that the fault has been detected and that recovery is in progress: +``` +hyp describe hyp-pytorch-job --job-name $JOB_NAME +``` + +Optionally, run a command inside one of the job’s pods (for example, to inspect GPU state): +``` +hyp exec hyp-pytorch-job --job-name $JOB_NAME --pod $JOB_NAME-pod-0 -- nvidia-smi +``` + +Lastly, cancel and delete the job to free up the cluster resources by running: +```bash +hyp delete hyp-pytorch-job --job-name $JOB_NAME +``` diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/01-pytorch-training-job-sdk.ipynb b/examples/end_to_end_walkthrough/01-training-job-submission/01-pytorch-training-job-sdk.ipynb new file mode 100644 index 00000000..cf8eb02c --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/01-pytorch-training-job-sdk.ipynb @@ -0,0 +1,248 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d0704685", + "metadata": {}, + "source": [ + "# Submitting a PyTorch Training Job - HyperPod CLI End-to-End Walkthrough\n", + "\n", + "This example shows how to fine-tune a **Qwen3 4B Thinking** model using PyTorch FSDP and QLora on your HyperPod cluster.\n", + "\n", + "This example assumes that you completed the **Setup instructions** in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md) as well as the **Training Dataset** and **Training Docker Image** steps in [00-pytorch-training-job.md](00-pytorch-training-job.md)." + ] + }, + { + "cell_type": "markdown", + "id": "ece29d2a", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "source": [ + "### Import necessary classes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09e23d2f", + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.hyperpod.training import (\n", + " HyperPodPytorchJob,\n", + " Containers,\n", + " ReplicaSpec,\n", + " Resources,\n", + " RunPolicy,\n", + " Spec,\n", + " Template,\n", + " Volumes,\n", + " VolumeMounts,\n", + ")\n", + "from sagemaker.hyperpod.common.config import Metadata" + ] + }, + { + "cell_type": "markdown", + "id": "00703dfd", + "metadata": {}, + "source": [ + "### Define the environment variables\n", + "\n", + "Please use the values according to the steps executed in [00-pytorch-training-job.md](00-pytorch-training-job.md)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "081c1821", + "metadata": {}, + "outputs": [], + "source": [ + "# Configuration variables\n", + "AWS_REGION = \"PLEASE_FILL_IN\"\n", + "AWS_ACCOUNT_ID = \"PLEASE_FILL_IN\"\n", + "\n", + "S3_PREFIX = \"qwen-cli-example\"\n", + "ECR_NAME = \"qwen3-finetuning\"\n", + "DOCKER_IMAGE_TAG = \"pytorch2.8-cu129\"\n", + "JOB_NAME = \"qwen3-4b-thinking-2507-fsdp\"\n", + "\n", + "IMAGE_URI = f\"{AWS_ACCOUNT_ID}.dkr.ecr.{AWS_REGION}.amazonaws.com/{ECR_NAME}:{DOCKER_IMAGE_TAG}\"" + ] + }, + { + "cell_type": "markdown", + "id": "7f277ec8", + "metadata": {}, + "source": [ + "### Define the training job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fd3c01d1", + "metadata": {}, + "outputs": [], + "source": [ + "pytorch_job = HyperPodPytorchJob(\n", + " metadata=Metadata(name=JOB_NAME, namespace=\"default\"),\n", + " nproc_per_node=\"4\",\n", + " replica_specs=[\n", + " ReplicaSpec(\n", + " name=\"pod\",\n", + " replicas=2,\n", + " template=Template(\n", + " spec=Spec(\n", + " containers=[\n", + " Containers(\n", + " name=\"training-container\",\n", + " image=IMAGE_URI,\n", + " image_pull_policy=\"IfNotPresent\",\n", + " command=[\"hyperpodrun\", \"--nnodes=2:2\", \"--nproc_per_node=4\", f\"/data/{S3_PREFIX}/scripts/train.py\"],\n", + " args=[\"--config\", f\"/data/{S3_PREFIX}/scripts/args.yaml\"],\n", + " env=[\n", + " {\"name\": \"LOGLEVEL\", \"value\": \"INFO\"},\n", + " {\"name\": \"PYTORCH_CUDA_ALLOC_CONF\", \"value\": \"expandable_segments:True\"},\n", + " {\"name\": \"NCCL_DEBUG\", \"value\": \"INFO\"},\n", + " {\"name\": \"NCCL_SOCKET_IFNAME\", \"value\": \"^lo\"},\n", + " {\"name\": \"TORCH_NCCL_ASYNC_ERROR_HANDLING\", \"value\": \"1\"},\n", + " {\"name\": \"FI_PROVIDER\", \"value\": \"efa\"},\n", + " {\"name\": \"FI_EFA_FORK_SAFE\", \"value\": \"1\"},\n", + " {\"name\": \"NCCL_PROTO\", \"value\": \"simple\"},\n", + " ],\n", + " resources=Resources(\n", + " requests={\"nvidia.com/gpu\": \"4\"},\n", + " limits={\"nvidia.com/gpu\": \"4\"},\n", + " ),\n", + " volume_mounts=[\n", + " VolumeMounts(name=\"shmem\", mount_path=\"/dev/shm\"),\n", + " VolumeMounts(name=\"local\", mount_path=\"/local\"),\n", + " VolumeMounts(name=\"fsx-volume\", mount_path=\"/data\"),\n", + " ],\n", + " )\n", + " ],\n", + " volumes=[\n", + " Volumes(name=\"shmem\", host_path={\"path\": \"/dev/shm\"}),\n", + " Volumes(name=\"local\", host_path={\"path\": \"/mnt/k8s-disks/0\"}),\n", + " Volumes(name=\"fsx-volume\", persistent_volume_claim={\"claim_name\": \"fsx-claim\"}),\n", + " ]\n", + " )\n", + " ),\n", + " )\n", + " ],\n", + " run_policy=RunPolicy(\n", + " clean_pod_policy=\"None\",\n", + " job_max_retry_count=100\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "9a136da1", + "metadata": {}, + "source": [ + "### Submit the training job to the HyperPod cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0b3c3a67", + "metadata": {}, + "outputs": [], + "source": [ + "pytorch_job.create()" + ] + }, + { + "cell_type": "markdown", + "id": "7069549d", + "metadata": {}, + "source": [ + "### Monitor the job status" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "522aeadb", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"List all jobs:\")\n", + "print(HyperPodPytorchJob.list())\n", + "\n", + "print(\"\\nRefresh job and check status:\")\n", + "pytorch_job.refresh()\n", + "print(pytorch_job.status)" + ] + }, + { + "cell_type": "markdown", + "id": "49bcacb8", + "metadata": {}, + "source": [ + "### List pods and show the training logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "43b1c723", + "metadata": {}, + "outputs": [], + "source": [ + "print(\"List all pods:\")\n", + "print(pytorch_job.list_pods())\n", + "\n", + "print(\"\\nLogs from pod-0:\")\n", + "print(pytorch_job.get_logs_from_pod(f\"{JOB_NAME}-pod-0\"))" + ] + }, + { + "cell_type": "markdown", + "id": "4db357b4", + "metadata": {}, + "source": [ + "### Delete the Job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "53ffeb7b", + "metadata": {}, + "outputs": [], + "source": [ + "pytorch_job.delete()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/README.MD b/examples/end_to_end_walkthrough/01-training-job-submission/README.MD new file mode 100644 index 00000000..f23919b9 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/README.MD @@ -0,0 +1,5 @@ +# PyTorch Training Job - HyperPod CLI End-to-End Walkthrough + +This folder contains the following files: +- [00-pytorch-training-job.md](00-pytorch-training-job.md) - Instructions on how to create and submit a **Qwen3 4B Lora** fine-tuning job to the HyperPod cluster through the HyperPod CLI. Additionally, an example for instance failure recovery. +- [01-pytorch-training-job-sdk.ipynb](01-pytorch-training-job-sdk.ipynb) - Instructions on how to to utilize the HyperPod Python SDK to create and submit the equivalent job to the HyperPod cluster. \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/docker/Dockerfile b/examples/end_to_end_walkthrough/01-training-job-submission/docker/Dockerfile new file mode 100644 index 00000000..b5a01938 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/docker/Dockerfile @@ -0,0 +1,13 @@ +FROM 763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:2.8.0-gpu-py312-cu129-ubuntu22.04-ec2 + +# Set working directory +WORKDIR /workspace + +# Copy requirements file +COPY ./requirements.txt /workspace/requirements.txt + +# Install Python dependencies +RUN pip install --no-cache-dir -r /workspace/requirements.txt + +# Set default command +CMD ["/bin/bash"] diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/docker/requirements.txt b/examples/end_to_end_walkthrough/01-training-job-submission/docker/requirements.txt new file mode 100644 index 00000000..57599a38 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/docker/requirements.txt @@ -0,0 +1,26 @@ +python-etcd +fsspec==2024.12.0 +transformers==4.55.0 +peft==0.17.0 +accelerate==1.10.0 +bitsandbytes==0.46.1 +datasets==4.0.0 +deepspeed==0.17.5 +evaluate==0.4.5 +huggingface_hub[hf_transfer] +kernels>=0.9.0 +mlflow +safetensors>=0.6.2 +sagemaker==2.251.1 +sagemaker-mlflow==0.1.0 +sentencepiece==0.2.0 +scikit-learn==1.7.1 +tokenizers>=0.21.4 +triton +trl==0.21.0 +psutil +py7zr +nvidia-ml-py +wandb +git+https://github.com/triton-lang/triton.git@main#subdirectory=python/triton_kernels +hyperpod-elastic-agent \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/scripts/create-training-dataset.py b/examples/end_to_end_walkthrough/01-training-job-submission/scripts/create-training-dataset.py new file mode 100644 index 00000000..a2811b2b --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/scripts/create-training-dataset.py @@ -0,0 +1,188 @@ +import json +import re +import textwrap +import pandas as pd +from sklearn.model_selection import train_test_split +from transformers import AutoTokenizer +from datasets import load_dataset, Dataset, DatasetDict + +def extract_think_content(text): + match = re.search(r"(.*?)", text, re.DOTALL) + return match.group(1).strip() if match else None + + +def extract_tool_call_content(text): + match = re.search(r"(.*?)", text, re.DOTALL) + return match.group(1).strip() if match else None + + +def extract_tool_call_response(text): + match = re.search(r"(.*?)", text, re.DOTALL) + return match.group(1).strip() if match else None + + +def extract_content_parts(text): + think_content = extract_think_content(text) + tool_call_content = extract_tool_call_content(text) + + # Remove both tags and their content to get the rest + rest_content = re.sub(r".*?", "", text, flags=re.DOTALL) + rest_content = re.sub( + r".*?", "", rest_content, flags=re.DOTALL + ) + rest_content = rest_content.strip() if rest_content.strip() else None + + return think_content, tool_call_content, rest_content + + +def remove_empty_think_tags(text): + return re.sub(r"\s*", "", text) + + +def validate_messages(conversations): + has_tool_call = False + for message in conversations: + if message["from"] == "gpt": + tool_call_content = extract_tool_call_content(message["value"]) + if tool_call_content: + try: + for line in tool_call_content.strip().split("\n"): + if line.strip(): + json.loads(line.strip()) + has_tool_call = True + except json.JSONDecodeError: + return False + else: + has_tool_call = False + elif message["from"] == "tool": + if not has_tool_call: + return False + return True + +def prepare_dataset(sample, tokenizer): + + if not validate_messages(sample["conversations"]): + return {"text": None} + + messages = [] + + system_text = f""" + You are a deep thinking AI, you may use extremely long chains of thought to deeply consider the problem and deliberate with yourself via systematic reasoning processes to help come to a correct solution prior to answering. + You should enclose your thoughts and internal monologue inside tags, and then provide your solution or response to the problem. + You are a function calling AI model. You are provided with function signatures within XML tags. You may call one or more functions to assist with the user query. + If available tools are not relevant in assisting with user query, just respond in natural conversational language. Don't make assumptions about what values to plug into functions. + After calling & executing the functions, you will be provided with function results within XML tags. + " + {json.loads(sample["tools"])}, + + """ + + system_text = textwrap.dedent(system_text).strip() + + messages.append({"role": "system", "content": system_text}) + + for message in sample["conversations"]: + if message["from"] == "human": + messages.append({"role": "user", "content": message["value"]}) + elif message["from"] == "gpt": + think_content, tool_call_content, rest_content = extract_content_parts( + message["value"] + ) + + assistant_msg = { + "role": "assistant", + "content": "", # Always provide content field + } + + if think_content: + assistant_msg["reasoning_content"] = think_content + + if tool_call_content: + assistant_msg["tool_calls"] = [ + { + "type": "function", + "function": json.loads(tool_call_content), + } + ] + + if rest_content: + assistant_msg["content"] = rest_content + + messages.append(assistant_msg) + elif message["from"] == "tool": + tool_response_text = extract_tool_call_response(message["value"]) + # Handle the case where it's a string representation of a dict + if tool_response_text.startswith("{'") and tool_response_text.endswith( + "'}" + ): + # Convert Python dict string to JSON + tool_response_text = tool_response_text.replace("'", '"') + + try: + tool = json.loads(tool_response_text) + content = tool.get( + "result", tool_response_text + ) # Use result if available, otherwise raw text + except: + content = tool_response_text # Use raw text if JSON parsing fails + + messages.append({"role": "tool", "content": str(content)}) + + if messages[-1]["role"] != "assistant": + messages = messages[:-1] + + sample["text"] = tokenizer.apply_chat_template( + messages, + tokenize=False, + enable_thinking=True, + ) + + return sample + + +if __name__ == "__main__": + print("[INFO] Loading dataset…") + model_id = "Qwen/Qwen3-4B-Thinking-2507" + dataset_id = "interstellarninja/hermes_reasoning_tool_use" + + dataset = load_dataset(dataset_id, split="train") + print(f"[INFO] Dataset loaded. Total samples: {len(dataset)}") + + print("[INFO] Converting to DataFrame…") + df = pd.DataFrame(dataset) + + print("[INFO] Splitting into train/validation sets…") + train, val = train_test_split(df, test_size=0.1, random_state=42) + print(f"[INFO] Train samples: {len(train)}, Val samples: {len(val)}") + + print("[INFO] Creating HuggingFace Dataset objects…") + train_dataset = Dataset.from_pandas(train) + val_dataset = Dataset.from_pandas(val) + + print("[INFO] Loading tokenizer…") + tokenizer = AutoTokenizer.from_pretrained(model_id) + + dataset = DatasetDict({"train": train_dataset, "val": val_dataset}) + + print("[INFO] Preparing train dataset…") + prepared_train_dataset = ( + dataset["train"] + .map(lambda x: prepare_dataset(x, tokenizer), + remove_columns=list(train_dataset.features)) + .filter(lambda x: x["text"] is not None) + ) + print(f"[INFO] Prepared train dataset size: {len(prepared_train_dataset)}") + + print("[INFO] Preparing validation dataset…") + prepared_val_dataset = ( + dataset["val"] + .map(lambda x: prepare_dataset(x, tokenizer), + remove_columns=list(val_dataset.features)) + .filter(lambda x: x["text"] is not None) + ) + print(f"[INFO] Prepared val dataset size: {len(prepared_val_dataset)}") + + print("[INFO] Saving datasets to JSON…") + prepared_train_dataset.to_json("./data/train/dataset.json", orient="records") + prepared_val_dataset.to_json("./data/val/dataset.json", orient="records") + print("[INFO] All done!") \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/scripts/requirements.txt b/examples/end_to_end_walkthrough/01-training-job-submission/scripts/requirements.txt new file mode 100644 index 00000000..c0896519 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/scripts/requirements.txt @@ -0,0 +1,4 @@ +datasets +transformers +pandas +scikit-learn \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/args.yaml b/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/args.yaml new file mode 100644 index 00000000..1a809c1d --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/args.yaml @@ -0,0 +1,49 @@ +model_id: "Qwen/Qwen3-4B-Thinking-2507" # Hugging Face model id +mlflow_uri: "" # MLflow tracking server URI +mlflow_experiment_name: "" # MLflow experiment name +# sagemaker specific parameters +output_dir: "/data/qwen-cli-example/model/" # path to where SageMaker will upload the model +train_dataset_path: "/data/qwen-cli-example/dataset/train/" # path to train dataset in FSx +val_dataset_path: "/data/qwen-cli-example/dataset/val/" # path to where test dataset in FSx saves +checkpoint_dir: "/data/qwen-cli-example/checkpoints/" # path to where FSx saves the checkpoints +token: "" # Hugging Face API token +merge_weights: true # merge weights in the base model +# training parameters +use_checkpoints: true +apply_truncation: true # apply truncation to datasets +use_snapshot_download: false # Use snapshot_download to download the model +attn_implementation: "flash_attention_2" # attention implementation type +lr_scheduler_type: "cosine" # learning rate scheduler type +learning_rate: 2e-5 # learning rate scheduler +num_train_epochs: 1000000 # number of training epochs +per_device_train_batch_size: 2 # batch size per device during training +per_device_eval_batch_size: 1 # batch size for evaluation +gradient_accumulation_steps: 4 # number of steps before performing a backward/update pass +gradient_checkpointing: true # use gradient checkpointing +torch_dtype: "bfloat16" # float precision type +bf16: true # use bfloat16 precision +tf32: true # use tf32 precision +ignore_data_skip: true # skip data loading errors +logging_strategy: "steps" # logging strategy +logging_steps: 1 # log every N steps +log_on_each_node: false # disable logging on each node +ddp_find_unused_parameters: false # DDP unused parameter detection +save_total_limit: 1 # maximum number of checkpoints to keep +save_steps: 8 # Save checkpoint every this many steps +warmup_steps: 3 # number of warmup steps +weight_decay: 0.01 # weight decay coefficient +eval_strategy: "steps" # Add evaluation +eval_steps: 17 # Evaluate every ~half epoch +fsdp: "full_shard auto_wrap offload" # FSDP sharding strategy +fsdp_config: # FSDP configuration options + backward_prefetch: "backward_pre" # prefetch parameters during backward pass + cpu_ram_efficient_loading: true # enable CPU RAM efficient model loading + offload_params: false # offload parameters to CPU + forward_prefetch: false # disable forward prefetch + use_orig_params: false # use original parameter names +# LoRA parameters +load_in_4bit: true # enable 4-bit quantization +use_mxfp4: false # use MXFP4 quantization +lora_r: 16 # LoRA rank +lora_alpha: 32 # LoRA alpha parameter +lora_dropout: 0.1 # LoRA dropout rate diff --git a/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/train.py b/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/train.py new file mode 100644 index 00000000..f490a851 --- /dev/null +++ b/examples/end_to_end_walkthrough/01-training-job-submission/training_scripts/train.py @@ -0,0 +1,732 @@ +from accelerate import Accelerator +from dataclasses import dataclass, field +from datasets import Dataset, load_dataset +import datetime +from huggingface_hub import snapshot_download +import logging +import mlflow +from mlflow.models import infer_signature +import os +from peft import ( + AutoPeftModelForCausalLM, + LoraConfig, + get_peft_model, +) +import torch +import torch.distributed as dist +from transformers import ( + AutoModelForCausalLM, + AutoTokenizer, + BitsAndBytesConfig, + EarlyStoppingCallback, + Mxfp4Config, + Trainer, + TrainingArguments, + set_seed, +) +from trl import TrlParser +import transformers +from transformers.trainer_utils import get_last_checkpoint +from transformers.integrations import WandbCallback +import contextlib +from typing import Any, Dict, List, Optional, Tuple +import wandb +from distutils.util import strtobool + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +@dataclass +class ScriptArguments: + """Arguments for the script execution.""" + + apply_truncation: Optional[bool] = field( + default=False, metadata={"help": "Whether to apply truncation"} + ) + attn_implementation: Optional[str] = field( + default="flash_attention_2", metadata={"help": "Attention implementation"} + ) + checkpoint_dir: str = field(default=None, metadata={"help": "Checkpoint directory"}) + use_checkpoints: bool = field( + default=False, metadata={"help": "Whether to use checkpointing"} + ) + early_stopping: bool = field( + default=False, metadata={"help": "Whether to use early stopping"} + ) + load_in_4bit: bool = field( + default=True, metadata={"help": "Load model in 4-bit quantization"} + ) + lora_r: Optional[int] = field(default=8, metadata={"help": "lora_r"}) + lora_alpha: Optional[int] = field(default=16, metadata={"help": "lora_alpha"}) + lora_dropout: Optional[float] = field( + default=0.1, metadata={"help": "lora_dropout"} + ) + max_length: Optional[int] = field( + default=None, metadata={"help": "max_length used for truncation"} + ) + merge_weights: Optional[bool] = field( + default=False, metadata={"help": "Merge adapter with base model"} + ) + mlflow_uri: Optional[str] = field( + default=None, metadata={"help": "MLflow tracking ARN"} + ) + mlflow_experiment_name: Optional[str] = field( + default=None, metadata={"help": "MLflow experiment name"} + ) + model_id: str = field( + default=None, metadata={"help": "Model ID to use for SFT training"} + ) + text_field: str = field( + default="text", + metadata={ + "help": "Name of the text field in dataset (e.g., 'text', 'content', 'message')" + }, + ) + token: str = field(default=None, metadata={"help": "Hugging Face API token"}) + train_dataset_path: Optional[str] = field( + default=None, metadata={"help": "Path to the training dataset"} + ) + use_mxfp4: bool = field( + default=False, + metadata={"help": "Use MXFP4 quantization instead of BitsAndBytes"}, + ) + use_peft: bool = field(default=True, metadata={"help": "Use PEFT for training"}) + use_snapshot_download: bool = field( + default=True, + metadata={"help": "Use snapshot download instead of Hugging Face Hub"}, + ) + val_dataset_path: Optional[str] = field( + default=None, metadata={"help": "Path to the val dataset"} + ) + wandb_token: str = field(default="", metadata={"help": "Wandb API token"}) + wandb_project: str = field( + default="project", metadata={"help": "Wandb project name"} + ) + target_modules: Optional[List[str]] = field( + default=None, metadata={"help": "Target modules for LoRA"} + ) + torch_dtype: Optional[str] = field( + default="auto", + metadata={"help": "Torch dtype (auto, bfloat16, float16, float32)"}, + ) + + +class ModelConfigBuilder: + """Centralized model configuration builder to eliminate duplicate logic.""" + + def __init__(self, script_args: ScriptArguments, training_args: TrainingArguments): + self.script_args = script_args + self.training_args = training_args + self._torch_dtype = None + self._quantization_config = None + self._use_deepspeed = None + self._use_fsdp = None + + @property + def torch_dtype(self) -> torch.dtype: + """Get torch dtype with single source of truth.""" + if self._torch_dtype is None: + if self.script_args.torch_dtype in ["auto", None]: + self._torch_dtype = ( + torch.bfloat16 if self.training_args.bf16 else torch.float32 + ) + else: + self._torch_dtype = getattr(torch, self.script_args.torch_dtype) + return self._torch_dtype + + @property + def use_deepspeed(self) -> bool: + """Check if DeepSpeed is enabled.""" + if self._use_deepspeed is None: + self._use_deepspeed = strtobool( + os.environ.get("ACCELERATE_USE_DEEPSPEED", "false") + ) + return self._use_deepspeed + + @property + def use_fsdp(self) -> bool: + """Check if FSDP is enabled.""" + if self._use_fsdp is None: + self._use_fsdp = strtobool(os.environ.get("ACCELERATE_USE_FSDP", "false")) + return self._use_fsdp + + @property + def quantization_config(self) -> Optional[Any]: + """Get quantization configuration.""" + if self._quantization_config is None and self.script_args.load_in_4bit: + if self.script_args.use_mxfp4: + self._quantization_config = Mxfp4Config(dequantize=True) + logger.info("Using MXFP4 quantization") + else: + self._quantization_config = BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_use_double_quant=True, + bnb_4bit_quant_type="nf4", + bnb_4bit_compute_dtype=self.torch_dtype, + bnb_4bit_quant_storage=self.torch_dtype, + ) + logger.info("Using BitsAndBytes quantization") + return self._quantization_config + + def build_model_kwargs(self) -> Dict[str, Any]: + """Build complete model loading arguments.""" + model_kwargs = { + "attn_implementation": self.script_args.attn_implementation, + "torch_dtype": self.torch_dtype, + "use_cache": not self.training_args.gradient_checkpointing, + "trust_remote_code": True, + "cache_dir": "/tmp/.cache", + } + + # Set low_cpu_mem_usage based on DeepSpeed usage + if not self.use_deepspeed: + model_kwargs["low_cpu_mem_usage"] = True + + # Add quantization config if enabled + if self.quantization_config is not None: + model_kwargs["quantization_config"] = self.quantization_config + + return model_kwargs + + def build_trainer_kwargs(self) -> Dict[str, Any]: + """Build trainer-specific configuration.""" + trainer_kwargs = {} + + if self.use_fsdp or (self.training_args.fsdp and self.training_args.fsdp != ""): + logger.info("Using FSDP configuration") + if self.training_args.gradient_checkpointing_kwargs is None: + trainer_kwargs["gradient_checkpointing_kwargs"] = { + "use_reentrant": False + } + elif self.use_deepspeed: + logger.info("Using DeepSpeed configuration") + else: + logger.info("Using DDP configuration") + if self.training_args.gradient_checkpointing_kwargs is None: + trainer_kwargs["gradient_checkpointing_kwargs"] = { + "use_reentrant": False + } + + return trainer_kwargs + + +class CustomWandbCallback(WandbCallback): + """Custom Wandb callback that logs metrics for all GPUs.""" + + def on_log(self, args, state, control, model=None, logs=None, **kwargs): + if state.is_world_process_zero and logs: + logs = {f"gpu_{i}_{k}": v for i in range(8) for k, v in logs.items()} + super().on_log(args, state, control, model, logs, **kwargs) + + +@contextlib.contextmanager +def gpu_memory_manager(): + """Context manager for GPU memory cleanup.""" + try: + if torch.cuda.is_available(): + torch.cuda.empty_cache() + yield + finally: + if torch.cuda.is_available(): + torch.cuda.empty_cache() + logger.info( + f"GPU memory freed: {torch.cuda.memory_allocated() / 1e9:.2f}GB allocated" + ) + + +@contextlib.contextmanager +def model_lifecycle(model_name: str): + """Context manager for model loading/cleanup lifecycle.""" + model = None + try: + logger.info(f"Loading model: {model_name}") + yield model + except Exception as e: + logger.error(f"Error in model lifecycle for {model_name}: {e}") + raise + finally: + if model is not None: + logger.info(f"Cleaning up model: {model_name}") + del model + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + +def download_model(model_name): + print("Downloading model ", model_name) + os.makedirs("/tmp/tmp_folder", exist_ok=True) + snapshot_download(repo_id=model_name, local_dir="/tmp/tmp_folder") + print(f"Model {model_name} downloaded under /tmp/tmp_folder") + + +def set_custom_env(env_vars: Dict[str, str]) -> None: + """Set custom environment variables.""" + if not isinstance(env_vars, dict): + raise TypeError("env_vars must be a dictionary") + + for key, value in env_vars.items(): + if not isinstance(key, str) or not isinstance(value, str): + raise ValueError("All keys and values in env_vars must be strings") + + os.environ.update(env_vars) + print("Updated environment variables:") + for key, value in env_vars.items(): + print(f" {key}: {value}") + + +def is_mlflow_enabled(script_args: ScriptArguments) -> bool: + """Check if MLflow is enabled based on script arguments.""" + return ( + script_args.mlflow_uri is not None + and script_args.mlflow_experiment_name is not None + and script_args.mlflow_uri != "" + and script_args.mlflow_experiment_name != "" + ) + + +def setup_mlflow(script_args: ScriptArguments) -> None: + """Set up MLflow tracking.""" + if not is_mlflow_enabled(script_args): + return + + logger.info("Initializing MLflow") + mlflow.enable_system_metrics_logging() + mlflow.autolog() + mlflow.set_tracking_uri(script_args.mlflow_uri) + mlflow.set_experiment(script_args.mlflow_experiment_name) + + current_datetime = datetime.datetime.now() + formatted_datetime = current_datetime.strftime("%Y-%m-%d-%H-%M") + set_custom_env( + { + "MLFLOW_RUN_NAME": f"Fine-tuning-{formatted_datetime}", + "MLFLOW_EXPERIMENT_NAME": script_args.mlflow_experiment_name, + } + ) + + +def setup_wandb(script_args: ScriptArguments) -> None: + """Set up Weights & Biases tracking.""" + if script_args.wandb_token and script_args.wandb_token != "": + logger.info("Initializing Wandb") + set_custom_env({"WANDB_API_KEY": script_args.wandb_token}) + wandb.init(project=script_args.wandb_project) + return [CustomWandbCallback()] + else: + set_custom_env({"WANDB_DISABLED": "true"}) + return None + + +def load_model( + config_builder: ModelConfigBuilder, script_args: ScriptArguments +) -> AutoModelForCausalLM: + """Load model using centralized configuration.""" + model_kwargs = config_builder.build_model_kwargs() + + try: + model = AutoModelForCausalLM.from_pretrained( + script_args.model_id, **model_kwargs + ) + + # Apply gradient checkpointing configuration + if config_builder.training_args.gradient_checkpointing: + model.gradient_checkpointing_enable( + gradient_checkpointing_kwargs={"use_reentrant": False} + ) + + return model + except Exception as e: + logger.error(f"Error loading model {script_args.model_id}: {e}") + raise + + +def load_tokenizer(script_args: ScriptArguments) -> AutoTokenizer: + """Load tokenizer.""" + try: + tokenizer = AutoTokenizer.from_pretrained(script_args.model_id) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + return tokenizer + except Exception as e: + logger.error(f"Error loading tokenizer {script_args.model_id}: {e}") + raise + + +def apply_lora_config( + model: AutoModelForCausalLM, script_args: ScriptArguments +) -> AutoModelForCausalLM: + """Apply LoRA configuration to the model.""" + config = LoraConfig( + r=script_args.lora_r, + lora_alpha=script_args.lora_alpha, + target_modules=( + "all-linear" + if script_args.target_modules is None + else script_args.target_modules + ), + lora_dropout=script_args.lora_dropout, + bias="none", + task_type="CAUSAL_LM", + ) + return get_peft_model(model, config) + + +def setup_trainer( + model: AutoModelForCausalLM, + tokenizer: AutoTokenizer, + train_ds: Dataset, + config_builder: ModelConfigBuilder, + test_ds: Optional[Dataset] = None, + callbacks: Optional[List] = None, +) -> Trainer: + """Set up the Trainer using centralized configuration.""" + trainer_kwargs = config_builder.build_trainer_kwargs() + + # Update training_args with trainer configs + for key, value in trainer_kwargs.items(): + setattr(config_builder.training_args, key, value) + + # Set report_to based on enabled tracking services + report_to = [] + if os.environ.get("WANDB_DISABLED", "false").lower() != "true": + report_to.append("wandb") + if is_mlflow_enabled(config_builder.script_args): + report_to.append("mlflow") + config_builder.training_args.report_to = report_to + + return Trainer( + model=model, + train_dataset=train_ds, + eval_dataset=test_ds if test_ds is not None else None, + args=config_builder.training_args, + callbacks=callbacks, + data_collator=transformers.DataCollatorForLanguageModeling( + tokenizer, mlm=False + ), + ) + + +def save_model( + trainer: Trainer, + model: AutoModelForCausalLM, + tokenizer: AutoTokenizer, + script_args: ScriptArguments, + training_args: TrainingArguments, + accelerator: Accelerator, + mlflow_enabled: bool, + final_output_dir: str, +) -> None: + """Save the trained model.""" + if trainer.is_fsdp_enabled: + trainer.accelerator.state.fsdp_plugin.set_state_dict_type("FULL_STATE_DICT") + + if script_args.use_peft and script_args.merge_weights: + temp_dir = "/tmp/model" + trainer.model.save_pretrained(temp_dir, safe_serialization=False) + + if accelerator.is_main_process: + # Use context manager for proper cleanup + with gpu_memory_manager(): + # Clean up trainer and model before loading merged model + del model, trainer + + # Load and merge model + model = AutoPeftModelForCausalLM.from_pretrained( + temp_dir, + torch_dtype=torch.float16, + low_cpu_mem_usage=True, + trust_remote_code=True, + ) + model = model.merge_and_unload() + + # Save merged model to final output directory + model.save_pretrained( + final_output_dir, safe_serialization=True, max_shard_size="2GB" + ) + + # Save tokenizer and register merged model + tokenizer.save_pretrained(final_output_dir) + + if mlflow_enabled: + register_model_in_mlflow(model, tokenizer, script_args) + else: + # Save final model to final output directory + trainer.model.save_pretrained(final_output_dir, safe_serialization=True) + + if accelerator.is_main_process: + tokenizer.save_pretrained(final_output_dir) + + if mlflow_enabled: + register_model_in_mlflow(trainer.model, tokenizer, script_args) + + +def register_model_in_mlflow( + model: AutoModelForCausalLM, tokenizer: AutoTokenizer, script_args: ScriptArguments +) -> None: + """Register the model in MLflow.""" + logger.info(f"MLflow model registration under {script_args.mlflow_experiment_name}") + + try: + params = {"top_p": 0.9, "temperature": 0.2, "max_new_tokens": 1024 * 4} + signature = infer_signature("inputs", "generated_text", params=params) + + mlflow.transformers.log_model( + transformers_model={"model": model, "tokenizer": tokenizer}, + signature=signature, + name="model", + task="text-generation", + registered_model_name=f"model-{os.environ.get('MLFLOW_RUN_NAME', '').split('Fine-tuning-')[-1]}", + ) + except Exception as e: + logger.error(f"Error registering model in MLflow: {e}") + raise + + +def calculate_optimal_max_length( + tokenizer: AutoTokenizer, + dataset: Dataset, + script_args: ScriptArguments, + sample_size: int = 1000, + percentile: float = 0.95, +) -> int: + """Calculate optimal max_length by tokenizing a sample of the dataset.""" + sample_indices = torch.randperm(len(dataset))[: min(sample_size, len(dataset))] + sample_data = dataset.select(sample_indices) + + token_lengths = [] + for sample in sample_data: + tokens = tokenizer(sample[script_args.text_field], add_special_tokens=True)[ + "input_ids" + ] + token_lengths.append(len(tokens)) + + avg_length = sum(token_lengths) / len(token_lengths) + max_length = int(sorted(token_lengths)[int(percentile * len(token_lengths))]) + + logger.info(f"Analyzed {len(token_lengths)} samples") + logger.info(f"Average token length: {avg_length:.1f}") + logger.info(f"{percentile*100}th percentile token length: {max_length}") + + return max_length + + +def prepare_dataset( + tokenizer: AutoTokenizer, + script_args: ScriptArguments, + train_ds: Dataset, + test_ds: Optional[Dataset] = None, +): + """Prepare the dataset for training with optimal tokenization.""" + if script_args.apply_truncation: + if script_args.max_length is None: + max_length = calculate_optimal_max_length(tokenizer, train_ds, script_args) + else: + max_length = script_args.max_length + else: + max_length = None + + logger.info(f"Using max_length: {max_length}") + logger.info(f"Truncation enabled: {script_args.apply_truncation}") + + lm_train_dataset = train_ds.map( + lambda sample: tokenizer( + sample[script_args.text_field], + padding=False, + truncation=script_args.apply_truncation, + max_length=max_length if script_args.apply_truncation else None, + ), + remove_columns=list(train_ds.features), + batched=True, + batch_size=1000, + ) + + if test_ds is not None: + lm_test_dataset = test_ds.map( + lambda sample: tokenizer( + sample[script_args.text_field], + padding=False, + truncation=script_args.apply_truncation, + max_length=max_length if script_args.apply_truncation else None, + ), + remove_columns=list(test_ds.features), + batched=True, + batch_size=1000, + ) + logger.info(f"Total number of test samples: {len(lm_test_dataset)}") + else: + lm_test_dataset = None + + logger.info(f"Total number of train samples: {len(lm_train_dataset)}") + return lm_train_dataset, lm_test_dataset + + +def train(script_args, training_args, train_ds, test_ds): + """Train the model using centralized configuration.""" + set_seed(training_args.seed) + + # Create centralized config builder + config_builder = ModelConfigBuilder(script_args, training_args) + mlflow_enabled = is_mlflow_enabled(script_args) + + if script_args.token is not None: + os.environ.update({"HF_TOKEN": script_args.token}) + if dist.is_initialized(): + logger.info("Waiting for all processes after setting HF token") + dist.barrier() + + if script_args.use_snapshot_download: + download_model(script_args.model_id) + if dist.is_initialized(): + logger.info("Waiting for all processes after model download") + dist.barrier() + script_args.model_id = "/tmp/tmp_folder" + + # Load model and tokenizer using centralized config + model = load_model(config_builder, script_args) + tokenizer = load_tokenizer(script_args) + + train_ds, test_ds = prepare_dataset(tokenizer, script_args, train_ds, test_ds) + + if script_args.use_peft: + model = apply_lora_config(model, script_args) + + callbacks = setup_wandb(script_args) + if script_args.early_stopping: + if callbacks is None: + callbacks = [] + callbacks.append(EarlyStoppingCallback(early_stopping_patience=3)) + + training_args.load_best_model_at_end = True + training_args.metric_for_best_model = "eval_loss" + training_args.greater_is_better = False + trainer = setup_trainer( + model, tokenizer, train_ds, config_builder, test_ds, callbacks + ) + + if trainer.accelerator.is_main_process: + trainer.model.print_trainable_parameters() + + if script_args.checkpoint_dir is not None: + os.makedirs(script_args.checkpoint_dir, exist_ok=True) + + original_output_dir = training_args.output_dir + training_args.output_dir = script_args.checkpoint_dir + else: + original_output_dir = training_args.output_dir + + # Start training + if mlflow_enabled: + logger.info(f"MLflow tracking under {script_args.mlflow_experiment_name}") + mlflow.set_system_metrics_node_id( + f"node_{trainer.accelerator.process_index // torch.cuda.device_count()}" + ) + if trainer.accelerator.is_main_process: + mlflow.start_run(run_name=os.environ.get("MLFLOW_RUN_NAME", None)) + mlflow.log_params( + { + "total_gpus": trainer.accelerator.num_processes, + "nodes": trainer.accelerator.num_processes + // torch.cuda.device_count(), + "gpus_per_node": torch.cuda.device_count(), + } + ) + try: + train_dataset_mlflow = mlflow.data.from_pandas( + train_ds.to_pandas(), name="train_dataset" + ) + mlflow.log_input(train_dataset_mlflow, context="train") + except Exception as e: + logger.warning(f"Failed to log dataset to MLflow: {e}") + + if ( + get_last_checkpoint(script_args.checkpoint_dir) is not None + and script_args.use_checkpoints + ): + train_result = trainer.train(resume_from_checkpoint=True) + else: + train_result = trainer.train() + + metrics = train_result.metrics + metrics["train_samples"] = len(train_ds) + trainer.log_metrics("train", metrics) + trainer.save_metrics("train", metrics) + trainer.save_state() + + save_model( + trainer, + model, + tokenizer, + script_args, + training_args, + trainer.accelerator, + mlflow_enabled, + original_output_dir, + ) + trainer.accelerator.wait_for_everyone() + + +def load_datasets(script_args: ScriptArguments) -> Tuple[Dataset, Optional[Dataset]]: + """Load training and test datasets.""" + try: + logger.info(f"Loading training dataset from {script_args.train_dataset_path}") + + if script_args.train_dataset_path.endswith( + ".jsonl" + ) or script_args.train_dataset_path.endswith(".json"): + train_ds = load_dataset( + "json", data_files=script_args.train_dataset_path, split="train" + ) + else: + train_ds = load_dataset( + "json", + data_files=os.path.join(script_args.train_dataset_path, "dataset.json"), + split="train", + ) + + test_ds = None + if script_args.val_dataset_path: + logger.info(f"Loading test dataset from {script_args.val_dataset_path}") + if script_args.val_dataset_path.endswith( + ".jsonl" + ) or script_args.val_dataset_path.endswith(".json"): + test_ds = load_dataset( + "json", data_files=script_args.val_dataset_path, split="train" + ) + else: + test_ds = load_dataset( + "json", + data_files=os.path.join( + script_args.val_dataset_path, "dataset.json" + ), + split="train", + ) + + return train_ds, test_ds + except Exception as e: + logger.error(f"Error loading datasets: {e}") + raise + + +def main() -> None: + """Main function to parse arguments and start training.""" + parser = TrlParser((ScriptArguments, TrainingArguments)) + script_args, training_args = parser.parse_args_and_config() + + set_custom_env({"HF_HUB_ENABLE_HF_TRANSFER": "1"}) + setup_mlflow(script_args) + + train_ds, test_ds = load_datasets(script_args) + train(script_args, training_args, train_ds, test_ds) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + logger.error(f"Training failed: {e}", exc_info=True) + raise diff --git a/examples/end_to_end_walkthrough/02-inference-deployment/00-jumpstart-endpoint.md b/examples/end_to_end_walkthrough/02-inference-deployment/00-jumpstart-endpoint.md new file mode 100644 index 00000000..7c694b91 --- /dev/null +++ b/examples/end_to_end_walkthrough/02-inference-deployment/00-jumpstart-endpoint.md @@ -0,0 +1,162 @@ +# Deploying a model from SageMaker JumpStart for Inference - HyperPod CLI End-to-End Walkthrough + +This example shows how to deploy an **open-weights LLM from SageMaker JumpStart** for inference on your HyperPod Cluster. + +In the following you will: +- Deploy a DeepSeek LLM model for inference +- Monitor and manage the endpoint deployment +- Test the endpoint with sample requests +- Clean up resources when finished + +This example assumes that you completed the setup instructions in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). + +## Deploy the JumpStart Endpoint (💻) + +Check the available command options for creating a JumpStart endpoint: +```bash +hyp create hyp-jumpstart-endpoint --help +``` + +Set up environment variables for the deployment. Please replace the `S3_CERT_URI` with an S3 bucket URI that your cluster inference operator role can access. If you used the console UI to create the cluster, the inference operator role and a corresponding bucket will be created automatically and the bucket will be named `sagemaker-HP_CLUSTER_NAME-RANDOMID-tls-RANDOMID`. + +```bash +# Please use the S3 URI, including the prefix s3:// +S3_CERT_URI="s3://PLEASE_FILL_IN" + +ENDPOINT_NAME="test-js-endpoint" +MODEL_ID="deepseek-llm-r1-distill-qwen-1-5b" +``` + +(Optional) Attempt to create an endpoint with an instance type that is incompatible with the JumpStart model to see the validation features of the HyperPod CLI: +```bash +hyp create hyp-jumpstart-endpoint \ + --model-id $MODEL_ID \ + --instance-type ml.t3.xlarge \ + --endpoint-name $ENDPOINT_NAME \ + --tls-certificate-output-s3-uri $S3_CERT_URI \ + --namespace default +``` + +Create the endpoint with a properly sized and available instance type: +```bash +hyp create hyp-jumpstart-endpoint \ + --model-id $MODEL_ID \ + --instance-type ml.g5.12xlarge \ + --endpoint-name $ENDPOINT_NAME \ + --tls-certificate-output-s3-uri $S3_CERT_URI \ + --namespace default +``` + +## Monitor the Endpoint Deployment (💻) + +List all JumpStart endpoints and check their status: +```bash +hyp list hyp-jumpstart-endpoint +``` + +Check the inference operator logs to see the deployment progress +and check for any potential issues. +```bash +hyp get-operator-logs hyp-custom-endpoint --since-hours 1 +``` + +Get detailed information about the specific endpoint: +```bash +hyp describe hyp-jumpstart-endpoint --name $ENDPOINT_NAME +``` + +List the pods associated with the endpoint: +```bash +hyp list-pods hyp-jumpstart-endpoint --endpoint-name $ENDPOINT_NAME +``` + +Check the logs for a specific pod (replace with actual pod name): +```bash +POD_NAME="test-js-endpoint-76588dc9-tzf89" + +hyp get-logs hyp-jumpstart-endpoint --pod-name $POD_NAME +``` + +You can additionally view both the Application Load Balancer and SageMaker Inference Endpoint creation in the AWS console +by navigating to: +- Amazon SageMaker AI -> Deployments & Inference -> Endpoints +- Amazon EC2 -> Load Balancing -> Load Balancers + +![SageMaker Endpoint](../images/inference-endpoint.png) +![EC2 ALB](../images/inference-endpoint-alb.png) + +## Test the Endpoint (💻) + +Once the endpoint is running, test it with a sample inference request: +```bash +hyp invoke hyp-jumpstart-endpoint \ + --endpoint-name $ENDPOINT_NAME \ + --body '{ + "messages": [ + { + "role": "system", + "content": "You are a helpful AI assistant that can answer questions and provide information. Give brief answers." + }, + { + "role": "user", + "content": "What is the capital of USA?" + } + ], + "temperature": 0.1, + "top_p": 0.95, + "max_tokens": 512 + }' +``` + +When you're finished with the endpoint, delete the endpoint to free up the resources: +``` +hyp delete hyp-jumpstart-endpoint --name $ENDPOINT_NAME +``` + +## (Optional) Deploy the model by creating a customizable template +Alternatively to creating a JumpStart model deployment via the `hyp create hyp-jumpstart-endpoint` command +above, the HyperPod CLI also enables a configuration file-based workflow that allows +for easy reproducability as well as further customization options as the Kubernetes +template is directly exposed to the user. + +Initialize a JumpStart model deployment configuration in a new directory by running: +```bash +mkdir jumpstart-model-config && cd jumpstart-model-config + +hyp init hyp-jumpstart-endpoint +``` + +This will create three files in the new directory: +- `k8s.jinja` - Kubernetes template for a `JumpStartModel` resource +- `config.yaml` - Configuration file that contains the values for the Kubernetes template +- `README.md` - Usage instructions for this functionality + +The configuration parameters can be either modified directly in the `config.yaml` or via +the CLI by executing `hyp configure -- ` which provides +additional validation. + +To reproduce the earlier JumpStart CLI example, run the following commands: +``` +hyp configure --endpoint-name $ENDPOINT_NAME +hyp configure --model-id $MODEL_ID +hyp configure --tls-certificate-output-s3-uri $S3_CERT_URI +hyp configure --instance-type ml.g5.12xlarge +``` + +Validate the values in `config.yaml` by running: +```bash +hyp validate +``` + +Submit the JumpStart deployment to the cluster by running: +```bash +hyp create +``` + +The final, submitted Kubernetes manifest will be stored for reference in `./run//k8s.yaml`. +You can consequently test the endpoint invokation in the same way as the one created in the previous example. + +When you're finished with the endpoint, delete the endpoint to free up the resources: +``` +hyp delete hyp-jumpstart-endpoint --name $ENDPOINT_NAME +``` \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/02-inference-deployment/01-custom-model-endpoint.md b/examples/end_to_end_walkthrough/02-inference-deployment/01-custom-model-endpoint.md new file mode 100644 index 00000000..ed46d7cf --- /dev/null +++ b/examples/end_to_end_walkthrough/02-inference-deployment/01-custom-model-endpoint.md @@ -0,0 +1,239 @@ +# Deploying a custom model for Inference - HyperPod CLI End-to-End Walkthrough + +This example shows how to deploy a **custom LLM with weights stored on S3** for inference on your HyperPod Cluster. + +In the following you will: +- Deploy a TinyLlama model for inference +- Monitor and manage the endpoint deployment +- Test the endpoint with sample requests +- Clean up resources when finished + +This example assumes that you completed the setup instructions in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). + +While this example shows the workflow for deploying a model that is stored on S3, +the workflow is equivalent to a model stored in FSx. More information is available in the corresponding [AWS documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-model-deployment-deploy-ftm.html). + +## Prepare a model for deployment + +For this example, you are going to deploy a TinyLlama 1.1B Chat v1.0 +model for inference on your HyperPod cluster. + +First, setup the environment variables and download the model checkpoint from Hugginface. Then, transfer it to an S3 bucket. Please replace the variables `S3_MODEL_BUCKET_NAME`, `S3_CERT_URI` and `S3_MODEL_REGION` with an S3 bucket that your cluster inference operator role can access. If you used the console UI to create the cluster, the inference operator role and a corresponding bucket will be created automatically and the bucket will be named `sagemaker-HP_CLUSTER_NAME-RANDOMID-tls-RANDOMID`. +```bash +S3_MODEL_BUCKET_NAME="PLEASE_FILL_IN" +S3_MODEL_REGION="PLEASE_FILL_IN" + +S3_CERT_URI="s3://PLEASE_FILL_IN" + +INSTANCE_TYPE=ml.g5.12xlarge + +MODEL_NAME="tinyllama" +HF_MODEL_ID="TinyLlama/TinyLlama-1.1B-Chat-v1.0" +CHECKPOINT_DIR="tinyllama-1.1b-chat" +S3_MODEL_URI="s3://$S3_MODEL_BUCKET_NAME/$CHECKPOINT_DIR" +ENDPOINT_NAME="test-custom-endpoint" + +pip install huggingface-hub + +hf download $HF_MODEL_ID --local-dir $CHECKPOINT_DIR + +aws s3 sync $CHECKPOINT_DIR $S3_MODEL_URI +``` + +## Deploy the custom model (💻) + +Check the available command options for creating a custom model endpoint: +```bash +hyp create hyp-custom-endpoint --help +``` + +Create the endpoint: +``` +hyp create hyp-custom-endpoint \ + --endpoint-name $ENDPOINT_NAME \ + --model-name $MODEL_NAME \ + --model-source-type s3 \ + --s3-bucket-name $S3_MODEL_BUCKET_NAME \ + --s3-region $S3_MODEL_REGION \ + --model-location $CHECKPOINT_DIR \ + --instance-type $INSTANCE_TYPE \ + --image-uri 763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.33.0-lmi15.0.0-cu128 \ + --container-port 8080 \ + --model-volume-mount-name modelmount \ + --tls-certificate-output-s3-uri $S3_CERT_URI \ + --namespace default +``` + +## Monitor the Endpoint Deployment (💻) + +List all custom endpoints and check their status: +```bash +hyp list hyp-custom-endpoint +``` + +Check the inference operator logs to see the deployment progress +and check for any potential issues. +```bash +hyp get-operator-logs hyp-custom-endpoint --since-hours 0.5 +``` + +Get detailed information about the specific endpoint: +```bash +hyp describe hyp-custom-endpoint --name $ENDPOINT_NAME +``` + +List the pods associated with the endpoint: +```bash +hyp list-pods hyp-custom-endpoint --endpoint-name $ENDPOINT_NAME +``` + +Check the logs for a specific pod (replace with actual pod name): +```bash +POD_NAME="test-custom-endpoint-76588dc9-tzf89" + +hyp get-logs hyp-custom-endpoint --pod-name $POD_NAME +``` + +## Test the Endpoint (💻) + +Once the endpoint is running, test it with a sample inference request: +```bash +hyp invoke hyp-custom-endpoint \ + --endpoint-name $ENDPOINT_NAME \ + --body '{ + "messages": [ + { + "role": "system", + "content": "You are a helpful AI assistant that can answer questions and provide information. Give brief answers." + }, + { + "role": "user", + "content": "What is the capital of USA?" + } + ], + "temperature": 0.1, + "top_p": 0.95, + "max_tokens": 512 + }' +``` + +When you're finished with the endpoint, delete the endpoint to free up the resources: +``` +hyp delete hyp-custom-endpoint --name $ENDPOINT_NAME +``` + +## (Optional) Deploy the model by creating a customizable template and setup autoscaling (💻) +Alternatively to creating a custom model deployment via the `hyp create hyp-custom-endpoint` command +above, the HyperPod CLI also enables a configuration file-based workflow that allows +for easy reproducability as well as further customization options as the Kubernetes +template is directly exposed to the user. + +Initialize a custom model deployment configuration in a new directory by running: +```bash +mkdir custom-model-config && cd custom-model-config + +hyp init hyp-custom-endpoint +``` + +This will create three files in the new directory: +- `k8s.jinja` - Kubernetes template for a `InferenceEndpointConfig` resource +- `config.yaml` - Configuration file that contains the values for the Kubernetes template +- `README.md` - Usage instructions for this functionality + +The configuration parameters can be either modified directly in the `config.yaml` or via +the CLI by executing `hyp configure -- ` which provides +additional validation. + +With the following commands, you can deploy the same model as in the previous example, with an additional autoscaling configuration that will automatically scale the model to multiple pods, according to the number of requests to the SageMaker endpoint. The number of requests is logged to and accessed through CloudWatch. + +In the next example, you will trigger this scaling functionality to see it in action. A detailed explanation of the autoscaling parameters can be found in the [AWS Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-hyperpod-model-deployment-autoscaling.html). +``` +ENDPOINT_NAME=test-custom-endpoint-init-scale + +hyp configure --endpoint-name "$ENDPOINT_NAME" +hyp configure --model-name "$MODEL_NAME" +hyp configure --model-source-type "s3" +hyp configure --s3-bucket-name "$S3_MODEL_BUCKET_NAME" +hyp configure --s3-region "$S3_MODEL_REGION" +hyp configure --model-location "$CHECKPOINT_DIR" +hyp configure --instance-type "$INSTANCE_TYPE" +hyp configure --image-uri "763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.33.0-lmi15.0.0-cu128" +hyp configure --container-port "8080" +hyp configure --model-volume-mount-name "modelmount" +hyp configure --tls-certificate-output-s3-uri "$S3_CERT_URI" +hyp configure --metrics-enabled "True" +hyp configure --dimensions "{\"EndpointName\": \"$ENDPOINT_NAME\", \"VariantName\": \"AllTraffic\"}" +hyp configure --metric-collection-period "5" +hyp configure --metric-collection-start-time "120" +hyp configure --metric-name "Invocations" +hyp configure --metric-stat "Sum" +hyp configure --metric-type "Average" +hyp configure --cloud-watch-trigger-name "SageMaker-Invocations" +hyp configure --cloud-watch-trigger-namespace "AWS/SageMaker" +hyp configure --target-value "2" +hyp configure --resources-limits '{"nvidia.com/gpu": 1}' +hyp configure --resources-requests '{"nvidia.com/gpu": 1}' +``` + +View the following files in an editor of your choice to see the configuration before submitting: +``` +./k8s.jinja +./config.yaml +``` + +Validate the values in `config.yaml` by running: +```bash +hyp validate +``` + +Submit the custom model deployment to the cluster by running: +```bash +hyp create +``` + +The final, submitted Kubernetes manifest will be stored for reference in `./run//k8s.yaml`. + +You can consequently test the endpoint invokation in the same way as the one created in the previous example. + +## (Optional) Test the autoscaling behavior of the endpoint (💻) + +This repository provides a script to trigger the defined autoscaling behaviour of the endpoint +by running `hyp invoke` in several parallel threads. + +Run the script by executing the following command which will run for 30 seconds: +```bash +python ../scripts/invoke_endpoint_autoscale.py --command hyp-custom-endpoint --endpoint $ENDPOINT_NAME +``` + +After a few minutes, the autoscaling functionality will create additional pods to handle the request volume. + +You can watch the Kubernetes Horizontal Pod Autoscaler metrics by running the following command: +```bash +kubectl get hpa + +# To update continuously you can run the following, assuming the 'watch' command is available on your machine +watch -n 5 kubectl get hpa +``` +The target metric will show the increased requests, triggering the addition of more replicas: +``` +NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS +keda-hpa-test-custom-endpoint-init-scale-scaled-object Deployment/test-custom-endpoint-init-scale 10/2 (avg) 1 4 1 + +NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS +keda-hpa-test-custom-endpoint-init-scale-scaled-object Deployment/test-custom-endpoint-init-scale 2/2 (avg) 1 4 4 +``` + + + +You can also view the pods by running: +```bash +hyp list-pods hyp-custom-endpoint --endpoint-name $ENDPOINT_NAME +``` + +Additionally, the ALB will show the targets under the **Resource Map** tab: +![Cluster Scheduler Policy](../images/alb-resource-map.png) + +When you're finished with the endpoint, delete the endpoint to free up the resources: +```bash +hyp delete hyp-custom-endpoint --name $ENDPOINT_NAME +``` \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/02-inference-deployment/README.MD b/examples/end_to_end_walkthrough/02-inference-deployment/README.MD new file mode 100644 index 00000000..d5338e22 --- /dev/null +++ b/examples/end_to_end_walkthrough/02-inference-deployment/README.MD @@ -0,0 +1,5 @@ +# Inference Deployment - HyperPod CLI End-to-End Walkthrough + +This folder contains the following files: +- [00-jumpstart-endpoint.md](00-jumpstart-endpoint.md) - Instructions on how to deploy models available on SageMaker JumpStart to the HyperPod cluster. +- [01-custom-model-endpoint.md](01-custom-model-endpoint.md) - Instructions on how to deploy a custom model from an S3 bucket (TinyLlama) to the HyperPod cluster. \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/02-inference-deployment/scripts/invoke_endpoint_autoscale.py b/examples/end_to_end_walkthrough/02-inference-deployment/scripts/invoke_endpoint_autoscale.py new file mode 100644 index 00000000..f38de33f --- /dev/null +++ b/examples/end_to_end_walkthrough/02-inference-deployment/scripts/invoke_endpoint_autoscale.py @@ -0,0 +1,46 @@ +import subprocess +import time +import shlex +import argparse +from concurrent.futures import ThreadPoolExecutor + +def call_hyp(command_name, endpoint_name): + command = [ + "hyp", "invoke", shlex.quote(command_name), + "--endpoint-name", shlex.quote(endpoint_name), + "--body", '{"messages":[{"role":"system","content":"You are a helpful AI assistant that can answer questions and provide information. You must include your reasoning activities."}, {"role": "user", "content": "What is the capital of USA?"}], "temperature": 0.1, "top_p": 0.95, "max_tokens": 512}' + ] + result = subprocess.run(command, capture_output=True, text=True) + return result.stdout + + +def run_parallel_calls(command_name, endpoint_name, executions_per_second=5, duration_seconds=10): + interval = 1.0 / executions_per_second + with ThreadPoolExecutor(max_workers=executions_per_second) as executor: + futures = [] + start_time = time.time() + while time.time() - start_time < duration_seconds: + futures.append(executor.submit(call_hyp, command_name, endpoint_name)) + time.sleep(interval) + for future in futures: + print(future.result()) + + +def main(): + parser = argparse.ArgumentParser(description="Parallel hyp invoke tester") + parser.add_argument("--command", required=True, help="Command name passed to 'hyp invoke'") + parser.add_argument("--endpoint", required=True, help="Endpoint name for --endpoint-name") + parser.add_argument("--eps", type=int, default=5, help="Executions per second") + parser.add_argument("--duration", type=int, default=20, help="Duration in seconds") + args = parser.parse_args() + + run_parallel_calls( + command_name=args.command, + endpoint_name=args.endpoint, + executions_per_second=args.eps, + duration_seconds=args.duration + ) + + +if __name__ == "__main__": + main() diff --git a/examples/end_to_end_walkthrough/03-task-governance/00-task-governance.md b/examples/end_to_end_walkthrough/03-task-governance/00-task-governance.md new file mode 100644 index 00000000..3322412c --- /dev/null +++ b/examples/end_to_end_walkthrough/03-task-governance/00-task-governance.md @@ -0,0 +1,219 @@ +# Utilizing HyperPod Task Governance - HyperPod CLI End-to-End Walkthrough + +SageMaker HyperPod task governance is a management system designed to streamline resource allocation and ensure efficient utilization of compute resources across teams and projects for your Amazon EKS clusters, providing administrators with the capability to set priority levels for various tasks, compute allocation for each team, how each team lends and borrows idle compute, and if a team preempts their own tasks. + +This section will guide you through an end to end example of using task governance. For this example, we use a cluster of at least 2 `ml.g5.12xlarge` instances split by two teams. This assumes that you completed the setup instructions in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md) and the training dataset and docker image creation instructions in [01-training-job-submission/00-pytorch-training-job.md](../01-training-job-submission/00-pytorch-training-job.md). + +Please setup the following environment variables corresponding to your account. `S3_BUCKET_NAME` should be the bucket corresponding to your FSx filesystem, which contains the training data and scripts, as described in [01-training-job-submission/00-pytorch-training-job.md](../01-training-job-submission/00-pytorch-training-job.md). +```bash +AWS_REGION="PLEASE_FILL_IN" +AWS_ACCOUNT_ID="PLEASE_FILL_IN" +S3_BUCKET_NAME="PLEASE_FILL_IN" + +DOCKER_IMAGE_TAG="pytorch2.8-cu129" +ECR_NAME="qwen3-finetuning" +S3_PREFIX="qwen-cli-example" +``` +## Borrowing idle compute from other teams (💻) + +**Scenario**: Team A submits a training job that **requires 2 instances** (8 GPUs) but only has **1 instance allocated**. Hyperpod task governance allows Team A to **borrow 1 instance** from Team B's idle capacity because we have allowed Borrowing and Lending when setting up the team quotas in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). + +The following command submits the 2 instance, 8 GPU job to the namespace and task queue of Team A: +``` +JOB_NAME=team-a-qwen3-training-prio +TEAM_NAMESPACE=hyperpod-ns-team-a +TEAM_QUEUE=$TEAM_NAMESPACE-localqueue +TEAM_FSX_CLAIM=fsx-pvc-hyperpod-ns-team-a +JOB_PRIO=training-priority + +hyp create hyp-pytorch-job \ + --job-name $JOB_NAME \ + --namespace $TEAM_NAMESPACE \ + --queue-name $TEAM_QUEUE \ + --priority $JOB_PRIO \ + --image $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME:$DOCKER_IMAGE_TAG \ + --command "[hyperpodrun, --nnodes=2:2, --nproc_per_node=4, /data/$S3_PREFIX/scripts/train.py]" \ + --args "[--config, /data/$S3_PREFIX/scripts/args.yaml]" \ + --environment '{"LOGLEVEL": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", "NCCL_DEBUG": "INFO", "NCCL_SOCKET_IFNAME": "^lo", "TORCH_NCCL_ASYNC_ERROR_HANDLING": "1", "FI_PROVIDER": "efa", "FI_EFA_FORK_SAFE": "1", "NCCL_PROTO": "simple"}' \ + --pull-policy "IfNotPresent" \ + --instance-type ml.g5.12xlarge \ + --node-count 2 \ + --tasks-per-node 4 \ + --deep-health-check-passed-nodes-only false \ + --max-retry 100 \ + --volume name=shmem,type=hostPath,mount_path=/dev/shm,path=/dev/shm,read_only=false \ + --volume name=local,type=hostPath,mount_path=/local,path=/mnt/k8s-disks/0,read_only=false \ + --volume name=fsx-volume,type=pvc,mount_path=/data,claim_name=$TEAM_FSX_CLAIM,read_only=false +``` + +After the job got submitted successfully, you can list the jobs currently running on the cluster in the team's namespace and monitor their status +using the following command: +```bash +hyp list hyp-pytorch-job -n $TEAM_NAMESPACE +``` + +Describe the job details by running: +```bash +hyp describe hyp-pytorch-job --job-name $JOB_NAME -n $TEAM_NAMESPACE +``` + +You can verify that the job is currently borrowing compute by running the following to show the status +of Team A`s `clusterqueue`: +```bash +kubectl get clusterqueue hyperpod-ns-team-a-clusterqueue -o jsonpath='{.status.flavorsUsage[0]}' +``` + +The output will look similar to the following, showing that the job is borrowing cpu, memory and gpu of Team B: +```json +{ + "name": "ml.g5.12xlarge", + "resources": [ + { + "borrowed": "0", + "name": "aws.amazon.com/neurondevice", + "total": "0" + }, + { + "borrowed": "40", + "name": "cpu", + "total": "88" + }, + { + "borrowed": "136Gi", + "name": "memory", + "total": "328Gi" + }, + { + "borrowed": "4", + "name": "nvidia.com/gpu", + "total": "8" + }, + { + "borrowed": "0", + "name": "vpc.amazonaws.com/efa", + "total": "0" + } + ] +} +``` + +## Reclaiming guaranteed compute (💻) + +**Scenario**: Team B needs to reclaim its compute resources. By submitting a job requiring **1 instance**, Team B's job is **prioritized** as one of the instances currently used belongs to Team B, and Job 1 is **suspended** due to resource unavailability. + +The following command submits the 1 instance, 4 GPU job to the namespace and task queue of Team B: +``` +JOB_NAME=team-b-qwen3-training-prio +TEAM_NAMESPACE=hyperpod-ns-team-b +TEAM_QUEUE=$TEAM_NAMESPACE-localqueue +TEAM_FSX_CLAIM=fsx-pvc-hyperpod-ns-team-b +JOB_PRIO=training-priority + +hyp create hyp-pytorch-job \ + --job-name $JOB_NAME \ + --namespace $TEAM_NAMESPACE \ + --queue-name $TEAM_QUEUE \ + --priority $JOB_PRIO \ + --image $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME:$DOCKER_IMAGE_TAG \ + --command "[hyperpodrun, --nnodes=2:2, --nproc_per_node=4, /data/$S3_PREFIX/scripts/train.py]" \ + --args "[--config, /data/$S3_PREFIX/scripts/args.yaml]" \ + --environment '{"LOGLEVEL": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", "NCCL_DEBUG": "INFO", "NCCL_SOCKET_IFNAME": "^lo", "TORCH_NCCL_ASYNC_ERROR_HANDLING": "1", "FI_PROVIDER": "efa", "FI_EFA_FORK_SAFE": "1", "NCCL_PROTO": "simple"}' \ + --pull-policy "IfNotPresent" \ + --instance-type ml.g5.12xlarge \ + --node-count 1 \ + --tasks-per-node 4 \ + --deep-health-check-passed-nodes-only false \ + --max-retry 100 \ + --volume name=shmem,type=hostPath,mount_path=/dev/shm,path=/dev/shm,read_only=false \ + --volume name=local,type=hostPath,mount_path=/local,path=/mnt/k8s-disks/0,read_only=false \ + --volume name=fsx-volume,type=pvc,mount_path=/data,claim_name=$TEAM_FSX_CLAIM,read_only=false +``` + +After the job got submitted successfully, you can list the jobs currently running on the cluster in the team's namespace and monitor their status +using the following command: +```bash +hyp list hyp-pytorch-job -n hyperpod-ns-team-a +hyp list hyp-pytorch-job -n hyperpod-ns-team-b +``` + +This will show you that Team A's job was suspended, and Team B's job is now running: +``` + +NAME NAMESPACE STATUS AGE +-------------------------------------------------------------------------------- +team-a-qwen3-training-prio hyperpod-ns-team-a Suspended N/A + +NAME NAMESPACE STATUS AGE +-------------------------------------------------------------------------------- +team-b-qwen3-training-prio hyperpod-ns-team-b Running 2m +``` + +Describe the job details by running: +```bash +hyp describe hyp-pytorch-job --job-name $JOB_NAME -n $TEAM_NAMESPACE +``` + +You can additionally see the task status in the AWS console UI by navigating to your HyperPod cluster +and viewing the **Tasks** tab and selecting **Pytorch - HyperPod Training Operator** in the dropdown menu: +![Currently running tasks](../images/task-suspension-console.png) + +## Preempt a lower priority task (💻) + +Finally, the following example will show preemption by a higher priority task. + +**Scenario**: Team B needs to deploy another training job with higher priority for a paper deadline. For this, the team submits a training job requiring **2 instances** with **experimentation priority** which is a higher priority level than the currently running job's **training priority** as configured in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). Team B's running training job is thus **suspended** to allow the experimentation job to utilize the resources. + +The following command submits the 1 instance, 4 GPU job to the namespace and task queue of Team B: +``` +JOB_NAME=team-b-qwen3-experimentation-prio +TEAM_NAMESPACE=hyperpod-ns-team-b +TEAM_QUEUE=$TEAM_NAMESPACE-localqueue +TEAM_FSX_CLAIM=fsx-pvc-hyperpod-ns-team-b +JOB_PRIO=experimentation-priority + +hyp create hyp-pytorch-job \ + --job-name $JOB_NAME \ + --namespace $TEAM_NAMESPACE \ + --queue-name $TEAM_QUEUE \ + --priority $JOB_PRIO \ + --image $AWS_ACCOUNT_ID.dkr.ecr.$AWS_REGION.amazonaws.com/$ECR_NAME:$DOCKER_IMAGE_TAG \ + --command "[hyperpodrun, --nnodes=2:2, --nproc_per_node=4, /data/$S3_PREFIX/scripts/train.py]" \ + --args "[--config, /data/$S3_PREFIX/scripts/args.yaml]" \ + --environment '{"LOGLEVEL": "INFO", "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", "NCCL_DEBUG": "INFO", "NCCL_SOCKET_IFNAME": "^lo", "TORCH_NCCL_ASYNC_ERROR_HANDLING": "1", "FI_PROVIDER": "efa", "FI_EFA_FORK_SAFE": "1", "NCCL_PROTO": "simple"}' \ + --pull-policy "IfNotPresent" \ + --instance-type ml.g5.12xlarge \ + --node-count 2 \ + --tasks-per-node 4 \ + --deep-health-check-passed-nodes-only false \ + --max-retry 100 \ + --volume name=shmem,type=hostPath,mount_path=/dev/shm,path=/dev/shm,read_only=false \ + --volume name=local,type=hostPath,mount_path=/local,path=/mnt/k8s-disks/0,read_only=false \ + --volume name=fsx-volume,type=pvc,mount_path=/data,claim_name=$TEAM_FSX_CLAIM,read_only=false +``` + +After the job got submitted successfully, you can list the jobs currently running on the cluster in the team's namespace and monitor their status +using the following command: +```bash +hyp list hyp-pytorch-job -n hyperpod-ns-team-a +hyp list hyp-pytorch-job -n hyperpod-ns-team-b +``` + +This will show you that Team B's initial job was suspended, and Team B's experimentation job is now running. Team A's job stays suspended: +``` + +NAME NAMESPACE STATUS AGE +-------------------------------------------------------------------------------- +team-a-qwen3-training-prio hyperpod-ns-team-a Suspended N/A + +NAME NAMESPACE STATUS AGE +-------------------------------------------------------------------------------- +team-b-qwen3-exp-prio hyperpod-ns-team-b Running 2m +team-b-qwen3-training-prio hyperpod-ns-team-b Suspended N/A +``` + +Finally, delete all the submitted jobs to free up the resources: +``` +hyp delete hyp-pytorch-job --job-name team-a-qwen3-training-prio -n hyperpod-ns-team-a +hyp delete hyp-pytorch-job --job-name team-b-qwen3-training-prio -n hyperpod-ns-team-b +hyp delete hyp-pytorch-job --job-name team-b-qwen3-experimentation-prio -n hyperpod-ns-team-b +``` \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/03-task-governance/README.MD b/examples/end_to_end_walkthrough/03-task-governance/README.MD new file mode 100644 index 00000000..02d875d3 --- /dev/null +++ b/examples/end_to_end_walkthrough/03-task-governance/README.MD @@ -0,0 +1,4 @@ +# Task Governance - HyperPod CLI End-to-End Walkthrough + +This folder contains the following files: +- [00-task-governance.md](00-task-governance.md) - Instructions on multiple scenarios for task governance, including borrowing idle compute, reclaiming guaranteed compute as well as preempting lower priority tasks. \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/04-spaces/00-create-space.md b/examples/end_to_end_walkthrough/04-spaces/00-create-space.md new file mode 100644 index 00000000..432d5603 --- /dev/null +++ b/examples/end_to_end_walkthrough/04-spaces/00-create-space.md @@ -0,0 +1,152 @@ +# SageMaker Spaces for IDEs and Notebooks - HyperPod CLI End-to-End Walkthrough + +SageMaker Spaces is a functionality which allows AI developers to run their interactive machine learning workloads directly on HyperPod EKS clusters through IDEs and notebooks. + +Spaces can be created and managed using the HyperPod CLI. In this example you will create a space and connect your Visual Studio Code IDE to it. Then, we'll access the space through the web UI. + +This example assumes that you completed the setup instructions in [00-getting-started/00-setup.md](../00-getting-started/00-setup.md). + +Define a name for the new space. + +```bash +SPACE_NAME=my-jupyterlab-space +``` + +Create the space on a CPU instance and mount your FSx for Lustre Filesystem to it by executing the following command: +```bash +hyp create hyp-space --name $SPACE_NAME \ +--display-name $SPACE_NAME --node-selector '{"node.kubernetes.io/instance-type":"ml.t3.2xlarge"}' \ +--volume name=fsx-data,mountPath=/fsx,persistentVolumeClaimName=fsx-claim +``` + +If successful, this will show the following output: +``` +Space 'my-jupyterlab-space' created successfully in namespace 'default' +``` + +You can list the spaces in your HyperPod cluster by running the following command: +```bash +hyp list hyp-space +``` + +This will show your newly created space. After a few minutes, the `AVAILABLE` status will change to `True`: +``` +NAME NAMESPACE AVAILABLE PROGRESSING DEGRADED +------------------ ----------- ----------- ------------- ---------- +my-jupyterlab-space default True False False +``` + +You can show your space's configuration by running the following command. +```bash +hyp describe hyp-space --name $SPACE_NAME +``` + +This will show a configuration similar to the following: +``` +apiVersion: workspace.jupyter.org/v1alpha1 +kind: Workspace +metadata: + annotations: + workspace.jupyter.org/created-by: **** + workspace.jupyter.org/last-updated-by: system:serviceaccount:jupyter-k8s-system:jupyter-k8s-controller-manager + creationTimestamp: '2025-11-24T18:43:53Z' + finalizers: + - workspace.jupyter.org/workspace-protection + generation: 1 + labels: + workspace.jupyter.org/access-strategy-name: hyperpod-access-strategy + workspace.jupyter.org/access-strategy-namespace: jupyter-k8s-system + workspace.jupyter.org/template-name: sagemaker-jupyter-template + workspace.jupyter.org/template-namespace: default + name: my-jupyterlab-space + namespace: default + resourceVersion: '353398' + uid: 68ad0a20-8eb1-4c2f-81fd-cf0590bbb743 +spec: + accessStrategy: + name: hyperpod-access-strategy + namespace: jupyter-k8s-system + accessType: Public + appType: jupyterlab + containerConfig: + command: + - /opt/amazon/sagemaker/workspace/bin/entrypoint-workspace-jupyterlab + displayName: my-jupyterlab-space + image: public.ecr.aws/sagemaker/sagemaker-distribution:latest-cpu + nodeSelector: + node.kubernetes.io/instance-type: ml.t3.2xlarge + ownershipType: Public + podSecurityContext: + fsGroup: 1000 + resources: + limits: + cpu: '2' + memory: 8Gi + requests: + cpu: '2' + memory: 8Gi + serviceAccountName: default + storage: + mountPath: /home/sagemaker-user + size: 5Gi + storageClassName: sagemaker-spaces-default-storage-class + templateRef: + name: sagemaker-jupyter-template + volumes: + - mountPath: /fsx + name: fsx-data + persistentVolumeClaimName: fsx-claim +status: + accessResourceSelector: workspace.jupyter.org/workspace-name=my-jupyterlab-space + accessResources: + - apiVersion: traefik.io/v1alpha1 + kind: IngressRoute + name: authorized-route-my-jupyterlab-space + namespace: default + - apiVersion: traefik.io/v1alpha1 + kind: IngressRoute + name: unauthorized-route-my-jupyterlab-space + namespace: default + conditions: + - lastTransitionTime: '2025-11-24T18:44:06Z' + message: Workspace is ready + reason: ResourcesReady + status: 'True' + type: Available + - lastTransitionTime: '2025-11-24T18:44:06Z' + message: Workspace is ready + reason: ResourcesReady + status: 'False' + type: Progressing + - lastTransitionTime: '2025-11-24T18:44:06Z' + message: No errors detected + reason: NoError + status: 'False' + type: Degraded + - lastTransitionTime: '2025-11-24T18:44:06Z' + message: Workspace is running + reason: DesiredStateRunning + status: 'False' + type: Stopped + deploymentName: workspace-my-jupyterlab-space + serviceName: workspace-my-jupyterlab-space-service + +``` + +Let's connect your VSCode IDE to your newly created space, run the following command: +```bash +hyp create hyp-space-access --name $SPACE_NAME --connection-type vscode-remote +``` + +This will print a `JSON`-formatted output to the console that contains a `SpaceConnectionUrl`. Copy this URL to your browser and allow it to open VSCode. This will establish an SSH connection to VSCode server in your HyperPod space. You can now use the space and interactively run workloads on your HyperPod cluster through this IDE. + +![VSCode connected](../images/vscode-connected.png) + +Lastly let's generate a JupyterLab web UI URL (if the web ui has been enabled in your environment): +```bash +hyp create hyp-space-access --name $SPACE_NAME --connection-type web-ui +``` + +This will print a JSON-formatted output to the console that contains a SpaceConnectionUrl. Copy this URL to your browser (or click on it) to open JupyterLab. + +![Web UI](../images/jupyterlab.png) \ No newline at end of file diff --git a/examples/end_to_end_walkthrough/04-spaces/README.md b/examples/end_to_end_walkthrough/04-spaces/README.md new file mode 100644 index 00000000..adfdd38b --- /dev/null +++ b/examples/end_to_end_walkthrough/04-spaces/README.md @@ -0,0 +1,6 @@ +# SageMaker Spaces on HyperPod - Amazon SageMaker HyperPod CLI and SDK Examples + +This folder contains the following files: + +- [**Spaces**](04-spaces/) + - [00-create-space.md](04-spaces/00-create-space.md) - Instructions on how to set up the SageMaker Spaces functionality for hosting IDEs and notebooks on the HyperPod cluster. diff --git a/examples/end_to_end_walkthrough/README.md b/examples/end_to_end_walkthrough/README.md new file mode 100644 index 00000000..2d6f5d6e --- /dev/null +++ b/examples/end_to_end_walkthrough/README.md @@ -0,0 +1,18 @@ +# End-to-End Walkthrough - Amazon SageMaker HyperPod CLI and SDK +This folder contains a full, end-to-end walkthrough of the HyperPod CLI and SDK functionalities for cluster management, training job submission, inference deployments, task governance as well as spaces (IDE) deployment. + +A recording of the full walkthrough as part of re:invent 2025 session 371 is available on [Youtube](https://www.youtube.com/watch?v=6hrqologUZE). + +- [**Getting Started**](./00-getting-started/) + - [00-setup.md](./00-getting-started/00-setup.md) - Instructions on how to install the CLI and setting up prerequisites you should have in your AWS account for running the examples. + - [01-(optional)-cluster-creation.md](./00-getting-started/01-(optional)-cluster-creation.md) - Optional instructions on how to create a new HyperPod cluster using the CLI. Alternatively an existing cluster can be used for the examples, or one can be created using the console UI. +- [**Training Job Submission**](./01-training-job-submission/) + - [00-pytorch-training-job.md](./01-training-job-submission/00-pytorch-training-job.md) - Instructions on how to create and submit a Qwen3 4B Lora fine-tuning job to the HyperPod cluster through the HyperPod CLI. Additionally, an example for instance failure recovery. + - [01-pytorch-training-job-sdk.ipynb](./01-training-job-submission/01-pytorch-training-job-sdk.ipynb) - Instructions on how to to utilize the HyperPod Python SDK to create and submit the equivalent job to the HyperPod cluster. +- [**Inference Deployment**](./02-inference-deployment/) + - [00-jumpstart-endpoint.md](./02-inference-deployment/00-jumpstart-endpoint.md) - Instructions on how to deploy models available on SageMaker JumpStart to the HyperPod cluster. + - [01-custom-model-endpoint.md](./02-inference-deployment/01-custom-model-endpoint.md) - Instructions on how to deploy a custom model from an S3 bucket (TinyLlama) to the HyperPod cluster and how to utilize the autoscaling functionality. +- [**Task Governance**](./03-task-governance/) + - [00-task-governance.md](./03-task-governance/00-task-governance.md) - Instructions on multiple scenarios for task governance, including borrowing idle compute, reclaiming guaranteed compute as well as preempting lower priority tasks. +- [**Spaces**](./04-spaces/) + - [00-create-space.md](./04-spaces/00-create-space.md) - Instructions on how to set up the SageMaker Spaces functionality for hosting IDEs and notebooks on the HyperPod cluster. diff --git a/examples/end_to_end_walkthrough/images/alb-resource-map.png b/examples/end_to_end_walkthrough/images/alb-resource-map.png new file mode 100644 index 00000000..bf688340 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/alb-resource-map.png differ diff --git a/examples/end_to_end_walkthrough/images/cluster-policy-example.png b/examples/end_to_end_walkthrough/images/cluster-policy-example.png new file mode 100644 index 00000000..a8fa714b Binary files /dev/null and b/examples/end_to_end_walkthrough/images/cluster-policy-example.png differ diff --git a/examples/end_to_end_walkthrough/images/dra-console.png b/examples/end_to_end_walkthrough/images/dra-console.png new file mode 100644 index 00000000..30b9b15a Binary files /dev/null and b/examples/end_to_end_walkthrough/images/dra-console.png differ diff --git a/examples/end_to_end_walkthrough/images/dra-s3.png b/examples/end_to_end_walkthrough/images/dra-s3.png new file mode 100644 index 00000000..cba87e36 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/dra-s3.png differ diff --git a/examples/end_to_end_walkthrough/images/hpto_install.png b/examples/end_to_end_walkthrough/images/hpto_install.png new file mode 100644 index 00000000..fec75508 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/hpto_install.png differ diff --git a/examples/end_to_end_walkthrough/images/inference-endpoint-alb.png b/examples/end_to_end_walkthrough/images/inference-endpoint-alb.png new file mode 100644 index 00000000..8e274b04 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/inference-endpoint-alb.png differ diff --git a/examples/end_to_end_walkthrough/images/inference-endpoint.png b/examples/end_to_end_walkthrough/images/inference-endpoint.png new file mode 100644 index 00000000..5243e2a3 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/inference-endpoint.png differ diff --git a/examples/end_to_end_walkthrough/images/jupyterlab.png b/examples/end_to_end_walkthrough/images/jupyterlab.png new file mode 100644 index 00000000..13eefc76 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/jupyterlab.png differ diff --git a/examples/end_to_end_walkthrough/images/spaces-install.png b/examples/end_to_end_walkthrough/images/spaces-install.png new file mode 100644 index 00000000..0737f997 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/spaces-install.png differ diff --git a/examples/end_to_end_walkthrough/images/spaces-success.png b/examples/end_to_end_walkthrough/images/spaces-success.png new file mode 100644 index 00000000..6e7b6644 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/spaces-success.png differ diff --git a/examples/end_to_end_walkthrough/images/task-suspension-console.png b/examples/end_to_end_walkthrough/images/task-suspension-console.png new file mode 100644 index 00000000..04d4f0db Binary files /dev/null and b/examples/end_to_end_walkthrough/images/task-suspension-console.png differ diff --git a/examples/end_to_end_walkthrough/images/task_governance_install.png b/examples/end_to_end_walkthrough/images/task_governance_install.png new file mode 100644 index 00000000..075ce8c0 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/task_governance_install.png differ diff --git a/examples/end_to_end_walkthrough/images/team-quotas.png b/examples/end_to_end_walkthrough/images/team-quotas.png new file mode 100644 index 00000000..893866b5 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/team-quotas.png differ diff --git a/examples/end_to_end_walkthrough/images/vscode-connected.png b/examples/end_to_end_walkthrough/images/vscode-connected.png new file mode 100644 index 00000000..26adec30 Binary files /dev/null and b/examples/end_to_end_walkthrough/images/vscode-connected.png differ