-
Notifications
You must be signed in to change notification settings - Fork 22
Open
Description
source code is:
import random
import lance_namespace as ln
import ray
from ray.data.datatype import DataType
from ray.data.expressions import udf
from ray.data.expressions import col
from lance_ray import read_lance, write_lance
# 初始化 Ray 本地集群
ray.init(
num_cpus=4,
ignore_reinit_error=True,
include_dashboard=False
)
print("Ray 本地集群初始化成功")
warehouse_dir = "hdfs://test-ns/user/platform_rd_two/zhb"
hms_address = "thrift://hivemetastoretest-001:9083"
lance_hive_table = "lance.lance_test_database.test_overwrite_table1"
ln.register_namespace_impl("hive2", "lance_namespace_impls.hive2.Hive2Namespace")
namespace = ln.connect("hive2", {"uri": hms_address,
"root": warehouse_dir})
ds = read_lance(
namespace=namespace,
table_id=["lance_test_database", "test_overwrite_table1"]
)
# print the schema of the dataset.
print(ds.schema())
# print data
print(ds.show())
ds_with_id_2=ds.with_column("id_2", col("id") * 2)
print(ds_with_id_2.show())
write_lance(
ds_with_id_2,
namespace=namespace,
table_id=["lance_test_database", "test_overwrite_table1"],
mode="overwrite"
)exception is :
Running 0: 0.00 row [00:00, ? row/s] 2026-01-29 17:13:43,772 ERROR streaming_executor_state.py:625 -- An exception was raised from a task of operator "Project->Write". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
Traceback (most recent call last):0, ? row/s]
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 593, in process_completed_tasks
bytes_read = task.on_data_ready(
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 199, in on_data_ready
raise ex from None
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 194, in on_data_ready
ray.get(self._pending_block_ref)
File "/data/miniforge3/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
return fn(*args, **kwargs)
File "/data/miniforge3/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
return func(*args, **kwargs)
File "/data/miniforge3/lib/python3.10/site-packages/ray/_private/worker.py", line 2972, in get
values, debugger_breakpoint = worker.get_objects(
File "/data/miniforge3/lib/python3.10/site-packages/ray/_private/worker.py", line 1031, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(OSError): ray::Project->Write() (pid=21872, ip=172.17.0.6)
for b_out in map_transformer.apply_transform(block_iter, ctx):
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
yield from self._post_process(results)
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
yield from self._block_fn(blocks, ctx)
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_write_op.py", line 50, in fn
block_accessors = [BlockAccessor.for_block(block) for block in blocks]
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_write_op.py", line 50, in <listcomp>
block_accessors = [BlockAccessor.for_block(block) for block in blocks]
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
yield from self._post_process(results)
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
yield from self._block_fn(blocks, ctx)
File "/data/miniforge3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_write_op.py", line 32, in fn
ctx.kwargs["_datasink_write_return"] = datasink_or_legacy_datasource.write(
File "/data/miniforge3/lib/python3.10/site-packages/lance_ray/datasink.py", line 255, in write
fragments_and_schema = write_fragment(
File "/data/miniforge3/lib/python3.10/site-packages/lance_ray/fragment.py", line 82, in write_fragment
fragments = call_with_retry(
File "/data/miniforge3/lib/python3.10/site-packages/ray/_common/retry.py", line 62, in call_with_retry
raise e from None
File "/data/miniforge3/lib/python3.10/site-packages/ray/_common/retry.py", line 40, in call_with_retry
return f(*args, **kwargs)
File "/data/miniforge3/lib/python3.10/site-packages/lance_ray/fragment.py", line 83, in <lambda>
lambda: write_fragments(
File "/data/miniforge3/lib/python3.10/site-packages/lance/fragment.py", line 1031, in write_fragments
return function(
OSError: Append with different schema: fields did not match, missing=[], unexpected=[id_2], location: /data/git/lance/rust/lance-core/src/datatypes/schema.rs:159:27Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels