From f856b96b31a76ccefaad18fd9c8b430e01eaeec1 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Thu, 18 Nov 2021 15:40:43 +0100 Subject: [PATCH 01/19] Add check threshold mechanism in easl logic Currently something going wrong with the produce_data value --- tensorflow/core/data/service/BUILD | 16 ++++ .../core/data/service/dispatcher_impl.cc | 7 ++ .../data/service/easl/local_decision_utils.cc | 75 +++++++++++++++++++ .../data/service/easl/local_decision_utils.h | 34 +++++++++ tensorflow/core/protobuf/service_config.proto | 2 + .../data/experimental/service/server_lib.py | 12 ++- 6 files changed, 142 insertions(+), 4 deletions(-) create mode 100644 tensorflow/core/data/service/easl/local_decision_utils.cc create mode 100644 tensorflow/core/data/service/easl/local_decision_utils.h diff --git a/tensorflow/core/data/service/BUILD b/tensorflow/core/data/service/BUILD index baad904a1097f1..c52b4485509bd2 100644 --- a/tensorflow/core/data/service/BUILD +++ b/tensorflow/core/data/service/BUILD @@ -295,6 +295,7 @@ cc_library( ], deps = [ ":cache_utils", + ":local_decision_utils", ":common", ":common_proto_cc", ":credentials_factory", @@ -938,6 +939,21 @@ cc_library( ], ) +cc_library( + name = "local_decision_utils", + srcs = ["easl/local_decision_utils.cc"], + hdrs = [ + "easl/local_decision_utils.h", + ], + deps = [ + ":common_proto_cc", + ":dispatcher_state", + ":metadata_store", + "//tensorflow/core:lib", + "@com_google_absl//absl/strings", + ], +) + cc_library( name = "cache_model", srcs = ["easl/cache_model.cc"], diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 19938a8a5f8375..94af527c865e5d 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -44,6 +44,7 @@ limitations under the License. #include "tensorflow/core/data/service/worker.grpc.pb.h" #include "tensorflow/core/data/service/easl/cache_utils.h" #include "tensorflow/core/data/service/easl/metadata_store.h" +#include "tensorflow/core/data/service/easl/local_decision_utils.h" #include "tensorflow/core/data/standalone.h" #include "tensorflow/core/framework/dataset.h" #include "tensorflow/core/framework/graph.pb.h" @@ -900,6 +901,12 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(0) << "EASL - Scalability decision for dataset_key " << compute_dataset_key << ": " << worker_count; + bool if_use_local_workers = false; + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( + config_, metadata_store_, compute_dataset_key, if_use_local_workers + )); + VLOG(0) << "EASL-MUYU (CreateJob) - Check Local Worker Policy: " << if_use_local_workers; + // EASL add job entry to metadata store std::string dataset_key = service::easl::cache_utils::DatasetKey( dataset->dataset_id, dataset->fingerprint, job_type); diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc new file mode 100644 index 00000000000000..4bd77d5354f87e --- /dev/null +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -0,0 +1,75 @@ +// +// Created by Muyu Li on 16.11.21. +// + +#include "local_decision_utils.h" + +namespace tensorflow { +namespace data { +namespace service { +namespace easl { +namespace local_decision { + +Status DecideIfLocal( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + bool& if_local) { + using NodeMetrics = ::tensorflow::data::easl::NodeMetrics; + using ModelMetrics = ::tensorflow::data::easl::ModelMetrics; + + // Check if we have any metrics for this dataset + std::shared_ptr job_metrics; + Status s = metadata_store.GetInputPipelineMetricsByDatasetKey( + dataset_key, job_metrics); + + // We do not yet have the metrics for this dataset --> use 1 worker + if(errors::IsNotFound(s)) { + VLOG(0) << "(DecideIfLocal) No metrics found for dataset, will use normal mode"; + if_local = false; + return Status::OK(); + } else if (!s.ok()) { + VLOG(0) << "(DecideIfLocal) Another error has been thrown: " << s; + return s; + } + + // Pipeline stats: last TF node metrics + std::shared_ptr last_tf_node_metrics; + s = metadata_store.GetLastTFNodeMetricsByDatasetKey( + dataset_key, last_tf_node_metrics); + if (!s.ok()) { + VLOG(0) << "(DecideIfLocal) Failed to get the last TF node metrics"; + return s; + } + + int64_t total_bytes_produced = 0, total_num_elements = 0; + for (std::pair> e : + last_tf_node_metrics->metrics_) { + std::shared_ptr node_metrics = e.second; + total_bytes_produced += node_metrics->bytes_produced(); + total_num_elements += node_metrics->num_elements(); + } + + double avg_bytes_per_element = (double)total_bytes_produced / total_num_elements; + VLOG(0) << "(DecideIfLocal) Total bytes produced: " << total_bytes_produced << "\n" + << "Total num elements: " << total_num_elements << "\n" + << "Avg bytes produced per element: " << avg_bytes_per_element << "\n" + << "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_thres() << "\n"; + + if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_thres()) { + if_local = true; + VLOG(0) << "(DecideIfLocal-decision) using local workers\n"; + } + else { + if_local = false; + VLOG(0) << "(DecideIfLocal-decision) using default worker set "; + } + + return Status::OK(); +} + +} // namespace local_decision +} // namespace easl +} // namespace service +} // namespace data +} // namespace tensorflow \ No newline at end of file diff --git a/tensorflow/core/data/service/easl/local_decision_utils.h b/tensorflow/core/data/service/easl/local_decision_utils.h new file mode 100644 index 00000000000000..e40d44b19cecfd --- /dev/null +++ b/tensorflow/core/data/service/easl/local_decision_utils.h @@ -0,0 +1,34 @@ +// +// Created by Muyu Li on 16.11.21. +// + +#ifndef ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H +#define ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H + +#include +#include "tensorflow/core/platform/default/integral_types.h" +#include "tensorflow/core/data/service/easl/metadata_store.h" +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/data/service/common.pb.h" +#include "tensorflow/core/data/service/dispatcher_state.h" +#include "tensorflow/core/protobuf/service_config.pb.h" + +namespace tensorflow { +namespace data { +namespace service { +namespace easl { +namespace local_decision { + +Status DecideIfLocal( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + bool& if_local); + +} // namespace local_decision +} // namespace easl +} // namespace service +} // namespace data +} // namespace tensorflow + +#endif //ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H diff --git a/tensorflow/core/protobuf/service_config.proto b/tensorflow/core/protobuf/service_config.proto index a3deccf495fbd2..65fb41e901b1e4 100644 --- a/tensorflow/core/protobuf/service_config.proto +++ b/tensorflow/core/protobuf/service_config.proto @@ -59,6 +59,8 @@ message DispatcherConfig { // The interval at which the dispatcher should dump log files. int64 log_dumps_interval_ms = 14; + // MUYU's modification + int64 avg_bytes_per_element_local_thres = 17; } // Configuration for a tf.data service WorkerServer. diff --git a/tensorflow/python/data/experimental/service/server_lib.py b/tensorflow/python/data/experimental/service/server_lib.py index 7ba729b98c217e..f14cf38eab3629 100644 --- a/tensorflow/python/data/experimental/service/server_lib.py +++ b/tensorflow/python/data/experimental/service/server_lib.py @@ -46,7 +46,7 @@ class DispatcherConfig( "port", "protocol", "work_dir", "fault_tolerant_mode", "worker_addresses", "job_gc_check_interval_ms", "job_gc_timeout_ms", "cache_policy", "cache_format", "cache_compression", "cache_ops_parallelism", "cache_path", - "scaling_policy", "log_dir", "log_dumps_interval_ms" + "scaling_policy", "log_dir", "log_dumps_interval_ms", "avg_bytes_per_element_local_thres" ])): """Configuration class for tf.data service dispatchers. @@ -109,7 +109,9 @@ def __new__(cls, cache_path="./outputs", scaling_policy=1, log_dir="", - log_dumps_interval_ms=None): + log_dumps_interval_ms=None, + avg_bytes_per_element_local_thres=1000 + ): if protocol is None: protocol = _pywrap_utils.TF_DATA_DefaultProtocol() job_gc_check_interval_ms = _get_time_or_placeholder( @@ -124,7 +126,7 @@ def __new__(cls, fault_tolerant_mode, worker_addresses, job_gc_check_interval_ms, job_gc_timeout_ms, cache_policy, cache_format, cache_compression, cache_ops_parallelism, cache_path, scaling_policy, - log_dir, log_dumps_interval_ms) + log_dir, log_dumps_interval_ms, avg_bytes_per_element_local_thres) @tf_export("data.experimental.service.DispatchServer", v1=[]) @@ -202,7 +204,9 @@ def __init__(self, config=None, start=True): cache_path=config.cache_path, scaling_policy=config.scaling_policy, log_dir=config.log_dir, - log_dumps_interval_ms=config.log_dumps_interval_ms) + log_dumps_interval_ms=config.log_dumps_interval_ms, + avg_bytes_per_element_local_thres=config.avg_bytes_per_element_local_thres, + ) self._server = _pywrap_server_lib.TF_DATA_NewDispatchServer( config_proto.SerializeToString()) if start: From f9425e9c584cda92c8954acc099ed189d5228fc2 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Thu, 2 Dec 2021 07:38:44 +0100 Subject: [PATCH 02/19] Control local selection from client side --- tensorflow/core/data/service/dispatcher.proto | 2 + .../core/data/service/dispatcher_impl.cc | 10 ++++ .../core/data/service/dispatcher_state.cc | 3 +- .../core/data/service/dispatcher_state.h | 8 +++- tensorflow/core/data/service/journal.proto | 1 + .../experimental/data_service_dataset_op.cc | 48 +++++++++++++++++-- 6 files changed, 64 insertions(+), 8 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher.proto b/tensorflow/core/data/service/dispatcher.proto index 51018181ac1829..46ab8e0638a325 100644 --- a/tensorflow/core/data/service/dispatcher.proto +++ b/tensorflow/core/data/service/dispatcher.proto @@ -196,6 +196,8 @@ message ClientHeartbeatResponse { } // Whether the job has finished. bool job_finished = 2; + // EASL: to check whether we should use local workers (based on last epoch's metrics) + bool if_use_local_workers = 4; } // Next tag: 3 diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 94af527c865e5d..62604fde09abe6 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -928,6 +928,9 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_job_type(job_type); create_job->set_worker_count(worker_count); create_job->set_num_split_providers(num_split_providers); + + create_job->set_if_use_local_workers(if_use_local_workers); + if (request.has_job_key()) { NamedJobKeyDef* key = create_job->mutable_named_job_key(); key->set_name(request.job_key().job_name()); @@ -940,6 +943,10 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_target_workers(request.target_workers()); TF_RETURN_IF_ERROR(Apply(update)); TF_RETURN_IF_ERROR(state_.JobFromId(job_id, job)); + + VLOG(1) << "EASL-MUYU(DataServiceDispatcherImpl::CreateJob) if_use_local_workers flag is set: " + << job->if_use_local_workers; + return Status::OK(); } @@ -1224,6 +1231,9 @@ Status DataServiceDispatcherImpl::ClientHeartbeat( task_info->set_starting_round(task->starting_round); } response->set_job_finished(job->finished); + + response->set_if_use_local_workers(job->if_use_local_workers); + VLOG(4) << "Found " << response->task_info_size() << " tasks for job client id " << request->job_client_id(); return Status::OK(); diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index d718f67e9b59dc..f2c4bbbf8abaf3 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -133,7 +133,8 @@ void DispatcherState::CreateJob(const CreateJobUpdate& create_job) { create_job.processing_mode_def(), create_job.num_split_providers(), named_job_key, num_consumers, create_job.target_workers(), - create_job.job_type(), create_job.worker_count()); + create_job.job_type(), create_job.worker_count(), + create_job.if_use_local_workers()); DCHECK(!jobs_.contains(job_id)); jobs_[job_id] = job; diff --git a/tensorflow/core/data/service/dispatcher_state.h b/tensorflow/core/data/service/dispatcher_state.h index 6f1427872a1910..b74fe49a22bea8 100644 --- a/tensorflow/core/data/service/dispatcher_state.h +++ b/tensorflow/core/data/service/dispatcher_state.h @@ -150,7 +150,8 @@ class DispatcherState { absl::optional num_consumers, TargetWorkers target_workers, const std::string& job_type, - int64 worker_count) + int64 worker_count, + bool if_use_local_workers = false) : job_id(job_id), dataset_id(dataset_id), processing_mode(processing_mode), @@ -158,7 +159,8 @@ class DispatcherState { num_consumers(num_consumers), job_type(job_type), worker_count(worker_count), - target_workers(target_workers) { + target_workers(target_workers), + if_use_local_workers(if_use_local_workers){ if (IsDynamicShard(processing_mode)) { distributed_epoch_state = DistributedEpochState(num_split_providers); } @@ -190,6 +192,8 @@ class DispatcherState { // EASL const std::string job_type; const int64 worker_count; + // EASL: indicate whether this job should be processed locally + const bool if_use_local_workers; }; struct Task { diff --git a/tensorflow/core/data/service/journal.proto b/tensorflow/core/data/service/journal.proto index 47f1c07c07ea45..f80bb89901e815 100644 --- a/tensorflow/core/data/service/journal.proto +++ b/tensorflow/core/data/service/journal.proto @@ -66,6 +66,7 @@ message CreateJobUpdate { // EASL string job_type = 13; // i.e read, write, cache int64 worker_count = 14; // determined by elasticity policy + bool if_use_local_workers = 15; // decided between epochs } // Next tag: 5 diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 75a0edf59be3ea..31ac623c7a1bcf 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -796,7 +796,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { if (it == task_id_to_task.end()) { continue; } - if (!ShouldReadFromTask(task)) { + if (!ShouldReadFromTask(task, resp.if_use_local_workers())) { VLOG(3) << "Skipping untargeted worker task " << task.task_id(); should_finish_job_ = false; continue; @@ -810,7 +810,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } - bool ShouldReadFromTask(const TaskInfo& task) const + bool ShouldReadFromTask(const TaskInfo& task, const bool if_use_local_workers) const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { if (StrictRoundRobin()) { return true; @@ -818,9 +818,47 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { const bool is_local_task = (LocalWorkers::Get(task.worker_address()) != nullptr); - if (dataset()->target_workers_ == TARGET_WORKERS_LOCAL && - !is_local_task) { - return false; + + if (if_use_local_workers) { + VLOG(1) << "EASL-MUYU: IF_USE_LOCAL_WORKERS is true"; + if (is_local_task) { + VLOG(1) << "EASL-MUYU(IF_USE_LOCAL_WORKERS): the worker address is: (" + << task.worker_address() << + "), which is local. CHOOSE IT!!"; + } + else { + VLOG(1) << "EASL-MUYU(IF_USE_LOCAL_WORKERS): the worker address is: (" + << task.worker_address() << + "), which is NOT local. DON'T CHOOSE IT!!"; + return false; + } + } + else { + VLOG(1) << "EASL-MUYU: IF_USE_LOCAL_WORKERS is false, do nothing"; + } + +// if (dataset()->target_workers_ == TARGET_WORKERS_LOCAL && +// !is_local_task) { +// VLOG(1) << "EASL-MUYU: TARGET_WORKERS_LOCAL is set. " +// return false; +// } + + if (dataset()->target_workers_ == TARGET_WORKERS_LOCAL) { + VLOG(1) << "EASL-MUYU: TARGET_WORKERS_TAG is set"; + if (is_local_task) { + VLOG(1) << "EASL-MUYU(TARGET_WORKERS_TAG): the worker address is: (" + << task.worker_address() << + "), which is local. CHOOSE IT!!"; + } + else { + VLOG(1) << "EASL-MUYU(TARGET_WORKERS_TAG): the worker address is: (" + << task.worker_address() << + "), which is NOT local. DON'T CHOOSE IT!!"; + return false; + } + } + else { + VLOG(1) << "EASL-MUYU: TARGET_WORKERS_TAG is not set, do nothing"; } // Cross-TF/TPU host reads may cause resource contention on the TF/TPU From 662adab5bce47cf9b6c12153f71349054248d529 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Thu, 2 Dec 2021 15:37:42 +0100 Subject: [PATCH 03/19] Move local worker selection mechanism to dispatcher --- tensorflow/core/data/service/dispatcher.proto | 5 +++- .../core/data/service/dispatcher_client.cc | 12 +++++++- .../core/data/service/dispatcher_client.h | 3 +- .../core/data/service/dispatcher_impl.cc | 18 +++++++++++- .../core/data/service/dispatcher_state.cc | 28 ++++++++++++++++++- .../core/data/service/dispatcher_state.h | 15 ++++++++-- .../data/service/easl/local_decision_utils.cc | 8 ++++-- tensorflow/core/data/service/journal.proto | 3 +- tensorflow/core/data/service/worker_impl.cc | 13 +++++++++ tensorflow/core/data/service/worker_impl.h | 3 ++ .../experimental/data_service_dataset_op.cc | 4 ++- 11 files changed, 100 insertions(+), 12 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher.proto b/tensorflow/core/data/service/dispatcher.proto index 46ab8e0638a325..532df381e8142d 100644 --- a/tensorflow/core/data/service/dispatcher.proto +++ b/tensorflow/core/data/service/dispatcher.proto @@ -119,7 +119,7 @@ message JobKey { int64 job_name_index = 2; } -// Next tag: 10 +// Next tag: 11 message GetOrCreateJobRequest { reserved 2, 3, 4; // The id of the dataset to create a job for. @@ -136,6 +136,9 @@ message GetOrCreateJobRequest { } // Specifies which workers the client of this job reads from. TargetWorkers target_workers = 9; + + // MUYU's changes + repeated string local_workers = 10; } // Next tag: 2 diff --git a/tensorflow/core/data/service/dispatcher_client.cc b/tensorflow/core/data/service/dispatcher_client.cc index 328769b488d6b4..7492d66d8ab295 100644 --- a/tensorflow/core/data/service/dispatcher_client.cc +++ b/tensorflow/core/data/service/dispatcher_client.cc @@ -137,7 +137,9 @@ Status DataServiceDispatcherClient::GetOrCreateJob( int64_t dataset_id, const ProcessingModeDef& processing_mode, const absl::optional& job_key, absl::optional num_consumers, TargetWorkers target_workers, - int64_t& job_client_id) { + int64_t& job_client_id, + std::vector local_workers + ) { TF_RETURN_IF_ERROR(EnsureInitialized()); GetOrCreateJobRequest req; req.set_dataset_id(dataset_id); @@ -149,6 +151,13 @@ Status DataServiceDispatcherClient::GetOrCreateJob( req.set_num_consumers(num_consumers.value()); } req.set_target_workers(target_workers); + + *req.mutable_local_workers() = {local_workers.begin(), local_workers.end()}; + + for (auto worker: local_workers) { + VLOG(1) << "EASL-MUYU (Client: GetOrCreateJob) local_workers: " << worker; + } + GetOrCreateJobResponse resp; grpc::ClientContext client_ctx; grpc::Status status = stub_->GetOrCreateJob(&client_ctx, req, &resp); @@ -159,6 +168,7 @@ Status DataServiceDispatcherClient::GetOrCreateJob( status); } job_client_id = resp.job_client_id(); + return Status::OK(); } diff --git a/tensorflow/core/data/service/dispatcher_client.h b/tensorflow/core/data/service/dispatcher_client.h index 847e61a5bc696e..ff53d159bbed9e 100644 --- a/tensorflow/core/data/service/dispatcher_client.h +++ b/tensorflow/core/data/service/dispatcher_client.h @@ -77,7 +77,8 @@ class DataServiceDispatcherClient : public DataServiceClientBase { const ProcessingModeDef& processing_mode, const absl::optional& job_key, absl::optional num_consumers, - TargetWorkers target_workers, int64_t& job_client_id); + TargetWorkers target_workers, int64_t& job_client_id, + std::vector local_workers); // Releases a job client id, indicating that the id will no longer be used to // read from the job. diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 62604fde09abe6..cf27a774b8507b 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -894,6 +894,12 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(0) << "EASL - Caching decision for dataset_key " << compute_dataset_key << ": " << job_type; + + // MUYU, firstly check the local_workers from the client + absl::flat_hash_set local_workers; + local_workers.insert(request.local_workers().cbegin(), + request.local_workers().cend()); + // Infer the worker count for this job and job type int64 total_workers = state_.ListWorkers().size(); TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, @@ -930,6 +936,12 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_num_split_providers(num_split_providers); create_job->set_if_use_local_workers(if_use_local_workers); + *create_job->mutable_local_workers() = + {local_workers.begin(), local_workers.end()}; + + for (auto worker: local_workers) { + VLOG(1) << "EASL-MUYU (CreateJob) local_workers: " << worker; + } if (request.has_job_key()) { NamedJobKeyDef* key = create_job->mutable_named_job_key(); @@ -947,6 +959,10 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(1) << "EASL-MUYU(DataServiceDispatcherImpl::CreateJob) if_use_local_workers flag is set: " << job->if_use_local_workers; + for (auto worker: job->local_workers) { + VLOG(1) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; + } + return Status::OK(); } @@ -997,7 +1013,7 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( std::vector>& tasks) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ReserveWorkers( - job->job_id, job->worker_count); + job->job_id, job->worker_count, job->if_use_local_workers, job->local_workers); if (workers.size() < job->worker_count){ VLOG(0) << "EASL - Not enough workers for job. Elasticity policy requires " diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index f2c4bbbf8abaf3..e703ab18c173a2 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -136,6 +136,11 @@ void DispatcherState::CreateJob(const CreateJobUpdate& create_job) { create_job.job_type(), create_job.worker_count(), create_job.if_use_local_workers()); + for (auto worker: create_job.local_workers()) { + VLOG(1) << "EASL-MUYU (DispatcherState::CreateJob): worker " << worker; + job->local_workers.insert(worker); + } + DCHECK(!jobs_.contains(job_id)); jobs_[job_id] = job; tasks_by_job_[job_id] = std::vector>(); @@ -371,7 +376,10 @@ DispatcherState::ListAvailableWorkers() const { std::vector> DispatcherState::ReserveWorkers( - int64 job_id, int64 target_num_workers) { + int64 job_id, int64 target_num_workers, + // MUYU's modification + bool if_use_local_workers, + const absl::flat_hash_set local_workers) { // DCHECK(num_workers <= avail_workers_.size()); // If the number of required workers is below those available, we just assign @@ -383,7 +391,25 @@ DispatcherState::ReserveWorkers( workers.reserve(num_workers); VLOG(0) << "(ReserveWorkers) User got " << num_workers << " workers from " << "target " << target_num_workers << " workers"; + VLOG(0) << "(ReserveWorkers) IF_USE_LOCAL_WORKERS is set to " << if_use_local_workers; + + for (auto worker: local_workers) { + VLOG(1) << "EASL-MUYU (ReserveWorkers) local_workers: " << worker; + } + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { + if (if_use_local_workers && local_workers.count(it->first) == 0) { + VLOG(0) << "EASL-MUYU (ReserveWorkers): local_worker mode is set " + "And we can't find the worker: " << it->first << + " in the job local worker list"; + it++; + continue; + } + else if (if_use_local_workers) { + VLOG(0) << "EASL-MUYU (ReserveWorkers): local_worker mode is set " + "And we find the worker: " << it->first << + " in the job local worker list"; + } num_workers--; workers.push_back(it->second); VLOG(0) << "(ReserveWorkers) Assigning worker at address " diff --git a/tensorflow/core/data/service/dispatcher_state.h b/tensorflow/core/data/service/dispatcher_state.h index b74fe49a22bea8..0ca26d11749c3b 100644 --- a/tensorflow/core/data/service/dispatcher_state.h +++ b/tensorflow/core/data/service/dispatcher_state.h @@ -151,7 +151,9 @@ class DispatcherState { TargetWorkers target_workers, const std::string& job_type, int64 worker_count, - bool if_use_local_workers = false) + bool if_use_local_workers = false, + absl::flat_hash_set local_workers = {} + ) : job_id(job_id), dataset_id(dataset_id), processing_mode(processing_mode), @@ -160,7 +162,9 @@ class DispatcherState { job_type(job_type), worker_count(worker_count), target_workers(target_workers), - if_use_local_workers(if_use_local_workers){ + if_use_local_workers(if_use_local_workers), + local_workers(local_workers) + { if (IsDynamicShard(processing_mode)) { distributed_epoch_state = DistributedEpochState(num_split_providers); } @@ -194,6 +198,8 @@ class DispatcherState { const int64 worker_count; // EASL: indicate whether this job should be processed locally const bool if_use_local_workers; + // EASL: list of local workers in the client + absl::flat_hash_set local_workers; }; struct Task { @@ -247,7 +253,10 @@ class DispatcherState { // is lower than or equal to 0, then the reserved number of workers is equal // to all the available workers. std::vector> ReserveWorkers(int64 job_id, - int64 num_workers = 0); + int64 num_workers = 0, + bool if_use_local_workers = false, + const absl::flat_hash_set local_workers = {} + ); // Returns the next available job id. int64_t NextAvailableJobId() const; diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc index 4bd77d5354f87e..970d8727a7d43c 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.cc +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -35,8 +35,12 @@ Status DecideIfLocal( // Pipeline stats: last TF node metrics std::shared_ptr last_tf_node_metrics; - s = metadata_store.GetLastTFNodeMetricsByDatasetKey( - dataset_key, last_tf_node_metrics); + +// s = metadata_store.GetLastTFNodeMetricsByDatasetKey( +// dataset_key, last_tf_node_metrics); + s = metadata_store.GetLastNodeMetricsByDatasetKey( + dataset_key, last_tf_node_metrics + ); if (!s.ok()) { VLOG(0) << "(DecideIfLocal) Failed to get the last TF node metrics"; return s; diff --git a/tensorflow/core/data/service/journal.proto b/tensorflow/core/data/service/journal.proto index f80bb89901e815..2092bc613eea13 100644 --- a/tensorflow/core/data/service/journal.proto +++ b/tensorflow/core/data/service/journal.proto @@ -47,7 +47,7 @@ message NamedJobKeyDef { int64 index = 2; } -// Next tag: 11 +// Next tag: 17 message CreateJobUpdate { reserved 3, 5, 6; int64 job_id = 1; @@ -67,6 +67,7 @@ message CreateJobUpdate { string job_type = 13; // i.e read, write, cache int64 worker_count = 14; // determined by elasticity policy bool if_use_local_workers = 15; // decided between epochs + repeated string local_workers = 16; } // Next tag: 5 diff --git a/tensorflow/core/data/service/worker_impl.cc b/tensorflow/core/data/service/worker_impl.cc index ad08c2e7987706..21d05651c8edd8 100644 --- a/tensorflow/core/data/service/worker_impl.cc +++ b/tensorflow/core/data/service/worker_impl.cc @@ -650,5 +650,18 @@ void LocalWorkers::Remove(absl::string_view worker_address) { local_workers_->erase(worker_address); } +std::vector LocalWorkers::GetList() { + string local_workers_string = ""; + std::vector local_workers; + for (auto it = local_workers_->begin(); it != local_workers_->end(); ++it) { + local_workers.push_back(it->first); + local_workers_string += it->first + "; "; + } + + VLOG(1) << "EASL-MUYU: Check List of Local Workers: " << local_workers_string; + + return local_workers; +} + } // namespace data } // namespace tensorflow diff --git a/tensorflow/core/data/service/worker_impl.h b/tensorflow/core/data/service/worker_impl.h index 717e35d10fec8d..fec8450296aec7 100644 --- a/tensorflow/core/data/service/worker_impl.h +++ b/tensorflow/core/data/service/worker_impl.h @@ -174,6 +174,9 @@ class LocalWorkers { // at the address. static void Remove(absl::string_view worker_address); + // Get a list of local workers created in process + static std::vector GetList(); + private: using AddressToWorkerMap = absl::flat_hash_map>; diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 31ac623c7a1bcf..b3c7e4e7cfa5a2 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -326,7 +326,9 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { return dispatcher_->GetOrCreateJob( dataset()->dataset_id_, dataset()->processing_mode_, key, dataset()->num_consumers_, dataset()->target_workers_, - job_client_id_); + job_client_id_, + LocalWorkers::GetList() + ); }, /*description=*/ strings::StrCat("get or create job with dispatcher at ", From a364e1667eefad674435a331b3370741ca666ac7 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Sun, 5 Dec 2021 19:11:25 +0100 Subject: [PATCH 04/19] Set judge_local_threshold to a large value to diable the logic of local worker --- tensorflow/python/data/experimental/service/server_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow/python/data/experimental/service/server_lib.py b/tensorflow/python/data/experimental/service/server_lib.py index f14cf38eab3629..149fa4e155e6cf 100644 --- a/tensorflow/python/data/experimental/service/server_lib.py +++ b/tensorflow/python/data/experimental/service/server_lib.py @@ -110,7 +110,7 @@ def __new__(cls, scaling_policy=1, log_dir="", log_dumps_interval_ms=None, - avg_bytes_per_element_local_thres=1000 + avg_bytes_per_element_local_thres=1000000000 ): if protocol is None: protocol = _pywrap_utils.TF_DATA_DefaultProtocol() From 1017b7fff664a96d9df116807b22ede71cf2f426 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Sun, 19 Dec 2021 17:55:22 +0100 Subject: [PATCH 05/19] Add log worker metric --- .../experimental/data_service_dataset_op.cc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index b3c7e4e7cfa5a2..3c028f131a7888 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -21,6 +21,7 @@ limitations under the License. #include #include #include +#include #include "absl/algorithm/container.h" #include "absl/container/flat_hash_map.h" @@ -1202,11 +1203,29 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } if (enqueue_result && !result.end_of_sequence) { + uint64 current_micro_timestamp = Env::Default()->NowMicros(); + std::string data_source = task.info.worker_address(); + bool if_local = false; + int result_size = result.element.size(); if (local_tasks_.contains(task.info.worker_address())) { + if_local = true; local_results_buffer_.push(std::move(result)); } else { results_.push(std::move(result)); } + + const std::string log_location = std::getenv("EASL_MUYU_WORKER_METRICS"); + if (log_location && log_location.size() >= 3) { + std::ofstream file(log_location, std::ios_base::app); + + file << current_micro_timestamp << "," + << data_source << "," + << if_local << "," + << result_size << "\n"; + + file.flush(); + file.clear(); + } } get_next_cv_.notify_all(); } From cd98905ce26735c137bdba860449ce3d0c75cd41 Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Thu, 30 Dec 2021 14:05:42 +0100 Subject: [PATCH 06/19] Fix the env variable issue --- tensorflow/core/data/service/dispatcher_state.cc | 11 +++++++++++ .../data/experimental/data_service_dataset_op.cc | 7 +++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index e703ab18c173a2..6515a20fef522a 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -397,6 +397,17 @@ DispatcherState::ReserveWorkers( VLOG(1) << "EASL-MUYU (ReserveWorkers) local_workers: " << worker; } + int num_local_workers_available = 0; + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { + if (local_workers.count(it->first)) + num_local_workers_available++; + } + if (if_use_local_workers && num_local_workers_available == 0) { + VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " + "but no local worker is available, change to default mode"; + if_use_local_workers = false; + } + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { if (if_use_local_workers && local_workers.count(it->first) == 0) { VLOG(0) << "EASL-MUYU (ReserveWorkers): local_worker mode is set " diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 3c028f131a7888..5920e536c1384a 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -832,8 +832,7 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { else { VLOG(1) << "EASL-MUYU(IF_USE_LOCAL_WORKERS): the worker address is: (" << task.worker_address() << - "), which is NOT local. DON'T CHOOSE IT!!"; - return false; + "), which is NOT local. BUT Anyways choosing it!!"; } } else { @@ -1214,8 +1213,8 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { results_.push(std::move(result)); } - const std::string log_location = std::getenv("EASL_MUYU_WORKER_METRICS"); - if (log_location && log_location.size() >= 3) { + const char* log_location = std::getenv("EASL_MUYU_WORKER_METRICS"); + if (log_location) { std::ofstream file(log_location, std::ios_base::app); file << current_micro_timestamp << "," From 35dbf37876b943ca19020619e40add105470704e Mon Sep 17 00:00:00 2001 From: Muyu Li Date: Thu, 30 Dec 2021 14:06:07 +0100 Subject: [PATCH 07/19] Fix re-assign worker bug --- tensorflow/core/data/service/dispatcher_state.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 6515a20fef522a..a8442fe783a3be 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -461,10 +461,12 @@ void DispatcherState::ReassignFreeWorkers() { // Assign one worker to the job workers_by_job_[job->job_id].push_back(it->second); jobs_by_worker_[it->second->address][job->job_id] = jobs_[job->job_id]; - avail_workers_.erase(it); + VLOG(0) << "EASL - (ReassignFreeWorkers) Reassigned worker " << it->second->address << " to job " << job->job_id; + + avail_workers_.erase(it++); } } From 07d487a53bdcc7f98dcd245cae61ee9e85c54fe1 Mon Sep 17 00:00:00 2001 From: MrPerkins Date: Wed, 19 Jan 2022 19:04:36 +0100 Subject: [PATCH 08/19] Started work on local workers policy: - Manually activating local worker policy - Starting assignment with 1 local worker --- .../core/data/service/dispatcher_impl.cc | 12 +++--- .../core/data/service/dispatcher_state.cc | 39 +++++++++++++------ 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index cf27a774b8507b..a5ed7cea83096c 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -907,11 +907,13 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(0) << "EASL - Scalability decision for dataset_key " << compute_dataset_key << ": " << worker_count; - bool if_use_local_workers = false; - TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( - config_, metadata_store_, compute_dataset_key, if_use_local_workers - )); - VLOG(0) << "EASL-MUYU (CreateJob) - Check Local Worker Policy: " << if_use_local_workers; + bool if_use_local_workers = true; + VLOG(0) << "EASL-DSL (CreateJob) - Local Worker Policy was manually set to " << if_use_local_workers; + +// TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( +// config_, metadata_store_, compute_dataset_key, if_use_local_workers +// )); +// VLOG(0) << "EASL-MUYU (CreateJob) - Check Local Worker Policy: " << if_use_local_workers; // EASL add job entry to metadata store std::string dataset_key = service::easl::cache_utils::DatasetKey( diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index a8442fe783a3be..104237243289a6 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -408,19 +408,34 @@ DispatcherState::ReserveWorkers( if_use_local_workers = false; } + if (if_use_local_workers) { + VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; + bool found_local_worker=false; + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { + if (local_workers.count(it->first) != 0) { + VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " + << it->first << + " (from list avail_workers_) in the job's local_workers list"; + + num_workers--; + workers.push_back(it->second); + VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " + << it->second->address << " to job " << job_id; + workers_by_job_[job_id].push_back(it->second); + jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; + avail_workers_.erase(it++); + + found_local_worker=true; + break; + } + it++; + } + if(!found_local_worker) { + VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; + } + } + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { - if (if_use_local_workers && local_workers.count(it->first) == 0) { - VLOG(0) << "EASL-MUYU (ReserveWorkers): local_worker mode is set " - "And we can't find the worker: " << it->first << - " in the job local worker list"; - it++; - continue; - } - else if (if_use_local_workers) { - VLOG(0) << "EASL-MUYU (ReserveWorkers): local_worker mode is set " - "And we find the worker: " << it->first << - " in the job local worker list"; - } num_workers--; workers.push_back(it->second); VLOG(0) << "(ReserveWorkers) Assigning worker at address " From 4ae2d53f13d20fb6d49afeccbb8d03915b21b5ae Mon Sep 17 00:00:00 2001 From: Theodor Amariucai <32778667+amariucaitheodor@users.noreply.github.com> Date: Thu, 27 Jan 2022 11:36:37 +0100 Subject: [PATCH 09/19] Fix the client serial GetElement requests bug --- .../experimental/data_service_dataset_op.cc | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 5920e536c1384a..3f87fbf532aaf9 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -1167,15 +1167,17 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { Status TryGetElement(const Task& task, GetElementResult& result) { GetElementRequest req; - mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads - req.set_task_id(task.info.task_id()); - req.set_skipped_previous_round(task.skipped_previous_round); - absl::optional round_index; - if (StrictRoundRobin()) { - round_index = task.round; - req.set_consumer_index(dataset()->consumer_index_.value()); - req.set_round_index(task.round); - req.set_allow_skip(true); + { + mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads + req.set_task_id(task.info.task_id()); + req.set_skipped_previous_round(task.skipped_previous_round); + absl::optional round_index; + if (StrictRoundRobin()) { + round_index = task.round; + req.set_consumer_index(dataset()->consumer_index_.value()); + req.set_round_index(task.round); + req.set_allow_skip(true); + } } return task.worker->GetElement(req, result); } From 1b90158d4687466964ed063da1ad9836d6fde8a0 Mon Sep 17 00:00:00 2001 From: jialchen Date: Thu, 27 Jan 2022 11:10:53 +0000 Subject: [PATCH 10/19] parametrize local and remote worker numbers --- .../core/data/service/dispatcher_impl.cc | 5 +- .../core/data/service/dispatcher_state.cc | 179 ++++++++++++------ .../core/data/service/dispatcher_state.h | 4 +- .../experimental/data_service_dataset_op.cc | 20 +- 4 files changed, 137 insertions(+), 71 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index a5ed7cea83096c..a1ea6c75829439 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -904,8 +904,9 @@ Status DataServiceDispatcherImpl::CreateJob( int64 total_workers = state_.ListWorkers().size(); TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); - VLOG(0) << "EASL - Scalability decision for dataset_key " - << compute_dataset_key << ": " << worker_count; + worker_count = 2; + VLOG(0) << "EASL-DSL - Scalability decision for dataset_key " + << compute_dataset_key << " was manually set to " << worker_count; bool if_use_local_workers = true; VLOG(0) << "EASL-DSL (CreateJob) - Local Worker Policy was manually set to " << if_use_local_workers; diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 104237243289a6..32045aee999b65 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -376,75 +376,138 @@ DispatcherState::ListAvailableWorkers() const { std::vector> DispatcherState::ReserveWorkers( - int64 job_id, int64 target_num_workers, - // MUYU's modification - bool if_use_local_workers, + int64 job_id, int64 num_worker_remote_target, + int64 num_worker_local_target, const absl::flat_hash_set local_workers) { - // DCHECK(num_workers <= avail_workers_.size()); - // If the number of required workers is below those available, we just assign - // as many as there are available at this epoch's scheduling time. - int64 num_workers = target_num_workers <= 0 - || target_num_workers > avail_workers_.size() ? avail_workers_.size() - : target_num_workers; std::vector> workers; - workers.reserve(num_workers); - VLOG(0) << "(ReserveWorkers) User got " << num_workers << " workers from " - << "target " << target_num_workers << " workers"; - VLOG(0) << "(ReserveWorkers) IF_USE_LOCAL_WORKERS is set to " << if_use_local_workers; - - for (auto worker: local_workers) { - VLOG(1) << "EASL-MUYU (ReserveWorkers) local_workers: " << worker; - } - - int num_local_workers_available = 0; - for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { - if (local_workers.count(it->first)) - num_local_workers_available++; - } - if (if_use_local_workers && num_local_workers_available == 0) { - VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " - "but no local worker is available, change to default mode"; - if_use_local_workers = false; - } - - if (if_use_local_workers) { - VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; - bool found_local_worker=false; - for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { - if (local_workers.count(it->first) != 0) { - VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " - << it->first << - " (from list avail_workers_) in the job's local_workers list"; - - num_workers--; - workers.push_back(it->second); - VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " - << it->second->address << " to job " << job_id; - workers_by_job_[job_id].push_back(it->second); - jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; - avail_workers_.erase(it++); - - found_local_worker=true; - break; - } - it++; - } - if(!found_local_worker) { - VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; - } - } + workers.reserve(avail_workers_.size()); + VLOG(0) << "(ReserveWorkers) num_worker_total_avail=" << avail_workers_.size() + << " num_worker_local_avail=" << local_workers.size() + << " num_worker_remote_target=" << num_worker_remote_target + << " num_worker_local_target=" << num_worker_local_target; + +// // DCHECK(num_workers <= avail_workers_.size()); +// +// // If the number of required workers is below those available, we just assign +// // as many as there are available at this epoch's scheduling time. +// int64 num_workers = target_num_workers <= 0 +// || target_num_workers > avail_workers_.size() ? avail_workers_.size() +// : target_num_workers; +// std::vector> workers; +// workers.reserve(num_workers); +// VLOG(0) << "(ReserveWorkers) User got " << num_workers << " workers from " +// << "target " << target_num_workers << " workers"; +// VLOG(0) << "(ReserveWorkers) IF_USE_LOCAL_WORKERS is set to " << if_use_local_workers; +// +// for (auto worker: local_workers) { +// VLOG(1) << "EASL-MUYU (ReserveWorkers) local_workers: " << worker; +// } +// +// int num_local_workers_available = 0; +// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { +// if (local_workers.count(it->first)) +// num_local_workers_available++; +// } +// if (if_use_local_workers && num_local_workers_available == 0) { +// VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " +// "but no local worker is available, change to default mode"; +// if_use_local_workers = false; +// } +// +// if (if_use_local_workers) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; +// bool found_local_worker=false; +// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { +// if (local_workers.count(it->first) != 0) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " +// << it->first << +// " (from list avail_workers_) in the job's local_workers list"; +// +// num_workers--; +// workers.push_back(it->second); +// VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " +// << it->second->address << " to job " << job_id; +// workers_by_job_[job_id].push_back(it->second); +// jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; +// avail_workers_.erase(it++); +// +// found_local_worker=true; +// break; +// } +// it++; +// } +// if(!found_local_worker) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; +// } +// } + +// int num_local_workers_available = 0; +// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { +// if (local_workers.count(it->first)) +// num_local_workers_available++; +// } +// if (if_use_local_workers && num_local_workers_available == 0) { +// VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " +// "but no local worker is available, change to default mode"; +// if_use_local_workers = false; +// } +// +// if (if_use_local_workers) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; +// bool found_local_worker=false; +// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { +// if (local_workers.count(it->first) != 0) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " +// << it->first << +// " (from list avail_workers_) in the job's local_workers list"; +// +// num_workers--; +// workers.push_back(it->second); +// VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " +// << it->second->address << " to job " << job_id; +// workers_by_job_[job_id].push_back(it->second); +// jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; +// avail_workers_.erase(it++); +// +// found_local_worker=true; +// break; +// } +// it++; +// } +// if(!found_local_worker) { +// VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; +// } +// } + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { - num_workers--; + bool is_local; + // is_local = std::count(it->second->tags.begin(), it->second->tags.end(), "COLOCATED"); // Tag based + is_local = local_workers.count(it->first); + if (is_local) { + VLOG(1) << "EASL-DSL (ReserveWorkers) Worker_L: " << it->first; + if (num_worker_local_target <= 0) { + it++; + continue; + } else { + num_worker_local_target--; + } + } else { + VLOG(1) << "EASL-DSL (ReserveWorkers) Worker_R: " << it->first; + if (num_worker_remote_target <= 0) { + it++; + continue; + } else { + num_worker_remote_target--; + } + } workers.push_back(it->second); VLOG(0) << "(ReserveWorkers) Assigning worker at address " << it->second->address << " to job " << job_id; workers_by_job_[job_id].push_back(it->second); jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; avail_workers_.erase(it++); - if (num_workers == 0) - break; } VLOG(0) << "(ReserveWorkers) Number of workers for job " << job_id << " is: " << workers_by_job_[job_id].size(); diff --git a/tensorflow/core/data/service/dispatcher_state.h b/tensorflow/core/data/service/dispatcher_state.h index 0ca26d11749c3b..b66220b3de4d8e 100644 --- a/tensorflow/core/data/service/dispatcher_state.h +++ b/tensorflow/core/data/service/dispatcher_state.h @@ -253,8 +253,8 @@ class DispatcherState { // is lower than or equal to 0, then the reserved number of workers is equal // to all the available workers. std::vector> ReserveWorkers(int64 job_id, - int64 num_workers = 0, - bool if_use_local_workers = false, + int64 num_worker_remote_target = 0, + int64 num_worker_local_target = 0, const absl::flat_hash_set local_workers = {} ); diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 5920e536c1384a..481215cec8e96a 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -1167,15 +1167,17 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { Status TryGetElement(const Task& task, GetElementResult& result) { GetElementRequest req; - mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads - req.set_task_id(task.info.task_id()); - req.set_skipped_previous_round(task.skipped_previous_round); - absl::optional round_index; - if (StrictRoundRobin()) { - round_index = task.round; - req.set_consumer_index(dataset()->consumer_index_.value()); - req.set_round_index(task.round); - req.set_allow_skip(true); + { + mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads + req.set_task_id(task.info.task_id()); + req.set_skipped_previous_round(task.skipped_previous_round); + absl::optional round_index; + if (StrictRoundRobin()) { + round_index = task.round; + req.set_consumer_index(dataset()->consumer_index_.value()); + req.set_round_index(task.round); + req.set_allow_skip(true); + } } return task.worker->GetElement(req, result); } From 1fd1a4619e7dfa0665c8472f73c6833156af6fea Mon Sep 17 00:00:00 2001 From: jialchen Date: Thu, 27 Jan 2022 12:05:11 +0000 Subject: [PATCH 11/19] parametrize local and remote worker numbers --- tensorflow/core/data/service/dispatcher_state.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 32045aee999b65..28e191182d00f8 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -133,8 +133,8 @@ void DispatcherState::CreateJob(const CreateJobUpdate& create_job) { create_job.processing_mode_def(), create_job.num_split_providers(), named_job_key, num_consumers, create_job.target_workers(), - create_job.job_type(), create_job.worker_count(), - create_job.if_use_local_workers()); + create_job.job_type(), create_job.num_worker_remote_target(), + create_job.num_worker_local_target()); for (auto worker: create_job.local_workers()) { VLOG(1) << "EASL-MUYU (DispatcherState::CreateJob): worker " << worker; From f8a09838fe1c4cf38166557a2b7bddf1454786d3 Mon Sep 17 00:00:00 2001 From: jialchen Date: Thu, 27 Jan 2022 12:20:37 +0000 Subject: [PATCH 12/19] debug --- tensorflow/core/data/service/easl/local_decision_utils.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc index fc8e22a7c6e57e..ba7c131089f572 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.cc +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -82,6 +82,8 @@ Status DecideTargetWorkers( int64& num_worker_local_target) { num_worker_remote_target = num_worker_remote_avail / 2; num_worker_local_target = num_worker_local_avail / 2; + VLOG(1) << num_worker_remote_avail << ' ' << num_worker_local_avail + << ' ' << num_worker_remote_target << ' ' << num_worker_local_target; return Status::OK(); } From 9284b8b6f867c8c63b65fdc6df112a6d10557297 Mon Sep 17 00:00:00 2001 From: jialchen Date: Thu, 27 Jan 2022 14:31:18 +0000 Subject: [PATCH 13/19] parametrize local and remote worker numbers --- .../core/data/service/dispatcher_impl.cc | 46 +++++++++---------- .../core/data/service/dispatcher_state.cc | 3 +- .../core/data/service/dispatcher_state.h | 12 ++--- .../data/service/easl/local_decision_utils.cc | 2 +- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 8ae088355efb08..45a8b0a2162c1d 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -888,28 +888,28 @@ Status DataServiceDispatcherImpl::CreateJob( int64 dataset_fingerprint = dataset->fingerprint; std::string compute_dataset_key = DatasetKey(dataset_id, dataset_fingerprint); -// service::easl::cache_utils::DetermineJobType( -// config_, cache_state_, metadata_store_, dataset_fingerprint, -// compute_dataset_key, job_id, job_type); -// VLOG(0) << "EASL - Caching decision for dataset_key " -// << compute_dataset_key << ": " << job_type; -// -// -// // MUYU, firstly check the local_workers from the client + service::easl::cache_utils::DetermineJobType( + config_, cache_state_, metadata_store_, dataset_fingerprint, + compute_dataset_key, job_id, job_type); + VLOG(0) << "EASL - Caching decision for dataset_key " + << compute_dataset_key << ": " << job_type; + + + // MUYU, firstly check the local_workers from the client absl::flat_hash_set local_workers; local_workers.insert(request.local_workers().cbegin(), request.local_workers().cend()); // Infer the worker count for this job and job type int64 total_workers = state_.ListWorkers().size(); -// TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, -// config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); -// VLOG(0) << "EASL - Scalability decision for dataset_key " -// << compute_dataset_key << ": " << worker_count; -// + TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, + config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); + VLOG(0) << "EASL - Scalability decision for dataset_key " + << compute_dataset_key << ": " << worker_count; + // bool if_use_local_workers = true; // VLOG(0) << "EASL-DSL (CreateJob) - Local Worker Policy was manually set to " << if_use_local_workers; - +// // TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( // config_, metadata_store_, compute_dataset_key, if_use_local_workers // )); @@ -965,8 +965,8 @@ Status DataServiceDispatcherImpl::CreateJob( TF_RETURN_IF_ERROR(Apply(update)); TF_RETURN_IF_ERROR(state_.JobFromId(job_id, job)); - VLOG(1) << "EASL-MUYU(DataServiceDispatcherImpl::CreateJob) if_use_local_workers flag is set: " - << job->if_use_local_workers; +// VLOG(1) << "EASL-MUYU(DataServiceDispatcherImpl::CreateJob) if_use_local_workers flag is set: " +// << job->if_use_local_workers; for (auto worker: job->local_workers) { VLOG(1) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; @@ -1022,12 +1022,12 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( std::vector>& tasks) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ReserveWorkers( - job->job_id, job->worker_count, job->if_use_local_workers, job->local_workers); - if (workers.size() < job->worker_count){ - VLOG(0) - << "EASL - Not enough workers for job. Elasticity policy requires " - << job->worker_count << " but got " << workers.size(); - } + job->job_id, job->num_worker_remote_target, job->num_worker_local_target, job->local_workers); +// if (workers.size() < job->num_worker_remote_target){ +// VLOG(0) +// << "EASL - Not enough workers for job. Elasticity policy requires " +// << job->num_worker_remote_target << " but got " << workers.size(); +// } tasks.clear(); tasks.reserve(workers.size()); for (auto& worker : workers) { @@ -1257,7 +1257,7 @@ Status DataServiceDispatcherImpl::ClientHeartbeat( } response->set_job_finished(job->finished); - response->set_if_use_local_workers(job->if_use_local_workers); + response->set_if_use_local_workers(job->num_worker_local_target); VLOG(4) << "Found " << response->task_info_size() << " tasks for job client id " << request->job_client_id(); diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 28e191182d00f8..59732ab45a54f4 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -527,7 +527,8 @@ void DispatcherState::ReassignFreeWorkers() { // Get a job in need of workers std::shared_ptr job = job_iter->second; int64 num_assigned_workers = workers_by_job_[job->job_id].size(); - while (job->finished || num_assigned_workers == job->worker_count){ + while (job->finished || num_assigned_workers == + job->num_worker_remote_target + job->num_worker_local_target){ job_iter++; if(job_iter == jobs_.end()){ // Went through all jobs, can return diff --git a/tensorflow/core/data/service/dispatcher_state.h b/tensorflow/core/data/service/dispatcher_state.h index b66220b3de4d8e..23a08bed77e618 100644 --- a/tensorflow/core/data/service/dispatcher_state.h +++ b/tensorflow/core/data/service/dispatcher_state.h @@ -150,8 +150,8 @@ class DispatcherState { absl::optional num_consumers, TargetWorkers target_workers, const std::string& job_type, - int64 worker_count, - bool if_use_local_workers = false, + int64 num_worker_remote_target, + int64 num_worker_local_target, absl::flat_hash_set local_workers = {} ) : job_id(job_id), @@ -160,9 +160,9 @@ class DispatcherState { named_job_key(named_job_key), num_consumers(num_consumers), job_type(job_type), - worker_count(worker_count), + num_worker_remote_target(num_worker_remote_target), target_workers(target_workers), - if_use_local_workers(if_use_local_workers), + num_worker_local_target(num_worker_local_target), local_workers(local_workers) { if (IsDynamicShard(processing_mode)) { @@ -195,9 +195,9 @@ class DispatcherState { bool garbage_collected = false; // EASL const std::string job_type; - const int64 worker_count; + const int64 num_worker_remote_target; // EASL: indicate whether this job should be processed locally - const bool if_use_local_workers; + const int64 num_worker_local_target; // EASL: list of local workers in the client absl::flat_hash_set local_workers; }; diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc index ba7c131089f572..16cbf0290fbe48 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.cc +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -82,7 +82,7 @@ Status DecideTargetWorkers( int64& num_worker_local_target) { num_worker_remote_target = num_worker_remote_avail / 2; num_worker_local_target = num_worker_local_avail / 2; - VLOG(1) << num_worker_remote_avail << ' ' << num_worker_local_avail + VLOG(1) << "DSL (DecideTargetWorkers) " << num_worker_remote_avail << ' ' << num_worker_local_avail << ' ' << num_worker_remote_target << ' ' << num_worker_local_target; return Status::OK(); } From 51d5b843fed0f831e1f6440c64b3f4f19f90cd37 Mon Sep 17 00:00:00 2001 From: jialchen Date: Thu, 27 Jan 2022 17:11:49 +0000 Subject: [PATCH 14/19] grid search --- .../core/data/service/dispatcher_impl.cc | 18 ++++++--- .../data/service/easl/local_decision_utils.cc | 40 +++++++++++++++++-- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 45a8b0a2162c1d..29aff7236ee513 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -916,11 +916,19 @@ Status DataServiceDispatcherImpl::CreateJob( // VLOG(0) << "EASL-MUYU (CreateJob) - Check Local Worker Policy: " << if_use_local_workers; int64 num_worker_remote_target, num_worker_local_target; - TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkers( - config_, metadata_store_, compute_dataset_key, - total_workers - local_workers.size(), local_workers.size(), - num_worker_remote_target, num_worker_local_target - )); + if(config_.scaling_policy() == 1) { + num_worker_remote_target = worker_count; + num_worker_local_target = 0; + } else if(config_.scaling_policy() == 2) { + num_worker_remote_target = total_workers - local_workers.size(); + num_worker_local_target = local_workers.size(); + } else { + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkers( + config_, metadata_store_, compute_dataset_key, + total_workers - local_workers.size(), local_workers.size(), + num_worker_remote_target, num_worker_local_target + )); + } // EASL add job entry to metadata store std::string dataset_key = service::easl::cache_utils::DatasetKey( diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc index 16cbf0290fbe48..4f09dd55726e15 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.cc +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -72,6 +72,34 @@ Status DecideIfLocal( return Status::OK(); } +std::vector records; + +void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail, + int64& num_worker_remote_target, int64& num_worker_local_target) { + std::vector> test_set = std::vector>(); + for(int64 n_r = 0; n_r <= num_worker_remote_avail; n_r++) { + for(int64 n_l = 0; n_l <= num_worker_local_avail; n_l++) { + if(n_r + n_l <= 0) { + continue; + } + test_set.emplace_back(n_r, n_l); + } + } + std::vector epoch_times; + for(int i = 1; i < records.size(); i++) { + epoch_times.push_back(records[i] - records[i-1]); + } + int index; + if(epoch_times.size() < test_set.size()) { + index = epoch_times.size(); + } else { + index = std::min_element(epoch_times.begin(), epoch_times.begin() + test_set.size()) - epoch_times.begin(); + } + auto p = test_set[index]; + num_worker_remote_target = p.first; + num_worker_local_target = p.second; +} + Status DecideTargetWorkers( const experimental::DispatcherConfig& dispatcher_config, const ::tensorflow::data::easl::MetadataStore& metadata_store, @@ -80,10 +108,14 @@ Status DecideTargetWorkers( int64 num_worker_local_avail, int64& num_worker_remote_target, int64& num_worker_local_target) { - num_worker_remote_target = num_worker_remote_avail / 2; - num_worker_local_target = num_worker_local_avail / 2; - VLOG(1) << "DSL (DecideTargetWorkers) " << num_worker_remote_avail << ' ' << num_worker_local_avail - << ' ' << num_worker_remote_target << ' ' << num_worker_local_target; + std::time_t t = std::time(nullptr); + records.push_back(t); + grid_search(num_worker_remote_avail, num_worker_local_avail, num_worker_remote_target, num_worker_local_target); + VLOG(1) << "DSL (DecideTargetWorkers)" + << " num_worker_remote_avail " << num_worker_remote_avail + << " num_worker_local_avail " << num_worker_local_avail + << " num_worker_remote_target " << num_worker_remote_target + << " num_worker_local_target " << num_worker_local_target; return Status::OK(); } From a8bceb263cefd4d4154f25ed167e8cf001122f47 Mon Sep 17 00:00:00 2001 From: amariucaitheodor Date: Thu, 27 Jan 2022 20:31:30 +0100 Subject: [PATCH 15/19] Made use of local computation decision-making, refactored --- tensorflow/core/data/service/dispatcher.proto | 4 +- .../core/data/service/dispatcher_impl.cc | 23 ++-- .../core/data/service/dispatcher_state.cc | 106 +----------------- .../data/service/easl/local_decision_utils.cc | 48 ++++---- .../data/service/easl/local_decision_utils.h | 2 +- 5 files changed, 45 insertions(+), 138 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher.proto b/tensorflow/core/data/service/dispatcher.proto index 532df381e8142d..aef0de68c248e1 100644 --- a/tensorflow/core/data/service/dispatcher.proto +++ b/tensorflow/core/data/service/dispatcher.proto @@ -189,7 +189,7 @@ message ClientHeartbeatRequest { double avg_inter_arrival_time = 6; } -// Next tag: 4 +// Next tag: 5 message ClientHeartbeatResponse { // A list of all tasks that the client should read from. repeated TaskInfo task_info = 1; @@ -200,7 +200,7 @@ message ClientHeartbeatResponse { // Whether the job has finished. bool job_finished = 2; // EASL: to check whether we should use local workers (based on last epoch's metrics) - bool if_use_local_workers = 4; + bool num_worker_local_target = 4; } // Next tag: 3 diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 45a8b0a2162c1d..2a532d4f2aa024 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -907,14 +907,6 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(0) << "EASL - Scalability decision for dataset_key " << compute_dataset_key << ": " << worker_count; -// bool if_use_local_workers = true; -// VLOG(0) << "EASL-DSL (CreateJob) - Local Worker Policy was manually set to " << if_use_local_workers; -// -// TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( -// config_, metadata_store_, compute_dataset_key, if_use_local_workers -// )); -// VLOG(0) << "EASL-MUYU (CreateJob) - Check Local Worker Policy: " << if_use_local_workers; - int64 num_worker_remote_target, num_worker_local_target; TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkers( config_, metadata_store_, compute_dataset_key, @@ -943,7 +935,6 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_job_type(job_type); create_job->set_num_worker_remote_target(num_worker_remote_target); create_job->set_num_split_providers(num_split_providers); - create_job->set_num_worker_local_target(num_worker_local_target); *create_job->mutable_local_workers() = {local_workers.begin(), local_workers.end()}; @@ -1023,11 +1014,12 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ReserveWorkers( job->job_id, job->num_worker_remote_target, job->num_worker_local_target, job->local_workers); -// if (workers.size() < job->num_worker_remote_target){ -// VLOG(0) -// << "EASL - Not enough workers for job. Elasticity policy requires " -// << job->num_worker_remote_target << " but got " << workers.size(); -// } + if (workers.size() < job->num_worker_remote_target + job->num_worker_local_target){ + VLOG(0) + << "EASL - Not enough workers for job. Elasticity policy requires " + << job->num_worker_remote_target << " remote and " << job->num_worker_local_target + << " local but got " << workers.size(); + } tasks.clear(); tasks.reserve(workers.size()); for (auto& worker : workers) { @@ -1256,8 +1248,7 @@ Status DataServiceDispatcherImpl::ClientHeartbeat( task_info->set_starting_round(task->starting_round); } response->set_job_finished(job->finished); - - response->set_if_use_local_workers(job->num_worker_local_target); + response->set_num_worker_local_target(job->num_worker_local_target); VLOG(4) << "Found " << response->task_info_size() << " tasks for job client id " << request->job_client_id(); diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 59732ab45a54f4..156f3acb6a19c1 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -382,104 +382,10 @@ DispatcherState::ReserveWorkers( std::vector> workers; workers.reserve(avail_workers_.size()); - VLOG(0) << "(ReserveWorkers) num_worker_total_avail=" << avail_workers_.size() - << " num_worker_local_avail=" << local_workers.size() - << " num_worker_remote_target=" << num_worker_remote_target - << " num_worker_local_target=" << num_worker_local_target; - -// // DCHECK(num_workers <= avail_workers_.size()); -// -// // If the number of required workers is below those available, we just assign -// // as many as there are available at this epoch's scheduling time. -// int64 num_workers = target_num_workers <= 0 -// || target_num_workers > avail_workers_.size() ? avail_workers_.size() -// : target_num_workers; -// std::vector> workers; -// workers.reserve(num_workers); -// VLOG(0) << "(ReserveWorkers) User got " << num_workers << " workers from " -// << "target " << target_num_workers << " workers"; -// VLOG(0) << "(ReserveWorkers) IF_USE_LOCAL_WORKERS is set to " << if_use_local_workers; -// -// for (auto worker: local_workers) { -// VLOG(1) << "EASL-MUYU (ReserveWorkers) local_workers: " << worker; -// } -// -// int num_local_workers_available = 0; -// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { -// if (local_workers.count(it->first)) -// num_local_workers_available++; -// } -// if (if_use_local_workers && num_local_workers_available == 0) { -// VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " -// "but no local worker is available, change to default mode"; -// if_use_local_workers = false; -// } -// -// if (if_use_local_workers) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; -// bool found_local_worker=false; -// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { -// if (local_workers.count(it->first) != 0) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " -// << it->first << -// " (from list avail_workers_) in the job's local_workers list"; -// -// num_workers--; -// workers.push_back(it->second); -// VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " -// << it->second->address << " to job " << job_id; -// workers_by_job_[job_id].push_back(it->second); -// jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; -// avail_workers_.erase(it++); -// -// found_local_worker=true; -// break; -// } -// it++; -// } -// if(!found_local_worker) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; -// } -// } - -// int num_local_workers_available = 0; -// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); it++) { -// if (local_workers.count(it->first)) -// num_local_workers_available++; -// } -// if (if_use_local_workers && num_local_workers_available == 0) { -// VLOG(0) << "EASL-MUYU (ReserveWorkers): local worker mode is set " -// "but no local worker is available, change to default mode"; -// if_use_local_workers = false; -// } -// -// if (if_use_local_workers) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): if_use_local_workers is true, so we will first assign a local worker."; -// bool found_local_worker=false; -// for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { -// if (local_workers.count(it->first) != 0) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): we found the worker " -// << it->first << -// " (from list avail_workers_) in the job's local_workers list"; -// -// num_workers--; -// workers.push_back(it->second); -// VLOG(0) << "EASL-DSL (ReserveWorkers) Assigning local worker at address " -// << it->second->address << " to job " << job_id; -// workers_by_job_[job_id].push_back(it->second); -// jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; -// avail_workers_.erase(it++); -// -// found_local_worker=true; -// break; -// } -// it++; -// } -// if(!found_local_worker) { -// VLOG(0) << "EASL-DSL (ReserveWorkers): we tried but failed to find a local worker to assign."; -// } -// } - + VLOG(0) << "DSL (ReserveWorkers) Available remote: " << avail_workers_.size() << "\n" + << "Available local: " << local_workers.size() << "\n" + << "Target remote: " << num_worker_remote_target << "\n" + << "Target local: " << num_worker_local_target << "\n"; for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { bool is_local; @@ -487,7 +393,7 @@ DispatcherState::ReserveWorkers( is_local = local_workers.count(it->first); if (is_local) { VLOG(1) << "EASL-DSL (ReserveWorkers) Worker_L: " << it->first; - if (num_worker_local_target <= 0) { + if (num_worker_local_target <= 0) { // No additional local workers needed it++; continue; } else { @@ -495,7 +401,7 @@ DispatcherState::ReserveWorkers( } } else { VLOG(1) << "EASL-DSL (ReserveWorkers) Worker_R: " << it->first; - if (num_worker_remote_target <= 0) { + if (num_worker_remote_target <= 0) { // No additional remote workers needed it++; continue; } else { diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc index 16cbf0290fbe48..9ffdb0f43ebf92 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.cc +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -14,7 +14,7 @@ Status DecideIfLocal( const experimental::DispatcherConfig& dispatcher_config, const ::tensorflow::data::easl::MetadataStore& metadata_store, const std::string& dataset_key, - bool& if_local) { + bool& using_local_workers) { using NodeMetrics = ::tensorflow::data::easl::NodeMetrics; using ModelMetrics = ::tensorflow::data::easl::ModelMetrics; @@ -25,24 +25,20 @@ Status DecideIfLocal( // We do not yet have the metrics for this dataset --> use 1 worker if(errors::IsNotFound(s)) { - VLOG(0) << "(DecideIfLocal) No metrics found for dataset, will use normal mode"; - if_local = false; + VLOG(0) << "DSL (DecideIfLocal) No metrics found for dataset, will use normal mode"; + using_local_workers = false; return Status::OK(); } else if (!s.ok()) { - VLOG(0) << "(DecideIfLocal) Another error has been thrown: " << s; + VLOG(0) << "DSL (DecideIfLocal) Another error has been thrown: " << s; return s; } // Pipeline stats: last TF node metrics std::shared_ptr last_tf_node_metrics; -// s = metadata_store.GetLastTFNodeMetricsByDatasetKey( -// dataset_key, last_tf_node_metrics); - s = metadata_store.GetLastNodeMetricsByDatasetKey( - dataset_key, last_tf_node_metrics - ); + s = metadata_store.GetLastNodeMetricsByDatasetKey(dataset_key, last_tf_node_metrics); if (!s.ok()) { - VLOG(0) << "(DecideIfLocal) Failed to get the last TF node metrics"; + VLOG(0) << "DSL (DecideIfLocal) Failed to get the last TF node metrics"; return s; } @@ -55,18 +51,18 @@ Status DecideIfLocal( } double avg_bytes_per_element = (double)total_bytes_produced / total_num_elements; - VLOG(0) << "(DecideIfLocal) Total bytes produced: " << total_bytes_produced << "\n" + VLOG(0) << "DSL (DecideIfLocal) Total bytes produced: " << total_bytes_produced << "\n" << "Total num elements: " << total_num_elements << "\n" << "Avg bytes produced per element: " << avg_bytes_per_element << "\n" << "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_thres() << "\n"; if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_thres()) { - if_local = true; - VLOG(0) << "(DecideIfLocal-decision) using local workers\n"; + using_local_workers = true; + VLOG(0) << "DSL (DecideIfLocal) Using local workers! (because avg. bytes per element > threshold) \n"; } else { - if_local = false; - VLOG(0) << "(DecideIfLocal-decision) using default worker set "; + using_local_workers = false; + VLOG(0) << "DSL (DecideIfLocal) NOT using local workers! (because avg. bytes per element < threshold) \n"; } return Status::OK(); @@ -80,10 +76,24 @@ Status DecideTargetWorkers( int64 num_worker_local_avail, int64& num_worker_remote_target, int64& num_worker_local_target) { - num_worker_remote_target = num_worker_remote_avail / 2; - num_worker_local_target = num_worker_local_avail / 2; - VLOG(1) << "DSL (DecideTargetWorkers) " << num_worker_remote_avail << ' ' << num_worker_local_avail - << ' ' << num_worker_remote_target << ' ' << num_worker_local_target; + bool using_local_workers; + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( + dispatcher_config, metadata_store, dataset_key, using_local_workers + )); + + if(using_local_workers) { + num_worker_remote_target = num_worker_remote_avail / 2; + num_worker_local_target = num_worker_local_avail / 2; + } else { + num_worker_remote_target = num_worker_remote_avail / 2; + num_worker_local_target = 0; + } + + VLOG(0) << "DSL (DecideTargetWorkers) Available remote: " << num_worker_remote_avail << "\n" + << "Available local: " << num_worker_local_avail << "\n" + << "Decided remote: " << num_worker_remote_target << "\n" + << "Decided local: " << num_worker_local_target << "\n"; + return Status::OK(); } diff --git a/tensorflow/core/data/service/easl/local_decision_utils.h b/tensorflow/core/data/service/easl/local_decision_utils.h index b20a980b19957a..d37efcb8d6883e 100644 --- a/tensorflow/core/data/service/easl/local_decision_utils.h +++ b/tensorflow/core/data/service/easl/local_decision_utils.h @@ -23,7 +23,7 @@ Status DecideIfLocal( const experimental::DispatcherConfig& dispatcher_config, const ::tensorflow::data::easl::MetadataStore& metadata_store, const std::string& dataset_key, - bool& if_local); + bool& using_local_workers); Status DecideTargetWorkers( const experimental::DispatcherConfig& dispatcher_config, From 52f59a114c4eec2f9727e284447627024f2063a1 Mon Sep 17 00:00:00 2001 From: amariucaitheodor Date: Fri, 28 Jan 2022 12:42:06 +0100 Subject: [PATCH 16/19] Fixed bugs: - Lowered avg throughput per element for local workers to 0.1 GB (0.3GB/element for RetinaNet) - num_workers=0 edge case in reserve workers --- .../core/data/service/dispatcher_impl.cc | 19 ++++++++----------- .../core/data/service/dispatcher_state.cc | 9 +++++---- .../data/experimental/service/server_lib.py | 2 +- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index e17a96b5ead0cb..75a0252605df83 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -908,11 +908,11 @@ Status DataServiceDispatcherImpl::CreateJob( << compute_dataset_key << ": " << worker_count; int64 num_worker_remote_target, num_worker_local_target; - if(config_.scaling_policy() == 1) { - bool want_to_use_local_workers; - TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( - config_, metadata_store_, compute_dataset_key, want_to_use_local_workers - )); // Do we have enough throughput to decide to use local workers to save network bandwidth? + if(config_.scaling_policy() == 1) { // Old autoscaling prior to paper + bool want_to_use_local_workers; // Do we have enough throughput to decide to use local workers to save network bandwidth? + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( + config_, metadata_store_, compute_dataset_key, want_to_use_local_workers + )); if(want_to_use_local_workers && local_workers.size() >= 1) { num_worker_remote_target = worker_count - 1; @@ -921,16 +921,16 @@ Status DataServiceDispatcherImpl::CreateJob( num_worker_remote_target = worker_count; num_worker_local_target = 0; } - } else if(config_.scaling_policy() == 2) { + } else if(config_.scaling_policy() == 2) { // Use all available workers num_worker_remote_target = total_workers - local_workers.size(); num_worker_local_target = local_workers.size(); - } else if(config_.scaling_policy() == 3) { + } else if(config_.scaling_policy() == 3) { // Grid search over local and remote workers TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkersGridSearch( config_, metadata_store_, compute_dataset_key, total_workers - local_workers.size(), local_workers.size(), num_worker_remote_target, num_worker_local_target )); - } else { + } else { // New paper autoscaling TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkersAutoscaling( config_, metadata_store_, compute_dataset_key, total_workers - local_workers.size(), local_workers.size(), @@ -980,9 +980,6 @@ Status DataServiceDispatcherImpl::CreateJob( TF_RETURN_IF_ERROR(Apply(update)); TF_RETURN_IF_ERROR(state_.JobFromId(job_id, job)); -// VLOG(1) << "EASL-MUYU(DataServiceDispatcherImpl::CreateJob) if_use_local_workers flag is set: " -// << job->if_use_local_workers; - for (auto worker: job->local_workers) { VLOG(1) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; } diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index 76316525fdf63e..06e8c22a146419 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -382,10 +382,11 @@ DispatcherState::ReserveWorkers( int64 job_id, int64 num_worker_remote_target, int64 num_worker_local_target, const absl::flat_hash_set local_workers) { - num_worker_remote_target = num_worker_remote_target <= 0 || num_worker_remote_target > avail_workers_.size() ? avail_workers_.size() - : num_worker_remote_target; - num_worker_local_target = num_worker_local_target <= 0 || num_worker_local_target > avail_workers_.size() ? avail_workers_.size() - : num_worker_local_target; + int64 num_worker_target = num_worker_remote_target + num_worker_local_target; + if(num_worker_target <= 0 || num_worker_target > avail_workers_.size()) { + num_worker_remote_target = avail_workers_.size(); + num_worker_local_target = avail_workers_.size(); + } std::vector> workers; workers.reserve(avail_workers_.size()); diff --git a/tensorflow/python/data/experimental/service/server_lib.py b/tensorflow/python/data/experimental/service/server_lib.py index 149fa4e155e6cf..caec9c307d065e 100644 --- a/tensorflow/python/data/experimental/service/server_lib.py +++ b/tensorflow/python/data/experimental/service/server_lib.py @@ -110,7 +110,7 @@ def __new__(cls, scaling_policy=1, log_dir="", log_dumps_interval_ms=None, - avg_bytes_per_element_local_thres=1000000000 + avg_bytes_per_element_local_thres=100000000 # 0.1 GB ): if protocol is None: protocol = _pywrap_utils.TF_DATA_DefaultProtocol() From f79a4254a2b018f619000d4d1723f8481804226f Mon Sep 17 00:00:00 2001 From: Theodor Amariucai Date: Thu, 3 Feb 2022 11:44:18 +0100 Subject: [PATCH 17/19] Moved scalability decision inside of scaling policy 1 --- .../core/data/service/dispatcher_impl.cc | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 75a0252605df83..5e382808276648 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -881,7 +881,6 @@ Status DataServiceDispatcherImpl::CreateJob( int64 dataset_id = request.dataset_id(); // EASL - Caching decision: should the job compute, write or read from cache? - int64 worker_count; std::string job_type; std::shared_ptr dataset; TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset)); @@ -894,21 +893,21 @@ Status DataServiceDispatcherImpl::CreateJob( VLOG(0) << "EASL - Caching decision for dataset_key " << compute_dataset_key << ": " << job_type; - // MUYU, firstly check the local_workers from the client absl::flat_hash_set local_workers; local_workers.insert(request.local_workers().cbegin(), request.local_workers().cend()); - // Infer the worker count for this job and job type - int64 total_workers = state_.ListWorkers().size(); - TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, - config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); - VLOG(0) << "EASL - Scalability decision for dataset_key " - << compute_dataset_key << ": " << worker_count; - int64 num_worker_remote_target, num_worker_local_target; if(config_.scaling_policy() == 1) { // Old autoscaling prior to paper + // Infer the worker count for this job and job type + int64 worker_count; + int64 total_workers = state_.ListWorkers().size(); + TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, + config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); + VLOG(0) << "EASL - Scalability decision for dataset_key " + << compute_dataset_key << ": " << worker_count; + bool want_to_use_local_workers; // Do we have enough throughput to decide to use local workers to save network bandwidth? TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( config_, metadata_store_, compute_dataset_key, want_to_use_local_workers @@ -964,7 +963,7 @@ Status DataServiceDispatcherImpl::CreateJob( {local_workers.begin(), local_workers.end()}; for (auto worker: local_workers) { - VLOG(1) << "EASL-MUYU (CreateJob) local_workers: " << worker; + VLOG(2) << "EASL-MUYU (CreateJob) local_workers: " << worker; } if (request.has_job_key()) { @@ -981,7 +980,7 @@ Status DataServiceDispatcherImpl::CreateJob( TF_RETURN_IF_ERROR(state_.JobFromId(job_id, job)); for (auto worker: job->local_workers) { - VLOG(1) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; + VLOG(2) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; } return Status::OK(); From 8e5f425b90e02a804a7bacfcc2893eb856642481 Mon Sep 17 00:00:00 2001 From: Julia <19995777+4PiR2@users.noreply.github.com> Date: Wed, 16 Feb 2022 13:54:00 +0100 Subject: [PATCH 18/19] bug fix --- tensorflow/core/data/service/dispatcher_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 5e382808276648..4768e589bc276b 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -899,10 +899,10 @@ Status DataServiceDispatcherImpl::CreateJob( request.local_workers().cend()); int64 num_worker_remote_target, num_worker_local_target; + int64 total_workers = state_.ListWorkers().size(); if(config_.scaling_policy() == 1) { // Old autoscaling prior to paper // Infer the worker count for this job and job type int64 worker_count; - int64 total_workers = state_.ListWorkers().size(); TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); VLOG(0) << "EASL - Scalability decision for dataset_key " From e93a0ece3f9bc381d7e1ea47604fea425259413f Mon Sep 17 00:00:00 2001 From: Theodor Amariucai <32778667+amariucaitheodor@users.noreply.github.com> Date: Wed, 16 Feb 2022 21:28:01 +0200 Subject: [PATCH 19/19] Lowered local worker threshold to 30MB from 100MB --- tensorflow/python/data/experimental/service/server_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow/python/data/experimental/service/server_lib.py b/tensorflow/python/data/experimental/service/server_lib.py index caec9c307d065e..522c4fc00c62eb 100644 --- a/tensorflow/python/data/experimental/service/server_lib.py +++ b/tensorflow/python/data/experimental/service/server_lib.py @@ -110,7 +110,7 @@ def __new__(cls, scaling_policy=1, log_dir="", log_dumps_interval_ms=None, - avg_bytes_per_element_local_thres=100000000 # 0.1 GB + avg_bytes_per_element_local_thres=1024*1024*30 # 30 MB ): if protocol is None: protocol = _pywrap_utils.TF_DATA_DefaultProtocol()