From 7fdf72b0df9f92c724a1634b1f16c069c673ce07 Mon Sep 17 00:00:00 2001 From: sudhir Date: Sat, 25 Oct 2025 10:32:21 +0800 Subject: [PATCH 1/2] Documentation: RayDP Architecture - Ray & Spark Interaction --- ... Architecture - Ray & Spark Interaction.md | 441 ++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 RayDP Architecture - Ray & Spark Interaction.md diff --git a/RayDP Architecture - Ray & Spark Interaction.md b/RayDP Architecture - Ray & Spark Interaction.md new file mode 100644 index 00000000..76243e02 --- /dev/null +++ b/RayDP Architecture - Ray & Spark Interaction.md @@ -0,0 +1,441 @@ + + +## RayDP Internal Architecture Explained + +RayDP bridges Apache Spark and Ray by running Spark executors as Ray actors while enabling efficient data conversion between Spark DataFrames and Ray Datasets through Ray's shared object store.[^1][^2][^3] + +### High-Level Architecture + +**Core Design Principles:** + +RayDP implements a unified compute substrate where Spark runs on top of Ray rather than managing its own cluster:[^2][^3][^1] + +1. **Ray as Resource Manager**: Ray replaces Spark's traditional resource manager (YARN/Mesos/Kubernetes). Ray's scheduler allocates resources and manages Spark executor lifecycles.[^3][^1] +2. **Spark Executors as Ray Actors**: Each Spark executor runs inside a Ray actor, which provides isolation, resource guarantees, and statefulness.[^1][^2][^3] +3. **Dual Communication Protocols**: + - Spark executors communicate with each other using **Spark's internal RPC protocol** for shuffle, broadcast, and task coordination[^1] + - Data interchange between Spark and Ray uses **Ray's object store** for zero-copy efficiency[^3][^1] + +### Detailed Component Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Ray Cluster │ +│ ┌───────────────────────────────────────────────────────┐ │ +│ │ Ray Global Control Store (GCS) │ │ +│ │ - Tracks all actors (including Spark executors) │ │ +│ │ - Manages object locations │ │ +│ │ - Coordinates resource allocation │ │ +│ └───────────────────────────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────┐ │ +│ │ Driver Process (Python) │ │ +│ │ ├─ PySpark Driver (manages Spark jobs) │ │ +│ │ └─ Ray Driver (submits Ray tasks/actors) │ │ +│ └──────────────────────────────────────────────────────┘ │ +│ │ +│ ┌────────────────┐ ┌────────────────┐ ┌──────────────┐ │ +│ │ Ray Node 1 │ │ Ray Node 2 │ │ Ray Node N │ │ +│ │ │ │ │ │ │ │ +│ │ ┌────────────┐ │ │ ┌────────────┐ │ │┌───────────┐│ │ +│ │ │Ray Actor 1 │ │ │ │Ray Actor 2 │ │ ││Ray Actor N││ │ +│ │ │ │ │ │ │ │ │ ││ ││ │ +│ │ │ Spark │ │ │ │ Spark │ │ ││ Spark ││ │ +│ │ │ Executor │ │ │ │ Executor │ │ ││ Executor ││ │ +│ │ │ │ │ │ │ │ │ ││ ││ │ +│ │ │ - JVM │ │ │ │ - JVM │ │ ││ - JVM ││ │ +│ │ │ - Cores │ │ │ │ - Cores │ │ ││ - Cores ││ │ +│ │ │ - Memory │ │ │ │ - Memory │ │ ││ - Memory ││ │ +│ │ └────────────┘ │ │ └────────────┘ │ │└───────────┘│ │ +│ │ ↕ │ │ ↕ │ │ ↕ │ │ +│ │ Object Store │ │ Object Store │ │ Object Store│ │ +│ │ (Shared Mem) │ │ (Shared Mem) │ │(Shared Mem) │ │ +│ └────────────────┘ └────────────────┘ └──────────────┘ │ +│ ↕ Spark RPC (shuffle/broadcast) ↕ │ +└─────────────────────────────────────────────────────────────┘ +``` + + +### How Spark Executors Run as Ray Actors + +When you call `raydp.init_spark()`, the following sequence occurs:[^2][^3][^1] + +**Step 1: Initialize Ray Cluster** + +```python +ray.init(address='auto') +``` + +- Connects to existing Ray cluster or starts a local one +- Ray GCS becomes active, ready to manage resources + +**Step 2: Create Spark on Ray** + +```python +spark = raydp.init_spark( + app_name='RayDP Example', + num_executors=2, + executor_cores=2, + executor_memory='4GB' +) +``` + +**Internal Flow:** + +1. **Driver Setup**: RayDP starts a Spark driver in the current Python process +2. **Resource Reservation**: RayDP calculates total resources needed: + - 2 executors × 2 cores = 4 CPUs + - 2 executors × 4GB = 8GB memory +3. **Actor Creation**: For each executor, RayDP creates a Ray actor: + +```python +@ray.remote(num_cpus=2, memory=4*1024**3) +class SparkExecutorActor: + def __init__(self): + # Start JVM with Spark executor code + self.jvm = start_jvm_with_spark_executor(...) + + def execute_task(self, task_data): + # Spark tasks execute in this JVM + return self.jvm.run_task(task_data) +``` + +4. **Ray Scheduling**: Ray's scheduler places these actors on nodes with available resources +5. **Registration**: Each Spark executor actor registers with the Spark driver using Spark's RPC protocol +6. **Ready State**: Spark cluster is now running on Ray; Spark driver sees N executors available + +**Key Characteristics:** + +- **Actor Lifecycle**: Spark executors persist as long-lived Ray actors, maintaining state across tasks[^2] +- **Resource Isolation**: Ray guarantees each actor gets its requested CPU/memory allocation +- **Fault Tolerance**: If an actor (executor) fails, Ray can restart it; Spark driver reschedules failed tasks[^1] +- **Communication**: Executors communicate via Spark's shuffle service for data exchange during joins/aggregations[^1] + + +### Spark DataFrame to Ray Dataset Conversion + +The conversion from Spark DataFrame to Ray Dataset is the most critical data interchange operation in RayDP.[^4][^3][^1] + +#### Conversion Mechanism: `ray.data.from_spark(df)` + +**High-Level Flow:** + +```python +# User code +df = spark.range(0, 1000) +ray_ds = ray.data.from_spark(df) +``` + +**Internal Implementation (Default Non-Fault-Tolerant Mode):** + +The conversion uses Spark's `mapPartitions` combined with `ray.put()` to transfer data partition-by-partition into Ray's object store:[^5][^4][^1] + +```python +def from_spark(spark_df): + # Step 1: Convert Spark DataFrame partitions to Arrow format + # and put each partition into Ray's object store + + def partition_to_ray_object(partition_iterator): + """Runs on each Spark executor (Ray actor)""" + # Collect partition data + partition_data = list(partition_iterator) + + # Convert to Arrow Table (columnar format) + arrow_table = pyarrow.Table.from_pydict(partition_data) + + # Put into Ray's object store + # This stores data in local node's shared memory + object_ref = ray.put(arrow_table) + + # Return ObjectRef (not the data itself) + yield object_ref + + # Step 2: Apply function to all partitions + # This executes in Spark executors (Ray actors) + object_refs_rdd = spark_df.rdd.mapPartitions(partition_to_ray_object) + + # Step 3: Collect all ObjectRefs to driver + object_refs = object_refs_rdd.collect() + + # Step 4: Create Ray Dataset from ObjectRefs + # Ray Dataset now references data stored in object stores + return ray.data.from_arrow_refs(object_refs) +``` + +**Detailed Step-by-Step Flow:** + +Let's trace an example with 1000 rows partitioned into 4 Spark partitions: + +``` +Initial State: +Spark DataFrame (1000 rows, 4 partitions) +├─ Partition 0: rows 0-249 (on Ray Actor/Executor 1) +├─ Partition 1: rows 250-499 (on Ray Actor/Executor 2) +├─ Partition 2: rows 500-749 (on Ray Actor/Executor 3) +└─ Partition 3: rows 750-999 (on Ray Actor/Executor 4) +``` + +**Conversion Process:** + +**Phase 1: Partition-Level Conversion (Parallel on Executors)** + +*On Ray Node 1 (Executor 1 - Ray Actor):* + +``` +1. mapPartitions function receives partition 0 iterator +2. Collect rows 0-249 into memory (in JVM) +3. Convert to PyArrow Table: + - Serialization: Java objects → Arrow columnar format + - Uses Arrow IPC format for zero-copy between JVM and Python +4. Call ray.put(arrow_table): + - Arrow table is serialized using Arrow's format + - Data is written to Ray's Object Store (shared memory on Node 1) + - Object Store returns: ObjectRef(abc123) +5. Return ObjectRef(abc123) to Spark +``` + +*On Ray Node 2 (Executor 2):* + +``` +Same process for partition 1: + - Rows 250-499 → Arrow Table → ray.put() + - Returns: ObjectRef(def456) +``` + +*Similar for Nodes 3 and 4...* + +**Phase 2: Collect ObjectRefs to Driver** + +``` +Driver Process: +object_refs = [ + ObjectRef(abc123), # Points to data on Node 1's object store + ObjectRef(def456), # Points to data on Node 2's object store + ObjectRef(ghi789), # Points to data on Node 3's object store + ObjectRef(jkl012) # Points to data on Node 4's object store +] +``` + +**Important**: At this stage, **no actual data is transferred**—only lightweight ObjectRefs (essentially pointers) are collected to the driver.[^6][^7] + +**Phase 3: Create Ray Dataset** + +```python +ray_ds = ray.data.from_arrow_refs(object_refs) +``` + +Ray Dataset is constructed as a lazy, distributed collection of Arrow table references: + +``` +Ray Dataset Structure: +├─ Block 0: ObjectRef(abc123) @ Node 1 Object Store +├─ Block 1: ObjectRef(def456) @ Node 2 Object Store +├─ Block 2: ObjectRef(ghi789) @ Node 3 Object Store +└─ Block 3: ObjectRef(jkl012) @ Node 4 Object Store +``` + + +### Data Storage and Zero-Copy Mechanics + +**Where Data Lives After Conversion:** + +``` +┌─────────────────────────────────────────────────────────┐ +│ Ray Node 1 │ +│ ┌──────────────────────────────────────────────────┐ │ +│ │ Ray Actor (Spark Executor 1) │ │ +│ │ - JVM Memory: Spark partition 0 data (original) │ │ +│ └──────────────────────────────────────────────────┘ │ +│ ┌──────────────────────────────────────────────────┐ │ +│ │ Ray Object Store (Shared Memory) │ │ +│ │ ObjectRef(abc123): │ │ +│ │ Arrow Table (rows 0-249) │ │ +│ │ Format: Apache Arrow IPC │ │ +│ │ Size: ~X MB │ │ +│ └──────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────┘ +``` + +**Zero-Copy Properties:** + +1. **Within Same Node**: When Ray tasks on Node 1 access `ObjectRef(abc123)`, they memory-map the Arrow table directly from shared memory—no serialization or copying.[^8][^9][^6] +2. **Cross-Node Access**: If a Ray task on Node 2 needs `ObjectRef(abc123)`: + - Ray automatically replicates the object from Node 1's object store to Node 2's object store + - After replication, access is zero-copy within Node 2[^7][^10] +3. **Arrow Format Advantage**: Arrow's columnar format is self-describing and language-agnostic, enabling zero-copy reads across Python, C++, and Java without deserialization overhead.[^9][^6] + +### Ray Dataset to Spark DataFrame Conversion + +The reverse conversion (`ds.to_spark(spark)`) follows a different pattern:[^11][^12] + +**Method 1: Direct Conversion (when supported)** + +```python +ray_ds = ray.data.from_items([{"id": i} for i in range(1000)]) +spark_df = ray_ds.to_spark(spark) +``` + +**Internal Flow:** + +```python +def to_spark(ray_dataset, spark_session): + # Step 1: Write Ray Dataset to intermediate storage + # (either in-memory or temp files) + + # Option A: For small datasets - use Spark's createDataFrame + collected_data = ray.get(ray_dataset._block_refs) + arrow_tables = [block for block in collected_data] + + # Convert Arrow to Pandas (zero-copy where possible) + pandas_dfs = [table.to_pandas() for table in arrow_tables] + + # Spark createDataFrame from Pandas + return spark_session.createDataFrame(pd.concat(pandas_dfs)) +``` + +**Method 2: Intermediate Storage (for large datasets)**[^12][^11] + +```python +# Write Ray Dataset to Parquet/temp storage +ray_ds.write_parquet("/tmp/ray_output") + +# Read into Spark from storage +spark_df = spark.read.parquet("/tmp/ray_output") +``` + +This avoids driver memory bottlenecks for large datasets.[^11][^12] + +### Data Format and Serialization + +**Apache Arrow as Interchange Format:** + +RayDP relies heavily on Apache Arrow for efficient data interchange:[^13][^6][^9] + +- **Columnar Memory Layout**: Arrow stores data in contiguous memory columns, optimized for analytics +- **Zero-Copy Deserialization**: Arrow data can be read without parsing—processes map memory directly[^6][^9] +- **Language Interoperability**: Arrow format is understood by Java (Spark), Python (Ray/Pandas), and C++[^13][^6] +- **Shared Memory Friendly**: Arrow's immutable buffers are perfect for Ray's object store[^6] + +**Serialization Path:** + +``` +Spark DataFrame (JVM) + ↓ (Arrow IPC) +Arrow Table (Python/Spark Executor) + ↓ (ray.put - Arrow serialization) +Ray Object Store (Shared Memory) + ↓ (zero-copy memory map) +Ray Dataset Blocks (Arrow format) + ↓ (consumed by Ray Train/XGBoost) +ML Framework (PyTorch/TensorFlow tensors) +``` + + +### Performance Characteristics + +**Advantages of RayDP's Design:** + +1. **Minimal Data Movement**: Data stays in Arrow format from Spark through Ray to ML frameworks[^14][^6] +2. **Locality-Aware**: Data partitions remain on the same nodes where Spark executors (Ray actors) ran[^14][^2] +3. **Parallel Transfer**: All partitions convert to Ray format simultaneously across executors[^1] +4. **Memory Efficiency**: Object store uses shared memory; multiple Ray tasks can read the same data without copies[^6] + +**Limitations:** + +1. **Not Fault-Tolerant by Default**: Objects created with `ray.put()` are not recoverable—if a node fails, data is lost[^4][^1] +2. **Memory Pressure**: Large DataFrames can fill object store; Ray spills to disk but adds latency[^1] +3. **Dependency on Spark**: Ray Dataset becomes invalid after `raydp.stop_spark()` unless using `cleanup_data=False`[^3][^1] + +### Fault-Tolerant Conversion (Experimental) + +RayDP offers an experimental fault-tolerant mode using `from_spark_recoverable()`:[^1] + +```python +spark = raydp.init_spark(..., fault_tolerance_mode=True) +df = spark.range(100000) +ds = raydp.spark.from_spark_recoverable(df) +``` + +**How It Works:** + +- Persists Spark DataFrame to Spark's storage (memory/disk) with configurable `storage_level` +- Conversion creates Ray tasks (not just `ray.put`) that can re-read from Spark if data is lost +- Ray maintains lineage: if object store loses data, Ray re-executes the conversion task from persisted Spark data[^1] + +This adds overhead but enables recovery from node failures during ML training.[^1] + +### Summary + +RayDP's architecture elegantly unifies Spark and Ray by treating Spark executors as Ray actors and using Ray's object store as a high-performance data interchange layer. The `mapPartitions` + `ray.put()` mechanism combined with Apache Arrow's zero-copy format enables efficient, parallel data conversion with minimal overhead, making it practical to build end-to-end ETL-to-ML pipelines in a single Python program.[^2][^3][^6][^1] +[^15][^16][^17][^18][^19][^20][^21][^22][^23][^24][^25][^26][^27][^28][^29][^30][^31][^32][^33][^34] + +
+ +[^1]: https://github.com/oap-project/raydp + +[^2]: https://www.samsara.com/blog/building-a-modern-machine-learning-platform-with-ray + +[^3]: https://pypi.org/project/raydp/0.5.0/ + +[^4]: https://pypi.org/project/raydp-nightly/2021.7.21.dev0/ + +[^5]: https://blog.csdn.net/weixin_45681127/article/details/119916961 + +[^6]: https://arrow.apache.org/blog/2017/10/15/fast-python-serialization-with-ray-and-arrow/ + +[^7]: https://stackoverflow.com/questions/58082023/how-exactly-does-ray-share-data-to-workers + +[^8]: https://www.reddit.com/r/datascience/comments/ptmxr1/how_does_the_arrow_package_achieve_zerocopy/ + +[^9]: https://www.datacamp.com/tutorial/apache-arrow + +[^10]: https://sands.kaust.edu.sa/classes/CS345/S19/papers/ray.pdf + +[^11]: https://learn.microsoft.com/en-us/azure/databricks/machine-learning/ray/connect-spark-ray + +[^12]: https://docs.databricks.com/aws/en/machine-learning/ray/connect-spark-ray + +[^13]: https://arrow.apache.org/docs/python/interchange_protocol.html + +[^14]: https://community.intel.com/t5/Blogs/Tech-Innovation/Cloud/Intel-Optimization-for-XGBoost-on-Ray-with-RayDP-Delivers-Better/post/1535578 + +[^15]: https://docs.databricks.com/aws/en/machine-learning/ray/ + +[^16]: https://www.reddit.com/r/dataengineering/comments/1jjzl1u/need_help_optimizing_35tb_pyspark_job_on_ray/ + +[^17]: https://stackoverflow.com/questions/73973111/using-ray-to-transfer-data-from-spark-to-ray-datasets + +[^18]: https://www.thomasjpfan.com/2023/05/accessing-data-from-pythons-dataframe-interchange-protocol/ + +[^19]: https://cloud.google.com/vertex-ai/docs/open-source/ray-on-vertex-ai/run-spark-on-ray + +[^20]: https://www.anyscale.com/blog/data-processing-support-in-ray + +[^21]: https://data-apis.org/dataframe-protocol/latest/purpose_and_scope.html + +[^22]: https://www.shakudo.io/blog/apache-spark-intro-on-shakudo + +[^23]: https://speakerdeck.com/anyscale/raydp-build-large-scale-end-to-end-data-analytics-and-ai-pipelines-using-spark-and-ray-carson-wang-intel-data-analytics-software-group + +[^24]: https://www.anyscale.com/events/raydp-build-large-scale-end-to-end-data-analytics-and-ai-pipelines-using-spark-and-ray + +[^25]: https://github.com/ray-project/ray/issues/32313 + +[^26]: https://stackoverflow.com/questions/65509407/apache-arrow-getting-vectors-from-java-in-python-with-zero-copy + +[^27]: https://arxiv.org/abs/2404.03030 + +[^28]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapPartitions.html + +[^29]: https://discourse.julialang.org/t/how-well-apache-arrow-s-zero-copy-methodology-is-supported/59797 + +[^30]: https://spark.apache.org/docs/latest/rdd-programming-guide.html + +[^31]: https://github.com/oap-project/raydp/issues/409 + +[^32]: https://docs.databricks.com/aws/en/machine-learning/ray/spark-ray-overview + +[^33]: https://github.com/ray-project/ray/issues/31455 + +[^34]: https://learn.microsoft.com/en-us/azure/databricks/machine-learning/ray/spark-ray-overview + From 27d89fb2104954d36326ceb1013d7f22ea3b9fad Mon Sep 17 00:00:00 2001 From: sudhir Date: Sat, 25 Oct 2025 15:59:30 +0800 Subject: [PATCH 2/2] Ray Execution and Data Flow for Sample Code --- Ray Execution and Data Flow.md | 350 +++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 Ray Execution and Data Flow.md diff --git a/Ray Execution and Data Flow.md b/Ray Execution and Data Flow.md new file mode 100644 index 00000000..2f2ea016 --- /dev/null +++ b/Ray Execution and Data Flow.md @@ -0,0 +1,350 @@ +# Ray Execution & Data Flow + +Detailed walkthrough using a concrete example showing internal control and data flow in Ray. + +## Simple Ray Example with Internal Flow Explained + +Distributed machine learning inference example using Ray and trace exactly what happens internally.[^1][^2][^3] + +### The Example Code + +```python +import ray +import pandas as pd + +# Initialize Ray +ray.init() + +# Define a simple model +def load_trained_model(): + def model(batch: pd.DataFrame) -> pd.DataFrame: + # Predict tip based on passenger count + predict = batch["passenger_count"] >= 2 + return pd.DataFrame({"score": predict}) + return model + +# Define an Actor for batch prediction +@ray.remote +class BatchPredictor: + def __init__(self, model): + self.model = model + self.prediction_count = 0 + + def predict(self, data_path: str): + # Read data and make predictions + df = pd.read_parquet(data_path) + result = self.model(df) + self.prediction_count += 1 + return result, self.prediction_count + +# Create model and store in object store +model = load_trained_model() +model_ref = ray.put(model) + +# Create 3 actor instances +actors = [BatchPredictor.remote(model_ref) for _ in range(3)] + +# Process data files in parallel +input_files = ["data1.parquet", "data2.parquet", "data3.parquet"] +futures = [actors[i].predict.remote(input_files[i]) for i in range(3)] + +# Get results +results = ray.get(futures) +``` + + +### Internal Control Flow Step-by-Step + +#### **Phase 1: Initialization (`ray.init()`)** + +When `ray.init()` is called, several processes start on your machine:[^4][^5][^2] + +1. **Raylet process** (one per node): Handles local task scheduling and manages the local object store +2. **Worker processes** (typically one per CPU core): Execute tasks and actor methods +3. **Global Control Store (GCS)**: Stores cluster metadata, object locations, and actor registry +4. **Object Store** (one per node): Shared memory region using Apache Arrow/Plasma for zero-copy data sharing[^4][^1] + +**What's stored where:** + +- GCS: Empty initially, will track all actors and objects +- Object Store: Empty +- Workers: Idle, waiting for tasks + + +#### **Phase 2: Model Creation (`ray.put(model)`)** + +```python +model_ref = ray.put(model) +``` + +**Control Flow:** + +1. Driver process serializes the model function using pickle +2. Driver writes serialized model to **local Object Store** shared memory +3. Object Store returns an `ObjectRef` (e.g., `ObjectRef(a1b2c3d4...)`) +4. Object Store registers this object's location with **GCS** + +**Data Storage:** + +- **Object Store (Driver Node)**: Contains the serialized model (~few KB) +- **GCS Metadata**: Records `{object_id: a1b2c3d4, location: driver_node, size: 2KB, ref_count: 1}` +- **Driver Memory**: Holds `model_ref` variable pointing to object + + +#### **Phase 3: Actor Creation (`BatchPredictor.remote(model_ref)`)** + +```python +actors = [BatchPredictor.remote(model_ref) for _ in range(3)] +``` + +**Control Flow for EACH actor:** + +1. Driver submits actor creation request to **Local Scheduler (Raylet)** +2. Local Scheduler checks local resources; if sufficient, assigns to a local worker; otherwise forwards to **Global Scheduler**[^2][^4] +3. Scheduler selects Worker Process \#1 for Actor 0 +4. Worker \#1 receives: + - Actor class definition (`BatchPredictor`) + - Constructor arguments (`model_ref`) +5. Worker \#1 checks if it has `model_ref` locally: + - **NO** → Queries **GCS** for object location + - GCS returns: "driver_node's object store" + - Worker \#1's object store **replicates** model from driver's object store via shared memory (zero-copy on same node)[^6][^1] +6. Worker \#1 deserializes model and calls `__init__(model)` +7. Actor instance is created with: + - `self.model = model` (in Worker \#1's heap memory) + - `self.prediction_count = 0` (in Worker \#1's heap memory) +8. Worker \#1 registers actor handle with **GCS** +9. Driver receives `ActorHandle` (e.g., `Actor(BatchPredictor, id=xyz)`) + +**After creating 3 actors:** + +**Actor Distribution:** + +- Worker \#1: Hosts `Actor 0` +- Worker \#2: Hosts `Actor 1` +- Worker \#3: Hosts `Actor 2` + +**Actor State Storage (in each worker's process memory, NOT object store):** + +``` +Worker #1 (Actor 0): + - self.model = (deserialized from object store) + - self.prediction_count = 0 + +Worker #2 (Actor 1): + - self.model = + - self.prediction_count = 0 + +Worker #3 (Actor 2): + - self.model = + - self.prediction_count = 0 +``` + +**GCS Registry:** + +``` +Actor Registry: + - actor_id: xyz_0, location: worker_1, class: BatchPredictor + - actor_id: xyz_1, location: worker_2, class: BatchPredictor + - actor_id: xyz_2, location: worker_3, class: BatchPredictor +``` + +**Object Store:** + +- Still contains the model object (ref_count now = 4: driver + 3 actors) + + +#### **Phase 4: Method Invocation (`predict.remote()`)** + +```python +futures = [actors[i].predict.remote(input_files[i]) for i in range(3)] +``` + +**Control Flow for Actor 0's `predict.remote("data1.parquet")`:** + +1. Driver sends task to **Raylet**: "Execute `predict` method on Actor 0 with argument `'data1.parquet'`" +2. Raylet looks up Actor 0's location in **GCS** → Worker \#1 +3. Raylet schedules task on Worker \#1's task queue (actors process methods serially)[^7][^3][^2] +4. Worker \#1 executes: + +```python +df = pd.read_parquet("data1.parquet") # Loads into Worker #1's heap +result = self.model(df) # Computation in Worker #1's heap +self.prediction_count += 1 # Updates actor state (heap) +return result, self.prediction_count +``` + +5. Worker \#1 checks result size: + - **Small result** (<100KB): Sends directly back to driver's memory + - **Large result** (>100KB): Stores in Worker \#1's **Object Store** and returns `ObjectRef` +6. In this case, let's say result is large → stored in Object Store +7. Worker \#1's Object Store: + - Writes result DataFrame to shared memory + - Registers with **GCS**: `{object_id: result_0, location: worker_1, size: 5MB}` +8. Worker \#1 returns `ObjectRef(result_0)` to driver +9. Driver receives the `ObjectRef` (this is the "future") + +**Parallel Execution:** +All 3 actors execute their `predict` methods simultaneously on different workers:[^3] + +``` +Time 0: All 3 actors start predict() in parallel +├─ Worker #1 (Actor 0): predict("data1.parquet") +├─ Worker #2 (Actor 1): predict("data2.parquet") +└─ Worker #3 (Actor 2): predict("data3.parquet") + +Time T: All complete and store results +``` + +**Actor State After Execution:** + +``` +Worker #1 (Actor 0): + - self.model = + - self.prediction_count = 1 ← UPDATED + +Worker #2 (Actor 1): + - self.model = + - self.prediction_count = 1 ← UPDATED + +Worker #3 (Actor 2): + - self.model = + - self.prediction_count = 1 ← UPDATED +``` + +**Object Store Contents:** + +``` +Driver Node Object Store: + - model (original, 2KB) + - result_0 (5MB) - if Actor 0 on same node as driver + - result_1 (5MB) - if Actor 1 on same node as driver + - result_2 (5MB) - if Actor 2 on same node as driver +``` + + +#### **Phase 5: Result Retrieval (`ray.get(futures)`)** + +```python +results = ray.get(futures) +``` + +**Control Flow:** + +1. Driver calls `ray.get([ObjectRef(result_0), ObjectRef(result_1), ObjectRef(result_2)])` +2. For each ObjectRef, driver checks **local Object Store**: + - If result is local → Deserialize via zero-copy memory mapping[^1] + - If result is remote → Query **GCS** for location, then fetch from remote worker's Object Store[^8][^6] +3. Assuming all workers are on same node (single machine): + - Driver's process **memory-maps** each result from Object Store (zero-copy) +4. Driver deserializes the 3 DataFrames into its local Python variables +5. Returns list: `[(df1, 1), (df2, 1), (df3, 1)]` + +**Data Transfer Summary:** + +- **Same node**: Zero-copy via shared memory mapping (~200 Mbps effective)[^9] +- **Different nodes**: TCP transfer from remote object store to local object store, then zero-copy locally[^6][^9] + + +### Key Insights on Actor State vs Object Store + +**What's Stored in Actor's Process Memory (Heap):** + +- Actor instance variables (`self.model`, `self.prediction_count`) +- Local variables during method execution +- Thread/async state if using concurrent actors + +**What's Stored in Object Store:** + +- Large task/method return values (>100KB) +- Objects explicitly put with `ray.put()` +- Arguments passed between tasks that need to be shared + +**Critical Distinction:** + +- Actor state is **mutable** and lives in worker's heap +- Object store data is **immutable** and lives in shared memory +- Actors cannot directly access each other's state; they must communicate via object store[^2][^1] + + +### Visual Summary of Data Flow + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Driver Process │ +│ model_ref, actors[0-2], futures[0-2] │ +└────────────┬────────────────────────────────────────────────┘ + │ + ├─── ray.put(model) ───────────────┐ + │ ▼ + │ ┌──────────────────────────┐ + │ │ Object Store (Shared) │ + │ │ • model (2KB) │ + │ │ • result_0 (5MB) │ + │ │ • result_1 (5MB) │ + │ │ • result_2 (5MB) │ + │ └──────────────────────────┘ + │ ▲ + ├─── BatchPredictor.remote() ──────┤ + │ │ + ┌────────┴────────┬─────────────────┬──────┴───────┐ + ▼ ▼ ▼ ▼ +┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐ +│Worker #1│ │Worker #2│ │Worker #3│ │ GCS │ +│Actor 0 │ │Actor 1 │ │Actor 2 │ │(metadata)│ +│ │ │ │ │ │ └──────────┘ +│ Heap: │ │ Heap: │ │ Heap: │ +│ model │ │ model │ │ model │ +│ count=1│ │ count=1│ │ count=1│ +└─────────┘ └─────────┘ └─────────┘ +``` + +This architecture allows Ray to efficiently distribute computation while managing both stateful actors and immutable shared data, providing the foundation for scalable ML and distributed applications.[^3][^1][^2] +[^10][^11][^12][^13][^14][^15][^16][^17][^18][^19][^20][^21][^22] + +
+ +[^1]: https://maxpumperla.com/learning_ray/ch_02_ray_core/ + +[^2]: https://rise.cs.berkeley.edu/blog/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray/ + +[^3]: https://www.anyscale.com/blog/model-batch-inference-in-ray-actors-actorpool-and-datasets + +[^4]: http://ray-robert.readthedocs.io/en/latest/tutorial.html + +[^5]: https://blog.devops.dev/mastering-ray-a-beginners-guide-to-distributed-python-workloads-7d4af4ef341f + +[^6]: https://stackoverflow.com/questions/58082023/how-exactly-does-ray-share-data-to-workers + +[^7]: https://ray-project.github.io/q4-2021-docs-hackathon/0.4/ray-examples/ray-crash-course/02-Ray-Actors/ + +[^8]: https://sands.kaust.edu.sa/classes/CS345/S19/papers/ray.pdf + +[^9]: https://www.telesens.co/2022/04/23/data-transfer-speed-comparison-ray-plasma-store-vs-s3/ + +[^10]: https://www.anyscale.com/blog/writing-your-first-distributed-python-application-with-ray + +[^11]: https://www.datacamp.com/tutorial/distributed-processing-using-ray-framework-in-python + +[^12]: https://ray-project.github.io/q4-2021-docs-hackathon/0.2/ray-distributed-compute/getting-started/ + +[^13]: https://ray-project.github.io/q4-2021-docs-hackathon/0.4/ray-core/key-concepts/ + +[^14]: https://domino.ai/blog/ray-tutorial-for-accessing-clusters + +[^15]: https://web.engr.oregonstate.edu/~afern/distributed-AI-labs/lab1/ray_tutorial.py + +[^16]: https://stackoverflow.com/questions/66893318/how-to-clear-objects-from-the-object-store-in-ray + +[^17]: https://colab.research.google.com/github/ray-project/tutorial/blob/master/exercises/colab04-05.ipynb + +[^18]: https://github.com/ray-project/ray/issues/51173 + +[^19]: https://stackoverflow.com/questions/74334814/does-an-instance-of-a-ray-actor-run-in-only-one-process + +[^20]: https://github.com/ray-project/ray/issues/15058 + +[^21]: https://rise.cs.berkeley.edu/blog/ray-tips-for-first-time-users/ + +[^22]: https://docs.datadoghq.com/integrations/ray/ +