Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 14 additions & 36 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,76 +32,58 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
python-version: [3.9, 3.10.14]
spark-version: [3.3.2, 3.4.0, 3.5.0]
ray-version: [2.37.0, 2.40.0, 2.50.0]
python-version: ["3.10", "3.12"]
spark-version: [4.1.1]
ray-version: [2.53.0]

runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2.3.4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Set up JDK 17
uses: actions/setup-java@v5
uses: actions/setup-java@v4
with:
java-version: 17
distribution: 'corretto'
- name: Install extra dependencies for macOS
if: matrix.os == 'macos-latest'
run: |
brew install pkg-config
brew install libuv libomp mpich
- name: Install extra dependencies for Ubuntu
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get install -y mpich
- name: Cache pip - Ubuntu
if: matrix.os == 'ubuntu-latest'
uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2
- name: Cache pip
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ matrix.os }}-${{ matrix.python-version }}-pip
- name: Cache pip - MacOS
if: matrix.os == 'macos-latest'
uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2
with:
path: ~/Library/Caches/pip
key: ${{ matrix.os }}-${{ matrix.python-version }}-pip
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install wheel
pip install "numpy<1.24" "click<8.3.0"
pip install "pydantic<2.0"
SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
pip install wheel build
if [ "$(uname -s)" == "Linux" ]
then
pip install torch --index-url https://download.pytorch.org/whl/cpu
else
pip install torch
fi
pip install pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install "xgboost<=2.0.3"
pip install torchmetrics
pip install "pandas>=2.2,<3" pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tabulate grpcio-tools wget
pip install torchmetrics xgboost
pip install tensorflow-cpu
- name: Cache Maven
uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2
uses: actions/cache@v4
with:
path: ~/.m2
key: ${{ matrix.os }}-m2-${{ hashFiles('core/pom.xml') }}
- name: Build and install
env:
GITHUB_CI: 1
run: |
pip install pyspark==${{ matrix.spark-version }}
./build.sh
pip install dist/raydp-*.whl
- name: Lint
run: |
pip install pylint==2.8.3
pip install pylint==3.3.6
pylint --rcfile=python/pylintrc python/raydp
pylint --rcfile=python/pylintrc examples/*.py
- name: Test with pytest
Expand All @@ -115,8 +97,4 @@ jobs:
python examples/raydp-submit.py
python examples/test_raydp_submit_pyfiles.py
ray stop
python examples/pytorch_nyctaxi.py
python examples/tensorflow_nyctaxi.py
python examples/xgboost_ray_nyctaxi.py
# python examples/raytrain_nyctaxi.py
python examples/data_process.py
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ _SUCCESS

.metals/
.bloop/

.venv/

# Generated by python/setup.py during build
python/raydp/jars/__init__.py
python/raydp/bin/__init__.py
python/raydp/bin/raydp-submit
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray a

## Installation

You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.
RayDP 2.0 requires **Java 17**, **Spark 4.1+**, and **Ray 2.53+**.

**Prerequisites:**
1. Install **Java 17**.
2. Set `JAVA_HOME` to your Java 17 installation.

You can install the latest RayDP using pip:

```shell
pip install raydp
Expand Down
7 changes: 1 addition & 6 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ fi
# build core part
CORE_DIR="${CURRENT_DIR}/core"
pushd ${CORE_DIR}
if [[ -z $GITHUB_CI ]];
then
mvn clean package -q -DskipTests
else
mvn verify -q
fi
mvn clean package -q -DskipTests
popd # core dir

# build python part
Expand Down
2 changes: 1 addition & 1 deletion core/agent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.intel</groupId>
<artifactId>raydp-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
46 changes: 30 additions & 16 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,28 @@

<groupId>com.intel</groupId>
<artifactId>raydp-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>RayDP Parent Pom</name>
<url>https://github.com/ray-project/raydp.git</url>

<properties>
<spark.version>3.3.3</spark.version>
<spark322.version>3.2.2</spark322.version>
<spark330.version>3.3.0</spark330.version>
<spark340.version>3.4.0</spark340.version>
<spark350.version>3.5.0</spark350.version>
<snappy.version>1.1.10.4</snappy.version>
<netty.version>4.1.94.Final</netty.version>
<commons.text.version>1.10.0</commons.text.version>
<commons.compress.version>1.26.0</commons.compress.version>
<ray.version>2.47.1</ray.version>
<spark.version>4.1.1</spark.version>
<snappy.version>1.1.10.5</snappy.version>
<netty.version>4.1.108.Final</netty.version>
<commons.text.version>1.12.0</commons.text.version>
<commons.compress.version>1.26.1</commons.compress.version>
<rhino.version>1.7.14.1</rhino.version>
<protobuf.version>3.25.5</protobuf.version>
<ivy.version>2.5.2</ivy.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.15</scala.version>
<jackson.version>2.15.0</jackson.version>
<scala.binary.version>2.12</scala.binary.version>
<java.version>17</java.version>
<scala.version>2.13.17</scala.version>
<jackson.version>2.17.0</jackson.version>
<scala.binary.version>2.13</scala.binary.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
</properties>

Expand Down Expand Up @@ -144,7 +140,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.18.0</version>
<version>3.17.0</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -197,6 +193,24 @@
</dependencyManagement>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<release>${java.version}</release>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
21 changes: 9 additions & 12 deletions core/raydp-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@
<parent>
<groupId>com.intel</groupId>
<artifactId>raydp-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>raydp</artifactId>
<name>raydp</name>

<properties>
<ray.version>2.1.0</ray.version>
</properties>

<repositories>
<repository>
<id>sonatype</id>
Expand Down Expand Up @@ -128,7 +124,12 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.18.0</version>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<version>2.1.1</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -184,10 +185,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -202,7 +199,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.3</version>
<version>4.8.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
Expand All @@ -228,7 +225,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<version>3.2.5</version>
</plugin>

<!-- <plugin>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ private[spark] class SparkSubmit extends Logging {
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
Expand Down Expand Up @@ -1044,7 +1043,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
exitFn(e.exitCode, None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ class RayAppMaster(host: String,
s"{ CPU: $rayActorCPU, " +
s"${appInfo.desc.resourceReqsPerExecutor
.map { case (name, amount) => s"$name: $amount" }.mkString(", ")} }..")
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*

// This will check with dynamic auto scale no additional pending executor actor added more
// than max executors count as this result in executor even running after job completion
Expand All @@ -292,8 +291,6 @@ class RayAppMaster(host: String,
getAppMasterEndpointUrl(),
rayActorCPU,
memory,
// This won't work, Spark expect integer in custom resources,
// please see python test test_spark_on_fractional_custom_resource
appInfo.desc.resourceReqsPerExecutor
.map { case (name, amount) => (name, Double.box(amount)) }.asJava,
placementGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,29 +324,51 @@ class RayDPExecutor(
val env = SparkEnv.get
val context = SparkShimLoader.getSparkShims.getDummyTaskContext(partitionId, env)
TaskContext.setTaskContext(context)
val schema = Schema.fromJSON(schemaStr)
val blockId = BlockId.apply("rdd_" + rddId + "_" + partitionId)
val iterator = env.blockManager.get(blockId)(classTag[Array[Byte]]) match {
case Some(blockResult) =>
blockResult.data.asInstanceOf[Iterator[Array[Byte]]]
case None =>
logWarning("The cached block has been lost. Cache it again via driver agent")
requestRecacheRDD(rddId, driverAgentUrl)
env.blockManager.get(blockId)(classTag[Array[Byte]]) match {
case Some(blockResult) =>
blockResult.data.asInstanceOf[Iterator[Array[Byte]]]
case None =>
throw new RayDPException("Still cannot get the block after recache!")
}
val taskAttemptId = context.taskAttemptId()
env.blockManager.registerTask(taskAttemptId)
try {
val schema = Schema.fromJSON(schemaStr)
val blockId = BlockId.apply("rdd_" + rddId + "_" + partitionId)
val (iterator, blockBytes) = env.blockManager.get(blockId)(classTag[Array[Byte]]) match {
case Some(blockResult) =>
(blockResult.data.asInstanceOf[Iterator[Array[Byte]]], blockResult.bytes)
case None =>
logWarning("The cached block has been lost. Cache it again via driver agent")
requestRecacheRDD(rddId, driverAgentUrl)
env.blockManager.get(blockId)(classTag[Array[Byte]]) match {
case Some(blockResult) =>
(blockResult.data.asInstanceOf[Iterator[Array[Byte]]], blockResult.bytes)
case None =>
throw new RayDPException("Still cannot get the block after recache!")
}
}
// Collect batch byte arrays (references only; data already in memory from BlockManager)
val batches = iterator.toArray

// Serialize schema header and EOS marker into small temp buffers
val schemaBuf = new ByteArrayOutputStream(512)
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(schemaBuf)), schema)
val schemaBytes = schemaBuf.toByteArray

val eosBuf = new ByteArrayOutputStream(16)
ArrowStreamWriter.writeEndOfStream(new WriteChannel(Channels.newChannel(eosBuf)), new IpcOption)
val eosBytes = eosBuf.toByteArray

// Single allocation: copy schema + batches + EOS directly (avoids BAOS + toByteArray double-copy)
val totalSize = schemaBytes.length + batches.map(_.length.toLong).sum + eosBytes.length
val result = new Array[Byte](totalSize.toInt)
var offset = 0
System.arraycopy(schemaBytes, 0, result, offset, schemaBytes.length)
offset += schemaBytes.length
batches.foreach { batch =>
System.arraycopy(batch, 0, result, offset, batch.length)
offset += batch.length
}
System.arraycopy(eosBytes, 0, result, offset, eosBytes.length)
result
} finally {
env.blockManager.releaseAllLocksForTask(taskAttemptId)
TaskContext.unset()
}
val byteOut = new ByteArrayOutputStream()
val writeChannel = new WriteChannel(Channels.newChannel(byteOut))
MessageSerializer.serialize(writeChannel, schema)
iterator.foreach(writeChannel.write)
ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption)
val result = byteOut.toByteArray
writeChannel.close
byteOut.close
result
}
}
Loading