diff --git a/tensorflow/core/data/service/BUILD b/tensorflow/core/data/service/BUILD index baad904a1097f1..c52b4485509bd2 100644 --- a/tensorflow/core/data/service/BUILD +++ b/tensorflow/core/data/service/BUILD @@ -295,6 +295,7 @@ cc_library( ], deps = [ ":cache_utils", + ":local_decision_utils", ":common", ":common_proto_cc", ":credentials_factory", @@ -938,6 +939,21 @@ cc_library( ], ) +cc_library( + name = "local_decision_utils", + srcs = ["easl/local_decision_utils.cc"], + hdrs = [ + "easl/local_decision_utils.h", + ], + deps = [ + ":common_proto_cc", + ":dispatcher_state", + ":metadata_store", + "//tensorflow/core:lib", + "@com_google_absl//absl/strings", + ], +) + cc_library( name = "cache_model", srcs = ["easl/cache_model.cc"], diff --git a/tensorflow/core/data/service/dispatcher.proto b/tensorflow/core/data/service/dispatcher.proto index 51018181ac1829..aef0de68c248e1 100644 --- a/tensorflow/core/data/service/dispatcher.proto +++ b/tensorflow/core/data/service/dispatcher.proto @@ -119,7 +119,7 @@ message JobKey { int64 job_name_index = 2; } -// Next tag: 10 +// Next tag: 11 message GetOrCreateJobRequest { reserved 2, 3, 4; // The id of the dataset to create a job for. @@ -136,6 +136,9 @@ message GetOrCreateJobRequest { } // Specifies which workers the client of this job reads from. TargetWorkers target_workers = 9; + + // MUYU's changes + repeated string local_workers = 10; } // Next tag: 2 @@ -186,7 +189,7 @@ message ClientHeartbeatRequest { double avg_inter_arrival_time = 6; } -// Next tag: 4 +// Next tag: 5 message ClientHeartbeatResponse { // A list of all tasks that the client should read from. repeated TaskInfo task_info = 1; @@ -196,6 +199,8 @@ message ClientHeartbeatResponse { } // Whether the job has finished. bool job_finished = 2; + // EASL: to check whether we should use local workers (based on last epoch's metrics) + bool num_worker_local_target = 4; } // Next tag: 3 diff --git a/tensorflow/core/data/service/dispatcher_client.cc b/tensorflow/core/data/service/dispatcher_client.cc index 328769b488d6b4..7492d66d8ab295 100644 --- a/tensorflow/core/data/service/dispatcher_client.cc +++ b/tensorflow/core/data/service/dispatcher_client.cc @@ -137,7 +137,9 @@ Status DataServiceDispatcherClient::GetOrCreateJob( int64_t dataset_id, const ProcessingModeDef& processing_mode, const absl::optional& job_key, absl::optional num_consumers, TargetWorkers target_workers, - int64_t& job_client_id) { + int64_t& job_client_id, + std::vector local_workers + ) { TF_RETURN_IF_ERROR(EnsureInitialized()); GetOrCreateJobRequest req; req.set_dataset_id(dataset_id); @@ -149,6 +151,13 @@ Status DataServiceDispatcherClient::GetOrCreateJob( req.set_num_consumers(num_consumers.value()); } req.set_target_workers(target_workers); + + *req.mutable_local_workers() = {local_workers.begin(), local_workers.end()}; + + for (auto worker: local_workers) { + VLOG(1) << "EASL-MUYU (Client: GetOrCreateJob) local_workers: " << worker; + } + GetOrCreateJobResponse resp; grpc::ClientContext client_ctx; grpc::Status status = stub_->GetOrCreateJob(&client_ctx, req, &resp); @@ -159,6 +168,7 @@ Status DataServiceDispatcherClient::GetOrCreateJob( status); } job_client_id = resp.job_client_id(); + return Status::OK(); } diff --git a/tensorflow/core/data/service/dispatcher_client.h b/tensorflow/core/data/service/dispatcher_client.h index 847e61a5bc696e..ff53d159bbed9e 100644 --- a/tensorflow/core/data/service/dispatcher_client.h +++ b/tensorflow/core/data/service/dispatcher_client.h @@ -77,7 +77,8 @@ class DataServiceDispatcherClient : public DataServiceClientBase { const ProcessingModeDef& processing_mode, const absl::optional& job_key, absl::optional num_consumers, - TargetWorkers target_workers, int64_t& job_client_id); + TargetWorkers target_workers, int64_t& job_client_id, + std::vector local_workers); // Releases a job client id, indicating that the id will no longer be used to // read from the job. diff --git a/tensorflow/core/data/service/dispatcher_impl.cc b/tensorflow/core/data/service/dispatcher_impl.cc index 19938a8a5f8375..4768e589bc276b 100644 --- a/tensorflow/core/data/service/dispatcher_impl.cc +++ b/tensorflow/core/data/service/dispatcher_impl.cc @@ -44,6 +44,7 @@ limitations under the License. #include "tensorflow/core/data/service/worker.grpc.pb.h" #include "tensorflow/core/data/service/easl/cache_utils.h" #include "tensorflow/core/data/service/easl/metadata_store.h" +#include "tensorflow/core/data/service/easl/local_decision_utils.h" #include "tensorflow/core/data/standalone.h" #include "tensorflow/core/framework/dataset.h" #include "tensorflow/core/framework/graph.pb.h" @@ -880,7 +881,6 @@ Status DataServiceDispatcherImpl::CreateJob( int64 dataset_id = request.dataset_id(); // EASL - Caching decision: should the job compute, write or read from cache? - int64 worker_count; std::string job_type; std::shared_ptr dataset; TF_RETURN_IF_ERROR(state_.DatasetFromId(dataset_id, dataset)); @@ -890,16 +890,53 @@ Status DataServiceDispatcherImpl::CreateJob( service::easl::cache_utils::DetermineJobType( config_, cache_state_, metadata_store_, dataset_fingerprint, compute_dataset_key, job_id, job_type); - VLOG(0) << "EASL - Caching decision for dataset_key " + VLOG(0) << "EASL - Caching decision for dataset_key " << compute_dataset_key << ": " << job_type; - // Infer the worker count for this job and job type + // MUYU, firstly check the local_workers from the client + absl::flat_hash_set local_workers; + local_workers.insert(request.local_workers().cbegin(), + request.local_workers().cend()); + + int64 num_worker_remote_target, num_worker_local_target; int64 total_workers = state_.ListWorkers().size(); - TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, - config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); - VLOG(0) << "EASL - Scalability decision for dataset_key " + if(config_.scaling_policy() == 1) { // Old autoscaling prior to paper + // Infer the worker count for this job and job type + int64 worker_count; + TF_RETURN_IF_ERROR(service::easl::cache_utils::DetermineElasticity(job_type, + config_, metadata_store_, compute_dataset_key, total_workers, worker_count)); + VLOG(0) << "EASL - Scalability decision for dataset_key " << compute_dataset_key << ": " << worker_count; + bool want_to_use_local_workers; // Do we have enough throughput to decide to use local workers to save network bandwidth? + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideIfLocal( + config_, metadata_store_, compute_dataset_key, want_to_use_local_workers + )); + + if(want_to_use_local_workers && local_workers.size() >= 1) { + num_worker_remote_target = worker_count - 1; + num_worker_local_target = 1; + } else { + num_worker_remote_target = worker_count; + num_worker_local_target = 0; + } + } else if(config_.scaling_policy() == 2) { // Use all available workers + num_worker_remote_target = total_workers - local_workers.size(); + num_worker_local_target = local_workers.size(); + } else if(config_.scaling_policy() == 3) { // Grid search over local and remote workers + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkersGridSearch( + config_, metadata_store_, compute_dataset_key, + total_workers - local_workers.size(), local_workers.size(), + num_worker_remote_target, num_worker_local_target + )); + } else { // New paper autoscaling + TF_RETURN_IF_ERROR(service::easl::local_decision::DecideTargetWorkersAutoscaling( + config_, metadata_store_, compute_dataset_key, + total_workers - local_workers.size(), local_workers.size(), + num_worker_remote_target, num_worker_local_target + )); + } + // EASL add job entry to metadata store std::string dataset_key = service::easl::cache_utils::DatasetKey( dataset->dataset_id, dataset->fingerprint, job_type); @@ -919,8 +956,16 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_dataset_id(request.dataset_id()); *create_job->mutable_processing_mode_def() = request.processing_mode_def(); create_job->set_job_type(job_type); - create_job->set_worker_count(worker_count); + create_job->set_num_worker_remote_target(num_worker_remote_target); create_job->set_num_split_providers(num_split_providers); + create_job->set_num_worker_local_target(num_worker_local_target); + *create_job->mutable_local_workers() = + {local_workers.begin(), local_workers.end()}; + + for (auto worker: local_workers) { + VLOG(2) << "EASL-MUYU (CreateJob) local_workers: " << worker; + } + if (request.has_job_key()) { NamedJobKeyDef* key = create_job->mutable_named_job_key(); key->set_name(request.job_key().job_name()); @@ -933,6 +978,11 @@ Status DataServiceDispatcherImpl::CreateJob( create_job->set_target_workers(request.target_workers()); TF_RETURN_IF_ERROR(Apply(update)); TF_RETURN_IF_ERROR(state_.JobFromId(job_id, job)); + + for (auto worker: job->local_workers) { + VLOG(2) << "EASL-MUYU (CreateJob-after) local_workers: " << worker; + } + return Status::OK(); } @@ -983,11 +1033,12 @@ Status DataServiceDispatcherImpl::CreateTasksForJob( std::vector>& tasks) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { std::vector> workers = state_.ReserveWorkers( - job->job_id, job->worker_count); - if (workers.size() < job->worker_count){ + job->job_id, job->num_worker_remote_target, job->num_worker_local_target, job->local_workers); + if (workers.size() < job->num_worker_remote_target + job->num_worker_local_target){ VLOG(0) << "EASL - Not enough workers for job. Elasticity policy requires " - << job->worker_count << " but got " << workers.size(); + << job->num_worker_remote_target << " remote and " << job->num_worker_local_target + << " local but got " << workers.size(); } tasks.clear(); tasks.reserve(workers.size()); @@ -1217,6 +1268,8 @@ Status DataServiceDispatcherImpl::ClientHeartbeat( task_info->set_starting_round(task->starting_round); } response->set_job_finished(job->finished); + response->set_num_worker_local_target(job->num_worker_local_target); + VLOG(4) << "Found " << response->task_info_size() << " tasks for job client id " << request->job_client_id(); return Status::OK(); diff --git a/tensorflow/core/data/service/dispatcher_state.cc b/tensorflow/core/data/service/dispatcher_state.cc index d718f67e9b59dc..06e8c22a146419 100644 --- a/tensorflow/core/data/service/dispatcher_state.cc +++ b/tensorflow/core/data/service/dispatcher_state.cc @@ -133,7 +133,13 @@ void DispatcherState::CreateJob(const CreateJobUpdate& create_job) { create_job.processing_mode_def(), create_job.num_split_providers(), named_job_key, num_consumers, create_job.target_workers(), - create_job.job_type(), create_job.worker_count()); + create_job.job_type(), create_job.num_worker_remote_target(), + create_job.num_worker_local_target()); + + for (auto worker: create_job.local_workers()) { + VLOG(1) << "EASL-MUYU (DispatcherState::CreateJob): worker " << worker; + job->local_workers.insert(worker); + } DCHECK(!jobs_.contains(job_id)); jobs_[job_id] = job; @@ -368,30 +374,53 @@ DispatcherState::ListAvailableWorkers() const { return workers; } +// Reserves a number of available workers for a particular job. If num_workers +// is lower than or equal to 0, then the reserved number of workers is equal +// to all the available workers. std::vector> DispatcherState::ReserveWorkers( - int64 job_id, int64 target_num_workers) { - // DCHECK(num_workers <= avail_workers_.size()); - - // If the number of required workers is below those available, we just assign - // as many as there are available at this epoch's scheduling time. - int64 num_workers = target_num_workers <= 0 - || target_num_workers > avail_workers_.size() ? avail_workers_.size() - : target_num_workers; + int64 job_id, int64 num_worker_remote_target, + int64 num_worker_local_target, + const absl::flat_hash_set local_workers) { + int64 num_worker_target = num_worker_remote_target + num_worker_local_target; + if(num_worker_target <= 0 || num_worker_target > avail_workers_.size()) { + num_worker_remote_target = avail_workers_.size(); + num_worker_local_target = avail_workers_.size(); + } + std::vector> workers; - workers.reserve(num_workers); - VLOG(0) << "(ReserveWorkers) User got " << num_workers << " workers from " - << "target " << target_num_workers << " workers"; + workers.reserve(avail_workers_.size()); + VLOG(0) << "DSL (ReserveWorkers) Available remote: " << avail_workers_.size() << "\n" + << "Available local: " << local_workers.size() << "\n" + << "Target remote: " << num_worker_remote_target << "\n" + << "Target local: " << num_worker_local_target << "\n"; + for (auto it = avail_workers_.begin(); it != avail_workers_.end(); ) { - num_workers--; + //bool is_local = std::count(it->second->tags.begin(), it->second->tags.end(), "COLOCATED"); // Tag based + bool is_local = local_workers.count(it->first); + if (is_local) { + VLOG(0) << "EASL-DSL (ReserveWorkers) Worker_L: " << it->first; + if (num_worker_local_target <= 0) { // No additional local workers needed + it++; + continue; + } else { + num_worker_local_target--; + } + } else { + VLOG(0) << "EASL-DSL (ReserveWorkers) Worker_R: " << it->first; + if (num_worker_remote_target <= 0) { // No additional remote workers needed + it++; + continue; + } else { + num_worker_remote_target--; + } + } workers.push_back(it->second); VLOG(0) << "(ReserveWorkers) Assigning worker at address " << it->second->address << " to job " << job_id; workers_by_job_[job_id].push_back(it->second); jobs_by_worker_[it->second->address][job_id] = jobs_[job_id]; avail_workers_.erase(it++); - if (num_workers == 0) - break; } VLOG(0) << "(ReserveWorkers) Number of workers for job " << job_id << " is: " << workers_by_job_[job_id].size(); @@ -411,7 +440,8 @@ void DispatcherState::ReassignFreeWorkers() { // Get a job in need of workers std::shared_ptr job = job_iter->second; int64 num_assigned_workers = workers_by_job_[job->job_id].size(); - while (job->finished || num_assigned_workers == job->worker_count){ + while (job->finished || num_assigned_workers == + job->num_worker_remote_target + job->num_worker_local_target){ job_iter++; if(job_iter == jobs_.end()){ // Went through all jobs, can return @@ -423,10 +453,12 @@ void DispatcherState::ReassignFreeWorkers() { // Assign one worker to the job workers_by_job_[job->job_id].push_back(it->second); jobs_by_worker_[it->second->address][job->job_id] = jobs_[job->job_id]; - avail_workers_.erase(it); + VLOG(0) << "EASL - (ReassignFreeWorkers) Reassigned worker " << it->second->address << " to job " << job->job_id; + + avail_workers_.erase(it++); } } diff --git a/tensorflow/core/data/service/dispatcher_state.h b/tensorflow/core/data/service/dispatcher_state.h index 6f1427872a1910..23a08bed77e618 100644 --- a/tensorflow/core/data/service/dispatcher_state.h +++ b/tensorflow/core/data/service/dispatcher_state.h @@ -150,15 +150,21 @@ class DispatcherState { absl::optional num_consumers, TargetWorkers target_workers, const std::string& job_type, - int64 worker_count) + int64 num_worker_remote_target, + int64 num_worker_local_target, + absl::flat_hash_set local_workers = {} + ) : job_id(job_id), dataset_id(dataset_id), processing_mode(processing_mode), named_job_key(named_job_key), num_consumers(num_consumers), job_type(job_type), - worker_count(worker_count), - target_workers(target_workers) { + num_worker_remote_target(num_worker_remote_target), + target_workers(target_workers), + num_worker_local_target(num_worker_local_target), + local_workers(local_workers) + { if (IsDynamicShard(processing_mode)) { distributed_epoch_state = DistributedEpochState(num_split_providers); } @@ -189,7 +195,11 @@ class DispatcherState { bool garbage_collected = false; // EASL const std::string job_type; - const int64 worker_count; + const int64 num_worker_remote_target; + // EASL: indicate whether this job should be processed locally + const int64 num_worker_local_target; + // EASL: list of local workers in the client + absl::flat_hash_set local_workers; }; struct Task { @@ -243,7 +253,10 @@ class DispatcherState { // is lower than or equal to 0, then the reserved number of workers is equal // to all the available workers. std::vector> ReserveWorkers(int64 job_id, - int64 num_workers = 0); + int64 num_worker_remote_target = 0, + int64 num_worker_local_target = 0, + const absl::flat_hash_set local_workers = {} + ); // Returns the next available job id. int64_t NextAvailableJobId() const; diff --git a/tensorflow/core/data/service/easl/local_decision_utils.cc b/tensorflow/core/data/service/easl/local_decision_utils.cc new file mode 100644 index 00000000000000..88a926f691287c --- /dev/null +++ b/tensorflow/core/data/service/easl/local_decision_utils.cc @@ -0,0 +1,140 @@ +// +// Created by Muyu Li on 16.11.21. +// + +#include "local_decision_utils.h" + +namespace tensorflow { +namespace data { +namespace service { +namespace easl { +namespace local_decision { + +Status DecideIfLocal( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + bool& want_to_use_local_workers) { + using NodeMetrics = ::tensorflow::data::easl::NodeMetrics; + using ModelMetrics = ::tensorflow::data::easl::ModelMetrics; + + // Check if we have any metrics for this dataset + std::shared_ptr job_metrics; + Status s = metadata_store.GetInputPipelineMetricsByDatasetKey( + dataset_key, job_metrics); + + // We do not yet have the metrics for this dataset --> use 1 worker + if(errors::IsNotFound(s)) { + VLOG(0) << "DSL (DecideIfLocal) No metrics found for dataset, will use local workers (optimistic)!"; + want_to_use_local_workers = true; + return Status::OK(); + } else if (!s.ok()) { + VLOG(0) << "DSL (DecideIfLocal) Another error has been thrown: " << s; + return s; + } + + // Pipeline stats: last TF node metrics + std::shared_ptr last_tf_node_metrics; + + s = metadata_store.GetLastNodeMetricsByDatasetKey(dataset_key, last_tf_node_metrics); + if (!s.ok()) { + VLOG(0) << "DSL (DecideIfLocal) Failed to get the last TF node metrics"; + return s; + } + + int64_t total_bytes_produced = 0, total_num_elements = 0; + for (std::pair> e : + last_tf_node_metrics->metrics_) { + std::shared_ptr node_metrics = e.second; + total_bytes_produced += node_metrics->bytes_produced(); + total_num_elements += node_metrics->num_elements(); + } + + double avg_bytes_per_element = (double)total_bytes_produced / total_num_elements; + VLOG(0) << "DSL (DecideIfLocal) Total bytes produced: " << total_bytes_produced << "\n" + << "Total num elements: " << total_num_elements << "\n" + << "Avg bytes produced per element: " << avg_bytes_per_element << "\n" + << "Decision Threshold: " << dispatcher_config.avg_bytes_per_element_local_thres() << "\n"; + + if (avg_bytes_per_element > dispatcher_config.avg_bytes_per_element_local_thres()) { + want_to_use_local_workers = true; + VLOG(0) << "DSL (DecideIfLocal) Using local workers! (because avg. bytes per element > threshold) \n"; + } + else { + want_to_use_local_workers = false; + VLOG(0) << "DSL (DecideIfLocal) NOT using local workers! (because avg. bytes per element < threshold) \n"; + } + + return Status::OK(); +} + +std::vector records; + +void grid_search(int64 num_worker_remote_avail, int64 num_worker_local_avail, + int64& num_worker_remote_target, int64& num_worker_local_target) { + std::vector> test_set = std::vector>(); + for(int64 n_r = 0; n_r <= num_worker_remote_avail; n_r++) { + for(int64 n_l = 0; n_l <= num_worker_local_avail; n_l++) { + if(n_r + n_l <= 0) { + continue; + } + test_set.emplace_back(n_r, n_l); + } + } + std::vector epoch_times; + for(int i = 1; i < records.size(); i++) { + epoch_times.push_back(records[i] - records[i-1]); + } + int index; + if(epoch_times.size() < test_set.size()) { + index = epoch_times.size(); + } else { + index = std::min_element(epoch_times.begin(), epoch_times.begin() + test_set.size()) - epoch_times.begin(); + } + auto p = test_set[index]; + num_worker_remote_target = p.first; + num_worker_local_target = p.second; +} + +Status DecideTargetWorkersGridSearch( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + int64 num_worker_remote_avail, + int64 num_worker_local_avail, + int64& num_worker_remote_target, + int64& num_worker_local_target) { + std::time_t t = std::time(nullptr); + records.push_back(t); + grid_search(num_worker_remote_avail, num_worker_local_avail, num_worker_remote_target, num_worker_local_target); + VLOG(0) << "DSL (DecideTargetWorkers) Available remote: " << num_worker_remote_avail << "\n" + << "Available local: " << num_worker_local_avail << "\n" + << "Decided remote: " << num_worker_remote_target << "\n" + << "Decided local: " << num_worker_local_target << "\n"; + return Status::OK(); +} + +//TODO: Implement this based on EASL 2.7 autoscaling +Status DecideTargetWorkersAutoscaling( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + int64 num_worker_remote_avail, + int64 num_worker_local_avail, + int64& num_worker_remote_target, + int64& num_worker_local_target) { + num_worker_remote_target = num_worker_remote_avail / 2; + num_worker_local_target = num_worker_local_avail / 2; + + VLOG(0) << "DSL (DecideTargetWorkers) Available remote: " << num_worker_remote_avail << "\n" + << "Available local: " << num_worker_local_avail << "\n" + << "Decided remote: " << num_worker_remote_target << "\n" + << "Decided local: " << num_worker_local_target << "\n"; + return Status::OK(); +} + +} // namespace local_decision +} // namespace easl +} // namespace service +} // namespace data +} // namespace tensorflow diff --git a/tensorflow/core/data/service/easl/local_decision_utils.h b/tensorflow/core/data/service/easl/local_decision_utils.h new file mode 100644 index 00000000000000..3d4633967820eb --- /dev/null +++ b/tensorflow/core/data/service/easl/local_decision_utils.h @@ -0,0 +1,52 @@ +// +// Created by Muyu Li on 16.11.21. +// + +#ifndef ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H +#define ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H + +#include +#include "tensorflow/core/platform/default/integral_types.h" +#include "tensorflow/core/data/service/easl/metadata_store.h" +#include "tensorflow/core/lib/core/status.h" +#include "tensorflow/core/data/service/common.pb.h" +#include "tensorflow/core/data/service/dispatcher_state.h" +#include "tensorflow/core/protobuf/service_config.pb.h" + +namespace tensorflow { +namespace data { +namespace service { +namespace easl { +namespace local_decision { + +Status DecideIfLocal( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + bool& using_local_workers); + +Status DecideTargetWorkersGridSearch( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + int64 num_worker_remote_avail, + int64 num_worker_local_avail, + int64& num_worker_remote_target, + int64& num_worker_local_target); + +Status DecideTargetWorkersAutoscaling( + const experimental::DispatcherConfig& dispatcher_config, + const ::tensorflow::data::easl::MetadataStore& metadata_store, + const std::string& dataset_key, + int64 num_worker_remote_avail, + int64 num_worker_local_avail, + int64& num_worker_remote_target, + int64& num_worker_local_target); + +} // namespace local_decision +} // namespace easl +} // namespace service +} // namespace data +} // namespace tensorflow + +#endif //ML_INPUT_DATA_SERVICE_LOCAL_DECISION_UTILS_H diff --git a/tensorflow/core/data/service/journal.proto b/tensorflow/core/data/service/journal.proto index 47f1c07c07ea45..cc8ff71d567e0a 100644 --- a/tensorflow/core/data/service/journal.proto +++ b/tensorflow/core/data/service/journal.proto @@ -47,7 +47,7 @@ message NamedJobKeyDef { int64 index = 2; } -// Next tag: 11 +// Next tag: 17 message CreateJobUpdate { reserved 3, 5, 6; int64 job_id = 1; @@ -65,7 +65,9 @@ message CreateJobUpdate { TargetWorkers target_workers = 10; // EASL string job_type = 13; // i.e read, write, cache - int64 worker_count = 14; // determined by elasticity policy + int64 num_worker_remote_target = 14; // decided between epochs + int64 num_worker_local_target = 15; // decided between epochs + repeated string local_workers = 16; } // Next tag: 5 diff --git a/tensorflow/core/data/service/worker_impl.cc b/tensorflow/core/data/service/worker_impl.cc index ad08c2e7987706..21d05651c8edd8 100644 --- a/tensorflow/core/data/service/worker_impl.cc +++ b/tensorflow/core/data/service/worker_impl.cc @@ -650,5 +650,18 @@ void LocalWorkers::Remove(absl::string_view worker_address) { local_workers_->erase(worker_address); } +std::vector LocalWorkers::GetList() { + string local_workers_string = ""; + std::vector local_workers; + for (auto it = local_workers_->begin(); it != local_workers_->end(); ++it) { + local_workers.push_back(it->first); + local_workers_string += it->first + "; "; + } + + VLOG(1) << "EASL-MUYU: Check List of Local Workers: " << local_workers_string; + + return local_workers; +} + } // namespace data } // namespace tensorflow diff --git a/tensorflow/core/data/service/worker_impl.h b/tensorflow/core/data/service/worker_impl.h index 717e35d10fec8d..fec8450296aec7 100644 --- a/tensorflow/core/data/service/worker_impl.h +++ b/tensorflow/core/data/service/worker_impl.h @@ -174,6 +174,9 @@ class LocalWorkers { // at the address. static void Remove(absl::string_view worker_address); + // Get a list of local workers created in process + static std::vector GetList(); + private: using AddressToWorkerMap = absl::flat_hash_map>; diff --git a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc index 75a0edf59be3ea..384d740e42b2ee 100644 --- a/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc +++ b/tensorflow/core/kernels/data/experimental/data_service_dataset_op.cc @@ -21,6 +21,7 @@ limitations under the License. #include #include #include +#include #include "absl/algorithm/container.h" #include "absl/container/flat_hash_map.h" @@ -326,7 +327,9 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { return dispatcher_->GetOrCreateJob( dataset()->dataset_id_, dataset()->processing_mode_, key, dataset()->num_consumers_, dataset()->target_workers_, - job_client_id_); + job_client_id_, + LocalWorkers::GetList() + ); }, /*description=*/ strings::StrCat("get or create job with dispatcher at ", @@ -791,13 +794,14 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } } + for (auto& task : resp.task_info()) { auto it = task_id_to_task.find(task.task_id()); if (it == task_id_to_task.end()) { continue; } - if (!ShouldReadFromTask(task)) { - VLOG(3) << "Skipping untargeted worker task " << task.task_id(); + if (!ShouldReadFromTask(task, resp.num_worker_local_target())) { + VLOG(0) << "Skipping untargeted worker task " << task.task_id(); should_finish_job_ = false; continue; } @@ -810,17 +814,30 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } } - bool ShouldReadFromTask(const TaskInfo& task) const + bool ShouldReadFromTask(const TaskInfo& task, const int num_worker_local_target) const TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { if (StrictRoundRobin()) { return true; } - const bool is_local_task = - (LocalWorkers::Get(task.worker_address()) != nullptr); - if (dataset()->target_workers_ == TARGET_WORKERS_LOCAL && - !is_local_task) { - return false; + const bool is_local_task = (LocalWorkers::Get(task.worker_address()) != nullptr); + + if (dataset()->target_workers_ == TARGET_WORKERS_LOCAL) { + VLOG(1) << "EASL-MUYU: TARGET_WORKERS_TAG is set"; + if (is_local_task) { + VLOG(1) << "EASL-MUYU(TARGET_WORKERS_TAG): the worker address is: (" + << task.worker_address() << + "), which is local. CHOOSE IT!!"; + } + else { + VLOG(1) << "EASL-MUYU(TARGET_WORKERS_TAG): the worker address is: (" + << task.worker_address() << + "), which is NOT local. DON'T CHOOSE IT!!"; + return false; + } + } + else { + VLOG(1) << "EASL-MUYU: TARGET_WORKERS_TAG is not set, do nothing"; } // Cross-TF/TPU host reads may cause resource contention on the TF/TPU @@ -1127,15 +1144,17 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { Status TryGetElement(const Task& task, GetElementResult& result) { GetElementRequest req; - mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads - req.set_task_id(task.info.task_id()); - req.set_skipped_previous_round(task.skipped_previous_round); - absl::optional round_index; - if (StrictRoundRobin()) { - round_index = task.round; - req.set_consumer_index(dataset()->consumer_index_.value()); - req.set_round_index(task.round); - req.set_allow_skip(true); + { + mutex_lock l(mu_); // EASL - added cause tasks can be used by multiple threads + req.set_task_id(task.info.task_id()); + req.set_skipped_previous_round(task.skipped_previous_round); + absl::optional round_index; + if (StrictRoundRobin()) { + round_index = task.round; + req.set_consumer_index(dataset()->consumer_index_.value()); + req.set_round_index(task.round); + req.set_allow_skip(true); + } } return task.worker->GetElement(req, result); } @@ -1162,11 +1181,29 @@ class DataServiceDatasetOp::Dataset : public DatasetBase { } if (enqueue_result && !result.end_of_sequence) { + uint64 current_micro_timestamp = Env::Default()->NowMicros(); + std::string data_source = task.info.worker_address(); + bool if_local = false; + int result_size = result.element.size(); if (local_tasks_.contains(task.info.worker_address())) { + if_local = true; local_results_buffer_.push(std::move(result)); } else { results_.push(std::move(result)); } + + const char* log_location = std::getenv("EASL_MUYU_WORKER_METRICS"); + if (log_location) { + std::ofstream file(log_location, std::ios_base::app); + + file << current_micro_timestamp << "," + << data_source << "," + << if_local << "," + << result_size << "\n"; + + file.flush(); + file.clear(); + } } get_next_cv_.notify_all(); } diff --git a/tensorflow/core/protobuf/service_config.proto b/tensorflow/core/protobuf/service_config.proto index a3deccf495fbd2..65fb41e901b1e4 100644 --- a/tensorflow/core/protobuf/service_config.proto +++ b/tensorflow/core/protobuf/service_config.proto @@ -59,6 +59,8 @@ message DispatcherConfig { // The interval at which the dispatcher should dump log files. int64 log_dumps_interval_ms = 14; + // MUYU's modification + int64 avg_bytes_per_element_local_thres = 17; } // Configuration for a tf.data service WorkerServer. diff --git a/tensorflow/python/data/experimental/service/server_lib.py b/tensorflow/python/data/experimental/service/server_lib.py index 7ba729b98c217e..522c4fc00c62eb 100644 --- a/tensorflow/python/data/experimental/service/server_lib.py +++ b/tensorflow/python/data/experimental/service/server_lib.py @@ -46,7 +46,7 @@ class DispatcherConfig( "port", "protocol", "work_dir", "fault_tolerant_mode", "worker_addresses", "job_gc_check_interval_ms", "job_gc_timeout_ms", "cache_policy", "cache_format", "cache_compression", "cache_ops_parallelism", "cache_path", - "scaling_policy", "log_dir", "log_dumps_interval_ms" + "scaling_policy", "log_dir", "log_dumps_interval_ms", "avg_bytes_per_element_local_thres" ])): """Configuration class for tf.data service dispatchers. @@ -109,7 +109,9 @@ def __new__(cls, cache_path="./outputs", scaling_policy=1, log_dir="", - log_dumps_interval_ms=None): + log_dumps_interval_ms=None, + avg_bytes_per_element_local_thres=1024*1024*30 # 30 MB + ): if protocol is None: protocol = _pywrap_utils.TF_DATA_DefaultProtocol() job_gc_check_interval_ms = _get_time_or_placeholder( @@ -124,7 +126,7 @@ def __new__(cls, fault_tolerant_mode, worker_addresses, job_gc_check_interval_ms, job_gc_timeout_ms, cache_policy, cache_format, cache_compression, cache_ops_parallelism, cache_path, scaling_policy, - log_dir, log_dumps_interval_ms) + log_dir, log_dumps_interval_ms, avg_bytes_per_element_local_thres) @tf_export("data.experimental.service.DispatchServer", v1=[]) @@ -202,7 +204,9 @@ def __init__(self, config=None, start=True): cache_path=config.cache_path, scaling_policy=config.scaling_policy, log_dir=config.log_dir, - log_dumps_interval_ms=config.log_dumps_interval_ms) + log_dumps_interval_ms=config.log_dumps_interval_ms, + avg_bytes_per_element_local_thres=config.avg_bytes_per_element_local_thres, + ) self._server = _pywrap_server_lib.TF_DATA_NewDispatchServer( config_proto.SerializeToString()) if start: