Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions workflows/pipe-common/pipeline/hpc/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def scale_up(self, resource_demands, max_batch_size):
run_id_queue = Queue()
for instance_demand in instance_demands:
thread = threading.Thread(target=self.scale_up_handler.scale_up,
args=(instance_demand.instance, instance_demand.owner, run_id_queue))
args=(instance_demand.instance,
instance_demand.owner,
run_id_queue,
instance_demand.demand))
thread.setDaemon(True)
thread.start()
threads.append(thread)
Expand Down Expand Up @@ -119,7 +122,7 @@ def _update_last_activity_for_currently_running_jobs(self):

class DoNothingScaleUpHandler:

def scale_up(self, instance, owner, run_id_queue):
def scale_up(self, instance, owner, run_id_queue, demand=None):
pass


Expand All @@ -133,7 +136,8 @@ class GridEngineScaleUpHandler:
def __init__(self, cmd_executor, api, grid_engine, launch_adapter, host_storage, parent_run_id, instance_disk,
instance_image, cmd_template, price_type, region_id, queue, hostlist, owner_param_name,
polling_timeout=_POLL_TIMEOUT, polling_delay=_POLL_DELAY,
ge_polling_timeout=_GE_POLL_TIMEOUT, instance_launch_params=None, clock=Clock()):
ge_polling_timeout=_GE_POLL_TIMEOUT, instance_launch_params=None, clock=Clock(),
transfer_requests_to_pipe=False):
"""
Grid engine scale up handler.

Expand Down Expand Up @@ -177,8 +181,9 @@ def __init__(self, cmd_executor, api, grid_engine, launch_adapter, host_storage,
self.ge_polling_timeout = ge_polling_timeout
self.instance_launch_params = instance_launch_params or {}
self.clock = clock
self.transfer_requests_to_pipe = transfer_requests_to_pipe

def scale_up(self, instance, owner, run_id_queue):
def scale_up(self, instance, owner, run_id_queue, demand=None):
"""
Scales up an additional worker.

Expand All @@ -191,7 +196,7 @@ def scale_up(self, instance, owner, run_id_queue):
"""
try:
Logger.info('Scaling up additional worker (%s)...' % instance.name)
run_id = self._launch_additional_worker(instance.name, owner)
run_id = self._launch_additional_worker(instance.name, owner, demand)
run_id_queue.put(run_id)
host = self._retrieve_pod_name(run_id)
self.host_storage.add_host(host)
Expand All @@ -210,11 +215,12 @@ def scale_up(self, instance, owner, run_id_queue):
# On the other hand, some jobs may finish between our checks so the program may stuck until host is filled
# with some task.

def _launch_additional_worker(self, instance, owner):
def _launch_additional_worker(self, instance, owner, demand=None):
Logger.info('Launching additional worker (%s)...' % instance)
instance_dynamic_launch_params = {
self.owner_param_name: owner
}
self._update_capacity_block_params_if_required(instance_dynamic_launch_params, demand)
# todo: Use api client here
pipe_run_command = 'pipe run --yes --quiet ' \
'--instance-disk %s ' \
Expand All @@ -237,6 +243,26 @@ def _launch_additional_worker(self, instance, owner):
Logger.info('Additional worker #%s (%s) has been launched.' % (run_id, instance))
return run_id

def _update_capacity_block_params_if_required(self, params, demand=None):
if not self.transfer_requests_to_pipe:
return
if not demand:
return
try:
if demand.cpu and int(demand.cpu) > 0:
params.update({'CP_CAP_REQUESTS_CPU': demand.cpu})
if demand.gpu and int(demand.gpu) > 0:
params.update({'CP_CAP_REQUESTS_GPU': demand.gpu})
if demand.mem and int(demand.mem) > 0:
params.update({'CP_CAP_REQUESTS_RAM': self._gb_to_bytes(demand.mem)})
except Exception as e:
Logger.warn('Failed to pass requests to pipe: %s' % str(e))
Logger.warn(traceback.format_exc())


def _gb_to_bytes(self, value):
return value * (2 ** 30)

def _parameters_str(self, instance_launch_params):
return ' '.join("{} '{}'".format(key, value) for key, value in instance_launch_params.items())

Expand Down
13 changes: 9 additions & 4 deletions workflows/pipe-common/pipeline/hpc/instance/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ class InstanceSelectionError(RuntimeError):

class InstanceDemand:

def __init__(self, instance, owner):
def __init__(self, instance, owner, demand=None):
"""
Execution instance demand.
"""
self.instance = instance
self.owner = owner
self.demand = demand

@property
def _dict_(self):
return {k: v for k, v in self.__dict__.items() if k != 'demand'}

def __eq__(self, other):
return self.__dict__ == other.__dict__
return self._dict_ == other._dict_

def __repr__(self):
return str(self.__dict__)
return str(self._dict_)


class GridEngineInstanceSelector:
Expand Down Expand Up @@ -92,7 +97,7 @@ def select(self, demands):
best_demand.mem, best_supply.mem,
best_demand.exc, best_supply.exc,
best_supply.owner))
yield InstanceDemand(best_instance, best_supply.owner)
yield InstanceDemand(best_instance, best_supply.owner, demand=best_demand)

def _apply(self, demands, supply):
remaining_supply = supply
Expand Down
3 changes: 3 additions & 0 deletions workflows/pipe-common/pipeline/hpc/param.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def __init__(self):
self.kube_grid_engine = GridEngineParameter(
name='CP_CAP_KUBE', type=PARAM_BOOL, default=False,
help='Enables Kubernetes grid engine.')
self.transfer_requests_to_pipe = GridEngineParameter(
name='CP_CAP_AUTOSCALE_MAP_REQUESTS_TO_PIPE', type=PARAM_BOOL, default=False,
help='Transfers consumable resources to pipe command parameters.')


class GridEngineParameters(GridEngineParametersGroup):
Expand Down
6 changes: 5 additions & 1 deletion workflows/pipe-common/scripts/autoscale_grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def get_daemon():
queue_gpu_resource_name = params.queue.gpu_resource_name.get()
queue_mem_resource_name = params.queue.mem_resource_name.get()
queue_exc_resource_name = params.queue.exc_resource_name.get()
transfer_requests_to_pipe = params.queue.transfer_requests_to_pipe.get()

host_storage_file = os.path.join(cluster_work_dir, '.autoscaler.%s.storage' % queue_name)
host_storage_static_file = os.path.join(cluster_work_dir, '.autoscaler.%s.static.storage' % queue_name)
Expand Down Expand Up @@ -542,6 +543,8 @@ def get_daemon():
instance_supply.exc)
for instance, instance_supply
in zip(instances, instance_supplies))))
if transfer_requests_to_pipe:
Logger.info('Enabled transferring requests to pipe.')

instance_launch_params = fetch_instance_launch_params(api, launch_adapter, cluster_master_run_id,
instance_inheritable_params,
Expand All @@ -564,7 +567,8 @@ def get_daemon():
polling_delay=scale_up_polling_delay,
polling_timeout=scale_up_polling_timeout,
instance_launch_params=instance_launch_params,
clock=clock)
clock=clock,
transfer_requests_to_pipe=transfer_requests_to_pipe)
if dry_run:
scale_up_handler = DoNothingScaleUpHandler()
scale_up_orchestrator = GridEngineScaleUpOrchestrator(scale_up_handler=scale_up_handler,
Expand Down