diff --git a/core/pom.xml b/core/pom.xml index eda2c780..46256804 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -66,13 +66,6 @@ provided - - net.sf.py4j - py4j - 0.10.9.2 - provided - - org.apache.commons commons-lang3 diff --git a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java index 727a1aac..71f6f581 100644 --- a/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java +++ b/core/src/main/java/org/apache/spark/deploy/RayAppMasterUtils.java @@ -24,8 +24,8 @@ public class RayAppMasterUtils { public static ActorHandle createAppMaster( - String cp, List jvmOptions) { - return Ray.actor(RayAppMaster::new, cp).setJvmOptions(jvmOptions).remote(); + List jvmOptions) { + return Ray.actor(RayAppMaster::new).setJvmOptions(jvmOptions).remote(); } public static String getMasterUrl( @@ -35,7 +35,6 @@ public static String getMasterUrl( public static void stopAppMaster( ActorHandle handle) { - handle.task(RayAppMaster::stop).remote().get(); - handle.kill(); + handle.task(RayAppMaster::stop).remote(); } } diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala deleted file mode 100644 index 6ab801e5..00000000 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterEntryPoint.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.raydp - -import java.io.{DataOutputStream, File, FileOutputStream} -import java.nio.file.Files - -import py4j.GatewayServer - -import org.apache.spark.internal.Logging - -class AppMasterEntryPoint { - private val appMaster: AppMasterJavaBridge = new AppMasterJavaBridge() - - def getAppMasterBridge(): AppMasterJavaBridge = { - appMaster - } -} - -object AppMasterEntryPoint extends Logging { - initializeLogIfNecessary(true) - - def main(args: Array[String]): Unit = { - val server = new GatewayServer(new AppMasterEntryPoint()) - server.start() - val boundPort: Int = server.getListeningPort() - if (boundPort == -1) { - logError(s"${server.getClass} failed to bind; exiting") - System.exit(1) - } else { - logDebug(s"Started PythonGatewayServer on port $boundPort") - } - - - val connectionInfoPath = new File(sys.env("_RAYDP_APPMASTER_CONN_INFO_PATH")) - val tmpPath = Files.createTempFile(connectionInfoPath.getParentFile().toPath(), - "connection", ".info").toFile() - - val dos = new DataOutputStream(new FileOutputStream(tmpPath)) - dos.writeInt(boundPort) - dos.close() - - if (!tmpPath.renameTo(connectionInfoPath)) { - logError(s"Unable to write connection information to $connectionInfoPath.") - System.exit(1) - } - - // Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies: - while (System.in.read() != -1) { - // Do nothing - } - logDebug("Exiting due to broken pipe from Python driver") - System.exit(0) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala deleted file mode 100644 index b003d298..00000000 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/AppMasterJavaBridge.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.raydp - -import io.ray.api.Ray -import io.ray.runtime.config.RayConfig -import org.json4s._ -import org.json4s.jackson.JsonMethods._ - -class AppMasterJavaBridge { - private var instance: RayAppMaster = null - - def setProperties(properties: String): Unit = { - implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats - val parsed = parse(properties).extract[Map[String, String]] - parsed.foreach{ case (key, value) => - System.setProperty(key, value) - } - // Use the same session dir as the python side - RayConfig.create().setSessionDir(System.getProperty("ray.session-dir")) - } - - def startUpAppMaster(extra_cp: String): Unit = { - if (instance == null) { - // init ray, we should set the config by java properties - Ray.init() - instance = new RayAppMaster(extra_cp) - } - } - - def getMasterUrl(): String = { - if (instance == null) { - throw new RuntimeException("You should create the RayAppMaster instance first") - } - instance.getMasterUrl() - } - - def stop(): Unit = { - if (instance != null) { - instance.stop() - instance = null - } - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index 90fc1641..a8385fec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -35,22 +35,15 @@ import org.apache.spark.rpc._ import org.apache.spark.util.ShutdownHookManager import org.apache.spark.util.Utils -class RayAppMaster(host: String, - port: Int, - actor_extra_classpath: String) extends Serializable with Logging { +class RayAppMaster() extends Serializable with Logging { private var endpoint: RpcEndpointRef = _ private var rpcEnv: RpcEnv = _ private val conf: SparkConf = new SparkConf() - init() - - def this() = { - this(RayConfig.create().nodeIp, 0, "") - } + private val host: String = RayConfig.create().nodeIp + private var actorExtraClasspath: String = _ - def this(actor_extra_classpath: String) = { - this(RayConfig.create().nodeIp, 0, actor_extra_classpath) - } + init() def init(): Unit = { Utils.loadDefaultSparkProperties(conf) @@ -59,7 +52,7 @@ class RayAppMaster(host: String, RayAppMaster.ENV_NAME, host, host, - port, + 0, conf, securityMgr, numUsableCores = 0, @@ -68,6 +61,11 @@ class RayAppMaster(host: String, endpoint = rpcEnv.setupEndpoint(RayAppMaster.ENDPOINT_NAME, new RayAppMasterEndpoint(rpcEnv)) } + def setActorClasspath(cp: String): Int = { + actorExtraClasspath = cp + 0 + } + /** * Get the app master endpoint URL. The executor will connect to AppMaster by this URL and * tell the AppMaster that it has started up successful. @@ -84,14 +82,14 @@ class RayAppMaster(host: String, url.replace("spark", "ray") } - def stop(): Int = { + def stop(): Unit = { logInfo("Stopping RayAppMaster") if (rpcEnv != null) { rpcEnv.shutdown() endpoint = null rpcEnv = null } - 0 + Ray.exitActor() } class RayAppMasterEndpoint(override val rpcEnv: RpcEnv) @@ -244,10 +242,10 @@ class RayAppMaster(host: String, s"Found ${javaOpts(i - 1)} while not classpath url in executor java opts") } - javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actor_extra_classpath) + javaOpts.updated(i, javaOpts(i) + File.pathSeparator + actorExtraClasspath) } else { // user has not set, we append the actor extra classpath in the end - javaOpts ++ Seq("-cp", actor_extra_classpath) + javaOpts ++ Seq("-cp", actorExtraClasspath) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index 7bbf0729..5ac916da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -69,7 +69,7 @@ class RayCoarseGrainedSchedulerBackend( Ray.init() val cp = sys.props("java.class.path") val options = RayExternalShuffleService.getShuffleConf(conf) - masterHandle = RayAppMasterUtils.createAppMaster(cp, options.toBuffer.asJava) + masterHandle = RayAppMasterUtils.createAppMaster(options.toBuffer.asJava) uri = new URI(RayAppMasterUtils.getMasterUrl(masterHandle)) } else { uri = new URI(sparkUrl) diff --git a/python/raydp/__init__.py b/python/raydp/__init__.py index d5d73e5b..8a7546ee 100644 --- a/python/raydp/__init__.py +++ b/python/raydp/__init__.py @@ -16,7 +16,7 @@ # from raydp.context import init_spark, stop_spark - +from raydp.utils import get_code_search_path __version__ = "0.4.0.dev0" -__all__ = ["init_spark", "stop_spark"] +__all__ = ["init_spark", "stop_spark", "get_code_search_path"] diff --git a/python/raydp/context.py b/python/raydp/context.py index 4aaeedca..3772a968 100644 --- a/python/raydp/context.py +++ b/python/raydp/context.py @@ -15,17 +15,18 @@ # limitations under the License. # -import atexit +import atexit, os from contextlib import ContextDecorator from threading import RLock from typing import Dict, Union, Optional import ray +from ray.job_config import JobConfig +import pyspark from pyspark.sql import SparkSession from raydp.spark import SparkCluster -from raydp.utils import parse_memory_size - +from raydp.utils import get_code_search_path, parse_memory_size class _SparkContext(ContextDecorator): """A class used to create the Spark cluster and get the Spark session. @@ -111,8 +112,7 @@ def init_spark(app_name: str, if not ray.is_initialized(): # ray has not initialized, init local - ray.init() - + ray.init(job_config=JobConfig(java_code_search_path=get_code_search_path())) with _spark_context_lock: global _global_spark_context if _global_spark_context is None: diff --git a/python/raydp/spark/ray_cluster.py b/python/raydp/spark/ray_cluster.py index db7ec512..84237863 100644 --- a/python/raydp/spark/ray_cluster.py +++ b/python/raydp/spark/ray_cluster.py @@ -15,34 +15,34 @@ # limitations under the License. # -import glob +import glob, os from typing import Any, Dict import ray from pyspark.sql.session import SparkSession from raydp.services import Cluster -from .ray_cluster_master import RayClusterMaster, RAYDP_CP +from .ray_cluster_master import RayClusterMaster, RAYDP_JARS class SparkCluster(Cluster): def __init__(self, configs): super().__init__(None) - self._app_master_bridge = None + self._app_master = None self._configs = configs self._set_up_master(None, None) self._spark_session: SparkSession = None def _set_up_master(self, resources: Dict[str, float], kwargs: Dict[Any, Any]): # TODO: specify the app master resource - self._app_master_bridge = RayClusterMaster(self._configs) - self._app_master_bridge.start_up() + self._app_master = RayClusterMaster(self._configs) + self._app_master.start_up() def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]): raise Exception("Unsupported operation") def get_cluster_url(self) -> str: - return self._app_master_bridge.get_master_url() + return self._app_master.get_master_url() def get_spark_session(self, app_name: str, @@ -58,19 +58,19 @@ def get_spark_session(self, extra_conf["spark.executor.instances"] = str(num_executors) extra_conf["spark.executor.cores"] = str(executor_cores) extra_conf["spark.executor.memory"] = str(executor_memory) - driver_node_ip = ray.services.get_node_ip_address() + driver_node_ip = ray.util.get_node_ip_address() extra_conf["spark.driver.host"] = str(driver_node_ip) extra_conf["spark.driver.bindAddress"] = str(driver_node_ip) try: extra_jars = [extra_conf["spark.jars"]] except KeyError: extra_jars = [] - extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_CP) + extra_jars) + extra_conf["spark.jars"] = ",".join(glob.glob(RAYDP_JARS) + extra_jars) driver_cp = "spark.driver.extraClassPath" if driver_cp in extra_conf: - extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) + ":" + extra_conf[driver_cp] + extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS)) + ":" + extra_conf[driver_cp] else: - extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_CP)) + extra_conf[driver_cp] = ":".join(glob.glob(RAYDP_JARS)) spark_builder = SparkSession.builder for k, v in extra_conf.items(): spark_builder.config(k, v) @@ -83,6 +83,6 @@ def stop(self): self._spark_session.stop() self._spark_session = None - if self._app_master_bridge is not None: - self._app_master_bridge.stop() - self._app_master_bridge = None + if self._app_master is not None: + self._app_master.stop() + self._app_master = None diff --git a/python/raydp/spark/ray_cluster_master.py b/python/raydp/spark/ray_cluster_master.py index 65cf8270..f60c4ecb 100644 --- a/python/raydp/spark/ray_cluster_master.py +++ b/python/raydp/spark/ray_cluster_master.py @@ -30,19 +30,15 @@ import pyspark import ray import ray.services -from py4j.java_gateway import JavaGateway, GatewayParameters from raydp.services import ClusterMaster -RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) - logger = logging.getLogger(__name__) +RAYDP_JARS = os.path.abspath(os.path.join(os.path.abspath(__file__), "../../jars/*")) class RayClusterMaster(ClusterMaster): def __init__(self, configs): - self._gateway = None - self._app_master_java_bridge = None self._host = None self._started_up = False self._configs = configs @@ -51,21 +47,22 @@ def start_up(self, popen_kwargs=None): if self._started_up: logger.warning("The RayClusterMaster has started already. Do not call it twice") return + appMasterCls = ray.java_actor_class("org.apache.spark.deploy.raydp.RayAppMaster") + self._host = ray.util.get_node_ip_address() + self._instance = appMasterCls.remote() + self._master_url = ray.get(self._instance.getMasterUrl.remote()) extra_classpath = os.pathsep.join(self._prepare_jvm_classpath()) - self._gateway = self._launch_gateway(extra_classpath, popen_kwargs) - self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge() - self._set_properties() - self._host = ray._private.services.get_node_ip_address() - self._create_app_master(extra_classpath) + # set classpath for executors + ray.get(self._instance.setActorClasspath.remote(extra_classpath)) self._started_up = True def _prepare_jvm_classpath(self): cp_list = [] # find RayDP core path - cp_list.append(RAYDP_CP) + cp_list.append(RAYDP_JARS) # find ray jar path - ray_cp = os.path.abspath(os.path.join(os.path.dirname(ray.__file__), "jars/*")) - cp_list.append(ray_cp) + ray_jars = os.path.abspath(os.path.join(os.path.dirname(ray.__file__), "jars/*")) + cp_list.append(ray_jars) # find pyspark jars path spark_home = os.path.dirname(pyspark.__file__) spark_jars_dir = os.path.join(spark_home, "jars/*") @@ -73,116 +70,16 @@ def _prepare_jvm_classpath(self): cp_list.extend(spark_jars) return cp_list - def _launch_gateway(self, class_path, popen_kwargs=None): - """ - launch jvm gateway - :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning - the py4j JVM. This is a developer feature intended for use in - customizing how pyspark interacts with the py4j JVM (e.g., capturing - stdout/stderr). - """ - - command = ["java"] - command.append("-cp") - command.append(class_path) - command.append("org.apache.spark.deploy.raydp.AppMasterEntryPoint") - - # Create a temporary directory where the gateway server should write the connection - # information. - conn_info_dir = tempfile.mkdtemp() - try: - fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir) - os.close(fd) - os.unlink(conn_info_file) - - env = dict(os.environ) - env["_RAYDP_APPMASTER_CONN_INFO_PATH"] = conn_info_file - - # Launch the Java gateway. - popen_kwargs = {} if popen_kwargs is None else popen_kwargs - # We open a pipe to stdin so that the Java gateway can die when the pipe is broken - popen_kwargs["stdin"] = PIPE - # We always set the necessary environment variables. - popen_kwargs["env"] = env - - # Don't send ctrl-c / SIGINT to the Java gateway: - def preexec_func(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - popen_kwargs["preexec_fn"] = preexec_func - # pylint: disable=R1732 - proc = Popen(command, **popen_kwargs) - - # Wait for the file to appear, or for the process to exit, whichever happens first. - while not proc.poll() and not os.path.isfile(conn_info_file): - time.sleep(0.1) - - if not os.path.isfile(conn_info_file): - raise Exception("Java gateway process exited before sending its port number") - - with open(conn_info_file, "rb") as info: - length = info.read(4) - if not length: - raise EOFError - gateway_port = struct.unpack("!i", length)[0] - - finally: - shutil.rmtree(conn_info_dir) - - gateway = JavaGateway(gateway_parameters=GatewayParameters( - port=gateway_port, auto_convert=True)) - - # Store a reference to the Popen object for use by the caller - # (e.g., in reading stdout/stderr) - gateway.proc = proc - - return gateway - - def _set_properties(self): - assert ray.is_initialized() - options = copy(self._configs) - - node = ray.worker.global_worker.node - - options["ray.run-mode"] = "CLUSTER" - options["ray.node-ip"] = node.node_ip_address - options["ray.address"] = node.redis_address - options["ray.redis.password"] = node.redis_password - options["ray.logging.dir"] = node.get_logs_dir_path() - options["ray.session-dir"] = node.get_session_dir_path() - options["ray.raylet.node-manager-port"] = node.node_manager_port - options["ray.raylet.socket-name"] = node.raylet_socket_name - options["ray.raylet.config.num_workers_per_process_java"] = "1" - options["ray.object-store.socket-name"] = node.plasma_store_socket_name - options["ray.logging.level"] = "INFO" - - # jnius_config.set_option has some bug, we set this options in java side - jvm_properties = json.dumps(options) - self._app_master_java_bridge.setProperties(jvm_properties) - def get_host(self) -> str: assert self._started_up return self._host - def _create_app_master(self, extra_classpath: str): - if self._started_up: - return - self._app_master_java_bridge.startUpAppMaster(extra_classpath) - def get_master_url(self): assert self._started_up - return self._app_master_java_bridge.getMasterUrl() + return self._master_url def stop(self): if not self._started_up: return - - if self._app_master_java_bridge is not None: - self._app_master_java_bridge.stop() - self._app_master_java_bridge = None - - if self._gateway is not None: - self._gateway.shutdown() - self._gateway.proc.terminate() - self._gateway = None - + self._instance.stop.remote() self._started_up = False diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index 1f835515..be88f5f2 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -20,8 +20,11 @@ import pytest from pyspark.sql import SparkSession import ray +from ray.job_config import JobConfig import raydp +from raydp.utils import get_code_search_path +TEST_CONFIG=JobConfig(code_search_path=get_code_search_path()) def quiet_logger(): py4j_logger = logging.getLogger("py4j") @@ -42,14 +45,16 @@ def spark_session(request): @pytest.fixture(scope="function") def ray_cluster(request): ray.shutdown() - ray.init(num_cpus=4, _redis_password="123", include_dashboard=False) + ray.init(num_cpus=4, _redis_password="123", include_dashboard=False, + job_config=TEST_CONFIG) request.addfinalizer(lambda: ray.shutdown()) @pytest.fixture(scope="function") def spark_on_ray_small(request): ray.shutdown() - ray.init(num_cpus=4, _redis_password="123", include_dashboard=False) + ray.init(num_cpus=4, _redis_password="123", include_dashboard=False, + job_config=TEST_CONFIG) spark = raydp.init_spark("test", 1, 1, "500 M") def stop_all(): diff --git a/python/raydp/utils.py b/python/raydp/utils.py index 328c4c8b..32392cc1 100644 --- a/python/raydp/utils.py +++ b/python/raydp/utils.py @@ -17,19 +17,23 @@ import atexit import math -import re +import re, os import signal from typing import Dict, List, Tuple import numpy as np import psutil +import pyspark MEMORY_SIZE_UNITS = {"K": 2**10, "M": 2**20, "G": 2**30, "T": 2**40} - +SPARK_CP = os.path.join(os.path.dirname(pyspark.__file__), "jars") +RAYDP_CP = os.path.abspath(os.path.join(os.path.abspath(__file__), "../jars")) # we use 4 bytes for block size, this means each block can contain # 4294967296 records BLOCK_SIZE_BIT = 32 +def get_code_search_path() -> List: + return [RAYDP_CP, SPARK_CP] def get_node_address() -> str: """