[CI] Add E2E consistency tests with comprehensive data type coverage#35
[CI] Add E2E consistency tests with comprehensive data type coverage#35mpb159753 wants to merge 10 commits intoAscend:mainfrom
Conversation
CLA Signature Passmpb159753, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
ed40bba to
845b85c
Compare
CLA Signature Passmpb159753, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
| """ | ||
| E2E Lifecycle Consistency Tests for TransferQueue. | ||
|
|
||
| Implements all 5 scenarios from consistency_validation_plan.md: |
There was a problem hiding this comment.
clean unnecessary AI doc
| sys.path.append(str(parent_dir)) | ||
|
|
||
| from transfer_queue import ( # noqa: E402 | ||
| SimpleStorageUnit, |
There was a problem hiding this comment.
Please import it from lower layers. The top-level namespace is cleaned up recently
There was a problem hiding this comment.
Pull request overview
Adds a new end-to-end lifecycle consistency test module for TransferQueue, exercising read/write consistency across complex data types and key lifecycle operations (production/consumption/custom meta/reset/clear).
Changes:
- Introduces an E2E test suite covering 5 lifecycle consistency scenarios.
- Adds helpers to generate complex mixed-type
TensorDictpayloads and verify round-tripped correctness. - Validates partition lifecycle operations (custom meta persistence, reset consumption, and clear partition).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| custom_metadata = {} | ||
| for i in range(batch_size): | ||
| custom_metadata[meta.global_indexes[i]] = { | ||
| "score": float(i) / 10.0, | ||
| "label": f"label_{i}", | ||
| "tags": [f"tag_{i}_a", f"tag_{i}_b"], | ||
| } | ||
| meta.update_custom_meta(custom_metadata) | ||
|
|
||
| # 3. Upload Custom Metadata | ||
| client.set_custom_meta(meta) | ||
|
|
||
| # 4. Retrieve Metadata and Verify Custom Meta | ||
| retrieved_meta = poll_for_meta(client, partition_id, fields, batch_size, task_name, mode="force_fetch") | ||
| assert retrieved_meta is not None, "Failed to retrieve metadata" | ||
|
|
||
| # Verify custom metadata content | ||
| retrieved_custom = retrieved_meta.get_all_custom_meta() | ||
| for global_idx, expected_meta in custom_metadata.items(): | ||
| assert global_idx in retrieved_custom, f"Missing custom_meta for index {global_idx}" | ||
| actual = retrieved_custom[global_idx] | ||
| assert actual["score"] == expected_meta["score"], f"Score mismatch at index {global_idx}" | ||
| assert actual["label"] == expected_meta["label"], f"Label mismatch at index {global_idx}" | ||
| assert actual["tags"] == expected_meta["tags"], f"Tags mismatch at index {global_idx}" |
There was a problem hiding this comment.
BatchMeta.update_custom_meta() expects a list of per-sample dicts (length == batch size, aligned with meta.global_indexes), but the test passes a dict keyed by global_index. This will raise at runtime (dict indexing with 0..N-1) and also get_all_custom_meta() returns a list, not a dict keyed by global_index. Please build a custom_meta_list in batch order and compare against the returned list (or add a helper that returns a global_index->meta mapping and use that consistently).
| # 6. Verify region 0-9: original Put A values | ||
| original_data_0_9 = generate_complex_data(list(range(0, 10))) | ||
| assert torch.allclose(full_data["tensor_f32"][:10], original_data_0_9["tensor_f32"]), ( | ||
| "Region 0-9 tensor_f32 should match original Put A" | ||
| ) | ||
|
|
||
| # 7. Verify region 10-29: updated values (using offset indices 1010-1029) | ||
| updated_expected = generate_complex_data([i + 1000 for i in range(10, 30)]) | ||
| assert torch.allclose(full_data["tensor_f32"][10:30], updated_expected["tensor_f32"]), ( | ||
| "Region 10-29 tensor_f32 should match updated values" | ||
| ) | ||
|
|
||
| # 8. Verify region 30-39: original Put B values | ||
| original_data_30_39 = generate_complex_data(list(range(30, 40))) | ||
| assert torch.allclose(full_data["tensor_f32"][30:40], original_data_30_39["tensor_f32"]), ( |
There was a problem hiding this comment.
This test assumes force_fetch returns metadata/data in a deterministic order that matches the original insertion order (e.g., slicing full_data["tensor_f32"][:10] to represent indices 0-9). In the current implementation, force_fetch uses IndexManager.get_indexes_for_partition() which returns a list built from a set, so ordering is not guaranteed. Please sort/reorder the returned BatchMeta by global_indexes (or build an index->row mapping) before doing positional slicing assertions.
| # 6. Verify region 0-9: original Put A values | |
| original_data_0_9 = generate_complex_data(list(range(0, 10))) | |
| assert torch.allclose(full_data["tensor_f32"][:10], original_data_0_9["tensor_f32"]), ( | |
| "Region 0-9 tensor_f32 should match original Put A" | |
| ) | |
| # 7. Verify region 10-29: updated values (using offset indices 1010-1029) | |
| updated_expected = generate_complex_data([i + 1000 for i in range(10, 30)]) | |
| assert torch.allclose(full_data["tensor_f32"][10:30], updated_expected["tensor_f32"]), ( | |
| "Region 10-29 tensor_f32 should match updated values" | |
| ) | |
| # 8. Verify region 30-39: original Put B values | |
| original_data_30_39 = generate_complex_data(list(range(30, 40))) | |
| assert torch.allclose(full_data["tensor_f32"][30:40], original_data_30_39["tensor_f32"]), ( | |
| # The order of rows returned by force_fetch is not guaranteed. | |
| # Sort by global_indexes so that position i corresponds to global index i. | |
| sorted_order = np.argsort(full_meta.global_indexes) | |
| tensor_f32_sorted = full_data["tensor_f32"][sorted_order] | |
| # 6. Verify region 0-9: original Put A values | |
| original_data_0_9 = generate_complex_data(list(range(0, 10))) | |
| assert torch.allclose(tensor_f32_sorted[:10], original_data_0_9["tensor_f32"]), ( | |
| "Region 0-9 tensor_f32 should match original Put A" | |
| ) | |
| # 7. Verify region 10-29: updated values (using offset indices 1010-1029) | |
| updated_expected = generate_complex_data([i + 1000 for i in range(10, 30)]) | |
| assert torch.allclose(tensor_f32_sorted[10:30], updated_expected["tensor_f32"]), ( | |
| "Region 10-29 tensor_f32 should match updated values" | |
| ) | |
| # 8. Verify region 30-39: original Put B values | |
| original_data_30_39 = generate_complex_data(list(range(30, 40))) | |
| assert torch.allclose(tensor_f32_sorted[30:40], original_data_30_39["tensor_f32"]), ( |
| # 1. Put Data | ||
| data = generate_complex_data(list(range(batch_size))) | ||
| client.put(data=data, partition_id=partition_id) | ||
|
|
||
| # 2. Verify Data Exists - production status should be True | ||
| is_ready = client.check_production_status(data_fields=fields, partition_id=partition_id) | ||
| assert is_ready, "Data should be ready after put" | ||
|
|
||
| # 3. Get Data to confirm accessibility | ||
| meta = poll_for_meta(client, partition_id, fields, batch_size, task_name, mode="force_fetch") | ||
| assert meta is not None and meta.size == batch_size, "Failed to poll metadata" | ||
|
|
||
| # 4. Verify partition exists before clear | ||
| partition_list_before = client.get_partition_list() | ||
| assert partition_id in partition_list_before, "Partition should exist before clear" | ||
|
|
||
| # 5. Clear Partition | ||
| client.clear_partition(partition_id) | ||
|
|
||
| # 6. Verify partition is removed from list | ||
| partition_list_after = client.get_partition_list() | ||
| assert partition_id not in partition_list_after, "Partition should be removed after clear" | ||
|
|
||
| # 7. Verify Production Status returns False for cleared partition | ||
| is_ready_after_clear = client.check_production_status(data_fields=fields, partition_id=partition_id) | ||
| assert not is_ready_after_clear, "Production status should be False after clear" |
There was a problem hiding this comment.
test_clear_partition doesn’t have a try/finally cleanup. If an assertion fails before client.clear_partition(partition_id) runs, the partition can leak into later tests in this module (shared e2e_client) and cause cascading failures. Please wrap the body in try/finally and call clear_partition in the finally block.
| # 1. Put Data | |
| data = generate_complex_data(list(range(batch_size))) | |
| client.put(data=data, partition_id=partition_id) | |
| # 2. Verify Data Exists - production status should be True | |
| is_ready = client.check_production_status(data_fields=fields, partition_id=partition_id) | |
| assert is_ready, "Data should be ready after put" | |
| # 3. Get Data to confirm accessibility | |
| meta = poll_for_meta(client, partition_id, fields, batch_size, task_name, mode="force_fetch") | |
| assert meta is not None and meta.size == batch_size, "Failed to poll metadata" | |
| # 4. Verify partition exists before clear | |
| partition_list_before = client.get_partition_list() | |
| assert partition_id in partition_list_before, "Partition should exist before clear" | |
| # 5. Clear Partition | |
| client.clear_partition(partition_id) | |
| # 6. Verify partition is removed from list | |
| partition_list_after = client.get_partition_list() | |
| assert partition_id not in partition_list_after, "Partition should be removed after clear" | |
| # 7. Verify Production Status returns False for cleared partition | |
| is_ready_after_clear = client.check_production_status(data_fields=fields, partition_id=partition_id) | |
| assert not is_ready_after_clear, "Production status should be False after clear" | |
| try: | |
| # 1. Put Data | |
| data = generate_complex_data(list(range(batch_size))) | |
| client.put(data=data, partition_id=partition_id) | |
| # 2. Verify Data Exists - production status should be True | |
| is_ready = client.check_production_status(data_fields=fields, partition_id=partition_id) | |
| assert is_ready, "Data should be ready after put" | |
| # 3. Get Data to confirm accessibility | |
| meta = poll_for_meta(client, partition_id, fields, batch_size, task_name, mode="force_fetch") | |
| assert meta is not None and meta.size == batch_size, "Failed to poll metadata" | |
| # 4. Verify partition exists before clear | |
| partition_list_before = client.get_partition_list() | |
| assert partition_id in partition_list_before, "Partition should exist before clear" | |
| # 5. Clear Partition | |
| client.clear_partition(partition_id) | |
| # 6. Verify partition is removed from list | |
| partition_list_after = client.get_partition_list() | |
| assert partition_id not in partition_list_after, "Partition should be removed after clear" | |
| # 7. Verify Production Status returns False for cleared partition | |
| is_ready_after_clear = client.check_production_status(data_fields=fields, partition_id=partition_id) | |
| assert not is_ready_after_clear, "Production status should be False after clear" | |
| finally: | |
| # Ensure partition is cleared even if assertions above fail | |
| try: | |
| client.clear_partition(partition_id) | |
| except Exception: | |
| # Best-effort cleanup; ignore errors during teardown | |
| pass |
| """Create a client with 2 storage units for lifecycle testing.""" | ||
| controller_actor = TransferQueueController.options( | ||
| name="lifecycle_controller", | ||
| get_if_exists=True, | ||
| ).remote(polling_mode=True) | ||
| controller_info = ray.get(controller_actor.get_zmq_server_info.remote()) | ||
|
|
||
| # Two storage units to ensure sharding | ||
| storage_actor_1 = SimpleStorageUnit.options( | ||
| name="lifecycle_storage_1", | ||
| get_if_exists=True, | ||
| ).remote(storage_unit_size=10000) | ||
| storage_info_1 = ray.get(storage_actor_1.get_zmq_server_info.remote()) | ||
|
|
||
| storage_actor_2 = SimpleStorageUnit.options( | ||
| name="lifecycle_storage_2", | ||
| get_if_exists=True, | ||
| ).remote(storage_unit_size=10000) | ||
| storage_info_2 = ray.get(storage_actor_2.get_zmq_server_info.remote()) | ||
|
|
||
| client = TransferQueueClient( | ||
| client_id="lifecycle_test_client", | ||
| controller_info=controller_info, | ||
| ) | ||
|
|
There was a problem hiding this comment.
The initialization can be simplified now through transfer_queue.init(config). It also helps to test the initialization in e2e
| # Nested Tensor (Strided) - fallback to jagged if not supported | ||
| try: | ||
| strided_tensors = [torch.full((3, 4), float(i)) for i in indices] | ||
| nested_strided = torch.nested.nested_tensor(strided_tensors, layout=torch.strided) | ||
| except (TypeError, RuntimeError): | ||
| strided_tensors = [torch.full((3, 4), float(i)) for i in indices] | ||
| nested_strided = torch.nested.as_nested_tensor(strided_tensors, layout=torch.jagged) |
There was a problem hiding this comment.
Need to make sure this is strided. No silent fallback
| """Verify nested tensors element by element.""" | ||
| if len(retrieved.unbind()) != len(expected.unbind()): | ||
| return False | ||
| for r, e in zip(retrieved.unbind(), expected.unbind(), strict=False): |
|
|
||
|
|
||
| def verify_list_equal(retrieved, expected) -> bool: | ||
| """Verify list content, handling possible Tensor conversion.""" |
There was a problem hiding this comment.
Why it can become Tensor? Should not change the data type.
There was a problem hiding this comment.
TensorDict automatically converts Python list[int] / list[float] to torch.Tensor at construction time.For list[str]/list[dict], TensorDict stores them as NonTensorData with no type change.
| # ============================================================================= | ||
| # Scenario Two: Cross-Partition & Complex Update | ||
| # ============================================================================= | ||
| def test_cross_partition_complex_update(e2e_client): |
There was a problem hiding this comment.
the name is a little bit misleading - the partition here is not the same concept in TransferQueue
There was a problem hiding this comment.
consider change to shard?
| 1. Round 1 Put: Indices 0-9, only Set_A fields -> Check production(Set_A)=True, production(Set_B)=False | ||
| 2. Round 2 Put: Indices 0-9, complete Set_B fields -> Check production(Set_A+Set_B)=True |
There was a problem hiding this comment.
These descriptions are not easy to undersand
| retrieved_data = client.get_data(meta) | ||
| assert retrieved_data.batch_size[0] == batch_size, "Retrieved data batch_size mismatch" | ||
|
|
||
| # 4. Post-Consumption Status Check - should be True | ||
| is_consumed_after = client.check_consumption_status(task_name=task_name, partition_id=partition_id) | ||
| assert is_consumed_after, "Data should be consumed after get_data" |
There was a problem hiding this comment.
Is this logic correct? get_data will not trigger consumption label. get_meta will.
| partition_list_after = client.get_partition_list() | ||
| assert partition_id not in partition_list_after, "Partition should be removed after clear" | ||
|
|
||
| # 7. Verify Production Status returns False for cleared partition |
There was a problem hiding this comment.
also need to check consumption status
| assert not is_ready_after_clear, "Production status should be False after clear" | ||
|
|
||
|
|
||
| if __name__ == "__main__": |
There was a problem hiding this comment.
And please also add additional tests for our high-level kv API provided in interface.py~
|
Great work! |
- Added tests/test_e2e_consistency.py covering core types, multi-round puts, and slicing. - Updated terminology to Standard/Complex groups. - Verified cross-batch put scenarios. - Fixed linter errors. Signed-off-by: 看我72遍 <m.pb@msn.com>
Signed-off-by: 看我72遍 <m.pb@msn.com>
…ation Signed-off-by: 看我72遍 <m.pb@msn.com>
Signed-off-by: 看我72遍 <m.pb@msn.com>
Signed-off-by: 看我72遍 <m.pb@msn.com>
…cleanup Signed-off-by: 看我72遍 <m.pb@msn.com>
Signed-off-by: 看我72遍 <m.pb@msn.com>
…anup - Add runtime assertion in generate_complex_data to ensure field_values keys exactly match DEFAULT_FIELDS, preventing silent field mismatches - Build TensorDict via dict comprehension keyed by DEFAULT_FIELDS order - Extract inline reorder logic into reusable _reorder_tensordict helper - Remove redundant section separator comments (=== banners) - Add missing assertions for tensor_bf16 and list_obj in core consistency - Rename test_cross_partition_complex_update -> test_cross_shard_complex_update - Improve verify_list_equal docstring with TensorDict conversion note Signed-off-by: 看我72遍 <m.pb@msn.com>
845b85c to
c447faf
Compare
CLA Signature Passmpb159753, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert verify_list_equal(retrieved_data["list_str"], original_data["list_str"]), "list_str mismatch" | ||
| assert verify_list_equal(retrieved_data["list_obj"], original_data["list_obj"]), "list_obj mismatch" |
There was a problem hiding this comment.
list_str and list_obj are written as non-tensor fields, but SimpleStorageManager.get_data() will materialize non-tensor batches as NonTensorStack (not torch.Tensor / Python list). verify_list_equal() only converts torch.Tensor via .tolist(), so these assertions will compare NonTensorStack to the original Python lists and likely fail. Consider switching these checks to verify_non_tensor_data() (or normalizing both sides via .tolist() when available).
| assert verify_list_equal(retrieved_data["list_str"], original_data["list_str"]), "list_str mismatch" | |
| assert verify_list_equal(retrieved_data["list_obj"], original_data["list_obj"]), "list_obj mismatch" | |
| assert verify_non_tensor_data(retrieved_data["list_str"], original_data["list_str"]), "list_str mismatch" | |
| assert verify_non_tensor_data(retrieved_data["list_obj"], original_data["list_obj"]), "list_obj mismatch" |
|
|
||
| # 7. Verify NumPy Arrays | ||
| assert np.allclose(retrieved_data["np_array"], original_data["np_array"]), "np_array mismatch" | ||
| assert np.array_equal(retrieved_data["np_obj"], original_data["np_obj"]), "np_obj mismatch" |
There was a problem hiding this comment.
np_obj is an object-dtype NumPy array; per-sample reads/writes through SimpleStorage will round-trip it as a non-tensor batch (typically NonTensorStack of Python objects), not necessarily a NumPy array with the same dtype/shape. Using np.array_equal(retrieved_data["np_obj"], original_data["np_obj"]) is therefore unlikely to be a valid equality check. Normalize the retrieved value (e.g., .tolist()) and compare to original_data["np_obj"].tolist(), or store per-sample object arrays explicitly and assert against that representation.
| assert np.array_equal(retrieved_data["np_obj"], original_data["np_obj"]), "np_obj mismatch" | |
| original_np_obj_list = original_data["np_obj"].tolist() | |
| retrieved_np_obj = retrieved_data["np_obj"] | |
| if hasattr(retrieved_np_obj, "tolist"): | |
| retrieved_np_obj_list = retrieved_np_obj.tolist() | |
| else: | |
| retrieved_np_obj_list = list(retrieved_np_obj) | |
| assert retrieved_np_obj_list == original_np_obj_list, "np_obj mismatch" |
| def _reorder_tensordict(td: TensorDict, order: list[int]) -> TensorDict: | ||
| """Reorder a TensorDict by the given index order. | ||
|
|
||
| Handles regular tensors, nested/jagged tensors, lists, and other indexable types. | ||
| """ | ||
| reordered = {} | ||
| for key in td.keys(): | ||
| field = td[key] | ||
| if hasattr(field, "unbind"): | ||
| items = field.unbind(0) | ||
| reordered_items = [items[i] for i in order] | ||
| try: | ||
| reordered[key] = torch.stack(reordered_items) | ||
| except RuntimeError: | ||
| reordered[key] = torch.nested.as_nested_tensor(reordered_items, layout=field.layout) | ||
| elif isinstance(field, list): | ||
| reordered[key] = [field[i] for i in order] | ||
| else: | ||
| reordered[key] = field[torch.tensor(order)] | ||
| return TensorDict(reordered, batch_size=td.batch_size) |
There was a problem hiding this comment.
_reorder_tensordict() treats any field with .unbind() as tensor-like and tries torch.stack(reordered_items). For non-tensor batches returned as NonTensorStack, .unbind() exists but the unbound items are not plain torch.Tensors, so torch.stack(...) can raise TypeError (not caught here). This can break the cross-shard test when reordering is needed. Handle NonTensorStack explicitly (e.g., index it directly or reorder via .tolist()), and/or broaden the exception handling to include TypeError.
| # 9. Verify new fields exist in update region | ||
| extended_fields = base_fields + ["new_extra_tensor", "new_extra_non_tensor"] | ||
| update_region_meta = poll_for_meta( | ||
| client, partition_id, extended_fields, 20, "update_region_task", mode="force_fetch" | ||
| ) | ||
| if update_region_meta is not None and update_region_meta.size > 0: | ||
| update_region_data = client.get_data(update_region_meta) | ||
| assert "new_extra_tensor" in update_region_data.keys(), "new_extra_tensor should exist" | ||
| assert "new_extra_non_tensor" in update_region_data.keys(), "new_extra_non_tensor should exist" |
There was a problem hiding this comment.
In the cross-shard update test, the “verify new fields exist in update region” step doesn’t actually ensure the returned batch corresponds to the updated global index range (10–29), and "new_extra_tensor" in update_region_data.keys() will always be true because those fields were requested. To make this meaningful, assert the returned global_indexes are within the update range (or select the exact updated samples) and validate the retrieved values for the new fields for those samples.
| # 9. Verify new fields exist in update region | |
| extended_fields = base_fields + ["new_extra_tensor", "new_extra_non_tensor"] | |
| update_region_meta = poll_for_meta( | |
| client, partition_id, extended_fields, 20, "update_region_task", mode="force_fetch" | |
| ) | |
| if update_region_meta is not None and update_region_meta.size > 0: | |
| update_region_data = client.get_data(update_region_meta) | |
| assert "new_extra_tensor" in update_region_data.keys(), "new_extra_tensor should exist" | |
| assert "new_extra_non_tensor" in update_region_data.keys(), "new_extra_non_tensor should exist" | |
| # 9. Verify new fields exist in update region and have correct values | |
| extended_fields = base_fields + ["new_extra_tensor", "new_extra_non_tensor"] | |
| update_region_meta = poll_for_meta( | |
| client, partition_id, extended_fields, 20, "update_region_task", mode="force_fetch" | |
| ) | |
| if update_region_meta is not None and update_region_meta.size > 0: | |
| # Ensure we are actually reading from the updated global index range (10–29) | |
| assert all( | |
| 10 <= gi < 30 for gi in update_region_meta.global_indexes | |
| ), f"Fetched indexes {list(update_region_meta.global_indexes)} are not within the update region 10–29" | |
| update_region_data = client.get_data(update_region_meta) | |
| # New fields should exist in the retrieved data | |
| assert "new_extra_tensor" in update_region_data.keys(), "new_extra_tensor should exist in update region data" | |
| assert "new_extra_non_tensor" in update_region_data.keys(), "new_extra_non_tensor should exist in update region data" | |
| # Map fetched global indexes back to positions within the update_data tensor dict | |
| fetched_global_indexes = list(update_region_meta.global_indexes) | |
| relative_update_positions = [idx_update.index(gi) for gi in fetched_global_indexes] | |
| # Validate tensor field values for the updated samples | |
| expected_new_tensor = update_data["new_extra_tensor"][relative_update_positions] | |
| actual_new_tensor = update_region_data["new_extra_tensor"] | |
| assert torch.allclose( | |
| actual_new_tensor, expected_new_tensor | |
| ), "new_extra_tensor values in update region do not match updated data" | |
| # Validate non-tensor field values for the updated samples | |
| expected_new_non_tensor = [ | |
| update_data["new_extra_non_tensor"][i] for i in relative_update_positions | |
| ] | |
| actual_new_non_tensor = list(update_region_data["new_extra_non_tensor"]) | |
| assert ( | |
| actual_new_non_tensor == expected_new_non_tensor | |
| ), "new_extra_non_tensor values in update region do not match updated data" |
| if not ray.is_initialized(): | ||
| ray.init(ignore_reinit_error=True) | ||
| yield | ||
| if ray.is_initialized(): |
There was a problem hiding this comment.
The module-level Ray fixture always calls ray.shutdown() at teardown, even if Ray was already initialized by another test/fixture in the same worker. This can cause cross-module interference. Consider tracking whether this fixture performed the init and only shutting down in that case, and also setting a dedicated namespace (as done in other E2E tests) to avoid actor-name collisions.
| if not ray.is_initialized(): | |
| ray.init(ignore_reinit_error=True) | |
| yield | |
| if ray.is_initialized(): | |
| did_init = False | |
| if not ray.is_initialized(): | |
| ray.init( | |
| ignore_reinit_error=True, | |
| namespace="transfer_queue_e2e_lifecycle_consistency", | |
| ) | |
| did_init = True | |
| yield | |
| if did_init and ray.is_initialized(): |
CLA Signature Passmpb159753, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
Signed-off-by: 看我72遍 <m.pb@msn.com>
2eaf7dc to
0ac69a2
Compare
CLA Signature Passmpb159753, thanks for your pull request. All authors of the commits have signed the CLA. 👍 |
CLA Signature Guide@mpb159753 , thanks for your pull request. The following commit(s) are not associated with a signed Contributor License Agreement (CLA).
To sign CLA, click here. To check if your email is configured correctly, refer to the FAQs. Once you've signed the CLA or updating your email, please comment |
|
closed as already merged in Gitcode: https://gitcode.com/Ascend/TransferQueue/pull/25 |

Test Case Design
Same as https://gitcode.com/Ascend/TransferQueue/pull/25
Only for review
This PR introduces a new E2E consistency test suite
tests/test_e2e_consistency.pyto comprehensively verify the read/write consistency ofTransferQueuein complex data scenarios. The tests cover the following three core scenarios:Core Data Types (
test_consistency_core_types)Multi-round Put & Field Merge (
test_consistency_multi_round_put_get)Slicing and Field Subsetting (
test_consistency_slicing_and_subset)force_fetchmode.Source Code Modifications
Client API Update: Modified
TransferQueueClient.get_metaintransfer_queue/client.py. Exposedmodeparameter (default="fetch") to supportforce_fetchandinsertmodes in synchronous client, aligning capabilities withasync_get_meta.The
get_metaupdate allows the synchronous client to perform specialized operations likeforce_fetch(used in inspection tests) andinsert(used in test data allocation), which were previously only available in the async API.Discussion
/transfer_queue/client.py:L1044):get_metastrictly follows the parameter order ofasync_get_meta, withmodelocated in the middle (data_fields,batch_size,partition_id,mode, ...).modeto the end of the parameter list to improve external interface compatibility? Or prioritizing consistency between Sync/Async signatures?