Skip to content

Conversation

@Zyiqin-Miranda
Copy link
Member

Summary

Problem: When deduplicating large datasets using a pre-defined hash bucket count, we encountered critical scalability challenges where individual hash buckets could grow beyond the memory capacity of a single Ray worker node, causing Out-Of-Memory (OOM) failures when a bucket's data exceeded the available RAM. Additionally, we observed severe performance degradation in PyArrow's group_by operation, which would become unresponsive and effectively hang when processing buckets containing more than 200 million records.

Solution: The sub hash bucket feature partitions bucketed data into smaller, memory-manageable sub hash bucket chunks.

Rationale

Explain the reasoning behind the changes and their benefits to the project.

Changes

┌─────────────────────────────────────────────────────────────────────────┐
│                    CONVERT TASK (Main Orchestrator)                       │
│                                                                           │
│  1. Receives input files to process                                      │
│  2. Checks if sub-bucketing is needed (memory threshold check)           │
│     └─> Is total_memory > sub_bucket_threshold?                          │
│         ├─> NO: Use regular dedupe_data_files() [Single Process]         │
│         └─> YES: Use sub-bucketing workflow ↓                            │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ↓
┌─────────────────────────────────────────────────────────────────────────┐
│              STEP 1: HASH BUCKETING (Ray Remote Tasks)                   │
│                                                                           │
│  For each input file:                                                    │
│    ┌──────────────────────────────────────────────────────────┐         │
│    │  hash_bucket.remote(files)                                │         │
│    │                                                           │         │
│    │  1. Download file data                                   │         │
│    │  2. Compute hash(identifier_columns) for each record     │         │
│    │  3. Assign record to bucket: hash % num_hash_buckets     │         │
│    │  4. Store bucketed data in Ray Object Store              │         │
│    │  5. Return object_ref_id (serialized string)             │         │
│    └──────────────────────────────────────────────────────────┘         │
│                                                                           │
│  Output: Dictionary mapping bucket_num → [object_ref_ids]                │
│          {                                                                │
│            0: ["ref_1_bucket_0", "ref_2_bucket_0"],                      │
│            1: ["ref_1_bucket_1", "ref_2_bucket_1"],                      │
│            ...                                                            │
│            N-1: ["ref_1_bucket_N", "ref_2_bucket_N"]                     │
│          }                                                                │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ↓
┌─────────────────────────────────────────────────────────────────────────┐
│         STEP 2: PARALLEL DEDUPLICATION (Ray Remote Tasks)                │
│                                                                           │
│  For each bucket (0 to N-1) in parallel:                                │
│    ┌──────────────────────────────────────────────────────────┐         │
│    │  dedupe_sub_hash_bucket.remote(bucket_num)               │         │
│    │                                                           │         │
│    │  1. Retrieve data from Ray Object Store using refs       │         │
│    │  2. Concatenate all data for this bucket                 │         │
│    │  3. Deduplicate within bucket:                           │         │
│    │     • Group by identifier_columns_hash                   │         │
│    │     • Keep record with max(global_record_idx)            │         │
│    │     • Mark others for deletion                           │         │
│    │  4. Write position delete files to S3                    │         │
│    │  5. Return (pos_delete_files, count, memory_used)        │         │
│    └──────────────────────────────────────────────────────────┘         │
│                                                                           │
│  Each bucket processes independently - no cross-bucket dependencies!     │
│  Why? Hash guarantees duplicates are in same bucket                      │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ↓
┌─────────────────────────────────────────────────────────────────────────┐
│            STEP 3: AGGREGATION (Back to Main Task)                       │
│                                                                           │
│  Collect results from all buckets:                                       │
│    • Aggregate position_delete_files from all buckets                    │
│    • Sum position_delete_record_counts                                   │
│    • Track memory usage statistics                                       │
│    • Return to converter session for Iceberg commit                      │
└─────────────────────────────────────────────────────────────────────────┘

Impact

Discuss any potential impacts the changes may have on existing functionalities.

Testing

Describe how the changes have been tested, including both automated and manual testing strategies.
If this is a bugfix, explain how the fix has been tested to ensure the bug is resolved without introducing new issues.

Regression Risk

If this is a bugfix, assess the risk of regression caused by this fix and steps taken to mitigate it.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

Additional Notes

Any additional information or context relevant to this PR.

@pdames
Copy link
Member

pdames commented Dec 23, 2025

LGTM with the latest code cleanup, and updates to use the pyarrow filesystem for unit tests. Thanks!!

@pdames pdames merged commit 3bac4f6 into ray-project:2.0 Dec 23, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants