diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index 15c10874..9725b3dd 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -38,10 +38,11 @@ jobs: uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2.3.4 with: python-version: 3.10.14 - - name: Set up JDK 1.8 + - name: Set up JDK 17 uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4 with: - java-version: 1.8 + java-version: 17 + distribution: 'corretto' - name: days since the commit date run: | : diff --git a/.github/workflows/pypi_release.yml b/.github/workflows/pypi_release.yml index 6e284b03..38845301 100644 --- a/.github/workflows/pypi_release.yml +++ b/.github/workflows/pypi_release.yml @@ -35,7 +35,7 @@ jobs: name: build wheel and upload release runs-on: ubuntu-latest env: - PYSPARK_VERSION: "3.5.7" + PYSPARK_VERSION: "4.1.0" RAY_VERSION: "2.40.0" steps: - uses: actions/checkout@61b9e3751b92087fd0b06925ba6dd6314e06f089 # master @@ -46,10 +46,11 @@ jobs: uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2.3.4 with: python-version: 3.10.14 - - name: Set up JDK 1.8 + - name: Set up JDK 17 uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4 with: - java-version: 1.8 + java-version: 17 + distribution: 'corretto' - name: Install extra dependencies for Ubuntu run: | sudo apt-get install -y mpich @@ -65,7 +66,7 @@ jobs: pip install "numpy<1.24" "click<8.3.0" pip install "pydantic<2.0" pip install torch --index-url https://download.pytorch.org/whl/cpu - pip install pyarrow "ray[train,default]==${{ env.RAY_VERSION }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget + pip install pyarrow "ray[train,default]==${{ env.RAY_VERSION }}" tqdm pytest tensorflow==2.16.1 tf_keras tabulate grpcio-tools wget pip install "xgboost_ray[default]<=0.1.13" pip install "xgboost<=2.0.3" pip install torchmetrics diff --git a/.github/workflows/ray_nightly_test.yml b/.github/workflows/ray_nightly_test.yml index 95b4eb96..6da5888a 100644 --- a/.github/workflows/ray_nightly_test.yml +++ b/.github/workflows/ray_nightly_test.yml @@ -31,8 +31,8 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - python-version: [3.9, 3.10.14] - spark-version: [3.3.2, 3.4.0, 3.5.0] + python-version: [3.10.14, 3.11] + spark-version: [4.0.0, 4.1.0] runs-on: ${{ matrix.os }} @@ -74,7 +74,7 @@ jobs: run: | python -m pip install --upgrade pip pip install wheel - pip install "numpy<1.24" "click<8.3.0" + pip install "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then @@ -83,14 +83,14 @@ jobs: pip install torch fi case $PYTHON_VERSION in - 3.9) - pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl" - ;; 3.10.14) pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl" ;; + 3.11) + pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp311-cp311-manylinux2014_x86_64.whl" + ;; esac - pip install pyarrow tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget + pip install pyarrow tqdm pytest tabulate grpcio-tools wget pip install "xgboost_ray[default]<=0.1.13" pip install torchmetrics HOROVOD_WITH_GLOO=1 @@ -107,10 +107,10 @@ jobs: run: | pip install pyspark==${{ matrix.spark-version }} ./build.sh - pip install dist/raydp-*.whl + pip install "$(ls dist/raydp-*.whl)[tensorflow]" - name: Lint run: | - pip install pylint==2.8.3 + pip install pylint==3.2.7 pylint --rcfile=python/pylintrc python/raydp pylint --rcfile=python/pylintrc examples/*.py - name: Test with pytest diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index a24746b9..0ad7d4a4 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -32,8 +32,8 @@ jobs: strategy: matrix: os: [ubuntu-latest] - python-version: [3.9, 3.10.14] - spark-version: [3.3.2, 3.4.0, 3.5.0] + python-version: [3.10.14, 3.11] + spark-version: [4.0.0, 4.1.0] ray-version: [2.37.0, 2.40.0, 2.50.0] runs-on: ${{ matrix.os }} @@ -74,8 +74,7 @@ jobs: run: | python -m pip install --upgrade pip pip install wheel - pip install "numpy<1.24" "click<8.3.0" - pip install "pydantic<2.0" + pip install "pydantic<2.0" "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then @@ -83,7 +82,7 @@ jobs: else pip install torch fi - pip install pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget + pip install pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tabulate grpcio-tools wget pip install "xgboost_ray[default]<=0.1.13" pip install "xgboost<=2.0.3" pip install torchmetrics @@ -98,10 +97,10 @@ jobs: run: | pip install pyspark==${{ matrix.spark-version }} ./build.sh - pip install dist/raydp-*.whl + pip install "$(ls dist/raydp-*.whl)[tensorflow]" - name: Lint run: | - pip install pylint==2.8.3 + pip install pylint==3.2.7 pylint --rcfile=python/pylintrc python/raydp pylint --rcfile=python/pylintrc examples/*.py - name: Test with pytest diff --git a/core/pom.xml b/core/pom.xml index 67d945d7..2ebaab55 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -13,11 +13,13 @@ https://github.com/ray-project/raydp.git - 3.3.3 + 4.0.0 3.2.2 3.3.0 3.4.0 3.5.0 + 4.0.0 + 4.1.0 1.1.10.4 4.1.94.Final 1.10.0 @@ -27,11 +29,11 @@ 2.5.2 UTF-8 UTF-8 - 1.8 - 1.8 - 2.12.15 - 2.18.6 - 2.12 + 17 + 17 + 2.13.12 + 2.18.2 + 2.13 5.10.1 @@ -151,16 +153,19 @@ com.fasterxml.jackson.core jackson-core ${jackson.version} + provided com.fasterxml.jackson.core jackson-databind ${jackson.version} + provided com.fasterxml.jackson.core jackson-annotations ${jackson.version} + provided @@ -168,6 +173,7 @@ com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} ${jackson.version} + provided com.google.guava @@ -179,6 +185,7 @@ com.fasterxml.jackson.module jackson-module-jaxb-annotations ${jackson.version} + provided diff --git a/core/raydp-main/pom.xml b/core/raydp-main/pom.xml index 3c791a65..78effa21 100644 --- a/core/raydp-main/pom.xml +++ b/core/raydp-main/pom.xml @@ -134,24 +134,20 @@ com.fasterxml.jackson.core jackson-core - ${jackson.version} com.fasterxml.jackson.core jackson-databind - ${jackson.version} com.fasterxml.jackson.core jackson-annotations - ${jackson.version} com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - ${jackson.version} com.google.guava @@ -162,7 +158,6 @@ com.fasterxml.jackson.module jackson-module-jaxb-annotations - ${jackson.version} diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 829517e5..d9e8396a 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -24,13 +24,13 @@ import java.security.PrivilegedExceptionAction import java.text.ParseException import java.util.{ServiceLoader, UUID} import java.util.jar.JarInputStream -import javax.ws.rs.core.UriBuilder import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.{Properties, Try} +import com.intel.raydp.shims.SparkShimLoader import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} @@ -258,7 +258,10 @@ private[spark] class SparkSubmit extends Logging { } if (clusterManager == KUBERNETES) { - args.master = Utils.checkAndGetK8sMasterUrl(args.master) + val checkedMaster = Utils.checkAndGetK8sMasterUrl(args.master) + SparkShimLoader.getSparkShims + .getCommandLineUtilsBridge + .setSubmitMaster(args, checkedMaster) // Make sure KUBERNETES is included in our build if we're trying to use it if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { error( @@ -340,7 +343,7 @@ private[spark] class SparkSubmit extends Logging { // update spark config from args args.toSparkConf(Option(sparkConf)) - val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) + val hadoopConf = conf.getOrElse(SparkHadoopUtil.get.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() // Kerberos is not supported in standalone mode, and keytab support is not yet available @@ -393,8 +396,10 @@ private[spark] class SparkSubmit extends Logging { val archiveLocalFiles = Option(args.archives).map { uris => val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) val localArchives = downloadFileList( - resolvedUris.map( - UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","), + resolvedUris.map { uri => + new URI(uri.getScheme, + uri.getRawSchemeSpecificPart, null).toString + }.mkString(","), targetDir, sparkConf, hadoopConf) // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running @@ -413,8 +418,9 @@ private[spark] class SparkSubmit extends Logging { Utils.unpack(source, dest) // Keep the URIs of local files with the given fragments. - UriBuilder.fromUri( - localArchive).fragment(resolvedUri.getFragment).build().toString + new URI(localArchive.getScheme, + localArchive.getRawSchemeSpecificPart, + resolvedUri.getFragment).toString }.mkString(",") }.orNull args.files = filesLocalFiles @@ -986,7 +992,12 @@ private[spark] object InProcessSparkSubmit { } -object SparkSubmit extends CommandLineUtils with Logging { +object SparkSubmit extends Logging { + + var printStream: PrintStream = System.err + // scalastyle:off println + def printMessage(str: String): Unit = printStream.println(str) + // scalastyle:on println // Cluster managers private val YARN = 1 @@ -1019,7 +1030,7 @@ object SparkSubmit extends CommandLineUtils with Logging { private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication" - override def main(args: Array[String]): Unit = { + def main(args: Array[String]): Unit = { val submit = new SparkSubmit() { self => @@ -1044,7 +1055,8 @@ object SparkSubmit extends CommandLineUtils with Logging { super.doSubmit(args) } catch { case e: SparkUserAppException => - exitFn(e.exitCode) + SparkShimLoader.getSparkShims + .getCommandLineUtilsBridge.callExit(e.exitCode) } } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala index f4cc823d..f835106a 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/raydp/RayAppMaster.scala @@ -22,15 +22,15 @@ import java.text.SimpleDateFormat import java.util.{Date, Locale} import javax.xml.bind.DatatypeConverter -import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap +import scala.jdk.CollectionConverters._ +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.databind.ObjectMapper import io.ray.api.{ActorHandle, PlacementGroups, Ray} import io.ray.api.id.PlacementGroupId import io.ray.api.placementgroup.PlacementGroup import io.ray.runtime.config.RayConfig -import org.json4s._ -import org.json4s.jackson.JsonMethods._ import org.apache.spark.{RayDPException, SecurityManager, SparkConf} import org.apache.spark.executor.RayDPExecutor @@ -39,6 +39,7 @@ import org.apache.spark.raydp.{RayExecutorUtils, SparkOnRayConfigs} import org.apache.spark.rpc._ import org.apache.spark.util.Utils + class RayAppMaster(host: String, port: Int, actorExtraClasspath: String) extends Serializable with Logging { @@ -298,7 +299,7 @@ class RayAppMaster(host: String, .map { case (name, amount) => (name, Double.box(amount)) }.asJava, placementGroup, getNextBundleIndex, - seqAsJavaList(appInfo.desc.command.javaOpts)) + appInfo.desc.command.javaOpts.asJava) appInfo.addPendingRegisterExecutor(executorId, handler, sparkCoresPerExecutor, memory) } @@ -356,11 +357,15 @@ object RayAppMaster extends Serializable { val ACTOR_NAME = "RAY_APP_MASTER" def setProperties(properties: String): Unit = { - implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats - val parsed = parse(properties).extract[Map[String, String]] - parsed.foreach{ case (key, value) => - System.setProperty(key, value) + // Use Jackson ObjectMapper directly to avoid JSON4S version conflicts + val mapper = new ObjectMapper() + val javaMap = mapper.readValue(properties, classOf[java.util.Map[String, Object]]) + val scalaMap = javaMap.asScala.toMap + scalaMap.foreach{ case (key, value) => + // Convert all values to strings since System.setProperty expects String + System.setProperty(key, value.toString) } + // Use the same session dir as the python side RayConfig.create().setSessionDir(System.getProperty("ray.session-dir")) } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala b/core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala index afc8de0b..9ec06431 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/executor/RayDPExecutor.scala @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.reflect.classTag import com.intel.raydp.shims.SparkShimLoader +import io.ray.api.ActorHandle import io.ray.api.Ray import io.ray.runtime.config.RayConfig import org.apache.arrow.vector.ipc.{ArrowStreamWriter, WriteChannel} @@ -309,11 +310,43 @@ class RayDPExecutor( env.shutdown } - def getRDDPartition( + /** Refresh the current executor ID that owns a cached Spark block, if any. */ + private def getCurrentBlockOwnerExecutorId(blockId: BlockId): Option[String] = { + val env = SparkEnv.get + val locs = env.blockManager.master.getLocations(blockId) + if (locs != null && locs.nonEmpty) Some(locs.head.executorId) else None + } + + /** + * Map a (potentially restarted) Spark executor ID to the Ray actor-name executor ID. + * + * When a RayDP executor actor restarts, it keeps its Ray actor name, but Spark may assign a new + * executor ID. RayAppMaster tracks a mapping (new -> old). We must use the old ID to resolve + * the Ray actor by name. + */ + private def resolveRayActorExecutorId(sparkExecutorId: String): String = { + try { + val appMasterHandle = + Ray.getActor(RayAppMaster.ACTOR_NAME).get.asInstanceOf[ActorHandle[RayAppMaster]] + val restartedExecutors = RayAppMasterUtils.getRestartedExecutors(appMasterHandle) + if (restartedExecutors != null && restartedExecutors.containsKey(sparkExecutorId)) { + restartedExecutors.get(sparkExecutorId) + } else { + sparkExecutorId + } + } catch { + case _: Throwable => + // Best-effort: if we cannot query the app master for any reason, fall back to the given ID. + sparkExecutorId + } + } + + private def getRDDPartitionInternal( rddId: Int, partitionId: Int, schemaStr: String, - driverAgentUrl: String): Array[Byte] = { + driverAgentUrl: String, + allowForward: Boolean): Array[Byte] = { while (!started.get) { // wait until executor is started // this might happen if executor restarts @@ -339,6 +372,31 @@ class RayDPExecutor( env.blockManager.get(blockId)(classTag[Array[Byte]]) match { case Some(blockResult) => blockResult.data.asInstanceOf[Iterator[Array[Byte]]] + case None if allowForward => + // The block may have been (re)cached on a different executor after recache. + val ownerOpt = getCurrentBlockOwnerExecutorId(blockId) + ownerOpt match { + case Some(ownerSparkExecutorId) => + val ownerRayExecutorId = resolveRayActorExecutorId(ownerSparkExecutorId) + logWarning( + s"Cached block $blockId not found on executor $executorId after recache. " + + s"Forwarding fetch to executor $ownerSparkExecutorId " + + s"(ray actor id $ownerRayExecutorId).") + val otherHandle = + Ray.getActor("raydp-executor-" + ownerRayExecutorId).get() + .asInstanceOf[ActorHandle[RayDPExecutor]] + // One-hop forward only: call no-forward variant on the target executor and + // return the Arrow IPC bytes directly. + return otherHandle + .task( + (e: RayDPExecutor) => + e.getRDDPartitionNoForward(rddId, partitionId, schemaStr, driverAgentUrl)) + .remote() + .get() + case None => + throw new RayDPException( + s"Still cannot get block $blockId for RDD $rddId after recache!") + } case None => throw new RayDPException("Still cannot get the block after recache!") } @@ -353,4 +411,22 @@ class RayDPExecutor( byteOut.close() result } + + /** Public entry-point used by cross-language calls. Allows forwarding. */ + def getRDDPartition( + rddId: Int, + partitionId: Int, + schemaStr: String, + driverAgentUrl: String): Array[Byte] = { + getRDDPartitionInternal(rddId, partitionId, schemaStr, driverAgentUrl, allowForward = true) + } + + /** Internal one-hop target to prevent forward loops. */ + def getRDDPartitionNoForward( + rddId: Int, + partitionId: Int, + schemaStr: String, + driverAgentUrl: String): Array[Byte] = { + getRDDPartitionInternal(rddId, partitionId, schemaStr, driverAgentUrl, allowForward = false) + } } diff --git a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala index dce83a78..d4387867 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/scheduler/cluster/raydp/RayCoarseGrainedSchedulerBackend.scala @@ -154,7 +154,10 @@ class RayCoarseGrainedSchedulerBackend( } // Start executors with a few necessary configs for registering with the scheduler - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) + val sparkJavaOpts = conf.getAll + .filter { case (k, _) => SparkConf.isExecutorStartupConf(k) } + .map { case (k, v) => s"-D$k=$v" } + .toSeq // add Xmx, it should not be set in java opts, because Spark is not allowed. // We also add Xms to ensure the Xmx >= Xms val memoryLimit = Seq(s"-Xms${sc.executorMemory}M", s"-Xmx${sc.executorMemory}M") diff --git a/core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala b/core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala index 2d607044..36e86da3 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala @@ -96,9 +96,9 @@ object ObjectStoreWriter { } def toArrowSchema(df: DataFrame): Schema = { - val conf = df.queryExecution.sparkSession.sessionState.conf + val conf = df.sparkSession.sessionState.conf val timeZoneId = conf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE) - SparkShimLoader.getSparkShims.toArrowSchema(df.schema, timeZoneId) + SparkShimLoader.getSparkShims.toArrowSchema(df.schema, timeZoneId, df.sparkSession) } @deprecated @@ -109,10 +109,10 @@ object ObjectStoreWriter { } val uuid = dfToId.getOrElseUpdate(df, UUID.randomUUID()) val queue = ObjectRefHolder.getQueue(uuid) - val rdd = df.toArrowBatchRdd + val rdd = SparkShimLoader.getSparkShims.toArrowBatchRDD(df) rdd.persist(storageLevel) rdd.count() - var executorIds = df.sqlContext.sparkContext.getExecutorIds.toArray + val executorIds = df.sparkSession.sparkContext.getExecutorIds.toArray val numExecutors = executorIds.length val appMasterHandle = Ray.getActor(RayAppMaster.ACTOR_NAME) .get.asInstanceOf[ActorHandle[RayAppMaster]] @@ -169,14 +169,14 @@ object ObjectStoreWriter { "Not yet connected to Ray! Please set fault_tolerant_mode=True when starting RayDP.") } - val rdd = df.toArrowBatchRdd + val rdd = SparkShimLoader.getSparkShims.toArrowBatchRDD(df) rdd.persist(storageLevel) rdd.count() // Keep a strong reference so Spark's ContextCleaner does not GC the cached blocks // before Ray tasks fetch them. recoverableRDDs.put(rdd.id, rdd) - var executorIds = df.sqlContext.sparkContext.getExecutorIds.toArray + val executorIds = df.sparkSession.sparkContext.getExecutorIds.toArray val numExecutors = executorIds.length val appMasterHandle = Ray.getActor(RayAppMaster.ACTOR_NAME) .get.asInstanceOf[ActorHandle[RayAppMaster]] diff --git a/core/shims/common/src/main/scala/com/intel/raydp/shims/CommandLineUtilsBridge.scala b/core/shims/common/src/main/scala/com/intel/raydp/shims/CommandLineUtilsBridge.scala new file mode 100644 index 00000000..2d4b1995 --- /dev/null +++ b/core/shims/common/src/main/scala/com/intel/raydp/shims/CommandLineUtilsBridge.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims + +/** + * Bridge trait that delegates CommandLineUtils behavior to + * version-specific helper objects. Each shim module provides + * a SparkSubmitUtils that extends the real Spark CommandLineUtils + * and implements this bridge. + */ +trait CommandLineUtilsBridge { + def callExit(code: Int): Unit + def setSubmitMaster(args: Any, master: String): Unit +} diff --git a/core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala index 2ca83522..a4e9d22e 100644 --- a/core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala +++ b/core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -21,6 +21,7 @@ import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.java.JavaRDD import org.apache.spark.executor.RayDPExecutorBackendFactory +import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SparkSession} @@ -39,5 +40,9 @@ trait SparkShims { def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext - def toArrowSchema(schema : StructType, timeZoneId : String) : Schema + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema + + def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] + + def getCommandLineUtilsBridge: CommandLineUtilsBridge } diff --git a/core/shims/pom.xml b/core/shims/pom.xml index c013538b..75c8d0cc 100644 --- a/core/shims/pom.xml +++ b/core/shims/pom.xml @@ -21,10 +21,12 @@ spark330 spark340 spark350 + spark400 + spark410 - 2.12 + 2.13 4.3.0 3.2.2 diff --git a/core/shims/spark322/pom.xml b/core/shims/spark322/pom.xml index faff6ac5..295b3d73 100644 --- a/core/shims/spark322/pom.xml +++ b/core/shims/spark322/pom.xml @@ -16,8 +16,7 @@ jar - 2.12.15 - 2.13.5 + 2.13.12 diff --git a/core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala index 6ea817db..a782ecd0 100644 --- a/core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala +++ b/core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -24,8 +24,10 @@ import org.apache.spark.executor.spark322._ import org.apache.spark.spark322.TaskContextUtils import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.spark322.SparkSqlUtils -import com.intel.raydp.shims.{ShimDescriptor, SparkShims} +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.deploy.spark322.SparkSubmitUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType class Spark322Shims extends SparkShims { @@ -46,7 +48,18 @@ class Spark322Shims extends SparkShims { TaskContextUtils.getDummyTaskContext(partitionId, env) } - override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { - SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) + override def toArrowSchema( + schema : StructType, + timeZoneId : String, + sparkSession: SparkSession) : Schema = { + SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId, session = sparkSession) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils } } diff --git a/core/shims/spark322/src/main/scala/org/apache/spark/deploy/spark322/SparkSubmitUtils.scala b/core/shims/spark322/src/main/scala/org/apache/spark/deploy/spark322/SparkSubmitUtils.scala new file mode 100644 index 00000000..53e1057c --- /dev/null +++ b/core/shims/spark322/src/main/scala/org/apache/spark/deploy/spark322/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark322 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].master = master + } +} diff --git a/core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala index be9b409c..609c7112 100644 --- a/core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala +++ b/core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.spark322 import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.types.StructType @@ -29,7 +30,11 @@ object SparkSqlUtils { ArrowConverters.toDataFrame(rdd, schema, new SQLContext(session)) } - def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { + def toArrowSchema(schema : StructType, timeZoneId : String, session: SparkSession) : Schema = { ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd + } } diff --git a/core/shims/spark330/pom.xml b/core/shims/spark330/pom.xml index 4443f658..6972fef1 100644 --- a/core/shims/spark330/pom.xml +++ b/core/shims/spark330/pom.xml @@ -16,8 +16,7 @@ jar - 2.12.15 - 2.13.5 + 2.13.12 diff --git a/core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala index 4f1a50b5..e4236709 100644 --- a/core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala +++ b/core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -24,8 +24,10 @@ import org.apache.spark.executor.spark330._ import org.apache.spark.spark330.TaskContextUtils import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.spark330.SparkSqlUtils -import com.intel.raydp.shims.{ShimDescriptor, SparkShims} +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.deploy.spark330.SparkSubmitUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType class Spark330Shims extends SparkShims { @@ -46,7 +48,22 @@ class Spark330Shims extends SparkShims { TaskContextUtils.getDummyTaskContext(partitionId, env) } - override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { - SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) + override def toArrowSchema( + schema : StructType, + timeZoneId : String, + sparkSession: SparkSession) : Schema = { + SparkSqlUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + sparkSession = sparkSession + ) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils } } diff --git a/core/shims/spark330/src/main/scala/org/apache/spark/deploy/spark330/SparkSubmitUtils.scala b/core/shims/spark330/src/main/scala/org/apache/spark/deploy/spark330/SparkSubmitUtils.scala new file mode 100644 index 00000000..55f4c4e1 --- /dev/null +++ b/core/shims/spark330/src/main/scala/org/apache/spark/deploy/spark330/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark330 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].master = master + } +} diff --git a/core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala index 162371ad..8c937dcd 100644 --- a/core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala +++ b/core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.spark330 import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.types.StructType @@ -29,7 +30,11 @@ object SparkSqlUtils { ArrowConverters.toDataFrame(rdd, schema, session) } - def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = { ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd + } } diff --git a/core/shims/spark340/pom.xml b/core/shims/spark340/pom.xml index 1b312747..52af6ed5 100644 --- a/core/shims/spark340/pom.xml +++ b/core/shims/spark340/pom.xml @@ -16,8 +16,7 @@ jar - 2.12.15 - 2.13.5 + 2.13.12 diff --git a/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShims.scala index c444373f..65295753 100644 --- a/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShims.scala +++ b/core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -24,8 +24,10 @@ import org.apache.spark.executor.spark340._ import org.apache.spark.spark340.TaskContextUtils import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.spark340.SparkSqlUtils -import com.intel.raydp.shims.{ShimDescriptor, SparkShims} +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.deploy.spark340.SparkSubmitUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType class Spark340Shims extends SparkShims { @@ -46,7 +48,22 @@ class Spark340Shims extends SparkShims { TaskContextUtils.getDummyTaskContext(partitionId, env) } - override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { - SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) + override def toArrowSchema( + schema : StructType, + timeZoneId : String, + sparkSession: SparkSession) : Schema = { + SparkSqlUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + sparkSession = sparkSession + ) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils } } diff --git a/core/shims/spark340/src/main/scala/org/apache/spark/deploy/spark340/SparkSubmitUtils.scala b/core/shims/spark340/src/main/scala/org/apache/spark/deploy/spark340/SparkSubmitUtils.scala new file mode 100644 index 00000000..8cb64f64 --- /dev/null +++ b/core/shims/spark340/src/main/scala/org/apache/spark/deploy/spark340/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark340 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].maybeMaster = Some(master) + } +} diff --git a/core/shims/spark340/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark340/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala index eb52d8e7..55bee8dd 100644 --- a/core/shims/spark340/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala +++ b/core/shims/spark340/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.spark340 import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.types._ @@ -39,7 +40,11 @@ object SparkSqlUtils { session.internalCreateDataFrame(rdd.setName("arrow"), schema) } - def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = { ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd + } } diff --git a/core/shims/spark350/pom.xml b/core/shims/spark350/pom.xml index 2368daa2..0afa3289 100644 --- a/core/shims/spark350/pom.xml +++ b/core/shims/spark350/pom.xml @@ -16,8 +16,7 @@ jar - 2.12.15 - 2.13.5 + 2.13.12 diff --git a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala index 721d6923..dcde188c 100644 --- a/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala +++ b/core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -24,8 +24,10 @@ import org.apache.spark.executor.spark350._ import org.apache.spark.spark350.TaskContextUtils import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.spark350.SparkSqlUtils -import com.intel.raydp.shims.{ShimDescriptor, SparkShims} +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.deploy.spark350.SparkSubmitUtils +import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.StructType class Spark350Shims extends SparkShims { @@ -46,7 +48,21 @@ class Spark350Shims extends SparkShims { TaskContextUtils.getDummyTaskContext(partitionId, env) } - override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { - SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId) + override def toArrowSchema(schema : StructType, + timeZoneId : String, + sparkSession: SparkSession) : Schema = { + SparkSqlUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + sparkSession = sparkSession + ) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils } } diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/deploy/spark350/SparkSubmitUtils.scala b/core/shims/spark350/src/main/scala/org/apache/spark/deploy/spark350/SparkSubmitUtils.scala new file mode 100644 index 00000000..1802edbe --- /dev/null +++ b/core/shims/spark350/src/main/scala/org/apache/spark/deploy/spark350/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark350 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].maybeMaster = Some(master) + } +} diff --git a/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala index dfd063f7..a12c4256 100644 --- a/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala +++ b/core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.spark350 import org.apache.arrow.vector.types.pojo.Schema import org.apache.spark.TaskContext import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.types._ @@ -39,7 +40,18 @@ object SparkSqlUtils { session.internalCreateDataFrame(rdd.setName("arrow"), schema) } - def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = { - ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId, errorOnDuplicatedFieldNames = false) + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = { + val errorOnDuplicatedFieldNames = + sparkSession.sessionState.conf.pandasStructHandlingMode == "legacy" + + ArrowUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + errorOnDuplicatedFieldNames = errorOnDuplicatedFieldNames + ) + } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd } } diff --git a/core/shims/spark400/pom.xml b/core/shims/spark400/pom.xml new file mode 100644 index 00000000..fd3f8494 --- /dev/null +++ b/core/shims/spark400/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + + com.intel + raydp-shims + 1.7.0-SNAPSHOT + ../pom.xml + + + raydp-shims-spark400 + RayDP Shims for Spark 4.0.0 + jar + + + 2.13.12 + + + + + + org.scalastyle + scalastyle-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + + + + src/main/resources + + + + + + + com.intel + raydp-shims-common + ${project.version} + compile + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark400.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark400.version} + provided + + + org.xerial.snappy + snappy-java + + + io.netty + netty-handler + + + + + org.xerial.snappy + snappy-java + ${snappy.version} + + + io.netty + netty-handler + ${netty.version} + + + diff --git a/core/shims/spark400/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider b/core/shims/spark400/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider new file mode 100644 index 00000000..f88bbd7a --- /dev/null +++ b/core/shims/spark400/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider @@ -0,0 +1 @@ +com.intel.raydp.shims.spark400.SparkShimProvider diff --git a/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala b/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala new file mode 100644 index 00000000..70eeef10 --- /dev/null +++ b/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark400 + +import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor} + +object SparkShimProvider { + private val SUPPORTED_PATCHES = 0 to 2 + val DESCRIPTORS = SUPPORTED_PATCHES.map(p => SparkShimDescriptor(4, 0, p)) + val DESCRIPTOR_STRINGS = DESCRIPTORS.map(_.toString) + val DESCRIPTOR = DESCRIPTORS.head +} + +class SparkShimProvider extends com.intel.raydp.shims.SparkShimProvider { + def createShim: SparkShims = { + new Spark400Shims() + } + + def matches(version: String): Boolean = { + SparkShimProvider.DESCRIPTOR_STRINGS.contains(version) + } +} diff --git a/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShims.scala new file mode 100644 index 00000000..722099b3 --- /dev/null +++ b/core/shims/spark400/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark400 + +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.executor.{RayDPExecutorBackendFactory, RayDPSpark400ExecutorBackendFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.spark400.SparkSqlUtils +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.spark400.TaskContextUtils +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} +import org.apache.spark.deploy.spark400.SparkSubmitUtils +import org.apache.spark.rdd.{MapPartitionsRDD, RDD} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.arrow.ArrowConverters + +import scala.reflect.ClassTag + +class Spark400Shims extends SparkShims { + override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR + + override def toDataFrame( + rdd: JavaRDD[Array[Byte]], + schema: String, + session: SparkSession): DataFrame = { + SparkSqlUtils.toDataFrame(rdd, schema, session) + } + + override def getExecutorBackendFactory(): RayDPExecutorBackendFactory = { + new RayDPSpark400ExecutorBackendFactory() + } + + override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + TaskContextUtils.getDummyTaskContext(partitionId, env) + } + + override def toArrowSchema( + schema: StructType, + timeZoneId: String, + sparkSession: SparkSession): Schema = { + SparkSqlUtils.toArrowSchema(schema, timeZoneId, sparkSession) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils + } +} diff --git a/core/shims/spark400/src/main/scala/org/apache/spark/TaskContextUtils.scala b/core/shims/spark400/src/main/scala/org/apache/spark/TaskContextUtils.scala new file mode 100644 index 00000000..287105cd --- /dev/null +++ b/core/shims/spark400/src/main/scala/org/apache/spark/TaskContextUtils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.spark400 + +import java.util.Properties + +import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark.memory.TaskMemoryManager + +object TaskContextUtils { + def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + new TaskContextImpl(0, 0, partitionId, -1024, 0, 0, + new TaskMemoryManager(env.memoryManager, 0), new Properties(), env.metricsSystem) + } +} diff --git a/core/shims/spark400/src/main/scala/org/apache/spark/deploy/spark400/SparkSubmitUtils.scala b/core/shims/spark400/src/main/scala/org/apache/spark/deploy/spark400/SparkSubmitUtils.scala new file mode 100644 index 00000000..0f5aac07 --- /dev/null +++ b/core/shims/spark400/src/main/scala/org/apache/spark/deploy/spark400/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark400 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].maybeMaster = Some(master) + } +} diff --git a/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala b/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala new file mode 100644 index 00000000..2e6b5e25 --- /dev/null +++ b/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.net.URL + +import org.apache.spark.SparkEnv +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +class RayCoarseGrainedExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + env, + resourcesFileOpt, + resourceProfile) { + + override def getUserClassPath: Seq[URL] = userClassPath + +} diff --git a/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayDPSpark400ExecutorBackendFactory.scala b/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayDPSpark400ExecutorBackendFactory.scala new file mode 100644 index 00000000..eed998bd --- /dev/null +++ b/core/shims/spark400/src/main/scala/org/apache/spark/executor/RayDPSpark400ExecutorBackendFactory.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.SparkEnv +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +import java.net.URL + +class RayDPSpark400ExecutorBackendFactory + extends RayDPExecutorBackendFactory { + override def createExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile): CoarseGrainedExecutorBackend = { + new RayCoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + userClassPath, + env, + resourcesFileOpt, + resourceProfile) + } +} diff --git a/core/shims/spark400/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark400/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala new file mode 100644 index 00000000..aab0e2fe --- /dev/null +++ b/core/shims/spark400/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.spark400 + +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.TaskContext +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.classic.ClassicConversions.castToImpl +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.arrow.ArrowConverters +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.ArrowUtils + +object SparkSqlUtils { + def toDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], + schemaString: String, + session: SparkSession): DataFrame = { + val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + val timeZoneId = session.sessionState.conf.sessionLocalTimeZone + val internalRowRdd = arrowBatchRDD.rdd.mapPartitions { iter => + val context = TaskContext.get() + ArrowConverters.fromBatchIterator( + arrowBatchIter = iter, + schema = schema, + timeZoneId = timeZoneId, + errorOnDuplicatedFieldNames = false, + largeVarTypes = false, + context = context) + } + session.internalCreateDataFrame(internalRowRdd.setName("arrow"), schema) + } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd + } + + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = { + val errorOnDuplicatedFieldNames = + sparkSession.sessionState.conf.pandasStructHandlingMode == "legacy" + val largeVarTypes = + sparkSession.sessionState.conf.arrowUseLargeVarTypes + + ArrowUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + errorOnDuplicatedFieldNames = errorOnDuplicatedFieldNames, + largeVarTypes = largeVarTypes + ) + } +} diff --git a/core/shims/spark410/pom.xml b/core/shims/spark410/pom.xml new file mode 100644 index 00000000..df13c9cb --- /dev/null +++ b/core/shims/spark410/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + + com.intel + raydp-shims + 1.7.0-SNAPSHOT + ../pom.xml + + + raydp-shims-spark410 + RayDP Shims for Spark 4.1.0 + jar + + + 2.13.12 + + + + + + org.scalastyle + scalastyle-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + + + + src/main/resources + + + + + + + com.intel + raydp-shims-common + ${project.version} + compile + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark410.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark410.version} + provided + + + org.xerial.snappy + snappy-java + + + io.netty + netty-handler + + + + + org.xerial.snappy + snappy-java + ${snappy.version} + + + io.netty + netty-handler + ${netty.version} + + + diff --git a/core/shims/spark410/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider b/core/shims/spark410/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider new file mode 100644 index 00000000..38ccea86 --- /dev/null +++ b/core/shims/spark410/src/main/resources/META-INF/services/com.intel.raydp.shims.SparkShimProvider @@ -0,0 +1 @@ +com.intel.raydp.shims.spark410.SparkShimProvider diff --git a/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala b/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala new file mode 100644 index 00000000..96d8b7a0 --- /dev/null +++ b/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShimProvider.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark410 + +import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor} + +object SparkShimProvider { + private val SUPPORTED_PATCHES = 0 to 1 + val DESCRIPTORS = SUPPORTED_PATCHES.map(p => SparkShimDescriptor(4, 1, p)) + val DESCRIPTOR_STRINGS = DESCRIPTORS.map(_.toString) + val DESCRIPTOR = DESCRIPTORS.head +} + +class SparkShimProvider extends com.intel.raydp.shims.SparkShimProvider { + def createShim: SparkShims = { + new Spark410Shims() + } + + def matches(version: String): Boolean = { + SparkShimProvider.DESCRIPTOR_STRINGS.contains(version) + } +} diff --git a/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShims.scala b/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShims.scala new file mode 100644 index 00000000..3efb60dc --- /dev/null +++ b/core/shims/spark410/src/main/scala/com/intel/raydp/shims/SparkShims.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.raydp.shims.spark410 + +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.executor.{RayDPExecutorBackendFactory, RayDPSpark410ExecutorBackendFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.spark410.SparkSqlUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.spark410.TaskContextUtils +import com.intel.raydp.shims.{CommandLineUtilsBridge, ShimDescriptor, SparkShims} +import org.apache.spark.deploy.spark410.SparkSubmitUtils +import org.apache.spark.rdd.RDD + +class Spark410Shims extends SparkShims { + override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR + + override def toDataFrame( + rdd: JavaRDD[Array[Byte]], + schema: String, + session: SparkSession): DataFrame = { + SparkSqlUtils.toDataFrame(rdd, schema, session) + } + + override def getExecutorBackendFactory(): RayDPExecutorBackendFactory = { + new RayDPSpark410ExecutorBackendFactory() + } + + override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + TaskContextUtils.getDummyTaskContext(partitionId, env) + } + + override def toArrowSchema( + schema: StructType, + timeZoneId: String, + sparkSession: SparkSession): Schema = { + SparkSqlUtils.toArrowSchema(schema, timeZoneId, sparkSession) + } + + override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = { + SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession) + } + + override def getCommandLineUtilsBridge: CommandLineUtilsBridge = { + SparkSubmitUtils + } +} diff --git a/core/shims/spark410/src/main/scala/org/apache/spark/TaskContextUtils.scala b/core/shims/spark410/src/main/scala/org/apache/spark/TaskContextUtils.scala new file mode 100644 index 00000000..d46b6822 --- /dev/null +++ b/core/shims/spark410/src/main/scala/org/apache/spark/TaskContextUtils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.spark410 + +import java.util.Properties + +import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark.memory.TaskMemoryManager + +object TaskContextUtils { + def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = { + new TaskContextImpl(0, 0, partitionId, -1024, 0, 0, + new TaskMemoryManager(env.memoryManager, 0), new Properties(), env.metricsSystem) + } +} diff --git a/core/shims/spark410/src/main/scala/org/apache/spark/deploy/spark410/SparkSubmitUtils.scala b/core/shims/spark410/src/main/scala/org/apache/spark/deploy/spark410/SparkSubmitUtils.scala new file mode 100644 index 00000000..fc559f76 --- /dev/null +++ b/core/shims/spark410/src/main/scala/org/apache/spark/deploy/spark410/SparkSubmitUtils.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.spark410 + +import org.apache.spark.deploy.SparkSubmitArguments +import org.apache.spark.util.CommandLineUtils +import com.intel.raydp.shims.CommandLineUtilsBridge + +object SparkSubmitUtils + extends CommandLineUtils with CommandLineUtilsBridge { + override def main(args: Array[String]): Unit = {} + + override def callExit(code: Int): Unit = exitFn(code, None) + + override def setSubmitMaster(args: Any, master: String): Unit = { + args.asInstanceOf[SparkSubmitArguments].maybeMaster = Some(master) + } +} diff --git a/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala b/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala new file mode 100644 index 00000000..2e6b5e25 --- /dev/null +++ b/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayCoarseGrainedExecutorBackend.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.net.URL + +import org.apache.spark.SparkEnv +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +class RayCoarseGrainedExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile) + extends CoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + env, + resourcesFileOpt, + resourceProfile) { + + override def getUserClassPath: Seq[URL] = userClassPath + +} diff --git a/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayDPSpark410ExecutorBackendFactory.scala b/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayDPSpark410ExecutorBackendFactory.scala new file mode 100644 index 00000000..35433182 --- /dev/null +++ b/core/shims/spark410/src/main/scala/org/apache/spark/executor/RayDPSpark410ExecutorBackendFactory.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.SparkEnv +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.rpc.RpcEnv + +import java.net.URL + +class RayDPSpark410ExecutorBackendFactory + extends RayDPExecutorBackendFactory { + override def createExecutorBackend( + rpcEnv: RpcEnv, + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + userClassPath: Seq[URL], + env: SparkEnv, + resourcesFileOpt: Option[String], + resourceProfile: ResourceProfile): CoarseGrainedExecutorBackend = { + new RayCoarseGrainedExecutorBackend( + rpcEnv, + driverUrl, + executorId, + bindAddress, + hostname, + cores, + userClassPath, + env, + resourcesFileOpt, + resourceProfile) + } +} diff --git a/core/shims/spark410/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala b/core/shims/spark410/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala new file mode 100644 index 00000000..cdbb0809 --- /dev/null +++ b/core/shims/spark410/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.spark410 + +import org.apache.arrow.vector.types.pojo.Schema +import org.apache.spark.TaskContext +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.classic.ClassicConversions.castToImpl +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.execution.arrow.ArrowConverters +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.ArrowUtils + +object SparkSqlUtils { + def toDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], + schemaString: String, + session: SparkSession): DataFrame = { + val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + val timeZoneId = session.sessionState.conf.sessionLocalTimeZone + val internalRowRdd = arrowBatchRDD.rdd.mapPartitions { iter => + val context = TaskContext.get() + ArrowConverters.fromBatchIterator( + arrowBatchIter = iter, + schema = schema, + timeZoneId = timeZoneId, + errorOnDuplicatedFieldNames = false, + largeVarTypes = false, + context = context) + } + session.internalCreateDataFrame(internalRowRdd.setName("arrow"), schema) + } + + def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = { + dataFrame.toArrowBatchRdd + } + + def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = { + val errorOnDuplicatedFieldNames = + sparkSession.sessionState.conf.pandasStructHandlingMode == "legacy" + val largeVarTypes = + sparkSession.sessionState.conf.arrowUseLargeVarTypes + + ArrowUtils.toArrowSchema( + schema = schema, + timeZoneId = timeZoneId, + errorOnDuplicatedFieldNames = errorOnDuplicatedFieldNames, + largeVarTypes = largeVarTypes + ) + } +} diff --git a/examples/tensorflow_titanic.ipynb b/examples/tensorflow_titanic.ipynb index e6f6c1e0..4837e5cc 100644 --- a/examples/tensorflow_titanic.ipynb +++ b/examples/tensorflow_titanic.ipynb @@ -15,6 +15,7 @@ "source": [ "import ray\n", "import os\n", + "os.environ[\"TF_USE_LEGACY_KERAS\"] = \"1\"\n", "import re\n", "import pandas as pd, numpy as np\n", "\n", diff --git a/python/pylintrc b/python/pylintrc index 907de8cb..103656f9 100644 --- a/python/pylintrc +++ b/python/pylintrc @@ -74,65 +74,33 @@ confidence= # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" disable=abstract-method, - apply-builtin, arguments-differ, attribute-defined-outside-init, - backtick, - basestring-builtin, broad-except, - buffer-builtin, - cmp-builtin, - cmp-method, - coerce-builtin, - coerce-method, + broad-exception-raised, + consider-using-dict-items, + consider-using-f-string, + consider-using-from-import, + consider-using-generator, dangerous-default-value, - delslice-method, duplicate-code, - execfile-builtin, - file-builtin, - filter-builtin-not-iterating, fixme, - getslice-method, global-statement, - hex-method, + global-variable-not-assigned, import-error, import-self, - import-star-module-level, - input-builtin, - intern-builtin, invalid-name, locally-disabled, logging-fstring-interpolation, - long-builtin, - long-suffix, - map-builtin-not-iterating, missing-docstring, missing-function-docstring, - metaclass-assignment, - next-method-called, - next-method-defined, - no-absolute-import, no-else-return, no-member, + not-callable, + possibly-used-before-assignment, no-name-in-module, - no-self-use, - nonzero-method, - oct-method, - old-division, - old-ne-operator, - old-octal-literal, - old-raise-syntax, - parameter-unpacking, - print-statement, protected-access, - raising-string, - range-builtin-not-iterating, redefined-outer-name, - reduce-builtin, - reload-builtin, - round-builtin, - setslice-method, - standarderror-builtin, suppressed-message, too-few-public-methods, too-many-ancestors, @@ -143,18 +111,15 @@ disable=abstract-method, too-many-public-methods, too-many-return-statements, too-many-statements, - unichr-builtin, - unicode-builtin, - unpacking-in-except, unused-argument, unused-import, + unreachable, + unspecified-encoding, unused-variable, + use-dict-literal, useless-else-on-loop, useless-suppression, - using-cmp-argument, wrong-import-order, - xrange-builtin, - zip-builtin-not-iterating, [REPORTS] @@ -164,12 +129,6 @@ disable=abstract-method, # mypackage.mymodule.MyReporterClass. output-format=text -# Put messages in a separate file for each module / package specified on the -# command line instead of printing them on stdout. Reports (if any) will be -# written in a file name "pylint_global.[txt|html]". This option is deprecated -# and it will be removed in Pylint 2.0. -files-output=no - # Tells whether to display a full report or only the messages reports=no @@ -206,72 +165,39 @@ bad-names=foo,bar,baz,toto,tutu,tata # the name regexes allow several styles. name-group= -# Include a hint for the correct naming format with invalid-name -include-naming-hint=no - # List of decorators that produce properties, such as abc.abstractproperty. Add # to this list to register other decorators that produce valid properties. property-classes=abc.abstractproperty function-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for function names -function-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct variable names variable-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for variable names -variable-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct constant names const-rgx=(([A-Za-z_][A-Za-z0-9_]*)|(__.*__))$ -# Naming hint for constant names -const-name-hint=(([A-a-zZ_][A-Za-z0-9_]*)|(__.*__))$ - # Regular expression matching correct attribute names attr-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for attribute names -attr-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct argument names argument-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for argument names -argument-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression matching correct class attribute names class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ -# Naming hint for class attribute names -class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$ - # Regular expression matching correct inline iteration names inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ -# Naming hint for inline iteration names -inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$ - # Regular expression matching correct class names class-rgx=[A-Z_][a-zA-Z0-9]+$ -# Naming hint for class names -class-name-hint=[A-Z_][a-zA-Z0-9]+$ - # Regular expression matching correct module names module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ -# Naming hint for module names -module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ - # Regular expression matching correct method names method-rgx=[a-z_][a-z0-9_]{2,30}$ -# Naming hint for method names -method-name-hint=[a-z_][a-z0-9_]{2,30}$ - # Regular expression which should only match function or class names that do # not require a docstring. no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$ @@ -326,12 +252,6 @@ ignore-long-lines=(?x)( # else. single-line-if-stmt=yes -# List of optional constructs for which whitespace checking is disabled. `dict- -# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. -# `trailing-comma` allows a space between comma and closing bracket: (a, ). -# `empty-line` allows space-only lines. -no-space-check=trailing-comma,dict-separator - # Maximum number of lines in a module max-module-lines=1000 @@ -481,6 +401,5 @@ valid-metaclass-classmethod-first-arg=mcs # Exceptions that will emit a warning when being caught. Defaults to # "Exception" -overgeneral-exceptions=StandardError, - Exception, - BaseException +overgeneral-exceptions=builtins.Exception, + builtins.BaseException diff --git a/python/raydp/tests/conftest.py b/python/raydp/tests/conftest.py index a2a43b6d..8e01615f 100644 --- a/python/raydp/tests/conftest.py +++ b/python/raydp/tests/conftest.py @@ -65,7 +65,8 @@ def jdk17_extra_spark_configs() -> Dict[str, str]: @pytest.fixture(scope="function") def spark_session(request, jdk17_extra_spark_configs): - builder = SparkSession.builder.master("local[2]").appName("RayDP test") + builder = SparkSession.builder.master("local[2]").appName("RayDP test") \ + .config("spark.sql.ansi.enabled", "false") for k, v in jdk17_extra_spark_configs.items(): builder = builder.config(k, v) spark = builder.getOrCreate() @@ -98,6 +99,7 @@ def spark_on_ray_small(request, jdk17_extra_spark_configs): extra_configs = { "spark.driver.host": node_ip, "spark.driver.bindAddress": node_ip, + "spark.sql.ansi.enabled": "false", **jdk17_extra_spark_configs } spark = raydp.init_spark("test", 1, 1, "500M", configs=extra_configs) @@ -126,6 +128,7 @@ def spark_on_ray_2_executors(request, jdk17_extra_spark_configs): extra_configs = { "spark.driver.host": node_ip, "spark.driver.bindAddress": node_ip, + "spark.sql.ansi.enabled": "false", **jdk17_extra_spark_configs } spark = raydp.init_spark("test", 2, 1, "500M", configs=extra_configs) diff --git a/python/raydp/tf/estimator.py b/python/raydp/tf/estimator.py index 1eb3bf1d..fdbbcc8d 100644 --- a/python/raydp/tf/estimator.py +++ b/python/raydp/tf/estimator.py @@ -38,6 +38,7 @@ from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF from raydp import stop_spark + class TFEstimator(EstimatorInterface, SparkEstimatorInterface): def __init__(self, num_workers: int = 1, diff --git a/python/setup.py b/python/setup.py index 54077281..ed6b05ed 100644 --- a/python/setup.py +++ b/python/setup.py @@ -96,11 +96,11 @@ def run(self): install_requires = [ "numpy", - "pandas >= 1.1.4", + "pandas >= 2.2.0, < 3.0.0", "psutil", "pyarrow >= 4.0.1", "ray >= 2.37.0", - "pyspark >= 3.1.1, <=3.5.7", + "pyspark >= 4.0.0", "protobuf > 3.19.5" ] @@ -131,13 +131,16 @@ def run(self): 'build_proto_modules': CustomBuildPackageProtos, }, install_requires=install_requires, + extras_require={ + "tensorflow": ["tensorflow>=2.15.1,<2.16"], + "tensorflow-gpu": ["tensorflow[and-cuda]>=2.15.1,<2.16"], + }, setup_requires=["grpcio-tools"], - python_requires='>=3.6', + python_requires='>=3.10', classifiers=[ 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', ] ) finally: