From c696e49a1cb782adab1d18dd3be65665340b32df Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Thu, 17 Jun 2021 13:30:14 +0000 Subject: [PATCH 1/5] remove py4j init commit --- .../spark/deploy/RayAppMasterUtils.java | 4 +- .../deploy/raydp/AppMasterEntryPoint.scala | 70 ------------------- .../deploy/raydp/AppMasterJavaBridge.scala | 59 ---------------- .../spark/deploy/raydp/RayAppMaster.scala | 16 ++--- .../RayCoarseGrainedSchedulerBackend.scala | 2 +- python/raydp/context.py | 10 +-- python/raydp/spark/__init__.py | 4 +- python/raydp/spark/ray_cluster.py | 19 ++--- python/raydp/spark/ray_cluster_master.py | 38 +++++----- 9 files changed, 43 insertions(+), 179 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala diff --git a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java index f363b073..b3883c8e 100644 --- a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java +++ b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java @@ -22,8 +22,8 @@ public class RayAppMasterUtils { public static ActorHandle createAppMaster( - String cp, String jvmOptions) { - return Ray.actor(RayAppMaster::new, cp).setJvmOptions(jvmOptions).remote(); + String jvmOptions) { + return Ray.actor(RayAppMaster::new).setJvmOptions(jvmOptions).remote(); } public static String getMasterUrl( diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala deleted file mode 100644 index 6ab801e5..00000000 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.raydp - -import java.io.{DataOutputStream, File, FileOutputStream} -import java.nio.file.Files - -import py4j.GatewayServer - -import org.apache.spark.internal.Logging - -class AppMasterEntryPoint { - private val appMaster: AppMasterJavaBridge = new AppMasterJavaBridge() - - def getAppMasterBridge(): AppMasterJavaBridge = { - appMaster - } -} - -object AppMasterEntryPoint extends Logging { - initializeLogIfNecessary(true) - - def main(args: Array[String]): Unit = { - val server = new GatewayServer(new AppMasterEntryPoint()) - server.start() - val boundPort: Int = server.getListeningPort() - if (boundPort == -1) { - logError(s"${server.getClass} failed to bind; exiting") - System.exit(1) - } else { - logDebug(s"Started PythonGatewayServer on port $boundPort") - } - - - val connectionInfoPath = new File(sys.env("_RAYDP_APPMASTER_CONN_INFO_PATH")) - val tmpPath = Files.createTempFile(connectionInfoPath.getParentFile().toPath(), - "connection", ".info").toFile() - - val dos = new DataOutputStream(new FileOutputStream(tmpPath)) - dos.writeInt(boundPort) - dos.close() - - if (!tmpPath.renameTo(connectionInfoPath)) { - logError(s"Unable to write connection information to $connectionInfoPath.") - System.exit(1) - } - - // Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies: - while (System.in.read() != -1) { - // Do nothing - } - logDebug("Exiting due to broken pipe from Python driver") - System.exit(0) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala deleted file mode 100644 index b003d298..00000000 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.raydp - -import io.ray.api.Ray -import io.ray.runtime.config.RayConfig -import org.json4s._ -import org.json4s.jackson.JsonMethods._ - -class AppMasterJavaBridge { - private var instance: RayAppMaster = null - - def setProperties(properties: String): Unit = { - implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats - val parsed = parse(properties).extract[Map[String, String]] - parsed.foreach{ case (key, value) => - System.setProperty(key, value) - } - // Use the same session dir as the python side - RayConfig.create().setSessionDir(System.getProperty("ray.session-dir")) - } - - def startUpAppMaster(extra_cp: String): Unit = { - if (instance == null) { - // init ray, we should set the config by java properties - Ray.init() - instance = new RayAppMaster(extra_cp) - } - } - - def getMasterUrl(): String = { - if (instance == null) { - throw new RuntimeException("You should create the RayAppMaster instance first") - } - instance.getMasterUrl() - } - - def stop(): Unit = { - if (instance != null) { - instance.stop() - instance = null - } - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index dd1a4c31..6fb3fcaf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -35,23 +35,15 @@ import org.apache.spark.rpc._ import org.apache.spark.util.ShutdownHookManager import org.apache.spark.util.Utils -class RayAppMaster(host: String, - port: Int, - actor_extra_classpath: String) extends Serializable with Logging { +class RayAppMaster() extends Serializable with Logging { private var endpoint: RpcEndpointRef = _ private var rpcEnv: RpcEnv = _ private val conf: SparkConf = new SparkConf() + private val host: String = RayConfig.create().nodeIp + private val actor_extra_classpath: String = "" init() - def this() = { - this(RayConfig.create().nodeIp, 0, "") - } - - def this(actor_extra_classpath: String) = { - this(RayConfig.create().nodeIp, 0, actor_extra_classpath) - } - def init(): Unit = { Utils.loadDefaultSparkProperties(conf) val securityMgr = new SecurityManager(conf) @@ -59,7 +51,7 @@ class RayAppMaster(host: String, RayAppMaster.ENV_NAME, host, host, - port, + 0, conf, securityMgr, numUsableCores = 0, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index 40514e50..089bbc95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -68,7 +68,7 @@ class RayCoarseGrainedSchedulerBackend( Ray.init() val cp = sys.props("java.class.path") val options = RayExternalShuffleService.getShuffleConf(conf) - masterHandle = RayAppMasterUtils.createAppMaster(cp, options) + masterHandle = RayAppMasterUtils.createAppMaster(options) uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle)) } else { uri = new URI(sparkUrl) diff --git a/python/raydp/context.py b/python/raydp/context.py index 4aaeedca..9b288153 100644 --- a/python/raydp/context.py +++ b/python/raydp/context.py @@ -15,17 +15,20 @@ # limitations under the License. # -import atexit +import atexit, os from contextlib import ContextDecorator from threading import RLock from typing import Dict, Union, Optional import ray +from ray.job_config import JobConfig +import pyspark from pyspark.sql import SparkSession -from raydp.spark import SparkCluster +from raydp.spark import SparkCluster, RAYDP_CP from raydp.utils import parse_memory_size +SPARK_CP = os.path.join(os.path.dirname(pyspark.__file__), "jars/*") class _SparkContext(ContextDecorator): """A class used to create the Spark cluster and get the Spark session. @@ -111,8 +114,7 @@ def init_spark(app_name: str, if not ray.is_initialized(): # ray has not initialized, init local - ray.init() - + ray.init(job_config=JobConfig(java_code_search_path=[RAYDP_CP, SPARK_CP])) with _spark_context_lock: global _global_spark_context if _global_spark_context is None: diff --git a/python/raydp/spark/__init__.py b/python/raydp/spark/__init__.py index 81b64494..951b7e62 100644 --- a/python/raydp/spark/__init__.py +++ b/python/raydp/spark/__init__.py @@ -17,6 +17,6 @@ from .dataset import RayMLDataset from .interfaces import SparkEstimatorInterface -from .ray_cluster import SparkCluster +from .ray_cluster import SparkCluster, RAYDP_CP -__all__ = ["RayMLDataset", "SparkCluster", "SparkEstimatorInterface"] +__all__ = ["RayMLDataset", "SparkCluster", "SparkEstimatorInterface", "RAYDP_CP"] diff --git a/python/raydp/spark/ray_cluster.py b/python/raydp/spark/ray_cluster.py index db7ec512..d4ee12e9 100644 --- a/python/raydp/spark/ray_cluster.py +++ b/python/raydp/spark/ray_cluster.py @@ -15,34 +15,35 @@ # limitations under the License. # -import glob +import glob, os from typing import Any, Dict import ray from pyspark.sql.session import SparkSession from raydp.services import Cluster -from .ray_cluster_master import RayClusterMaster, RAYDP_CP +from .ray_cluster_master import RayClusterMaster +RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) class SparkCluster(Cluster): def __init__(self, configs): super().__init__(None) - self._app_master_bridge = None + self._app_master = None self._configs = configs self._set_up_master(None, None) self._spark_session: SparkSession = None def _set_up_master(self, resources: Dict[str, float], kwargs: Dict[Any, Any]): # TODO: specify the app master resource - self._app_master_bridge = RayClusterMaster(self._configs) - self._app_master_bridge.start_up() + self._app_master = RayClusterMaster(self._configs) + self._app_master.start_up() def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]): raise Exception("Unsupported operation") def get_cluster_url(self) -> str: - return self._app_master_bridge.get_master_url() + return self._app_master.get_master_url() def get_spark_session(self, app_name: str, @@ -83,6 +84,6 @@ def stop(self): self._spark_session.stop() self._spark_session = None - if self._app_master_bridge is not None: - self._app_master_bridge.stop() - self._app_master_bridge = None + if self._app_master is not None: + self._app_master.stop() + self._app_master = None diff --git a/python/raydp/spark/ray_cluster_master.py b/python/raydp/spark/ray_cluster_master.py index 65cf8270..ac214c68 100644 --- a/python/raydp/spark/ray_cluster_master.py +++ b/python/raydp/spark/ray_cluster_master.py @@ -30,19 +30,14 @@ import pyspark import ray import ray.services -from py4j.java_gateway import JavaGateway, GatewayParameters from raydp.services import ClusterMaster -RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) - logger = logging.getLogger(__name__) class RayClusterMaster(ClusterMaster): def __init__(self, configs): - self._gateway = None - self._app_master_java_bridge = None self._host = None self._started_up = False self._configs = configs @@ -51,18 +46,21 @@ def start_up(self, popen_kwargs=None): if self._started_up: logger.warning("The RayClusterMaster has started already. Do not call it twice") return - extra_classpath = os.pathsep.join(self._prepare_jvm_classpath()) - self._gateway = self._launch_gateway(extra_classpath, popen_kwargs) - self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge() - self._set_properties() - self._host = ray._private.services.get_node_ip_address() - self._create_app_master(extra_classpath) + appMasterCls = ray.java_actor_class("org.apache.spark.deploy.raydp.RayAppMaster") + self._instance = appMasterCls.remote() + self._master_url = ray.get(self._instance.getMasterUrl.remote()) + # extra_classpath = os.pathsep.join(self._prepare_jvm_classpath()) + # self._gateway = self._launch_gateway(extra_classpath, popen_kwargs) + # self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge() + # self._set_properties() + # self._host = ray._private.services.get_node_ip_address() + # self._create_app_master(extra_classpath) self._started_up = True def _prepare_jvm_classpath(self): cp_list = [] # find RayDP core path - cp_list.append(RAYDP_CP) + # cp_list.append(RAYDP_CP) # find ray jar path ray_cp = os.path.abspath(os.path.join(os.path.dirname(ray.__file__), "jars/*")) cp_list.append(ray_cp) @@ -170,19 +168,19 @@ def _create_app_master(self, extra_classpath: str): def get_master_url(self): assert self._started_up - return self._app_master_java_bridge.getMasterUrl() + return self._master_url def stop(self): if not self._started_up: return - if self._app_master_java_bridge is not None: - self._app_master_java_bridge.stop() - self._app_master_java_bridge = None + # if self._app_master_java_bridge is not None: + # self._app_master_java_bridge.stop() + # self._app_master_java_bridge = None - if self._gateway is not None: - self._gateway.shutdown() - self._gateway.proc.terminate() - self._gateway = None + # if self._gateway is not None: + # self._gateway.shutdown() + # self._gateway.proc.terminate() + # self._gateway = None self._started_up = False From 9d3053d349f48a2c375766e39a4242135ee3fe8e Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 18 Jun 2021 11:23:09 +0000 Subject: [PATCH 2/5] fix --- python/raydp/context.py | 8 +++----- python/raydp/spark/__init__.py | 4 ++-- python/raydp/spark/ray_cluster.py | 8 ++++---- python/raydp/utils.py | 8 ++++++-- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/python/raydp/context.py b/python/raydp/context.py index 9b288153..3772a968 100644 --- a/python/raydp/context.py +++ b/python/raydp/context.py @@ -25,10 +25,8 @@ import pyspark from pyspark.sql import SparkSession -from raydp.spark import SparkCluster, RAYDP_CP -from raydp.utils import parse_memory_size - -SPARK_CP = os.path.join(os.path.dirname(pyspark.__file__), "jars/*") +from raydp.spark import SparkCluster +from raydp.utils import get_code_search_path, parse_memory_size class _SparkContext(ContextDecorator): """A class used to create the Spark cluster and get the Spark session. @@ -114,7 +112,7 @@ def init_spark(app_name: str, if not ray.is_initialized(): # ray has not initialized, init local - ray.init(job_config=JobConfig(java_code_search_path=[RAYDP_CP, SPARK_CP])) + ray.init(job_config=JobConfig(java_code_search_path=get_code_search_path())) with _spark_context_lock: global _global_spark_context if _global_spark_context is None: diff --git a/python/raydp/spark/__init__.py b/python/raydp/spark/__init__.py index 951b7e62..81b64494 100644 --- a/python/raydp/spark/__init__.py +++ b/python/raydp/spark/__init__.py @@ -17,6 +17,6 @@ from .dataset import RayMLDataset from .interfaces import SparkEstimatorInterface -from .ray_cluster import SparkCluster, RAYDP_CP +from .ray_cluster import SparkCluster -__all__ = ["RayMLDataset", "SparkCluster", "SparkEstimatorInterface", "RAYDP_CP"] +__all__ = ["RayMLDataset", "SparkCluster", "SparkEstimatorInterface"] diff --git a/python/raydp/spark/ray_cluster.py b/python/raydp/spark/ray_cluster.py index d4ee12e9..5e956222 100644 --- a/python/raydp/spark/ray_cluster.py +++ b/python/raydp/spark/ray_cluster.py @@ -24,7 +24,7 @@ from raydp.services import Cluster from .ray_cluster_master import RayClusterMaster -RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) +RAYDP_JARS = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) class SparkCluster(Cluster): def __init__(self, configs): @@ -66,12 +66,12 @@ def get_spark_session(self, extra_jars = [extra_conf["spark.jars"]] except KeyError: extra_jars = [] - extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_CP) + extra_jars) + extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_JARS) + extra_jars) driver_cp = "spark.driver.extraClassPath" if driver_cp in extra_conf: - extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) + ":" + extra_conf[driver_cp] + extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS)) + ":" + extra_conf[driver_cp] else: - extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) + extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS)) spark_builder = SparkSession.builder for k, v in extra_conf.items(): spark_builder.config(k, v) diff --git a/python/raydp/utils.py b/python/raydp/utils.py index 328c4c8b..32392cc1 100644 --- a/python/raydp/utils.py +++ b/python/raydp/utils.py @@ -17,19 +17,23 @@ import atexit import math -import re +import re, os import signal from typing import Dict, List, Tuple import numpy as np import psutil +import pyspark MEMORY_SIZE_UNITS = {"K": 2**10, "M": 2**20, "G": 2**30, "T": 2**40} - +SPARK_CP = os.path.join(os.path.dirname(pyspark.__file__), "jars") +RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../jars")) # we use 4 bytes for block size, this means each block can contain # 4294967296 records BLOCK_SIZE_BIT = 32 +def get_code_search_path() -> List: + return [RAYDP_CP, SPARK_CP] def get_node_address() -> str: """ From 94de3b68cb3b6b31f93c1455b7daf65823a54304 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Fri, 18 Jun 2021 14:57:55 +0000 Subject: [PATCH 3/5] merge 1.4.0 --- .github/workflows/raydp.yml | 2 +- core/pom.xml | 2 +- .../apache/spark/deploy/ExternalShuffleServiceUtils.java | 4 +++- .../java/org/apache/spark/deploy/RayAppMasterUtils.java | 4 +++- .../java/org/apache/spark/raydp/RayExecutorUtils.java | 3 ++- .../org/apache/spark/deploy/raydp/RayAppMaster.scala | 5 ++--- .../spark/deploy/raydp/RayExternalShuffleService.scala | 4 ++-- .../cluster/raydp/RayCoarseGrainedSchedulerBackend.scala | 3 ++- python/raydp/__init__.py | 4 ++-- python/raydp/tests/conftest.py | 9 +++++++-- python/setup.py | 2 +- 11 files changed, 26 insertions(+), 16 deletions(-) diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index 0efe1e94..dc7b762e 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -69,7 +69,7 @@ jobs: else pip install torch fi - pip install pytest koalas tensorflow tabulate ray[tune]==1.3.0 xgboost_ray grpcio-tools + pip install pytest koalas tensorflow tabulate ray[tune]==1.4.0 xgboost_ray grpcio-tools HOROVOD_WITH_GLOO=1 HOROVOD_WITH_PYTORCH=1 pip install horovod[pytorch,ray] diff --git a/core/pom.xml b/core/pom.xml index 1fb17857..eda2c780 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ 2.12.2 2.31 2.12 - 1.3.0 + 1.4.0 diff --git a/core/src/main/java/org/apache/spark/deploy/ExternalShuffleServiceUtils.java b/core/src/main/java/org/apache/spark/deploy/ExternalShuffleServiceUtils.java index 1be21ad9..ecaa747e 100644 --- a/core/src/main/java/org/apache/spark/deploy/ExternalShuffleServiceUtils.java +++ b/core/src/main/java/org/apache/spark/deploy/ExternalShuffleServiceUtils.java @@ -17,12 +17,14 @@ package org.apache.spark.deploy.raydp; +import java.util.List; + import io.ray.api.ActorHandle; import io.ray.api.Ray; public class ExternalShuffleServiceUtils { public static ActorHandle createShuffleService( - String node, String options) { + String node, List options) { return Ray.actor(RayExternalShuffleService::new) .setResource("node:" + node, 0.01) .setJvmOptions(options).remote(); diff --git a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java index b3883c8e..e6d3af71 100644 --- a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java +++ b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java @@ -17,12 +17,14 @@ package org.apache.spark.deploy.raydp; +import java.util.List; + import io.ray.api.ActorHandle; import io.ray.api.Ray; public class RayAppMasterUtils { public static ActorHandle createAppMaster( - String jvmOptions) { + List jvmOptions) { return Ray.actor(RayAppMaster::new).setJvmOptions(jvmOptions).remote(); } diff --git a/core/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java b/core/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java index 37397ff2..dfd31401 100644 --- a/core/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java +++ b/core/src/main/java/org/apache/spark/raydp/RayExecutorUtils.java @@ -21,6 +21,7 @@ import io.ray.api.Ray; import io.ray.api.call.ActorCreator; import java.util.Map; +import java.util.List; import org.apache.spark.executor.RayCoarseGrainedExecutorBackend; public class RayExecutorUtils { @@ -39,7 +40,7 @@ public static ActorHandle createExecutorActor( int cores, int memoryInMB, Map resources, - String javaOpts) { + List javaOpts) { ActorCreator creator = Ray.actor( RayCoarseGrainedExecutorBackend::new, executorId, appMasterURL); creator.setJvmOptions(javaOpts); diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index 6fb3fcaf..81f565e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -127,7 +127,7 @@ class RayAppMaster() extends Serializable with Logging { if (!nodesWithShuffleService.contains(executorIp)) { logInfo(s"Starting shuffle service on ${executorIp}") val service = ExternalShuffleServiceUtils.createShuffleService( - executorIp, shuffleServiceOptions) + executorIp, shuffleServiceOptions.toBuffer.asJava) ExternalShuffleServiceUtils.startShuffleService(service) nodesWithShuffleService(executorIp) = service } @@ -211,12 +211,11 @@ class RayAppMaster() extends Serializable with Logging { val cores = appInfo.desc.coresPerExecutor.getOrElse(1) val memory = appInfo.desc.memoryPerExecutorMB val executorId = s"${appInfo.getNextExecutorId()}" - val javaOpts = appInfo.desc.command.javaOpts.mkString(" ") val handler = RayExecutorUtils.createExecutorActor( executorId, getAppMasterEndpointUrl(), cores, memory, appInfo.desc.resourceReqsPerExecutor.map(pair => (pair._1, Double.box(pair._2))).asJava, - javaOpts) + seqAsJavaList(appInfo.desc.command.javaOpts)) appInfo.addPendingRegisterExecutor(executorId, handler, cores, memory) } diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala index e127bbb4..1566c584 100644 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/raydp/RayExternalShuffleService.scala @@ -39,7 +39,7 @@ class RayExternalShuffleService() extends Logging { } object RayExternalShuffleService { - def getShuffleConf(conf: SparkConf): String = { + def getShuffleConf(conf: SparkConf): Array[String] = { // all conf needed by external shuffle service var shuffleConf = conf.getAll.filter { case (k, v) => k.startsWith("spark.shuffle") @@ -52,6 +52,6 @@ object RayExternalShuffleService { shuffleConf = shuffleConf :+ "-D" + localDirKey + "=" + conf.get(localDirKey) } - shuffleConf.mkString(" ") + shuffleConf } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index 089bbc95..5ac916da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.concurrent.Semaphore import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.concurrent.Future @@ -68,7 +69,7 @@ class RayCoarseGrainedSchedulerBackend( Ray.init() val cp = sys.props("java.class.path") val options = RayExternalShuffleService.getShuffleConf(conf) - masterHandle = RayAppMasterUtils.createAppMaster(options) + masterHandle = RayAppMasterUtils.createAppMaster(options.toBuffer.asJava) uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle)) } else { uri = new URI(sparkUrl) diff --git a/python/raydp/__init__.py b/python/raydp/__init__.py index d5d73e5b..8a7546ee 100644 --- a/python/raydp/__init__.py +++ b/python/raydp/__init__.py @@ -16,7 +16,7 @@ # from raydp.context import init_spark, stop_spark - +from raydp.utils import get_code_search_path __version__ = "0.4.0.dev0" -__all__ = ["init_spark", "stop_spark"] +__all__ = ["init_spark", "stop_spark", "get_code_search_path"] diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index 1f835515..be88f5f2 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -20,8 +20,11 @@ import pytest from pyspark.sql import SparkSession import ray +from ray.job_config import JobConfig import raydp +from raydp.utils import get_code_search_path +TEST_CONFIG=JobConfig(code_search_path=get_code_search_path()) def quiet_logger(): py4j_logger = logging.getLogger("py4j") @@ -42,14 +45,16 @@ def spark_session(request): @pytest.fixture(scope="function") def ray_cluster(request): ray.shutdown() - ray.init(num_cpus=4, _redis_password="123", include_dashboard=False) + ray.init(num_cpus=4, _redis_password="123", include_dashboard=False, + job_config=TEST_CONFIG) request.addfinalizer(lambda: ray.shutdown()) @pytest.fixture(scope="function") def spark_on_ray_small(request): ray.shutdown() - ray.init(num_cpus=4, _redis_password="123", include_dashboard=False) + ray.init(num_cpus=4, _redis_password="123", include_dashboard=False, + job_config=TEST_CONFIG) spark = raydp.init_spark("test", 1, 1, "500 M") def stop_all(): diff --git a/python/setup.py b/python/setup.py index e924925c..76c0a7e9 100644 --- a/python/setup.py +++ b/python/setup.py @@ -95,7 +95,7 @@ def run(self): "pandas == 1.1.4", "psutil", "pyarrow >= 0.10", - "ray == 1.3.0", + "ray == 1.4.0", "pyspark >= 3.0.0, < 3.1.0" ] From dd73c891f96cd9795c66c7884c29f3c39be62aac Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Mon, 21 Jun 2021 14:30:41 +0000 Subject: [PATCH 4/5] remove dead code; set classpath --- .../spark/deploy/RayAppMasterUtils.java | 3 +- .../spark/deploy/raydp/RayAppMaster.scala | 16 ++- python/raydp/spark/ray_cluster.py | 5 +- python/raydp/spark/ray_cluster_master.py | 119 ++---------------- 4 files changed, 23 insertions(+), 120 deletions(-) diff --git a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java index e6d3af71..71f6f581 100644 --- a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java +++ b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java @@ -35,7 +35,6 @@ public static String getMasterUrl( public static void stopAppMaster( ActorHandle handle) { - handle.task(RayAppMaster::stop).remote().get(); - handle.kill(); + handle.task(RayAppMaster::stop).remote(); } } diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index 81f565e1..a8385fec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -41,7 +41,8 @@ class RayAppMaster() extends Serializable with Logging { private val conf: SparkConf = new SparkConf() private val host: String = RayConfig.create().nodeIp - private val actor_extra_classpath: String = "" + private var actorExtraClasspath: String = _ + init() def init(): Unit = { @@ -60,6 +61,11 @@ class RayAppMaster() extends Serializable with Logging { endpoint = rpcEnv.setupEndpoint(RayAppMaster.ENDPOINT_NAME, new RayAppMasterEndpoint(rpcEnv)) } + def setActorClasspath(cp: String): Int = { + actorExtraClasspath = cp + 0 + } + /** * Get the app master endpoint URL. The executor will connect to AppMaster by this URL and * tell the AppMaster that it has started up successful. @@ -76,14 +82,14 @@ class RayAppMaster() extends Serializable with Logging { url.replace("spark", "ray") } - def stop(): Int = { + def stop(): Unit = { logInfo("Stopping RayAppMaster") if (rpcEnv != null) { rpcEnv.shutdown() endpoint = null rpcEnv = null } - 0 + Ray.exitActor() } class RayAppMasterEndpoint(override val rpcEnv: RpcEnv) @@ -236,10 +242,10 @@ class RayAppMaster() extends Serializable with Logging { s"Found ${javaOpts(i - 1)} while not classpath url in executor java opts") } - javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actor_extra_classpath) + javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actorExtraClasspath) } else { // user has not set, we append the actor extra classpath in the end - javaOpts ++ Seq("-cp", actor_extra_classpath) + javaOpts ++ Seq("-cp", actorExtraClasspath) } } diff --git a/python/raydp/spark/ray_cluster.py b/python/raydp/spark/ray_cluster.py index 5e956222..84237863 100644 --- a/python/raydp/spark/ray_cluster.py +++ b/python/raydp/spark/ray_cluster.py @@ -22,9 +22,8 @@ from pyspark.sql.session import SparkSession from raydp.services import Cluster -from .ray_cluster_master import RayClusterMaster +from .ray_cluster_master import RayClusterMaster, RAYDP_JARS -RAYDP_JARS = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) class SparkCluster(Cluster): def __init__(self, configs): @@ -59,7 +58,7 @@ def get_spark_session(self, extra_conf["spark.executor.instances"] = str(num_executors) extra_conf["spark.executor.cores"] = str(executor_cores) extra_conf["spark.executor.memory"] = str(executor_memory) - driver_node_ip = ray.services.get_node_ip_address() + driver_node_ip = ray.util.get_node_ip_address() extra_conf["spark.driver.host"] = str(driver_node_ip) extra_conf["spark.driver.bindAddress"] = str(driver_node_ip) try: diff --git a/python/raydp/spark/ray_cluster_master.py b/python/raydp/spark/ray_cluster_master.py index ac214c68..f60c4ecb 100644 --- a/python/raydp/spark/ray_cluster_master.py +++ b/python/raydp/spark/ray_cluster_master.py @@ -35,6 +35,7 @@ logger = logging.getLogger(__name__) +RAYDP_JARS = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) class RayClusterMaster(ClusterMaster): def __init__(self, configs): @@ -47,23 +48,21 @@ def start_up(self, popen_kwargs=None): logger.warning("The RayClusterMaster has started already. Do not call it twice") return appMasterCls = ray.java_actor_class("org.apache.spark.deploy.raydp.RayAppMaster") + self._host = ray.util.get_node_ip_address() self._instance = appMasterCls.remote() self._master_url = ray.get(self._instance.getMasterUrl.remote()) - # extra_classpath = os.pathsep.join(self._prepare_jvm_classpath()) - # self._gateway = self._launch_gateway(extra_classpath, popen_kwargs) - # self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge() - # self._set_properties() - # self._host = ray._private.services.get_node_ip_address() - # self._create_app_master(extra_classpath) + extra_classpath = os.pathsep.join(self._prepare_jvm_classpath()) + # set classpath for executors + ray.get(self._instance.setActorClasspath.remote(extra_classpath)) self._started_up = True def _prepare_jvm_classpath(self): cp_list = [] # find RayDP core path - # cp_list.append(RAYDP_CP) + cp_list.append(RAYDP_JARS) # find ray jar path - ray_cp = os.path.abspath(os.path.join(os.path.dirname(ray.__file__), "jars/*")) - cp_list.append(ray_cp) + ray_jars = os.path.abspath(os.path.join(os.path.dirname(ray.__file__), "jars/*")) + cp_list.append(ray_jars) # find pyspark jars path spark_home = os.path.dirname(pyspark.__file__) spark_jars_dir = os.path.join(spark_home, "jars/*") @@ -71,101 +70,10 @@ def _prepare_jvm_classpath(self): cp_list.extend(spark_jars) return cp_list - def _launch_gateway(self, class_path, popen_kwargs=None): - """ - launch jvm gateway - :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning - the py4j JVM. This is a developer feature intended for use in - customizing how pyspark interacts with the py4j JVM (e.g., capturing - stdout/stderr). - """ - - command = ["java"] - command.append("-cp") - command.append(class_path) - command.append("org.apache.spark.deploy.raydp.AppMasterEntryPoint") - - # Create a temporary directory where the gateway server should write the connection - # information. - conn_info_dir = tempfile.mkdtemp() - try: - fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir) - os.close(fd) - os.unlink(conn_info_file) - - env = dict(os.environ) - env["_RAYDP_APPMASTER_CONN_INFO_PATH"] = conn_info_file - - # Launch the Java gateway. - popen_kwargs = {} if popen_kwargs is None else popen_kwargs - # We open a pipe to stdin so that the Java gateway can die when the pipe is broken - popen_kwargs["stdin"] = PIPE - # We always set the necessary environment variables. - popen_kwargs["env"] = env - - # Don't send ctrl-c / SIGINT to the Java gateway: - def preexec_func(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - popen_kwargs["preexec_fn"] = preexec_func - # pylint: disable=R1732 - proc = Popen(command, **popen_kwargs) - - # Wait for the file to appear, or for the process to exit, whichever happens first. - while not proc.poll() and not os.path.isfile(conn_info_file): - time.sleep(0.1) - - if not os.path.isfile(conn_info_file): - raise Exception("Java gateway process exited before sending its port number") - - with open(conn_info_file, "rb") as info: - length = info.read(4) - if not length: - raise EOFError - gateway_port = struct.unpack("!i", length)[0] - - finally: - shutil.rmtree(conn_info_dir) - - gateway = JavaGateway(gateway_parameters=GatewayParameters( - port=gateway_port, auto_convert=True)) - - # Store a reference to the Popen object for use by the caller - # (e.g., in reading stdout/stderr) - gateway.proc = proc - - return gateway - - def _set_properties(self): - assert ray.is_initialized() - options = copy(self._configs) - - node = ray.worker.global_worker.node - - options["ray.run-mode"] = "CLUSTER" - options["ray.node-ip"] = node.node_ip_address - options["ray.address"] = node.redis_address - options["ray.redis.password"] = node.redis_password - options["ray.logging.dir"] = node.get_logs_dir_path() - options["ray.session-dir"] = node.get_session_dir_path() - options["ray.raylet.node-manager-port"] = node.node_manager_port - options["ray.raylet.socket-name"] = node.raylet_socket_name - options["ray.raylet.config.num_workers_per_process_java"] = "1" - options["ray.object-store.socket-name"] = node.plasma_store_socket_name - options["ray.logging.level"] = "INFO" - - # jnius_config.set_option has some bug, we set this options in java side - jvm_properties = json.dumps(options) - self._app_master_java_bridge.setProperties(jvm_properties) - def get_host(self) -> str: assert self._started_up return self._host - def _create_app_master(self, extra_classpath: str): - if self._started_up: - return - self._app_master_java_bridge.startUpAppMaster(extra_classpath) - def get_master_url(self): assert self._started_up return self._master_url @@ -173,14 +81,5 @@ def get_master_url(self): def stop(self): if not self._started_up: return - - # if self._app_master_java_bridge is not None: - # self._app_master_java_bridge.stop() - # self._app_master_java_bridge = None - - # if self._gateway is not None: - # self._gateway.shutdown() - # self._gateway.proc.terminate() - # self._gateway = None - + self._instance.stop.remote() self._started_up = False From 4017b2a648c2d575c8d5b40e948b9288a958aef3 Mon Sep 17 00:00:00 2001 From: Zhi Lin Date: Mon, 21 Jun 2021 15:35:23 +0000 Subject: [PATCH 5/5] remove py4j from pom --- core/pom.xml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index eda2c780..46256804 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -66,13 +66,6 @@ provided - - net.sf.py4j - py4j - 0.10.9.2 - provided - - org.apache.commons commons-lang3