diff --git a/.github/workflows/clippy.yml b/.github/workflows/clippy.yml new file mode 100644 index 0000000..e693886 --- /dev/null +++ b/.github/workflows/clippy.yml @@ -0,0 +1,19 @@ +name: rust_checks +on: + - pull_request +jobs: + rust-syntax-style-format-and-integration: + runs-on: ubuntu-latest + env: + CARGO_TERM_COLOR: always + steps: + - uses: actions/checkout@v4 + - name: Install Rust + components + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + toolchain: 1.91.1 + components: rustfmt,clippy + - name: Run syntax and style tests + run: cargo clippy --all-targets -- -D warnings + - name: Run format test + run: cargo fmt --check diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..56f3d71 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,26 @@ +{ + "[markdown]": { + "editor.defaultFormatter": null + }, + "editor.formatOnPaste": false, + "editor.formatOnSave": true, + "editor.rulers": [ + 100 + ], + "files.autoSave": "off", + "files.insertFinalNewline": true, + "gitlens.showWhatsNewAfterUpgrades": false, + "lldb.consoleMode": "evaluate", + "rust-analyzer.check.command": "clippy", + "rust-analyzer.checkOnSave": true, + "rust-analyzer.runnables.extraTestBinaryArgs": [ + "--nocapture" + ], + "rust-analyzer.rustfmt.extraArgs": [ + "--config", + "max_width=100" + ], + "notebook.formatOnSave.enabled": true, + "notebook.output.scrolling": true, + "python.terminal.activateEnvironment": false +} diff --git a/Cargo.toml b/Cargo.toml index e97d73e..4e9e61a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,34 @@ [package] name = "starfix" -version = "0.1.3" -edition = "2024" +version = "0.0.2" +edition = "2021" +description = "Package for hashing Arrow's data structures uniquely for identifying and comparing data efficiently." +authors = ["synicix "] +readme = "README.md" +repository = "https://github.com/nauticalab/starfix" +license = "MIT OR Apache-2.0" +keywords = ["arrow", "hashing"] +categories = ["algorithms"] [dependencies] -arrow = { version = "56.0.0", features = ["ffi"] } -arrow-digest = "56.0.0" +arrow = { version = "57.0.0", features = ["ffi"] } +arrow-schema = { version = "57.0.0", features = ["serde"] } +bitvec = "1.0.1" +digest = "0.10.7" +indoc = "2.0.7" + +postcard = "1.1.3" + +serde = "1.0.228" +serde_json = "1.0.145" sha2 = "0.10.9" # automated CFFI + bindings in other languages -uniffi = { version = "0.29.4", features = ["cli", "tokio"] } +uniffi = { version = "0.30.0", features = ["cli", "tokio"] } + +[dev-dependencies] +hex = "0.4.3" +pretty_assertions = "1.4.1" + [[bin]] name = "uniffi-bindgen" @@ -22,3 +42,55 @@ crate-type = ["rlib", "cdylib"] [package.metadata.release] publish = false + + +[lints.clippy] +cargo = "deny" +complexity = "deny" +correctness = "deny" +nursery = "deny" +pedantic = "deny" +perf = "deny" +restriction = "deny" +style = "deny" +suspicious = "deny" + +min_ident_chars = { level = "allow", priority = 127 } # allow for variables that is one char +arbitrary_source_item_ordering = { level = "allow", priority = 127 } # allow arbitrary ordering to keep relevant code nearby +as_conversions = { level = "allow", priority = 127 } # allow casting +blanket_clippy_restriction_lints = { level = "allow", priority = 127 } # allow setting all restrictions so we can omit specific ones +default_numeric_fallback = { level = "allow", priority = 127 } # allow type inferred by numeric literal, detection is buggy +disallowed_script_idents = { level = "allow", priority = 127 } # skip since we use only ascii +exhaustive_enums = { level = "allow", priority = 127 } # remove requirement to label enum as exhaustive +exhaustive_structs = { level = "allow", priority = 127 } # revisit once lib is ready to be used externally +field_scoped_visibility_modifiers = { level = "allow", priority = 127 } # allow field-level visibility modifiers +float_arithmetic = { level = "allow", priority = 127 } # allow float arithmetic +impl_trait_in_params = { level = "allow", priority = 127 } # impl in params ok +implicit_return = { level = "allow", priority = 127 } # missing return ok +iter_over_hash_type = { level = "allow", priority = 127 } # allow iterating over unordered iterables like `HashMap` +little_endian_bytes = { level = "allow", priority = 127 } # allow to_le_bytes / from_le_bytes +missing_docs_in_private_items = { level = "allow", priority = 127 } # missing docs on private ok +missing_inline_in_public_items = { level = "allow", priority = 127 } # let rust compiler determine best inline logic +missing_trait_methods = { level = "allow", priority = 127 } # allow in favor of rustc `implement the missing item` +module_name_repetitions = { level = "allow", priority = 127 } # allow use of module name in type names +multiple_crate_versions = { level = "allow", priority = 127 } # allow since list of exceptions changes frequently from external +multiple_inherent_impl = { level = "allow", priority = 127 } # required in best practice to limit exposure over UniFFI +must_use_candidate = { level = "allow", priority = 127 } # omitting #[must_use] ok +mod_module_files = { level = "allow", priority = 127 } # mod directories ok +non_ascii_literal = { level = "allow", priority = 127 } # non-ascii char in string literal ok +partial_pub_fields = { level = "allow", priority = 127 } # partial struct pub fields ok +pattern_type_mismatch = { level = "allow", priority = 127 } # allow in favor of clippy::ref_patterns +print_stderr = { level = "allow", priority = 127 } # stderr prints ok +print_stdout = { level = "allow", priority = 127 } # stdout prints ok +pub_use = { level = "allow", priority = 127 } # ok to structure source into many files but clean up import +pub_with_shorthand = { level = "allow", priority = 127 } # allow use of pub(super) +question_mark_used = { level = "allow", priority = 127 } # allow question operator +self_named_module_files = { level = "allow", priority = 127 } # mod files ok +separated_literal_suffix = { level = "allow", priority = 127 } # literal suffixes should be separated by underscore +single_call_fn = { level = "allow", priority = 127 } # allow functions called only once, which allows better code organization +single_char_lifetime_names = { level = "allow", priority = 127 } # single char lifetimes ok +std_instead_of_alloc = { level = "allow", priority = 127 } # we should use std when possible +std_instead_of_core = { level = "allow", priority = 127 } # we should use std when possible +string_add = { level = "allow", priority = 127 } # simple concat ok +use_debug = { level = "warn", priority = 127 } # debug print +wildcard_enum_match_arm = { level = "allow", priority = 127 } # allow wildcard match arm in enums diff --git a/README.md b/README.md index 9abcf18..e7a9541 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,488 @@ -# Overview -Hashing Lib for Arrow Data Tables using C_Pointers with Arrow_Digest from: https://github.com/kamu-data/arrow-digest +# StarFix: Arrow Data Deterministic Hashing -# Usage -The repo is setup to use dev containers of VSCode. After starting up the container and connecting to it the process to install the rust lib and a python package is: -```maturin develop --uv``` +## Overview -NOTE: After every code edit in rust code, you will need to rerun the command to rebuild it then restart the kernel in the Jupyter Notebook side \ No newline at end of file +StarFix is a cryptographic hashing library for Apache Arrow data tables. It provides a deterministic way to compute unique digests for Arrow data structures, enabling efficient identification and comparison of data regardless of storage order or location. + +The hashing system is built on top of SHA-256 (configurable to other digest algorithms via the `Digest` trait) and uses a hierarchical approach to hash different components of an Arrow table: schema metadata and field values. + +## Core Architecture + +### Main Components + +The hashing system consists of three main hashing levels: + +1. **Schema Digest** - Hash of the table schema (field names, types, and nullability) +2. **Field Digests** - Individual hashes for each field's data +3. **Final Digest** - Combined hash from schema + all field digests + +### DigestBufferType Enum + +The codebase uses a `DigestBufferType` enum to differentiate between nullable and non-nullable fields: + +```rust +enum DigestBufferType { + NonNullable(D), // Just the data digest + Nullable(BitVec, D), // Null bits vector + data digest +} +``` + +This separation is crucial because nullable and non-nullable fields must be hashed differently to ensure data integrity and distinguish between actual nulls and missing data. + +## Hashing Flow + +### Record Batch Hashing + +When hashing a complete `RecordBatch`, the process follows these steps: + +``` +1. Create ArrowDigester with schema + ├─ Hash the schema (JSON serialized) + └─ Initialize field digest buffers + └─ Flatten nested struct fields with "/" delimiter + └─ Mark each field as Nullable or NonNullable + +2. Update with record batch data + ├─ For each field: + │ └─ Match on data type and call appropriate hashing function + │ └─ Update both null bits (if nullable) and data digest + └─ Accumulate all digests + +3. Finalize + ├─ Combine schema digest + ├─ Process each field digest in alphabetical order + │ ├─ If nullable: hash (null_bits.len + raw_null_bits + data) + │ └─ If non-nullable: hash data only + └─ Return final digest +``` + +### Direct Array Hashing + +Arrays can also be hashed independently without schema context: + +``` +1. Hash the data type metadata (JSON serialized) +2. Initialize digest buffer based on array nullability +3. Call array_digest_update with appropriate handler +4. Finalize and combine digests +``` + +## Null Bits Handling + +### Why Null Bits Matter + +Null bits are essential to the hashing algorithm because: +- They distinguish between actual null values and valid data +- They enable reliable hashing of nullable vs non-nullable fields +- They preserve data integrity across different representations + +### Null Bits Processing + +For nullable fields, the system maintains a `BitVec` (bitvector) where each bit represents whether a value at that index is valid (`true`) or null (`false`). + +#### Processing Steps: + +1. **If null buffer exists:** + ``` + - Iterate through each element + - Set bit to true if value is valid + - Set bit to false if value is null + - For data digest: only hash valid values + - For null values: hash the NULL_BYTES constant (b"NULL") + ``` + +2. **If no null buffer (all values valid):** + ``` + - Extend bitvector with all true values (one per element) + - Hash all data normally + ``` + +### Finalization of Nullable Fields + +When finalizing a nullable field digest: + +```rust +final_digest.update(null_bits.len().to_le_bytes()); // Size of bitvector +for &word in null_bits.as_raw_slice() { + final_digest.update(word.to_be_bytes()); // Actual null bits +} +final_digest.update(data_digest.finalize()); // Data values +``` + +This ensures the null bit pattern is part of the final hash, making nullable arrays with actual nulls hash differently from arrays without nulls. + + +### Nullable Array with No Null Values + +As demonstrated in the `nullable_vs_non_nullable_array_produces_same_hash` test in `/tests/arrow_digester.rs`: + +When an Arrow array is created with a nullable type but contains no actual null values, Arrow optimizes the internal representation by removing the null buffer. This means the **hasher treats the array identically to a non-nullable array, producing the same hash result.** + + +## Supported Data Types + +### Fixed-Size Types + +These types have consistent byte widths and can be hashed directly: + +| Data Type | Size | Handling | +|-----------|------|----------| +| Boolean | Variable | Bit-packed into bytes | +| Int8, UInt8 | 1 byte | Direct buffer hashing | +| Int16, UInt16, Float16 | 2 bytes | Direct buffer hashing | +| Int32, UInt32, Float32, Date32 | 4 bytes | Direct buffer hashing | +| Int64, UInt64, Float64, Date64 | 8 bytes | Direct buffer hashing | +| Decimal32 | 4 bytes | Direct buffer hashing | +| Decimal64 | 8 bytes | Direct buffer hashing | +| Decimal128 | 16 bytes | Direct buffer hashing | +| Decimal256 | 32 bytes | Direct buffer hashing | +| Time32 | 4 bytes | Direct buffer hashing | +| Time64 | 8 bytes | Direct buffer hashing | + +**Hashing Strategy:** +- Get the data buffer from Arrow array +- Account for array offset +- For non-nullable: hash the entire slice directly +- For nullable: iterate element by element, skipping null values + +### Boolean Type + +Booleans receive special handling because Arrow stores them as bit-packed values (1 bit per value): + +```rust +// For non-nullable: +- Extract each boolean value +- Pack into BitVec using MSB0 ordering +- Hash the raw bytes + +// For nullable: +- Handle null bits (as described above) +- Pack only valid boolean values +- Hash the packed bytes +``` + +### Variable-Length Types + +#### Binary Arrays + +Binary data (raw byte sequences) must include length prefixes to prevent collisions: + +``` +For each element: + - Hash: value.len().to_le_bytes() // Length prefix + - Hash: value.as_slice() // Actual data +``` + +**Example collision prevention:** +- Without prefix: `[0x01, 0x02]` + `[0x03]` = `[0x01, 0x02, 0x03]` +- With prefix: `len=2, 0x01, 0x02, len=1, 0x03` (different!) + +#### String Arrays + +Strings are similar to binary but UTF-8 encoded: + +``` +For each element: + - Hash: (value.len() as u32).to_le_bytes() // Length as u32 + - Hash: value.as_bytes() // UTF-8 data +``` + +#### List Arrays + +Lists/Array types recursively hash their nested values: + +``` +For each list element: + - Recursively call array_digest_update + - Use the inner field's data type + - Skip null list entries +``` + +## Schema Handling + +### Schema Flattening + +Nested struct fields are flattened into a single-level map using the `/` delimiter: + +``` +Original schema: + person (struct) + ├─ name (string) + └─ address (struct) + ├─ street (string) + └─ zip (int32) + +Flattened: + person/name + person/address/street + person/address/zip +``` + +### Schema Serialization + +The schema is serialized as a JSON string containing: +- Field names +- Field types (as DataType serialization) +- Nullability flags + +```rust +{ + "address/street": ("string", Utf8), + "address/zip": ("int32", Int32), + "name": ("string", Utf8) +} +``` + +Fields are stored in a `BTreeMap` to ensure **consistent alphabetical ordering**, which is critical for deterministic hashing. + +### Schema Hash Inclusion + +The schema digest is always the first component hashed into the final digest. This ensures that changes to schema structure produce different hashes, preventing false collisions. + +## Collision Prevention + +The hashing algorithm includes multiple safeguards against collisions: + +### 1. Length Prefixes (Variable-Length Types) + +Binary and string arrays include length prefixes to prevent merging boundaries: + +``` +Array1: ["ab", "c"] → len=2, "ab", len=1, "c" +Array2: ["a", "bc"] → len=1, "a", len=2, "bc" +Result: Different hashes! ✓ +``` + +### 2. Null Bit Vectors (Nullable Fields) + +Distinguishes between actual nulls and non-nullable fields: + +``` +NonNullable [1, 2, 3] → Only data hash +Nullable [1, 2, 3] → Null bits [true, true, true] + data hash +Result: Different hashes! ✓ +``` + +### 3. Schema Digests + +Encodes all metadata (type information, field names, nullability) into the hash: + +``` +Field "col1" Int32 (non-nullable) ≠ Field "col1" Int32 (nullable) +Result: Different hashes! ✓ +``` + +### 4. Recursive Data Type Hashing + +Complex types like lists recursively hash their components using the full schema information. + +## Data Type Conversion Details + +### Fixed-Size Array Processing + +When hashing fixed-size types, the algorithm: + +1. **Gets the data buffer** - Contains raw bytes for all elements +2. **Accounts for offset** - Arrow arrays can have offsets; these are applied +3. **Handles nullability:** + - **NonNullable**: Hash entire buffer slice directly + - **Nullable with nulls**: Iterate element-by-element, only hashing valid entries + - **Nullable without nulls**: Hash entire buffer slice (simpler path) + +**Example: Int32Array([1, 2, 3])** +``` +Size per element: 4 bytes +Buffer: [0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00] +Hash entire 12 bytes +``` + +**Example: Int32Array([1, null, 3])** +``` +Size per element: 4 bytes +Buffer: [0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00] +Null bits: [true, false, true] + +Process: + 1. Hash null bits [true, false, true] + 2. Hash bytes 0-3 (index 0, valid) + 3. Skip bytes 4-7 (index 1, null) + 4. Hash bytes 8-11 (index 2, valid) +``` + +## Determinism Guarantees + +The hashing algorithm ensures deterministic output because: + +1. **Schema fields are sorted** - BTreeMap maintains alphabetical order +2. **Field order is deterministic** - Always process in alphabetical field name order +3. **Data types are consistent** - Each type uses the same hashing strategy +4. **Byte order is consistent** - Uses little-endian for length prefixes and big-endian for bitvectors +5. **Null handling is predictable** - Same rules applied consistently + +**Implication:** The same data in different storage order or location will always produce the same hash. + +## Performance Considerations + +### Efficient Schema Hashing + +- Schema is hashed only once during initialization +- Uses JSON serialization (fast) rather than alternative formats +- Schema digest is reused for all record batches + +### Incremental Updates + +- Each record batch update accumulates into the same digest buffers +- No need to re-hash previous batches +- Final digest combines all incremental updates + +### Memory Efficiency + +- Null bits use bit-packing (1 bit per value, not 1 byte) +- Streaming approach avoids loading entire dataset into memory +- Field flattening enables hierarchical processing + +### Buffer Slicing + +- Fixed-size arrays hash the raw buffer directly when possible +- Avoids element-by-element iteration for non-nullable arrays +- Significant speedup for large datasets + +## Known Limitations + +The current implementation marks the following data types as `todo!()`: + +- `Null` - Null data type itself +- `Timestamp` - Timestamp variants +- `Duration` - Duration types +- `Interval` - Interval types +- `BinaryView` - Binary view type +- `Utf8View` - UTF-8 view type +- `ListView` - List view type +- `FixedSizeList` - Fixed-size lists +- `LargeListView` - Large list view type +- `Struct` - Struct types (partial support for nested fields) +- `Union` - Union types +- `Dictionary` - Dictionary-encoded types +- `Map` - Map types +- `RunEndEncoded` - Run-end encoded types + +These types will panic if encountered during hashing and should be implemented in future versions. +## SHA-256 Hashing Implementation + +### Overview + +ArrowDigester uses SHA-256 as its default cryptographic hash function, providing a 256-bit (32-byte) digest. The digest algorithm is configurable through the `Digest` trait, allowing alternative implementations, but SHA-256 is the standard choice for production use. + +### Versioning Header + +Every hash produced by ArrowDigester is prefixed with a 3-byte version identifier: + +``` +[Version Byte 0] [Version Byte 1] [Version Byte 2] [SHA-256 Digest (32 bytes)] +``` + +This 3-byte header ensures forward compatibility and enables detection of incompatible hash formats across different library versions. If the hashing algorithm or data format changes in future versions, the version bytes allow consumers to: +- Reject hashes from incompatible versions +- Implement migration or conversion logic +- Maintain a stable hash contract with external systems + +### SHA-256 Digest Process + +The hashing workflow follows this structure: + +``` +1. Initialize SHA-256 digester with version header + └─ Write 3 version bytes + +2. Hash schema component + └─ Update digester with schema JSON + +3. Hash field digests (alphabetical order) + ├─ For each field: + │ ├─ Hash null bits (if nullable) + │ └─ Hash data digest + └─ Accumulate into SHA-256 state + +4. Finalize + └─ Return 35-byte result: [3 version bytes] + [32-byte SHA-256 hash] +``` + +### Implementation Details + +- **Hash Algorithm**: SHA-256 (256-bit output) +- **Version Prefix**: 3 bytes (allows 16.7 million versions) +- **Total Output**: 35 bytes (3 version + 32 digest) +- **State Management**: SHA-256 maintains running state across multiple `update()` calls +- **Finalization**: Single call to `finalize()` produces immutable digest + +## Example Usage + +### Hashing a Single Array + +```rust +use arrow::array::Int32Array; +use starfix::ArrowDigester; + +let array = Int32Array::from(vec![Some(1), Some(2), Some(3)]); +let hash = ArrowDigester::hash_array(&array); +println!("Hash: {}", hex::encode(hash)); +``` + +### Hashing a Record Batch + +```rust +use arrow::record_batch::RecordBatch; +use starfix::ArrowDigester; + +let batch = RecordBatch::try_new(...)?; +let hash = ArrowDigester::hash_record_batch(&batch); +println!("Hash: {}", hex::encode(hash)); +``` + +### Streaming Multiple Batches + +```rust +use starfix::ArrowDigester; + +let mut digester = ArrowDigester::new(schema); +digester.update(&batch1); +digester.update(&batch2); +digester.update(&batch3); + +let final_hash = digester.finalize(); +println!("Combined hash: {}", hex::encode(final_hash)); +``` + +## Testing Strategy + +The codebase includes comprehensive tests covering: + +- **Data type coverage** - Tests for each supported data type +- **Nullable handling** - Arrays with and without null values +- **Collision prevention** - Length prefix verification +- **Determinism** - Same data produces same hash +- **Schema metadata** - Different schemas produce different hashes +- **Field ordering** - Different field orders produce same hash (commutative) + +## Implementation Notes + +### About the Delimiter + +The code uses `/` as the delimiter for nested field hierarchies. This was chosen to be URL-safe and visually clear while avoiding common naming conflicts. + +### About Byte Order + +- **Length prefixes**: Little-endian (`to_le_bytes()`) - standard for Arrow +- **Bitvector words**: Big-endian (`to_be_bytes()`) - matches bitvector convention +- **Size fields**: Little-endian - consistent with Arrow buffers + +### About Bitpacking + +Boolean values and null indicators use `BitVec` (Most Significant Bit ordering): +- Compresses 8 boolean values into 1 byte +- Reduces hash input size by 8x for boolean arrays +- Uses MSB0 for consistent bit ordering + +--- + +**For more information, see the main README.md and examine test cases in `tests/arrow_digester.rs`** diff --git a/cspell.json b/cspell.json index 3c38ecf..5fe20e7 100644 --- a/cspell.json +++ b/cspell.json @@ -3,11 +3,13 @@ "datatypes", "pyarrow", "pythonapi", - "uniffi" - ], - "ignoreWords": [ - + "uniffi", + "uids", + "bitvec", + "indoc", + "starfix" ], + "ignoreWords": [], "useGitignore": false, "ignorePaths": [ "Cargo.lock", diff --git a/notebooks/Example Python Usage.ipynb b/notebooks/Example Python Usage.ipynb index ba23c98..19d7794 100644 --- a/notebooks/Example Python Usage.ipynb +++ b/notebooks/Example Python Usage.ipynb @@ -10,54 +10,38 @@ "name": "stdout", "output_type": "stream", "text": [ - "Received table with 5 rows and 3 columns\n", - "Table SHA-256 hash: e474f034a7d25abfac4941a3239a3d7c56405c84edb866e474056cbe033a9476\n" + "000001db154d744ff41a27ec6af4e205842cdf5356be83d39ac0b57e0a7d138774e5ab\n", + "000001aedb11d4fb4cabb4d4028e69cf912a0c392227c2a06ac2a2b4bd92cf122f9208\n" ] - }, - { - "data": { - "text/plain": [ - "'e474f034a7d25abfac4941a3239a3d7c56405c84edb866e474056cbe033a9476'" - ] - }, - "execution_count": 2, - "metadata": {}, - "output_type": "execute_result" } ], "source": [ "import pyarrow as pa\n", "import ctypes\n", - "import arrow_hasher as ah\n", - "\n", - "def hash_arrow_table(table: pa.Table):\n", - " # Covert table to record batch first (so we can extract the pointers), since the default behavior is 1 batch, we can just get the first element\n", - " # After that we can extract the PyCapsules\n", - " schema_capsule, array_capsule = table.to_batches()[0].__arrow_c_array__()\n", + "import starfix as sf\n", "\n", - " # Extract raw pointers from capsules due to uniffi limitations\n", - " PyCapsule_GetPointer = ctypes.pythonapi.PyCapsule_GetPointer\n", - " PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]\n", - " PyCapsule_GetPointer.restype = ctypes.c_void_p\n", - "\n", - " return ah.process_arrow_table(PyCapsule_GetPointer(array_capsule, b\"arrow_array\"), PyCapsule_GetPointer(schema_capsule, b\"arrow_schema\"))\n", "\n", "# Create a simple Arrow table\n", "data = {\n", - " 'id': [1, 2, 3, 4, 5],\n", - " 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],\n", - " 'value': [10.5, 20.3, 15.7, 30.2, 25.8]\n", + " \"id\": [1, 2, 3, 4, 5],\n", + " \"name\": [\"Alice\", \"Bob\", \"Charlie\", \"David\", \"Eve\"],\n", + " \"value\": [10.5, 20.3, 15.7, 30.2, 25.8],\n", "}\n", "table = pa.table(data)\n", "\n", - "hash_arrow_table(table)\n", - "\n" + "# Hash the entire Arrow table\n", + "(table)\n", + "print(sf.hash_record_batch(table).hex())\n", + "\n", + "# Hash the schema of the Arrow table\n", + "table.schema\n", + "print(sf.hash_schema(table.schema).hex())" ] }, { "cell_type": "code", "execution_count": null, - "id": "9352fee1", + "id": "0c10e8c3", "metadata": {}, "outputs": [], "source": [] diff --git a/python/starfix/__init__.py b/python/starfix/__init__.py index dca6f2d..b638059 100644 --- a/python/starfix/__init__.py +++ b/python/starfix/__init__.py @@ -6,7 +6,7 @@ import pyarrow as pa -def hash_arrow_table(table: "pa.Table") -> bytes: +def hash_record_batch(table: "pa.Table") -> bytes: # Covert table to record batch first (so we can extract the pointers), since the default behavior is 1 batch, we can just get the first element # After that we can extract the PyCapsules schema_capsule, array_capsule = table.to_batches()[0].__arrow_c_array__() @@ -16,7 +16,52 @@ def hash_arrow_table(table: "pa.Table") -> bytes: PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p] PyCapsule_GetPointer.restype = ctypes.c_void_p - return sfr.process_arrow_table( + return sfr.hash_record_batch( PyCapsule_GetPointer(array_capsule, b"arrow_array"), PyCapsule_GetPointer(schema_capsule, b"arrow_schema"), ) + + +def hash_schema(schema: "pa.Schema") -> bytes: + schema_capsule = schema.__arrow_c_schema__() + + # Extract raw pointers from capsules due to uniffi limitations + PyCapsule_GetPointer = ctypes.pythonapi.PyCapsule_GetPointer + PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p] + PyCapsule_GetPointer.restype = ctypes.c_void_p + + return sfr.hash_schema( + PyCapsule_GetPointer(schema_capsule, b"arrow_schema"), + ) + + +class PyArrowDigester: + def __init__(self, schema: "pa.Schema") -> None: + + schema_capsule = schema.__arrow_c_schema__() + + PyCapsule_GetPointer = ctypes.pythonapi.PyCapsule_GetPointer + PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p] + PyCapsule_GetPointer.restype = ctypes.c_void_p + + schema_ptr = PyCapsule_GetPointer(schema_capsule, b"arrow_schema") + + self._internal = sfr.InternalPyArrowDigester(schema_ptr) + + def update(self, table: "pa.Table") -> None: + # Covert table to record batch first (so we can extract the pointers), since the default behavior is 1 batch, we can just get the first element + # After that we can extract the PyCapsules + schema_capsule, array_capsule = table.to_batches()[0].__arrow_c_array__() + + # Extract raw pointers from capsules due to uniffi limitations + PyCapsule_GetPointer = ctypes.pythonapi.PyCapsule_GetPointer + PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p] + PyCapsule_GetPointer.restype = ctypes.c_void_p + + self._internal.update( + PyCapsule_GetPointer(array_capsule, b"arrow_array"), + PyCapsule_GetPointer(schema_capsule, b"arrow_schema"), + ) + + def finalize(self) -> bytes: + return self._internal.finalize() diff --git a/src/arrow_digester_core.rs b/src/arrow_digester_core.rs new file mode 100644 index 0000000..556ee3d --- /dev/null +++ b/src/arrow_digester_core.rs @@ -0,0 +1,923 @@ +#![expect( + clippy::expect_used, + clippy::todo, + reason = "First iteration of code, will add proper error handling later. Allow for unsupported data types for now" +)] +use std::{collections::BTreeMap, iter::repeat_n}; + +use arrow::{ + array::{ + Array, BinaryArray, BooleanArray, GenericBinaryArray, GenericListArray, GenericStringArray, + LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, OffsetSizeTrait, + RecordBatch, StringArray, StructArray, + }, + datatypes::{DataType, Schema}, +}; +use arrow_schema::Field; +use bitvec::prelude::*; +use digest::Digest; + +const NULL_BYTES: &[u8] = b"NULL"; + +const DELIMITER_FOR_NESTED_FIELD: &str = "/"; + +#[derive(Clone)] +enum DigestBufferType { + NonNullable(D), + Nullable(BitVec, D), // Where first digest is for the bull bits, while the second is for the actual data +} + +#[derive(Clone)] +pub struct ArrowDigesterCore { + schema: Schema, + schema_digest: Vec, + fields_digest_buffer: BTreeMap>, +} + +impl ArrowDigesterCore { + /// Create a new instance of `ArrowDigesterCore` with the schema which will be enforce through each update + pub fn new(schema: Schema) -> Self { + // Hash the schema first + let schema_digest = Self::hash_schema(&schema); + + // Flatten all nested fields into a single map, this allows us to hash each field individually and efficiently + let mut fields_digest_buffer = BTreeMap::new(); + schema.fields.into_iter().for_each(|field| { + Self::extract_fields_name(field, "", &mut fields_digest_buffer); + }); + + // Store it in the new struct for now + Self { + schema, + schema_digest, + fields_digest_buffer, + } + } + + /// Hash a record batch and update the internal digests + pub fn update(&mut self, record_batch: &RecordBatch) { + // Verify schema matches + assert!( + *record_batch.schema() == self.schema, + "Record batch schema does not match ArrowDigester schema" + ); + + // Iterate through each field and update its digest + self.fields_digest_buffer + .iter_mut() + .for_each(|(field_name, digest)| { + // Determine if field name is nested + let field_name_hierarchy = field_name + .split(DELIMITER_FOR_NESTED_FIELD) + .collect::>(); + + if field_name_hierarchy.len() == 1 { + Self::array_digest_update( + record_batch + .schema() + .field_with_name(field_name) + .expect("Failed to get field with name") + .data_type(), + record_batch + .column_by_name(field_name) + .expect("Failed to get column by name"), + digest, + ); + } else { + Self::update_nested_field( + &field_name_hierarchy, + 0, + record_batch + .column_by_name( + field_name_hierarchy + .first() + .expect("Failed to get field name at idx 0, list is empty!"), + ) + .expect("Failed to get column by name") + .as_any() + .downcast_ref::() + .expect("Failed to downcast to StructArray"), + digest, + ); + } + }); + } + + /// Hash an array directly without needing to create an `ArrowDigester` instance on the user side + /// For hash array, we don't have a schema to hash, however we do have field data type. + /// So similar to schema, we will hash based on datatype to encode the metadata information into the digest + /// + /// # Panics + /// + /// This function will panic if JSON serialization of the data type fails. + /// + pub fn hash_array(array: &dyn Array) -> Vec { + let mut final_digest = D::new(); + + let data_type_serialized = serde_json::to_string(&array.data_type()) + .expect("Failed to serialize data type to string"); + + // Update the digest buffer with the array metadata and field data + final_digest.update(data_type_serialized); + + // Now we update it with the actual array data + let mut digest_buffer = if array.is_nullable() { + DigestBufferType::Nullable(BitVec::new(), D::new()) + } else { + DigestBufferType::NonNullable(D::new()) + }; + Self::array_digest_update(array.data_type(), array, &mut digest_buffer); + Self::finalize_digest(&mut final_digest, digest_buffer); + + // Finalize and return the digest + final_digest.finalize().to_vec() + } + + /// Hash record batch directly without needing to create an `ArrowDigester` instance on the user side + pub fn hash_record_batch(record_batch: &RecordBatch) -> Vec { + let mut digester = Self::new(record_batch.schema().as_ref().clone()); + digester.update(record_batch); + digester.finalize() + } + + /// This will consume the `ArrowDigester` and produce the final combined digest where the schema + /// digest is fed in first, followed by each field digest in alphabetical order of field names + pub fn finalize(self) -> Vec { + // Finalize all the sub digest and combine them into a single digest + let mut final_digest = D::new(); + + // digest the schema first + final_digest.update(&self.schema_digest); + + // Then digest each field digest in order + self.fields_digest_buffer + .into_iter() + .for_each(|(_, digest)| Self::finalize_digest(&mut final_digest, digest)); + + final_digest.finalize().to_vec() + } + + #[expect( + clippy::big_endian_bytes, + reason = "Use for bit packing the null_bit_values" + )] + /// Finalize a single field digest into the final digest + /// Helpers to reduce code duplication + fn finalize_digest(final_digest: &mut D, digest: DigestBufferType) { + match digest { + DigestBufferType::NonNullable(data_digest) => { + final_digest.update(data_digest.finalize()); + } + DigestBufferType::Nullable(null_bit_digest, data_digest) => { + final_digest.update(null_bit_digest.len().to_le_bytes()); + for &word in null_bit_digest.as_raw_slice() { + final_digest.update(word.to_be_bytes()); + } + final_digest.update(data_digest.finalize()); + } + } + } + + /// Serialize the schema into a `BTreeMap` for field name and its digest + /// + /// # Panics + /// This function will panic if JSON serialization of the schema fails. + fn serialized_schema(schema: &Schema) -> String { + let fields_digest = schema + .fields + .iter() + .map(|field| (field.name(), (field.to_string(), field.data_type()))) + .collect::>(); + + serde_json::to_string(&fields_digest).expect("Failed to serialize field_digest to bytes") + } + + /// Serialize the schema into a `BTreeMap` for field name and its digest + pub fn hash_schema(schema: &Schema) -> Vec { + // Hash the entire thing to the digest + D::digest(Self::serialized_schema(schema)).to_vec() + } + + /// Recursive function to update nested field digests (structs within structs) + fn update_nested_field( + field_name_hierarchy: &[&str], + current_level: usize, + array: &StructArray, + digest: &mut DigestBufferType, + ) { + let current_level_plus_one = current_level + .checked_add(1) + .expect("Field nesting level overflow"); + + if field_name_hierarchy + .len() + .checked_sub(1) + .expect("field_name_hierarchy underflow") + == current_level_plus_one + { + let array_data = array + .column_by_name( + field_name_hierarchy + .last() + .expect("Failed to get field name at idx 0, list is empty!"), + ) + .expect("Failed to get column by name"); + // Base case, it should be a non-struct field + Self::array_digest_update(array_data.data_type(), array_data.as_ref(), digest); + } else { + // Recursive case, it should be a struct field + let next_array = array + .column_by_name( + field_name_hierarchy + .get(current_level_plus_one) + .expect("Failed to get field name at current level"), + ) + .expect("Failed to get column by name") + .as_any() + .downcast_ref::() + .expect("Failed to downcast to StructArray"); + + Self::update_nested_field( + field_name_hierarchy, + current_level_plus_one, + next_array, + digest, + ); + } + } + + #[expect( + clippy::too_many_lines, + reason = "Comprehensive match on all data types" + )] + fn array_digest_update( + data_type: &DataType, + array: &dyn Array, + digest: &mut DigestBufferType, + ) { + match data_type { + DataType::Null => todo!(), + DataType::Boolean => { + // Bool Array is stored a bit differently, so we can't use the standard fixed buffer approach + let bool_array = array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to BooleanArray"); + + match digest { + DigestBufferType::NonNullable(data_digest) => { + // We want to bit pack the boolean values into bytes for hashing + let mut bit_vec = BitVec::::with_capacity(bool_array.len()); + for i in 0..bool_array.len() { + bit_vec.push(bool_array.value(i)); + } + + data_digest.update(bit_vec.as_raw_slice()); + } + DigestBufferType::Nullable(null_bit_vec, data_digest) => { + // Handle null bits first + Self::handle_null_bits(bool_array, null_bit_vec); + + // Handle the data + let mut bit_vec = BitVec::::with_capacity(bool_array.len()); + for i in 0..bool_array.len() { + // We only want the valid bits, for null we will discard from the hash since that is already capture by null_bits + if bool_array.is_valid(i) { + bit_vec.push(bool_array.value(i)); + } + } + data_digest.update(bit_vec.as_raw_slice()); + } + } + } + DataType::Int8 | DataType::UInt8 => Self::hash_fixed_size_array(array, digest, 1), + DataType::Int16 | DataType::UInt16 | DataType::Float16 => { + Self::hash_fixed_size_array(array, digest, 2); + } + DataType::Int32 + | DataType::UInt32 + | DataType::Float32 + | DataType::Date32 + | DataType::Decimal32(_, _) => { + Self::hash_fixed_size_array(array, digest, 4); + } + DataType::Int64 + | DataType::UInt64 + | DataType::Float64 + | DataType::Date64 + | DataType::Decimal64(_, _) => { + Self::hash_fixed_size_array(array, digest, 8); + } + DataType::Timestamp(_, _) => todo!(), + DataType::Time32(_) => Self::hash_fixed_size_array(array, digest, 4), + DataType::Time64(_) => Self::hash_fixed_size_array(array, digest, 8), + DataType::Duration(_) => todo!(), + DataType::Interval(_) => todo!(), + DataType::Binary => Self::hash_binary_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to BinaryArray"), + digest, + ), + DataType::FixedSizeBinary(element_size) => { + Self::hash_fixed_size_array(array, digest, *element_size); + } + DataType::LargeBinary => Self::hash_binary_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to LargeBinaryArray"), + digest, + ), + DataType::BinaryView => todo!(), + DataType::Utf8 => Self::hash_string_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to StringArray"), + digest, + ), + DataType::LargeUtf8 => Self::hash_string_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to LargeStringArray"), + digest, + ), + DataType::Utf8View => todo!(), + DataType::List(field) => { + Self::hash_list_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to ListArray"), + field.data_type(), + digest, + ); + } + DataType::ListView(_) => todo!(), + DataType::FixedSizeList(_, _) => todo!(), + DataType::LargeList(field) => { + Self::hash_list_array( + array + .as_any() + .downcast_ref::() + .expect("Failed to downcast to LargeListArray"), + field.data_type(), + digest, + ); + } + DataType::LargeListView(_) => todo!(), + DataType::Struct(_) => todo!(), + DataType::Union(_, _) => todo!(), + DataType::Dictionary(_, _) => todo!(), + DataType::Decimal128(_, _) => { + Self::hash_fixed_size_array(array, digest, 16); + } + DataType::Decimal256(_, _) => { + Self::hash_fixed_size_array(array, digest, 32); + } + DataType::Map(_, _) => todo!(), + DataType::RunEndEncoded(_, _) => todo!(), + } + } + + #[expect(clippy::cast_sign_loss, reason = "element_size is always positive")] + fn hash_fixed_size_array( + array: &dyn Array, + digest_buffer: &mut DigestBufferType, + element_size: i32, + ) { + let array_data = array.to_data(); + let element_size_usize = element_size as usize; + + // Get the slice with offset accounted for if there is any + let slice = array_data + .buffers() + .first() + .expect("Unable to get first buffer to determine offset") + .as_slice() + .get( + array_data + .offset() + .checked_mul(element_size_usize) + .expect("Offset multiplication overflow").., + ) + .expect("Failed to get buffer slice for FixedSizeBinaryArray"); + + match digest_buffer { + DigestBufferType::NonNullable(data_digest) => { + // No nulls, we can hash the entire buffer directly + data_digest.update(slice); + } + DigestBufferType::Nullable(null_bits, data_digest) => { + // Handle null bits first + Self::handle_null_bits(array, null_bits); + + match array_data.nulls() { + Some(null_buffer) => { + // There are nulls, so we need to incrementally hash each value + for i in 0..array_data.len() { + if null_buffer.is_valid(i) { + let data_pos = i + .checked_mul(element_size_usize) + .expect("Data position multiplication overflow"); + let end_pos = data_pos + .checked_add(element_size_usize) + .expect("End position addition overflow"); + + data_digest.update( + slice + .get(data_pos..end_pos) + .expect("Failed to get data_slice"), + ); + } + } + } + None => { + // No nulls, we can hash the entire buffer directly + data_digest.update(slice); + } + } + } + } + } + + fn hash_binary_array( + array: &GenericBinaryArray, + digest: &mut DigestBufferType, + ) { + match digest { + DigestBufferType::NonNullable(data_digest) => { + for i in 0..array.len() { + let value = array.value(i); + data_digest.update(value.len().to_le_bytes()); + data_digest.update(value); + } + } + DigestBufferType::Nullable(null_bit_vec, data_digest) => { + // Deal with the null bits first + if let Some(null_buf) = array.nulls() { + // We would need to iterate through the null buffer and push it into the null_bit_vec + for i in 0..array.len() { + null_bit_vec.push(null_buf.is_valid(i)); + } + + for i in 0..array.len() { + if null_buf.is_valid(i) { + let value = array.value(i); + data_digest.update(value.len().to_le_bytes()); + data_digest.update(value); + } else { + data_digest.update(NULL_BYTES); + } + } + } else { + // All valid, therefore we can extend the bit vector with all true values + null_bit_vec.extend(repeat_n(true, array.len())); + + // Deal with the data + for i in 0..array.len() { + let value = array.value(i); + data_digest.update(value.len().to_le_bytes()); + data_digest.update(value); + } + } + } + } + } + + #[expect( + clippy::cast_possible_truncation, + reason = "String lengths from Arrow offsets are bounded" + )] + fn hash_string_array( + array: &GenericStringArray, + digest: &mut DigestBufferType, + ) { + match digest { + DigestBufferType::NonNullable(data_digest) => { + for i in 0..array.len() { + let value = array.value(i); + data_digest.update((value.len() as u64).to_le_bytes()); + data_digest.update(value.as_bytes()); + } + } + DigestBufferType::Nullable(null_bit_vec, data_digest) => { + // Deal with the null bits first + Self::handle_null_bits(array, null_bit_vec); + + match array.nulls() { + Some(null_buf) => { + for i in 0..array.len() { + if null_buf.is_valid(i) { + let value = array.value(i); + data_digest.update((value.len() as u32).to_le_bytes()); + data_digest.update(value.as_bytes()); + } else { + data_digest.update(NULL_BYTES); + } + } + } + None => { + for i in 0..array.len() { + let value = array.value(i); + data_digest.update((value.len() as u32).to_le_bytes()); + data_digest.update(value.as_bytes()); + } + } + } + } + } + } + + fn hash_list_array( + array: &GenericListArray, + field_data_type: &DataType, + digest: &mut DigestBufferType, + ) { + match digest { + DigestBufferType::NonNullable(_) => { + for i in 0..array.len() { + Self::array_digest_update(field_data_type, array.value(i).as_ref(), digest); + } + } + DigestBufferType::Nullable(bit_vec, _) => { + // Deal with null bits first + Self::handle_null_bits(array, bit_vec); + + match array.nulls() { + Some(null_buf) => { + for i in 0..array.len() { + if null_buf.is_valid(i) { + Self::array_digest_update( + field_data_type, + array.value(i).as_ref(), + digest, + ); + } + } + } + None => { + for i in 0..array.len() { + Self::array_digest_update( + field_data_type, + array.value(i).as_ref(), + digest, + ); + } + } + } + } + } + } + + /// Internal recursive function to extract field names from nested structs effectively flattening the schema + /// The format is `parent__child__grandchild__etc`... for nested fields and will be stored in `fields_digest_buffer` + fn extract_fields_name( + field: &Field, + parent_field_name: &str, + fields_digest_buffer: &mut BTreeMap>, + ) { + // Check if field is a nested type of struct + if let DataType::Struct(fields) = field.data_type() { + // We will add fields in alphabetical order + fields.into_iter().for_each(|field_inner| { + Self::extract_fields_name( + field_inner, + Self::construct_field_name_hierarchy(parent_field_name, field.name()).as_str(), + fields_digest_buffer, + ); + }); + } else { + // Base case, just add the the combine field name to the map + fields_digest_buffer.insert( + Self::construct_field_name_hierarchy(parent_field_name, field.name()), + if field.is_nullable() { + DigestBufferType::Nullable(BitVec::new(), D::new()) + } else { + DigestBufferType::NonNullable(D::new()) + }, + ); + } + } + + fn construct_field_name_hierarchy(parent_field_name: &str, field_name: &str) -> String { + if parent_field_name.is_empty() { + field_name.to_owned() + } else { + format!("{parent_field_name}{DELIMITER_FOR_NESTED_FIELD}{field_name}") + } + } + + fn handle_null_bits(array: &dyn Array, null_bit_vec: &mut BitVec) { + match array.nulls() { + Some(null_buf) => { + // We would need to iterate through the null buffer and push it into the null_bit_vec + for i in 0..array.len() { + null_bit_vec.push(null_buf.is_valid(i)); + } + } + None => { + // All valid, therefore we can extend the bit vector with all true values + null_bit_vec.extend(repeat_n(true, array.len())); + } + } + } +} + +#[cfg(test)] +mod tests { + #![expect(clippy::unwrap_used, reason = "Okay in test")] + + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + + use hex::encode; + use indoc::indoc; + use pretty_assertions::assert_eq; + use sha2::Sha256; + + use crate::arrow_digester_core::ArrowDigesterCore; + + #[expect(clippy::too_many_lines, reason = "Comprehensive schema test")] + #[test] + fn schema() { + let schema = Schema::new(vec![ + Field::new("bool", DataType::Boolean, true), + Field::new("int8", DataType::Int8, false), + Field::new("uint8", DataType::UInt8, false), + Field::new("int16", DataType::Int16, false), + Field::new("uint16", DataType::UInt16, false), + Field::new("int32", DataType::Int32, false), + Field::new("uint32", DataType::UInt32, false), + Field::new("int64", DataType::Int64, false), + Field::new("uint64", DataType::UInt64, false), + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, false), + Field::new("date32", DataType::Date32, false), + Field::new("date64", DataType::Date64, false), + Field::new("time32_second", DataType::Time32(TimeUnit::Second), false), + Field::new( + "time32_millis", + DataType::Time32(TimeUnit::Millisecond), + false, + ), + Field::new( + "time64_micro", + DataType::Time64(TimeUnit::Microsecond), + false, + ), + Field::new("time64_nano", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("binary", DataType::Binary, true), + Field::new("large_binary", DataType::LargeBinary, true), + Field::new("utf8", DataType::Utf8, true), + Field::new("large_utf8", DataType::LargeUtf8, true), + Field::new( + "list", + DataType::List(Box::new(Field::new("item", DataType::Int32, true)).into()), + true, + ), + Field::new( + "large_list", + DataType::LargeList(Box::new(Field::new("item", DataType::Int32, true)).into()), + true, + ), + Field::new("decimal32", DataType::Decimal32(9, 2), true), + Field::new("decimal64", DataType::Decimal64(18, 3), true), + Field::new("decimal128", DataType::Decimal128(38, 5), true), + ]); + + // Serialize the schema and covert it over to pretty json for comparison + let compact_json: serde_json::Value = + serde_json::from_str(&ArrowDigesterCore::::serialized_schema(&schema)).unwrap(); + let pretty_json = serde_json::to_string_pretty(&compact_json).unwrap(); + + assert_eq!( + pretty_json, + indoc! {r#" +{ + "binary": [ + "Field { \"binary\": nullable Binary }", + "Binary" + ], + "bool": [ + "Field { \"bool\": nullable Boolean }", + "Boolean" + ], + "date32": [ + "Field { \"date32\": Date32 }", + "Date32" + ], + "date64": [ + "Field { \"date64\": Date64 }", + "Date64" + ], + "decimal128": [ + "Field { \"decimal128\": nullable Decimal128(38, 5) }", + { + "Decimal128": [ + 38, + 5 + ] + } + ], + "decimal32": [ + "Field { \"decimal32\": nullable Decimal32(9, 2) }", + { + "Decimal32": [ + 9, + 2 + ] + } + ], + "decimal64": [ + "Field { \"decimal64\": nullable Decimal64(18, 3) }", + { + "Decimal64": [ + 18, + 3 + ] + } + ], + "float32": [ + "Field { \"float32\": Float32 }", + "Float32" + ], + "float64": [ + "Field { \"float64\": Float64 }", + "Float64" + ], + "int16": [ + "Field { \"int16\": Int16 }", + "Int16" + ], + "int32": [ + "Field { \"int32\": Int32 }", + "Int32" + ], + "int64": [ + "Field { \"int64\": Int64 }", + "Int64" + ], + "int8": [ + "Field { \"int8\": Int8 }", + "Int8" + ], + "large_binary": [ + "Field { \"large_binary\": nullable LargeBinary }", + "LargeBinary" + ], + "large_list": [ + "Field { \"large_list\": nullable LargeList(Int32) }", + { + "LargeList": { + "data_type": "Int32", + "dict_id": 0, + "dict_is_ordered": false, + "metadata": {}, + "name": "item", + "nullable": true + } + } + ], + "large_utf8": [ + "Field { \"large_utf8\": nullable LargeUtf8 }", + "LargeUtf8" + ], + "list": [ + "Field { \"list\": nullable List(Int32) }", + { + "List": { + "data_type": "Int32", + "dict_id": 0, + "dict_is_ordered": false, + "metadata": {}, + "name": "item", + "nullable": true + } + } + ], + "time32_millis": [ + "Field { \"time32_millis\": Time32(ms) }", + { + "Time32": "Millisecond" + } + ], + "time32_second": [ + "Field { \"time32_second\": Time32(s) }", + { + "Time32": "Second" + } + ], + "time64_micro": [ + "Field { \"time64_micro\": Time64(µs) }", + { + "Time64": "Microsecond" + } + ], + "time64_nano": [ + "Field { \"time64_nano\": Time64(ns) }", + { + "Time64": "Nanosecond" + } + ], + "uint16": [ + "Field { \"uint16\": UInt16 }", + "UInt16" + ], + "uint32": [ + "Field { \"uint32\": UInt32 }", + "UInt32" + ], + "uint64": [ + "Field { \"uint64\": UInt64 }", + "UInt64" + ], + "uint8": [ + "Field { \"uint8\": UInt8 }", + "UInt8" + ], + "utf8": [ + "Field { \"utf8\": nullable Utf8 }", + "Utf8" + ] +}"#} + ); + } + + #[test] + fn nested_fields() { + // Test nested struct field name extraction + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "nested", + DataType::Struct( + vec![ + Field::new("name", DataType::Utf8, true), + Field::new( + "deep", + DataType::Struct( + vec![Field::new("value", DataType::Int64, false)].into(), + ), + false, + ), + ] + .into(), + ), + false, + ), + ]); + + let mut digester = ArrowDigesterCore::::new(schema.clone()); + let field_names: Vec<&String> = digester.fields_digest_buffer.keys().collect(); + + assert_eq!(field_names.len(), 3); + assert!(field_names.contains(&&"id".to_owned())); + assert!(field_names.contains(&&"nested/name".to_owned())); + assert!(field_names.contains(&&"nested/deep/value".to_owned())); + + // Test the nested field update by creating record_batch and using the update method + let id_array = Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef; + let name_array = Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrayRef; + let value_array = Arc::new(Int64Array::from(vec![Some(100), Some(200)])) as ArrayRef; + + let schema_ref = Arc::new(schema); + + let nested_struct = StructArray::from(vec![ + ( + Arc::new(Field::new("name", DataType::Utf8, true)), + name_array, + ), + ( + Arc::new(Field::new( + "deep", + DataType::Struct(vec![Field::new("value", DataType::Int64, false)].into()), + false, + )), + Arc::new(StructArray::from(vec![( + Arc::new(Field::new("value", DataType::Int64, false)), + value_array, + )])) as ArrayRef, + ), + ]); + + let record_batch = RecordBatch::try_new( + Arc::clone(&schema_ref), + vec![id_array, Arc::new(nested_struct)], + ) + .unwrap(); + + digester.update(&record_batch); + + // Check the digest + assert_eq!( + encode(digester.finalize()), + "36ffc4d4c072ac0d2470dfa12a9dab10eaecd932a25872aca8de173bf51baa15" + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index e1de1e6..b041d3b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,62 @@ -//! Intuitive compute pipeline orchestration with reproducibility, performance, and scalability in -//! mind. +/// Crate for extern crate uniffi as uniffi_external; uniffi_external::setup_scaffolding!(); -mod pyarrow; +use arrow::array::{Array, RecordBatch}; +use arrow_schema::Schema; +use sha2::Sha256; + +use crate::arrow_digester_core::ArrowDigesterCore; + +const VERSION_BYTES: [u8; 3] = [0_u8, 0_u8, 1_u8]; // Version 0.0.1 + +/// Maps `arrow_digester_core` function to a `sha_256` digester + versioning +#[derive(Clone)] +pub struct ArrowDigester { + digester: ArrowDigesterCore, +} + +impl ArrowDigester { + /// Create a new instance of `ArrowDigester` with SHA256 as the digester with the schema which will be enforce through each update + pub fn new(schema: Schema) -> Self { + Self { + digester: ArrowDigesterCore::::new(schema), + } + } + + /// Update the digester with a new `RecordBatch` + pub fn update(&mut self, record_batch: &RecordBatch) { + self.digester.update(record_batch); + } + + /// Consume the digester and finalize the hash computation + pub fn finalize(self) -> Vec { + Self::prepend_version_bytes(self.digester.finalize()) + } + + /// Function to hash an Array in one go + pub fn hash_array(array: &dyn Array) -> Vec { + Self::prepend_version_bytes(ArrowDigesterCore::::hash_array(array)) + } + + /// Function to hash a complete `RecordBatch` in one go + pub fn hash_record_batch(record_batch: &RecordBatch) -> Vec { + Self::prepend_version_bytes(ArrowDigesterCore::::hash_record_batch(record_batch)) + } + + /// Function to hash schema only + pub fn hash_schema(schema: &Schema) -> Vec { + Self::prepend_version_bytes(ArrowDigesterCore::::hash_schema(schema)) + } + + fn prepend_version_bytes(digest: Vec) -> Vec { + let mut complete_hash = VERSION_BYTES.clone().to_vec(); + complete_hash.extend(digest); + complete_hash + } +} + +pub(crate) mod arrow_digester_core; +pub mod pyarrow; + +// Write a test to check that int32 digest is consistent diff --git a/src/pyarrow.rs b/src/pyarrow.rs index 90d1b68..03277ba 100644 --- a/src/pyarrow.rs +++ b/src/pyarrow.rs @@ -1,35 +1,128 @@ +#![expect( + unsafe_code, + clippy::expect_used, + reason = "Converting raw pointers to Arrow structures" +)] +use std::sync::{Arc, Mutex}; + +use crate::ArrowDigester; use arrow::array::{RecordBatch, StructArray}; -use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi}; -use arrow_digest::{RecordDigest, RecordDigestV0}; -use sha2::Sha256; +use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}; +use arrow_schema::Schema; /// Process an Arrow table via C Data Interface /// -/// # Safety +/// # Panics /// The pointers must be valid Arrow C Data Interface structs from Python's pyarrow + #[uniffi::export] -pub fn process_arrow_table(array_ptr: u64, schema_ptr: u64) -> Vec { +pub fn hash_record_batch(array_ptr: u64, schema_ptr: u64) -> Vec { + #[expect( + unsafe_code, + reason = "Need to convert raw pointers to Arrow data structures" + )] + #[expect( + clippy::multiple_unsafe_ops_per_block, + clippy::expect_used, + reason = "Okay since we are doing the same operation of dereferencing pointers, Will add proper errors later" + )] + // SAFETY: + // Need to conduct unsafe operations to convert raw pointers to Arrow data structures let array_data = unsafe { // Construct ArrayData from FFI structures - let ffi_array = FFI_ArrowArray::from_raw(array_ptr as *mut _); - let ffi_schema = FFI_ArrowSchema::from_raw(schema_ptr as *mut _); + let ffi_array = FFI_ArrowArray::from_raw(array_ptr as *mut FFI_ArrowArray); + let ffi_schema = FFI_ArrowSchema::from_raw(schema_ptr as *mut FFI_ArrowSchema); from_ffi(ffi_array, &ffi_schema).expect("Failed to import Arrow array data") }; - // // Convert FFI schema to Arrow Schema - // let schema = - // Schema::try_from(&ffi_schema).expect("Failed to convert FFI schema to Arrow schema"); + // Hash the table + ArrowDigester::hash_record_batch(&RecordBatch::from(StructArray::from(array_data))) +} - // Import array data from FFI +/// Process an Arrow schema via C Data Interface +/// +/// # Panics +/// The pointer must be a valid Arrow schema from Python's pyarrow +#[uniffi::export] +pub fn hash_schema(schema_ptr: u64) -> Vec { + #[expect( + unsafe_code, + reason = "Need to convert raw pointers to Arrow data structures" + )] + // SAFETY: + // Need to conduct unsafe operations to convert raw pointers to Arrow data structures + let schema = unsafe { + let ffi_schema = FFI_ArrowSchema::from_raw(schema_ptr as *mut FFI_ArrowSchema); + Schema::try_from(&ffi_schema).expect("Failed to convert FFI schema to Arrow schema") + }; - // Create StructArray from the array data - let struct_array = StructArray::from(array_data); + // Hash the schema + ArrowDigester::hash_schema(&schema) +} - // Create RecordBatch from StructArray - let record_batch = RecordBatch::from(struct_array); +#[derive(uniffi::Object)] +pub struct InternalPyArrowDigester { + digester: Arc>, +} - // Hash the table - let hash = RecordDigestV0::::digest(&record_batch); - // format!("{:x}", hash) - hash.to_vec() +#[uniffi::export] +impl InternalPyArrowDigester { + /// Create a new instance of `PyArrowDigester` with SHA256 as the digester with the schema which will be enforce through each update + /// + /// # Panics + /// The pointer must be a valid Arrow schema from Python's pyarrow, if failed to convert, it will panic + + #[uniffi::constructor] + pub fn new(schema_ptr: u64) -> Self { + // SAFETY: + // Need to conduct unsafe operations to convert raw pointers to Arrow data structures + let schema = unsafe { + let ffi_schema = FFI_ArrowSchema::from_raw(schema_ptr as *mut FFI_ArrowSchema); + Schema::try_from(&ffi_schema).expect("Failed to convert FFI schema to Arrow schema") + }; + Self { + digester: Arc::new(Mutex::new(ArrowDigester::new(schema))), + } + } + + /// Update the digester with a new `RecordBatch` + /// + /// # Panics + /// The pointers must be valid Arrow C Data Interface structs from Python's pyarrow + pub fn update(&self, array_ptr: u64, schema_ptr: u64) { + #[expect( + unsafe_code, + reason = "Need to convert raw pointers to Arrow data structures" + )] + #[expect( + clippy::multiple_unsafe_ops_per_block, + clippy::expect_used, + reason = "Okay since we are doing the same operation of dereferencing pointers, Will add proper errors later" + )] + // SAFETY: + // Need to conduct unsafe operations to convert raw pointers to Arrow data structures + let array_data = unsafe { + // Construct ArrayData from FFI structures + let ffi_array = FFI_ArrowArray::from_raw(array_ptr as *mut FFI_ArrowArray); + let ffi_schema = FFI_ArrowSchema::from_raw(schema_ptr as *mut FFI_ArrowSchema); + from_ffi(ffi_array, &ffi_schema).expect("Failed to import Arrow array data") + }; + + self.digester + .lock() + .expect("Failed to acquire lock on digester") + .update(&RecordBatch::from(StructArray::from(array_data))); + } + + /// Consume the digester and finalize the hash computation + /// + /// # Panics + /// If failed to acquire lock on digester + pub fn finalize(&self) -> Vec { + self.digester + .lock() + .expect("Failed to acquire lock on digester") + .clone() + .finalize() + } } diff --git a/tests/arrow_digester.rs b/tests/arrow_digester.rs new file mode 100644 index 0000000..dcc07de --- /dev/null +++ b/tests/arrow_digester.rs @@ -0,0 +1,579 @@ +#[cfg(test)] +mod tests { + #![expect(clippy::unwrap_used, reason = "Okay in test")] + use std::sync::Arc; + + use arrow::{ + array::{ + ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal32Array, + Decimal64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, + Int8Array, LargeBinaryArray, LargeListArray, LargeStringArray, ListArray, RecordBatch, + StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + }, + datatypes::Int32Type, + }; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; + use hex::encode; + use pretty_assertions::assert_eq; + + use arrow::array::Decimal128Array; + use starfix::ArrowDigester; + + #[expect(clippy::too_many_lines, reason = "Comprehensive schema test")] + #[test] + fn schema() { + let schema = Schema::new(vec![ + Field::new("bool", DataType::Boolean, true), + Field::new("int8", DataType::Int8, false), + Field::new("uint8", DataType::UInt8, false), + Field::new("int16", DataType::Int16, false), + Field::new("uint16", DataType::UInt16, false), + Field::new("int32", DataType::Int32, false), + Field::new("uint32", DataType::UInt32, false), + Field::new("int64", DataType::Int64, false), + Field::new("uint64", DataType::UInt64, false), + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, false), + Field::new("date32", DataType::Date32, false), + Field::new("date64", DataType::Date64, false), + Field::new("time32_second", DataType::Time32(TimeUnit::Second), false), + Field::new( + "time32_millis", + DataType::Time32(TimeUnit::Millisecond), + false, + ), + Field::new( + "time64_micro", + DataType::Time64(TimeUnit::Microsecond), + false, + ), + Field::new("time64_nano", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("binary", DataType::Binary, true), + Field::new("large_binary", DataType::LargeBinary, true), + Field::new("utf8", DataType::Utf8, true), + Field::new("large_utf8", DataType::LargeUtf8, true), + Field::new( + "list", + DataType::List(Box::new(Field::new("item", DataType::Int32, true)).into()), + true, + ), + Field::new( + "large_list", + DataType::LargeList(Box::new(Field::new("item", DataType::Int32, true)).into()), + true, + ), + Field::new("decimal32", DataType::Decimal32(9, 2), true), + Field::new("decimal64", DataType::Decimal64(18, 3), true), + Field::new("decimal128", DataType::Decimal128(38, 5), true), + ]); + + // Empty Table Hashing Check + + assert_eq!( + encode(ArrowDigester::new(schema.clone()).finalize()), + "000001c7bc0a0c84aca684adbec21f8cb481781332fc91a205165a6c74c3a63a80e9b2" + ); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(BooleanArray::from(vec![Some(true)])), + Arc::new(Int8Array::from(vec![1_i8])), + Arc::new(UInt8Array::from(vec![1_u8])), + Arc::new(Int16Array::from(vec![100_i16])), + Arc::new(UInt16Array::from(vec![100_u16])), + Arc::new(Int32Array::from(vec![1000_i32])), + Arc::new(UInt32Array::from(vec![1000_u32])), + Arc::new(Int64Array::from(vec![100_000_i64])), + Arc::new(UInt64Array::from(vec![100_000_u64])), + Arc::new(Float32Array::from(vec![1.5_f32])), + Arc::new(Float64Array::from(vec![1.5_f64])), + Arc::new(Date32Array::from(vec![18993_i32])), + Arc::new(Date64Array::from(vec![1_640_995_200_000_i64])), + Arc::new(Time32SecondArray::from(vec![3600_i32])), + Arc::new(Time32MillisecondArray::from(vec![3_600_000_i32])), + Arc::new(Time64MicrosecondArray::from(vec![3_600_000_000_i64])), + Arc::new(Time64NanosecondArray::from(vec![3_600_000_000_000_i64])), + Arc::new(BinaryArray::from(vec![Some(b"data1".as_ref())])), + Arc::new(LargeBinaryArray::from(vec![Some(b"large1".as_ref())])), + Arc::new(StringArray::from(vec![Some("text1")])), + Arc::new(LargeStringArray::from(vec![Some("large_text1")])), + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + ])), + Arc::new(LargeListArray::from_iter_primitive::( + vec![Some(vec![Some(5), Some(6)])], + )), + Arc::new( + Decimal32Array::from_iter(vec![Some(12345)]) + .with_precision_and_scale(9, 2) + .unwrap(), + ), + Arc::new( + Decimal64Array::from_iter(vec![Some(123_456_789_012)]) + .with_precision_and_scale(18, 3) + .unwrap(), + ), + Arc::new( + Decimal128Array::from_iter(vec![Some( + 123_456_789_012_345_678_901_234_567_890_i128, + )]) + .with_precision_and_scale(38, 5) + .unwrap(), + ), + ], + ) + .unwrap(); + // Hash the record batch + assert_eq!( + encode(ArrowDigester::hash_record_batch(&batch)), + "000001ac720bed7fb1d696d5626705dc7602d14cfe974a3297cc28c3cb8b8e9a62601a" + ); + } + + #[test] + fn boolean_array_hashing() { + let bool_array = BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]); + let hash = hex::encode(ArrowDigester::hash_array(&bool_array)); + assert_eq!( + hash, + "000001f9abeb37d9395f359b48a379f0a8467c572b19ecc6cae9fa85e1bf627a52a8f3" + ); + } + + /// Test int32 array hashing which is really meant to test fixed size element array hashing + #[test] + fn int32_array_hashing() { + let int_array = Int32Array::from(vec![Some(42), None, Some(-7), Some(0)]); + let hash = hex::encode(ArrowDigester::hash_array(&int_array)); + assert_eq!( + hash, + "00000127f2411e6839eb1e3fe706ac3f01e704c7b46357360fb2ddb8a08ec98e8ba4fa" + ); + } + + /// Test time array hashing + #[test] + fn time32_array_hashing() { + let time_array = Time32SecondArray::from(vec![Some(1000), None, Some(5000), Some(0)]); + let hash = hex::encode(ArrowDigester::hash_array(&time_array)); + assert_eq!( + hash, + "0000019000b74aa80f685103a8cafc7e113aa8f33ccc0c94ea3713318d2cc2f3436baa" + ); + } + + #[test] + fn time64_array_hashing() { + let time_array = + Time64MicrosecondArray::from(vec![Some(1_000_000), None, Some(5_000_000), Some(0)]); + let hash = hex::encode(ArrowDigester::hash_array(&time_array)); + assert_eq!( + hash, + "00000195f12143d789f364a3ed52f7300f8f91dc21fbe00c34aed798ca8fd54182dea3" + ); + } + + #[test] + fn time_array_different_units_produce_different_hashes() { + let time32_second = Time32SecondArray::from(vec![Some(1000), Some(2000)]); + let time32_millis = Time32MillisecondArray::from(vec![Some(1000), Some(2000)]); + + let hash_second = hex::encode(ArrowDigester::hash_array(&time32_second)); + let hash_millis = hex::encode(ArrowDigester::hash_array(&time32_millis)); + + assert_ne!(hash_second, hash_millis); + } + + /// Test binary array hashing + #[test] + fn binary_array_hashing() { + let binary_array = BinaryArray::from(vec![ + Some(b"hello".as_ref()), + None, + Some(b"world".as_ref()), + Some(b"".as_ref()), + ]); + let hash = hex::encode(ArrowDigester::hash_array(&binary_array)); + assert_eq!( + hash, + "000001466801efd880d2acecd6c78915b5c2a51476870f9116912834d79de43a000071" + ); + + // Test large binary array with same data to ensure consistency + let large_binary_array = LargeBinaryArray::from(vec![ + Some(b"hello".as_ref()), + None, + Some(b"world".as_ref()), + Some(b"".as_ref()), + ]); + + assert_ne!( + hex::encode(ArrowDigester::hash_array(&large_binary_array)), + hash + ); + } + + // Test binary array collision vulnerability - different partitions should produce different hashes + #[test] + fn binary_array_length_prefix_prevents_collisions() { + // Array 1: [[0x01, 0x02], [0x03]] + let array1 = BinaryArray::from(vec![Some(&[0x01_u8, 0x02_u8][..]), Some(&[0x03_u8][..])]); + + // Array 2: [[0x01], [0x02, 0x03]] + let array2 = BinaryArray::from(vec![Some(&[0x01_u8][..]), Some(&[0x02_u8, 0x03_u8][..])]); + + let hash1 = hex::encode(ArrowDigester::hash_array(&array1)); + let hash2 = hex::encode(ArrowDigester::hash_array(&array2)); + + // Without length prefix, these would collide (both hash to 0x01 0x02 0x03) + // With length prefix, they should produce different hashes + assert_ne!( + hash1, hash2, + "Binary arrays with different partitions should produce different hashes" + ); + } + + // Test string array collision vulnerability - different partitions should produce different hashes + #[test] + fn string_array_length_prefix_prevents_collisions() { + // Array 1: ["ab", "c"] + let array1 = StringArray::from(vec![Some("ab"), Some("c")]); + + // Array 2: ["a", "bc"] + let array2 = StringArray::from(vec![Some("a"), Some("bc")]); + + let hash1 = hex::encode(ArrowDigester::hash_array(&array1)); + let hash2 = hex::encode(ArrowDigester::hash_array(&array2)); + + // Without length prefix, these would collide (both hash to "abc") + // With length prefix, they should produce different hashes + assert_ne!( + hash1, hash2, + "String arrays with different partitions should produce different hashes" + ); + } + + // Test String hashing + #[test] + fn string_array_hashing() { + let string_array = StringArray::from(vec![Some("hello"), None, Some("world"), Some("")]); + let hash = hex::encode(ArrowDigester::hash_array(&string_array)); + assert_eq!( + hash, + "00000114a2d2eaf535b6e78fbf1d58ae93accce424eafd20fa449eff8acefc47903d3d" + ); + + // Test large string array with same data to ensure consistency + let large_string_array = + LargeStringArray::from(vec![Some("hello"), None, Some("world"), Some("")]); + + assert_ne!( + hex::encode(ArrowDigester::hash_array(&large_string_array)), + hash + ); + } + + // List array hashing test + #[test] + fn list_array_hashing() { + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + None, + Some(vec![Some(4), Some(5)]), + Some(vec![Some(6)]), + ]); + + let hash = hex::encode(ArrowDigester::hash_array(&list_array)); + assert_eq!( + hash, + "000001f654be5f0ef89807feba9483072190b7d26964e535cd7c522706218df9c3c015" + ); + } + + // Test all types of decimal hashing + #[test] + fn decimal_array_hashing() { + // Test Decimal32 (precision 1-9) + let decimal32_array = + Decimal128Array::from_iter(vec![Some(123), None, Some(-456), Some(0)]) + .with_precision_and_scale(9, 2) + .unwrap(); + + assert_eq!( + encode(ArrowDigester::hash_array(&decimal32_array)), + "000001ef29250615f9d6ab34672c3b11dfa2dcda6e8e6164bc55899c13887f17705f5d" + ); + + // Test Decimal64 (precision 10-18) + let decimal64_array = Decimal128Array::from_iter(vec![ + Some(1_234_567_890_123), + None, + Some(-9_876_543_210), + Some(0), + ]) + .with_precision_and_scale(15, 3) + .unwrap(); + assert_eq!( + encode(ArrowDigester::hash_array(&decimal64_array)), + "000001efa4ed72641051233889c07775366cbf2e56eb4b0fcfd46653f5741e81786f08" + ); + + // Test Decimal128 (precision 19-38) + let decimal128_array = Decimal128Array::from_iter(vec![ + Some(123_456_789_012_345_678_901_234_567), + None, + Some(-987_654_321_098_765_432_109_876_543), + Some(0), + ]) + .with_precision_and_scale(38, 5) + .unwrap(); + assert_eq!( + hex::encode(ArrowDigester::hash_array(&decimal128_array)), + "00000155cc4d81a048dbca001ca8581673a5a6c93efd870d358df211a545c2af9b658d" + ); + } + + #[test] + fn commutative_tables() { + let uids = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)])) as ArrayRef; + let fake_data = Arc::new(BooleanArray::from(vec![ + Some(true), + Some(false), + None, + Some(true), + ])) as ArrayRef; + + // Create two record batches with same data but different order + let batch1 = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("uids", DataType::Int32, false), + Field::new("flags", DataType::Boolean, true), + ])), + vec![Arc::clone(&uids), Arc::clone(&fake_data)], + ); + + let batch2 = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("flags", DataType::Boolean, true), + Field::new("uids", DataType::Int32, false), + ])), + vec![fake_data, uids], + ); + + // Hash both record batches + let hash1 = format!( + "000001{}", + encode(ArrowDigester::hash_record_batch(batch1.as_ref().unwrap())) + ); + let hash2 = format!( + "000001{}", + encode(ArrowDigester::hash_record_batch(batch2.as_ref().unwrap())) + ); + assert_eq!(hash1, hash2); + } + + #[test] + fn record_batch_hashing() { + let schema = Arc::new(Schema::new(vec![ + Field::new("uids", DataType::Int32, false), + Field::new("flags", DataType::Boolean, true), + ])); + + // Create two record batches with different data to simulate loading at different times + let uids = Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)])) as ArrayRef; + let fake_data = Arc::new(BooleanArray::from(vec![ + Some(true), + Some(false), + None, + Some(true), + ])); + + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![uids, fake_data]).unwrap(); + + let uids2 = + Arc::new(Int32Array::from(vec![Some(5), Some(6), Some(7), Some(8)])) as ArrayRef; + let fake_data2 = Arc::new(BooleanArray::from(vec![ + Some(false), + Some(true), + Some(true), + None, + ])); + + let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![uids2, fake_data2]).unwrap(); + // Hash both record batches + let mut digester = ArrowDigester::new((*schema).clone()); + digester.update(&batch1); + digester.update(&batch2); + assert_eq!( + encode(digester.finalize()), + "00000137954b3edd169c7a9e65604c191caf6a307940357305d182a5d2168047e9cc51" + ); + } + + #[test] + fn nullable_vs_non_nullable_array_produces_same_hash() { + let nullable_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]); + let non_nullable_array = Int32Array::from(vec![1, 2, 3]); + + let hash_nullable = hex::encode(ArrowDigester::hash_array(&nullable_array)); + let hash_non_nullable = hex::encode(ArrowDigester::hash_array(&non_nullable_array)); + + assert_eq!( + hash_nullable, hash_non_nullable, + "Nullable and non-nullable arrays with same data should produce same hashes" + ); + } + + #[test] + fn empty_nullable_vs_non_nullable_array_produces_different_hash() { + let empty_nullable_array: Int32Array = Int32Array::from(vec![] as Vec>); + let empty_non_nullable_array: Int32Array = Int32Array::from(vec![] as Vec); + + let hash_nullable = hex::encode(ArrowDigester::hash_array(&empty_nullable_array)); + let hash_non_nullable = hex::encode(ArrowDigester::hash_array(&empty_non_nullable_array)); + + // Both are empty, but their nullability metadata may differ + // This test documents the expected behavior + assert_eq!(hash_nullable, hash_non_nullable); + } + + #[test] + fn nullable_vs_non_nullable_schema_produces_different_hash() { + let nullable_schema = Schema::new(vec![ + Field::new("col1", DataType::Int32, true), + Field::new("col2", DataType::Boolean, true), + ]); + let non_nullable_schema = Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Boolean, false), + ]); + + let hash_nullable = hex::encode(ArrowDigester::new(nullable_schema).finalize()); + let hash_non_nullable = hex::encode(ArrowDigester::new(non_nullable_schema).finalize()); + + assert_ne!( + hash_nullable, hash_non_nullable, + "Nullable and non-nullable schemas with same data types should produce different hashes" + ); + } + + #[test] + fn batches_vs_single_hash_produces_same_result() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Float64, true), + ])); + + // Create two batches with data + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), + ], + ) + .unwrap(); + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Float64Array::from(vec![4.4, 5.5, 6.6])), + ], + ) + .unwrap(); + + // Hash batches incrementally + let mut digester_batches = ArrowDigester::new((*schema).clone()); + digester_batches.update(&batch1); + digester_batches.update(&batch2); + let hash_batches = encode(digester_batches.finalize()); + + // Hash combined batch all at once + let combined_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3, 4.4, 5.5, 6.6])), + ], + ) + .unwrap(); + + let mut digester_single = ArrowDigester::new((*schema).clone()); + digester_single.update(&combined_batch); + let hash_single = encode(digester_single.finalize()); + + assert_eq!( + hash_batches, hash_single, + "Hashing multiple batches incrementally should produce the same result as hashing one combined batch" + ); + } + + #[test] + fn batches_with_nulls_vs_single_hash_produces_same_result() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("value", DataType::Float64, true), + ])); + + // Create two batches: first all nulls, second with values + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![None, None, None])), + Arc::new(Float64Array::from(vec![None, None, None])), + ], + ) + .unwrap(); + + let batch2 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])), + Arc::new(Float64Array::from(vec![Some(1.1), Some(2.2), Some(3.3)])), + ], + ) + .unwrap(); + + // Hash batches incrementally + let mut digester_batches = ArrowDigester::new((*schema).clone()); + digester_batches.update(&batch1); + digester_batches.update(&batch2); + let hash_batches = encode(digester_batches.finalize()); + + // Hash combined batch all at once + let combined_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![ + None, + None, + None, + Some(1), + Some(2), + Some(3), + ])), + Arc::new(Float64Array::from(vec![ + None, + None, + None, + Some(1.1), + Some(2.2), + Some(3.3), + ])), + ], + ) + .unwrap(); + + let mut digester_single = ArrowDigester::new((*schema).clone()); + digester_single.update(&combined_batch); + let hash_single = encode(digester_single.finalize()); + + assert_eq!( + hash_batches, hash_single, + "Hashing batches where first is all nulls should produce same result as combined batch" + ); + } +}