Skip to content

[Data] Add resource based actor autoscaler#748

Open
zzchun wants to merge 1 commit intomasterfrom
ant-ray-autoscaler
Open

[Data] Add resource based actor autoscaler#748
zzchun wants to merge 1 commit intomasterfrom
ant-ray-autoscaler

Conversation

@zzchun
Copy link
Collaborator

@zzchun zzchun commented Jan 16, 2026

Description

This PR implements a resource-based actor autoscaler to improve resource management in Ray Data.

  • Created a new ResourceBasedActorAutoscaler class extending DefaultActorAutoscaler
  • Implemented functionality to automatically calculate actor pool sizes based on job-level resource configurations

Features

  • Resource-Aware Scaling: The new ResourceBasedActorAutoscaler can automatically adjust actor pool sizes based on job-level resource limits
  • Intelligent Resource Distribution: Intelligently distributes resources among multiple pools based on each actor pool's resource requirements and weights
  • Multi-Resource Type Support: Supports independent management and allocation of CPU, GPU, and memory resources
  • Pluggable Strategies: Introduces pluggable actor removal strategies, allowing flexible customization of scaling behaviors
  • Weight Calculation: Dynamically calculates weights based on actor pool utilization for more reasonable resource allocation

Main Changes

  • Added resource_based_actor_autoscaler.py file containing core autoscaling logic
  • Added pluggable actor removal strategy implementation
  • Added related configuration options in DataContext
  • Added comprehensive unit tests

Signed-off-by: will <zzchun8@gmail.com>
Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a resource-based actor autoscaler and a pluggable actor removal strategy, which are significant enhancements for managing resources in Ray Data. The implementation is well-structured and includes comprehensive tests for the new functionality.

My review has identified a couple of areas for improvement:

  • There is some duplicated code in the new ResourceBasedActorAutoscaler that could be refactored for better maintainability.
  • A constant appears to be defined twice in DataContext, which should be corrected.

Overall, this is a solid contribution. Addressing these points will further improve the code quality.

Comment on lines +226 to +228
DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR = env_integer(
"RAY_DATA_ACTOR_DEFAULT_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR", 2
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This constant DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR appears to be a duplicate. It is already defined on line 217. Please remove this redundant definition.

Comment on lines +157 to +246
def _calculate_pool_min_resources(
self,
actor_pool: "AutoscalingActorPool",
per_actor_resources: ExecutionResources,
weight_ratio: float,
all_pools: list,
pool_weights: Dict,
pool_resource_needs: Dict,
) -> ExecutionResources:
"""Calculate min resources for a single pool"""
pool_min_cpu = (
self._job_min_resources.cpu * weight_ratio
if per_actor_resources.cpu > 0
else 0
)

# GPU allocation: Only allocate to pools that need GPUs
if per_actor_resources.gpu > 0:
gpu_pools = [p for p in all_pools if pool_resource_needs[p].gpu > 0]
if gpu_pools:
gpu_total_weight = sum(pool_weights[p] for p in gpu_pools)
gpu_weight_ratio = (
pool_weights[actor_pool] / gpu_total_weight
if gpu_total_weight > 0
else 1.0 / len(gpu_pools)
)
pool_min_gpu = self._job_min_resources.gpu * gpu_weight_ratio
else:
pool_min_gpu = 0
else:
pool_min_gpu = 0

# Memory allocation
pool_min_memory = (
self._job_min_resources.memory * weight_ratio
if per_actor_resources.memory > 0
else 0
)

return ExecutionResources(
cpu=pool_min_cpu,
gpu=pool_min_gpu,
memory=pool_min_memory,
)

def _calculate_pool_max_resources(
self,
actor_pool: "AutoscalingActorPool",
per_actor_resources: ExecutionResources,
weight_ratio: float,
all_pools: list,
pool_weights: Dict,
pool_resource_needs: Dict,
) -> ExecutionResources:
"""Calculate max resources for a single pool"""
# CPU allocation
pool_max_cpu = (
self._job_max_resources.cpu * weight_ratio
if per_actor_resources.cpu > 0
else 0
)

# GPU allocation: Only allocate to pools that need GPUs
if per_actor_resources.gpu > 0:
gpu_pools = [p for p in all_pools if pool_resource_needs[p].gpu > 0]
if gpu_pools:
gpu_total_weight = sum(pool_weights[p] for p in gpu_pools)
gpu_weight_ratio = (
pool_weights[actor_pool] / gpu_total_weight
if gpu_total_weight > 0
else 1.0 / len(gpu_pools)
)
pool_max_gpu = self._job_max_resources.gpu * gpu_weight_ratio
else:
pool_max_gpu = 0
else:
pool_max_gpu = 0

# Memory allocation
pool_max_memory = (
self._job_max_resources.memory * weight_ratio
if per_actor_resources.memory > 0
else 0
)

return ExecutionResources(
cpu=pool_max_cpu,
gpu=pool_max_gpu,
memory=pool_max_memory,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The methods _calculate_pool_min_resources and _calculate_pool_max_resources contain a lot of duplicated logic. To improve maintainability and reduce redundancy, you can extract the common logic into a single helper method that takes the job-level resources (min or max) as an argument.

Here's a suggested refactoring:

    def _calculate_pool_resources(
        self,
        job_resources: ExecutionResources,
        actor_pool: "AutoscalingActorPool",
        per_actor_resources: ExecutionResources,
        weight_ratio: float,
        all_pools: list,
        pool_weights: Dict,
        pool_resource_needs: Dict,
    ) -> ExecutionResources:
        """Calculate resources for a single pool based on job-level resources."""
        pool_cpu = (
            job_resources.cpu * weight_ratio
            if per_actor_resources.cpu > 0
            else 0
        )

        # GPU allocation: Only allocate to pools that need GPUs
        if per_actor_resources.gpu > 0:
            gpu_pools = [p for p in all_pools if pool_resource_needs[p].gpu > 0]
            if gpu_pools:
                gpu_total_weight = sum(pool_weights[p] for p in gpu_pools)
                gpu_weight_ratio = (
                    pool_weights[actor_pool] / gpu_total_weight
                    if gpu_total_weight > 0
                    else 1.0 / len(gpu_pools)
                )
                pool_gpu = job_resources.gpu * gpu_weight_ratio
            else:
                pool_gpu = 0
        else:
            pool_gpu = 0

        # Memory allocation
        pool_memory = (
            job_resources.memory * weight_ratio
            if per_actor_resources.memory > 0
            else 0
        )

        return ExecutionResources(
            cpu=pool_cpu,
            gpu=pool_gpu,
            memory=pool_memory,
        )

    def _calculate_pool_min_resources(
        self,
        actor_pool: "AutoscalingActorPool",
        per_actor_resources: ExecutionResources,
        weight_ratio: float,
        all_pools: list,
        pool_weights: Dict,
        pool_resource_needs: Dict,
    ) -> ExecutionResources:
        """Calculate min resources for a single pool"""
        return self._calculate_pool_resources(
            self._job_min_resources,
            actor_pool,
            per_actor_resources,
            weight_ratio,
            all_pools,
            pool_weights,
            pool_resource_needs,
        )

    def _calculate_pool_max_resources(
        self,
        actor_pool: "AutoscalingActorPool",
        per_actor_resources: ExecutionResources,
        weight_ratio: float,
        all_pools: list,
        pool_weights: Dict,
        pool_resource_needs: Dict,
    ) -> ExecutionResources:
        """Calculate max resources for a single pool"""
        return self._calculate_pool_resources(
            self._job_max_resources,
            actor_pool,
            per_actor_resources,
            weight_ratio,
            all_pools,
            pool_weights,
            pool_resource_needs,
        )

@ffbin ffbin self-requested a review January 16, 2026 11:31
logger = logging.getLogger(__name__)

# Default maximum pool size
DEFAULT_MAX_POOL_SIZE = 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 100?


logger = logging.getLogger(__name__)

# Default maximum pool size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Default maximum pool size.

config: AutoscalingConfig,
):
super().__init__(topology, resource_manager, config=config)
# job-level resource limits
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Job-level resource limits.

"""

if min_resources is not None and max_resources is not None:
# Check CPU
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale label Jan 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants