Fix executor placement in the same cluster in Dynamic client mode.#104
Conversation
…mic client mode Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
… dynamic client mode Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
…luster mode Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
…xecutors Scale-up executors submitted via validateAndSubmitExecutorJobs were using getGangCardinality (minExecutors) instead of the actual batch count. This caused Armada to wait for pods that would never arrive when batch size differed from minExecutors. Pass gangCardinalityOverride=Some(executorCount) only in the scale-up path so initial submissions still use the mode default. Also split assertGangJobForDynamic into two assertions: env vars for initial gang executors and annotations for all executors including scale-up. Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
…on mode Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Skip gang annotations for scale-up batches once gang attributes are captured, add isReadyToAllocateMore guard to prevent scale-up executors from landing on a different cluster, and tune dynamic allocation defaults. Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
…flag Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Rename parameters for clarity, assert gang annotations only on initial batch, and verify scale-up pods have node selectors instead. Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
- Use CAS as sole guard in captureGangAttributes so only the winning
thread writes to SparkConf
- Replace sys.env.contains("ARMADA_JOB_SET_ID") with
DeploymentModeHelper.isDriverInCluster in isReadyToAllocateMore
and start()
- Check nodeValue.nonEmpty in getGangNodeSelector to prevent empty
node selector values
- Add default for ARMADA_NODE_UNIFORMITY_LABEL in submit script
- DRY cleanup in seedGangAttributesFromEnv
- Add unit tests for isReadyToAllocateMore and assume() guard for
env-dependent test
Signed-off-by:
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Revert modeHelper.isDriverInCluster back to sys.env.contains("ARMADA_JOB_SET_ID")
for cluster mode detection in start() and isReadyToAllocateMore. The env var is
only present in cluster-mode driver pods, making it a reliable runtime indicator.
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
| --conf spark.dynamicAllocation.initialExecutors=1 | ||
| --conf spark.dynamicAllocation.minExecutors=2 | ||
| --conf spark.dynamicAllocation.maxExecutors=10 | ||
| --conf spark.dynamicAllocation.initialExecutors=2 |
There was a problem hiding this comment.
This needs to be at least 2, otherwise armada wont gang schedule them
There was a problem hiding this comment.
So you don't have to address it in this ticket, but one of our requirements is that dynamic client mode supports 0 min executors so that it puts no load on armada when none is needed.
There was a problem hiding this comment.
Pull request overview
This pull request fixes dynamic client mode in Spark-on-Armada by enabling the driver to discover gang node selector attributes at runtime from executors. In dynamic client mode, the driver runs outside the Kubernetes cluster and lacks the ARMADA_GANG_* environment variables that Armada injects into pods, which previously caused scale-up executors to land on arbitrary nodes rather than the same cluster as the initial gang.
Changes:
- Implements runtime gang attribute capture via executor registration RPC interception, storing discovered node selector labels in SparkConf for use in subsequent executor allocations
- Adds entrypoint wrapper that forwards
ARMADA_GANG_*environment variables asSPARK_EXECUTOR_ATTRIBUTE_*so executors can relay them to the driver - Sets gang cardinality to 0 for scale-up batches while relying on node selectors instead of full gang scheduling to reduce overhead
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala | Adds gang attribute capture from executor registration and environment seeding; implements allocation gating via isReadyToAllocateMore |
| src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaExecutorAllocator.scala | Gates scale-up allocation until gang attributes are captured in client mode with nodeUniformity |
| src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala | Implements gang cardinality override for scale-up batches and gang node selector retrieval from SparkConf |
| src/main/scala/org/apache/spark/deploy/armada/Config.scala | Adds internal config keys for storing captured gang node label name and value |
| docker/armada-entrypoint.sh | New wrapper script that forwards ARMADA_GANG_* env vars as SPARK_EXECUTOR_ATTRIBUTE_* for executor-to-driver communication |
| docker/Dockerfile | Updates entrypoint to use armada-entrypoint.sh wrapper |
| src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackendSuite.scala | Unit tests for captureGangAttributes, seedGangAttributesFromEnv, and isReadyToAllocateMore |
| src/test/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplicationSuite.scala | Unit tests for getGangNodeSelector |
| src/test/scala/org/apache/spark/deploy/armada/e2e/E2ETestBuilder.scala | Updates gang assertion logic for dynamic allocation with separate checks for env vars, annotations, and node selectors |
| src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaSparkE2E.scala | Adds dynamicClient e2e test and updates dynamicCluster test with new assertion parameters |
| scripts/submitArmadaSpark.sh | Adjusts dynamic allocation parameters and adds gang scheduling configuration |
| CLAUDE.md | Updates ArmadaClusterManagerBackend documentation to reflect gang attribute capture feature |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/test/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplicationSuite.scala
Outdated
Show resolved
Hide resolved
src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala
Outdated
Show resolved
Hide resolved
src/test/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackendSuite.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala
Outdated
Show resolved
Hide resolved
src/main/scala/org/apache/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
| "conf.set(\"spark.armada.executor.limit.memory\", \"1Gi\")\n", | ||
| "conf.set(\"spark.armada.executor.request.memory\", \"1Gi\")" | ||
| ] | ||
| "source": "# Spark Configuration\nconf = SparkConf()\nif auth_token:\n conf.set(\"spark.armada.auth.token\", auth_token)\nif auth_script_path:\n conf.set(\"spark.armada.auth.script.path\", auth_script_path)\nif not driver_host:\n raise ValueError(\n \"SPARK_DRIVER_HOST environment variable is required. \"\n )\nconf.set(\"spark.master\", armada_master)\nconf.set(\"spark.submit.deployMode\", \"client\")\nconf.set(\"spark.app.id\", app_id)\nconf.set(\"spark.app.name\", \"jupyter-spark-pi\")\nconf.set(\"spark.driver.bindAddress\", \"0.0.0.0\")\nconf.set(\"spark.driver.host\", driver_host)\nconf.set(\"spark.driver.port\", driver_port)\nconf.set(\"spark.driver.blockManager.port\", block_manager_port)\nconf.set(\"spark.home\", \"/opt/spark\")\nconf.set(\"spark.armada.container.image\", image_name)\nconf.set(\"spark.armada.queue\", armada_queue)\nconf.set(\"spark.armada.scheduling.namespace\", armada_namespace)\nconf.set(\"spark.armada.eventWatcher.useTls\", event_watcher_use_tls)\nconf.set(\"spark.kubernetes.file.upload.path\", \"/tmp\")\nconf.set(\"spark.kubernetes.executor.disableConfigMap\", \"true\")\nconf.set(\"spark.local.dir\", \"/tmp\")\nconf.set(\"spark.jars\", armada_jar)\n\n# Network timeouts\nconf.set(\"spark.network.timeout\", \"800s\")\nconf.set(\"spark.executor.heartbeatInterval\", \"60s\")\n\n# Resource limits\nconf.set(\"spark.armada.driver.limit.memory\", \"1Gi\")\nconf.set(\"spark.armada.driver.request.memory\", \"1Gi\")\nconf.set(\"spark.armada.executor.limit.memory\", \"1Gi\")\nconf.set(\"spark.armada.executor.request.memory\", \"1Gi\")\n\n# Allocation mode configuration\nif allocation_mode == 'dynamic':\n # Dynamic allocation - executors scale based on workload\n # Gang scheduling ensures all executors land on the same Armada cluster\n conf.set(\"spark.armada.scheduling.nodeUniformity\", node_uniformity_label)\n conf.set(\"spark.dynamicAllocation.enabled\", \"true\")\n conf.set(\"spark.dynamicAllocation.minExecutors\", \"2\")\n conf.set(\"spark.dynamicAllocation.maxExecutors\", \"10\")\n conf.set(\"spark.dynamicAllocation.initialExecutors\", \"2\") # Must be >= 2 for gang scheduling\n conf.set(\"spark.dynamicAllocation.executorIdleTimeout\", \"60s\")\n conf.set(\"spark.dynamicAllocation.schedulerBacklogTimeout\", \"5s\")\n print(f\"Using dynamic allocation (min=2, max=10, initial=2)\")\nelse:\n # Static allocation - fixed number of executors\n conf.set(\"spark.executor.instances\", \"2\")\n print(f\"Using static allocation (instances=2)\")" |
There was a problem hiding this comment.
the previous version had a setting per line making them much easier to read. Is there any reason not to maintain that convention?
|
In the "how to verify" section, I'm not sure what steps 3,4 and 5 are telling us to do:? |
|
The deploymentmode helper is meant to hide a lot of the implementation differences between client and cluster mode. It feels like the latest diffs allow some of that abstraction to leak out of the deploymode helper. I've entered Claude's opinion below. What do you think?: The core issueDeploymentModeHelper has no gang scheduling abstractions at all. The gang attribute capture/propagation logic is spread across three classes that each perform their own deploy-mode branching: 1. ArmadaClusterManagerBackend — the biggest offender
2. ArmadaClientApplication — cross-component coupling
3. ArmadaExecutorAllocator — indirect leakage
4. Config.scala — the glue
What's missing from DeploymentModeHelperThe helper currently has methods like
The pattern of |
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
This is actually how I verified manually. Updated the PR description |
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
| .assertGangJobForDynamic( | ||
| "armada-spark", | ||
| 3 | ||
| ) // at least 3 executor pods (2 min + 1 scaled) with gang annotations seen |
There was a problem hiding this comment.
I find this comment helpful. Does it need to be removed?
There was a problem hiding this comment.
Is it no longer correct?
There was a problem hiding this comment.
The name of the assertion is more descriptive now, but yeah, better to have the comment, adding it back.
| ): Map[String, String] = { | ||
| val modeHelper = DeploymentModeHelper(conf) | ||
| val gangCardinality = modeHelper.getGangCardinality | ||
| val gangCardinality = gangCardinalityOverride.getOrElse(modeHelper.getGangCardinality) |
There was a problem hiding this comment.
Shouldn't this be handled in the DeploymentModeHelper as well?
(All the rest of the changes to the DeploymentModeHelper look real good to me, but it seems like this should go as well.)
| "- `dynamic`: Executors scale based on workload (recommended for interactive use)\n", | ||
| "\n", | ||
| "**Important for dynamic mode:** \n", | ||
| "- `initialExecutors` must be >= 2 for gang scheduling to work properly\n", |
There was a problem hiding this comment.
Does an error get generated if initialExecutors is < 2?
There was a problem hiding this comment.
Armada only injects ARMADA_GANG_NODE_UNIFORMITY_LABEL_NAME/VALUE env vars when the gang has >= 2 members.
As a result, the scale-up does not work. Should we do some sort of config validation to enforce initialExecutors>=2 for dynamic allocation?
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
7d6ad0f to
d71c8c2
Compare
Signed-off-by: Sudipto Baral <sudiptobaral.me@gmail.com>
closes G-Research/spark#152
Why: In dynamic client mode, the driver runs outside the Kubernetes cluster and lacks the
ARMADA_GANG_*environment variables that Armada injects into pods. Without these, scale-up executors have no way to target the same cluster as the initial gang, causing them to land on arbitrary nodes. This change lets the driver learn gang attributes at runtime from the first executor and apply them as node selectors to all subsequent allocations.Changes:
captureGangAttributesinArmadaClusterManagerBackendto interceptRegisterExecutorRPCs and extract gang node selector attributes from the first executorseedGangAttributesFromEnvfor cluster mode, where the driver pod already hasARMADA_GANG_*env vars at startupArmadaExecutorAllocatorviaisReadyToAllocateMore, blocking until gang attributes are captured in client mode withnodeUniformitysys.env.contains("ARMADA_JOB_SET_ID")for runtime cluster-mode detection instart()to prevent double executor submissionArmadaClientApplication, relying on node selectors instead of full gang schedulingarmada-entrypoint.shDocker wrapper that forwardsARMADA_GANG_*env vars asSPARK_EXECUTOR_ATTRIBUTE_*so executors relay them during registrationspark.armada.internal.gangNodeLabelNameandspark.armada.internal.gangNodeLabelValueto store captured attributes inSparkConfTests:
captureGangAttributes,seedGangAttributesFromEnv, andisReadyToAllocateMoreinArmadaClusterManagerBackendSuiteArmadaClientApplicationSuitedynamicClusteranddynamicClientgang scheduling with scale-up assertionsManual tests:
mvn test— all 139 unit tests passmvn clean package,./scripts/CreateImage.sh,./scripts/dev-e2e.sh