diff --git a/BUILD.bazel b/BUILD.bazel index 66db720..54b878d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -20,6 +20,10 @@ filegroup( srcs = [ "src/dataflow/core/execution/csv.cc", "src/dataflow/core/execution/csv.h", + "src/dataflow/core/execution/nanoarrow_ipc_codec.cc", + "src/dataflow/core/execution/nanoarrow_ipc_codec.h", + "src/dataflow/core/execution/source_materialization.cc", + "src/dataflow/core/execution/source_materialization.h", "src/dataflow/core/execution/table.cc", "src/dataflow/core/execution/table.h", "src/dataflow/core/execution/value.cc", @@ -106,6 +110,8 @@ cc_library( "src/dataflow/core/execution/value.cc", "src/dataflow/core/execution/table.cc", "src/dataflow/core/execution/csv.cc", + "src/dataflow/core/execution/nanoarrow_ipc_codec.cc", + "src/dataflow/core/execution/source_materialization.cc", "src/dataflow/core/contract/catalog/catalog.cc", "src/dataflow/core/contract/api/dataframe.cc", "src/dataflow/core/contract/api/session.cc", @@ -129,6 +135,8 @@ cc_library( "src/dataflow/core/execution/value.h", "src/dataflow/core/execution/table.h", "src/dataflow/core/execution/csv.h", + "src/dataflow/core/execution/nanoarrow_ipc_codec.h", + "src/dataflow/core/execution/source_materialization.h", "src/dataflow/core/contract/catalog/catalog.h", "src/dataflow/core/contract/api/dataframe.h", "src/dataflow/core/contract/api/session.h", @@ -153,6 +161,7 @@ cc_library( "src/dataflow/core/contract/source_sink_abi.h", "src/dataflow/core/execution/stream/stream.h", ], + deps = ["@nanoarrow_src//:nanoarrow"], visibility = ["//visibility:public"], ) @@ -165,6 +174,7 @@ test_suite( ":stream_runtime_test", ":stream_strategy_explain_test", ":vector_runtime_test", + ":source_materialization_test", ], ) @@ -407,3 +417,9 @@ cc_test( ":dataflow_core", ], ) + +cc_test( + name = "source_materialization_test", + srcs = ["src/dataflow/tests/source_materialization_test.cc"], + deps = [":dataflow_core"], +) diff --git a/MODULE.bazel b/MODULE.bazel index f904df0..bfc84ce 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -6,6 +6,62 @@ bazel_dep(name = "rules_python", version = "1.7.0") local_python_configure = use_repo_rule("//tools:python_configure.bzl", "local_python_configure") local_python_configure(name = "velaria_local_python") +http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +http_archive( + name = "nanoarrow_src", + build_file_content = """load("@rules_cc//cc:defs.bzl", "cc_library") + +genrule( + name = "nanoarrow_config_header", + srcs = ["src/nanoarrow/nanoarrow_config.h.in"], + outs = ["src/nanoarrow/nanoarrow_config.h"], + cmd = ("sed " + + "-e 's/@NANOARROW_VERSION_MAJOR@/0/g' " + + "-e 's/@NANOARROW_VERSION_MINOR@/8/g' " + + "-e 's/@NANOARROW_VERSION_PATCH@/0/g' " + + "-e 's/@NANOARROW_VERSION@/0.8.0/g' " + + "-e 's/@NANOARROW_NAMESPACE_DEFINE@//g' " + + "$< >$@"), +) + +cc_library( + name = "nanoarrow", + srcs = [ + "src/nanoarrow/common/array.c", + "src/nanoarrow/common/array_stream.c", + "src/nanoarrow/common/schema.c", + "src/nanoarrow/common/utils.c", + "src/nanoarrow/ipc/codecs.c", + "src/nanoarrow/ipc/decoder.c", + "src/nanoarrow/ipc/encoder.c", + "src/nanoarrow/ipc/reader.c", + "src/nanoarrow/ipc/writer.c", + "thirdparty/flatcc/src/runtime/builder.c", + "thirdparty/flatcc/src/runtime/emitter.c", + "thirdparty/flatcc/src/runtime/refmap.c", + "thirdparty/flatcc/src/runtime/verifier.c", + ], + hdrs = glob([ + "src/nanoarrow/**/*.h", + "thirdparty/flatcc/include/**/*.h", + ]) + [":nanoarrow_config_header"], + includes = [ + "src", + "thirdparty/flatcc/include", + ], + linkstatic = True, + visibility = ["//visibility:public"], +) +""", + sha256 = "1c5136edf5c1e9cd8c47c4e31dfe07d0e09c25f27be20c8d6e78a0f4a4ed3fae", + strip_prefix = "arrow-nanoarrow-apache-arrow-nanoarrow-0.8.0", + type = "tar.gz", + urls = [ + "https://codeload.github.com/apache/arrow-nanoarrow/tar.gz/refs/tags/apache-arrow-nanoarrow-0.8.0", + ], +) + python = use_extension("@rules_python//python/extensions:python.bzl", "python") python.toolchain(python_version = "3.12") use_repo(python, "python_3_12") diff --git a/python_api/BUILD.bazel b/python_api/BUILD.bazel index a21947e..2fec7f9 100644 --- a/python_api/BUILD.bazel +++ b/python_api/BUILD.bazel @@ -199,6 +199,16 @@ py_test( ], ) +py_test( + name = "source_materialization_test", + srcs = ["tests/test_source_materialization.py"], + main = "tests/test_source_materialization.py", + imports = ["."], + deps = [ + ":velaria_py_pkg", + ], +) + py_test( name = "bitable_stream_source_test", srcs = ["tests/test_bitable_stream_source.py"], diff --git a/python_api/tests/test_source_materialization.py b/python_api/tests/test_source_materialization.py new file mode 100644 index 0000000..75d0483 --- /dev/null +++ b/python_api/tests/test_source_materialization.py @@ -0,0 +1,90 @@ +import os +import pathlib +import tempfile +import time +import unittest +from contextlib import contextmanager + +import velaria + + +@contextmanager +def _temporary_env(updates): + previous = {key: os.environ.get(key) for key in updates} + try: + for key, value in updates.items(): + os.environ[key] = value + yield + finally: + for key, value in previous.items(): + if value is None: + os.environ.pop(key, None) + else: + os.environ[key] = value + + +class SourceMaterializationTest(unittest.TestCase): + @staticmethod + def _build_large_csv() -> str: + lines = ["id,name,amount"] + for index in range(1, 20001): + lines.append(f"{index},user_{index},{index * 1.25:.2f}") + return "\n".join(lines) + "\n" + + def _assert_sql_after_materialization_is_reused(self, format_name: str, expected_filename: str): + with tempfile.TemporaryDirectory(prefix="velaria-source-materialization-") as tmp: + root = pathlib.Path(tmp) + csv_path = root / "input.csv" + cache_dir = root / "cache" + csv_path.write_text(self._build_large_csv(), encoding="utf-8") + + with _temporary_env( + { + "VELARIA_MATERIALIZATION_DIR": str(cache_dir), + "VELARIA_MATERIALIZATION_FORMAT": format_name, + } + ): + first_session = velaria.Session() + first_df = first_session.read_csv(str(csv_path)) + first_session.create_temp_view("source_input", first_df) + first = first_session.sql( + "SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount " + "FROM source_input " + "WHERE id > 15000" + ).to_rows() + data_files = list(cache_dir.rglob(expected_filename)) + self.assertEqual(len(data_files), 1) + meta_files = list(cache_dir.rglob("meta.txt")) + self.assertEqual(len(meta_files), 1) + first_mtime = data_files[0].stat().st_mtime_ns + + time.sleep(0.05) + second_session = velaria.Session() + second_df = second_session.read_csv(str(csv_path)) + second_session.create_temp_view("source_input", second_df) + second = second_session.sql( + "SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount " + "FROM source_input " + "WHERE id > 15000" + ).to_rows() + second_mtime = data_files[0].stat().st_mtime_ns + + self.assertEqual(first["schema"], ["row_count", "total_amount"]) + self.assertEqual( + first["rows"], + [ + [5000, 109378125.0], + ], + ) + self.assertEqual(second, first) + self.assertEqual(first_mtime, second_mtime) + + def test_binary_row_batch_materialization_reuses_cached_file(self): + self._assert_sql_after_materialization_is_reused("binary_row_batch", "table.bin") + + def test_nanoarrow_ipc_materialization_reuses_cached_file(self): + self._assert_sql_after_materialization_is_reused("nanoarrow_ipc", "table.nanoarrow") + + +if __name__ == "__main__": + unittest.main() diff --git a/src/dataflow/core/contract/api/session.cc b/src/dataflow/core/contract/api/session.cc index 6eb79db..bd6d8ac 100644 --- a/src/dataflow/core/contract/api/session.cc +++ b/src/dataflow/core/contract/api/session.cc @@ -12,6 +12,7 @@ #include #include "src/dataflow/core/execution/csv.h" +#include "src/dataflow/core/execution/source_materialization.h" #include "src/dataflow/ai/plugin_runtime.h" namespace dataflow { @@ -73,7 +74,26 @@ DataflowSession& DataflowSession::builder() { } DataFrame DataflowSession::read_csv(const std::string& path, char delimiter) { - return DataFrame(load_csv(path, delimiter)); + std::ostringstream options; + options << "delimiter=" << delimiter; + const auto fingerprint = capture_file_source_fingerprint(path, "csv", options.str()); + const SourceMaterializationStore store(default_source_materialization_root(), + default_source_materialization_data_format()); + if (const auto entry = store.lookup(fingerprint); entry.has_value()) { + try { + return DataFrame(store.load(*entry)); + } catch (...) { + // Fall through to the CSV source path on cache corruption or decode errors. + } + } + + Table table = load_csv(fingerprint.abs_path, delimiter); + try { + store.save(fingerprint, table); + } catch (...) { + // CSV remains the source of truth; cache write failures must not change read semantics. + } + return DataFrame(std::move(table)); } StreamingDataFrame DataflowSession::readStream(std::shared_ptr source) { diff --git a/src/dataflow/core/execution/nanoarrow_ipc_codec.cc b/src/dataflow/core/execution/nanoarrow_ipc_codec.cc new file mode 100644 index 0000000..c386a03 --- /dev/null +++ b/src/dataflow/core/execution/nanoarrow_ipc_codec.cc @@ -0,0 +1,466 @@ +#include "src/dataflow/core/execution/nanoarrow_ipc_codec.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +namespace dataflow { + +namespace { + +enum class ColumnKind { + Int64, + Double, + String, + FixedVector, +}; + +struct ColumnLayout { + ColumnKind kind = ColumnKind::String; + int32_t fixed_vector_size = 0; +}; + +struct ArrowSchemaGuard { + ArrowSchemaGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowSchemaGuard() { + if (value.release != nullptr) { + value.release(&value); + } + } + ArrowSchema value; +}; + +struct ArrowArrayGuard { + ArrowArrayGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowArrayGuard() { + if (value.release != nullptr) { + value.release(&value); + } + } + ArrowArray value; +}; + +struct ArrowArrayViewGuard { + ArrowArrayViewGuard() { + ArrowArrayViewInitFromType(&value, NANOARROW_TYPE_UNINITIALIZED); + } + ~ArrowArrayViewGuard() { ArrowArrayViewReset(&value); } + ArrowArrayView value; +}; + +struct ArrowArrayStreamGuard { + ArrowArrayStreamGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowArrayStreamGuard() { + if (value.release != nullptr) { + value.release(&value); + } + } + ArrowArrayStream value; +}; + +struct ArrowIpcInputStreamGuard { + ArrowIpcInputStreamGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowIpcInputStreamGuard() { + if (value.release != nullptr) { + value.release(&value); + } + } + ArrowIpcInputStream value; +}; + +struct ArrowIpcOutputStreamGuard { + ArrowIpcOutputStreamGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowIpcOutputStreamGuard() { + if (value.release != nullptr) { + value.release(&value); + } + } + ArrowIpcOutputStream value; +}; + +struct ArrowIpcWriterGuard { + ArrowIpcWriterGuard() { std::memset(&value, 0, sizeof(value)); } + ~ArrowIpcWriterGuard() { ArrowIpcWriterReset(&value); } + ArrowIpcWriter value; +}; + +void throw_nanoarrow_status(ArrowErrorCode code, ArrowError* error, + const std::string& context) { + if (code == NANOARROW_OK) { + return; + } + std::string message = context; + if (error != nullptr && ArrowErrorMessage(error)[0] != '\0') { + message += ": "; + message += ArrowErrorMessage(error); + } else { + message += ": nanoarrow error code "; + message += std::to_string(static_cast(code)); + } + throw std::runtime_error(message); +} + +void throw_stream_status(ArrowErrorCode code, ArrowArrayStream* stream, + const std::string& context) { + if (code == NANOARROW_OK) { + return; + } + std::string message = context; + const char* last_error = + (stream != nullptr && stream->get_last_error != nullptr) ? stream->get_last_error(stream) + : nullptr; + if (last_error != nullptr && last_error[0] != '\0') { + message += ": "; + message += last_error; + } else { + message += ": nanoarrow stream error code "; + message += std::to_string(static_cast(code)); + } + throw std::runtime_error(message); +} + +struct FileCloser { + void operator()(std::FILE* file) const { + if (file != nullptr) { + std::fclose(file); + } + } +}; + +using FilePtr = std::unique_ptr; + +std::FILE* open_file_or_throw(const std::string& path, const char* mode) { + std::FILE* file = std::fopen(path.c_str(), mode); + if (file == nullptr) { + throw std::runtime_error("cannot open nanoarrow ipc file: " + path); + } + return file; +} + +ColumnLayout infer_column_layout(const Table& table, std::size_t column) { + bool seen_non_null = false; + ColumnLayout layout; + + for (const auto& row : table.rows) { + if (column >= row.size()) { + throw std::runtime_error("row width is smaller than schema width"); + } + + const auto& value = row[column]; + if (value.isNull()) { + continue; + } + + ColumnLayout candidate; + switch (value.type()) { + case DataType::Nil: + continue; + case DataType::Int64: + candidate.kind = ColumnKind::Int64; + break; + case DataType::Double: + candidate.kind = ColumnKind::Double; + break; + case DataType::String: + candidate.kind = ColumnKind::String; + break; + case DataType::FixedVector: + candidate.kind = ColumnKind::FixedVector; + candidate.fixed_vector_size = static_cast(value.asFixedVector().size()); + break; + } + + if (!seen_non_null) { + layout = candidate; + seen_non_null = true; + continue; + } + + if (layout.kind == candidate.kind) { + if (layout.kind == ColumnKind::FixedVector && + layout.fixed_vector_size != candidate.fixed_vector_size) { + throw std::runtime_error("nanoarrow materialization requires stable vector dimensions"); + } + continue; + } + + if ((layout.kind == ColumnKind::Int64 && candidate.kind == ColumnKind::Double) || + (layout.kind == ColumnKind::Double && candidate.kind == ColumnKind::Int64)) { + layout.kind = ColumnKind::Double; + continue; + } + + throw std::runtime_error("nanoarrow materialization requires stable column types"); + } + + if (!seen_non_null) { + layout.kind = ColumnKind::String; + } + return layout; +} + +ArrowType arrow_type_for_layout(const ColumnLayout& layout) { + switch (layout.kind) { + case ColumnKind::Int64: + return NANOARROW_TYPE_INT64; + case ColumnKind::Double: + return NANOARROW_TYPE_DOUBLE; + case ColumnKind::String: + return NANOARROW_TYPE_STRING; + case ColumnKind::FixedVector: + return NANOARROW_TYPE_FIXED_SIZE_LIST; + } + return NANOARROW_TYPE_STRING; +} + +ArrowStringView to_string_view(const std::string& value) { + ArrowStringView out; + out.data = value.data(); + out.size_bytes = static_cast(value.size()); + return out; +} + +void build_schema(const Table& table, const std::vector& layouts, + ArrowSchema* schema) { + ArrowSchemaInit(schema); + throw_nanoarrow_status( + ArrowSchemaSetTypeStruct(schema, static_cast(table.schema.fields.size())), + nullptr, "set struct schema type"); + + for (std::size_t i = 0; i < table.schema.fields.size(); ++i) { + ArrowSchema* child = schema->children[i]; + throw_nanoarrow_status(ArrowSchemaSetName(child, table.schema.fields[i].c_str()), nullptr, + "set child schema name"); + if (layouts[i].kind == ColumnKind::FixedVector) { + throw_nanoarrow_status( + ArrowSchemaSetTypeFixedSize(child, NANOARROW_TYPE_FIXED_SIZE_LIST, + layouts[i].fixed_vector_size), + nullptr, "set fixed-size-list schema type"); + throw_nanoarrow_status(ArrowSchemaSetName(child->children[0], "item"), nullptr, + "set fixed-size-list child schema name"); + throw_nanoarrow_status(ArrowSchemaSetType(child->children[0], NANOARROW_TYPE_FLOAT), + nullptr, "set fixed-size-list child schema type"); + } else { + throw_nanoarrow_status(ArrowSchemaSetType(child, arrow_type_for_layout(layouts[i])), + nullptr, "set child schema type"); + } + } +} + +void append_row_value(ArrowArray* child, const ColumnLayout& layout, const Value& value) { + if (value.isNull()) { + throw_nanoarrow_status(ArrowArrayAppendNull(child, 1), nullptr, + "append null nanoarrow value"); + return; + } + + switch (layout.kind) { + case ColumnKind::Int64: + throw_nanoarrow_status(ArrowArrayAppendInt(child, value.asInt64()), nullptr, + "append int64 nanoarrow value"); + return; + case ColumnKind::Double: + throw_nanoarrow_status(ArrowArrayAppendDouble(child, value.asDouble()), nullptr, + "append double nanoarrow value"); + return; + case ColumnKind::String: + throw_nanoarrow_status(ArrowArrayAppendString(child, to_string_view(value.asString())), + nullptr, "append string nanoarrow value"); + return; + case ColumnKind::FixedVector: { + const auto& vec = value.asFixedVector(); + if (static_cast(vec.size()) != layout.fixed_vector_size) { + throw std::runtime_error("nanoarrow materialization vector dimension mismatch"); + } + ArrowArray* values = child->children[0]; + for (float item : vec) { + throw_nanoarrow_status(ArrowArrayAppendDouble(values, static_cast(item)), + nullptr, "append fixed vector item"); + } + throw_nanoarrow_status(ArrowArrayFinishElement(child), nullptr, + "finish fixed vector element"); + return; + } + } +} + +Table table_from_nanoarrow_batch(const ArrowSchema* schema, const ArrowArray* array) { + ArrowError error; + std::memset(&error, 0, sizeof(error)); + + ArrowArrayViewGuard root_view; + throw_nanoarrow_status(ArrowArrayViewInitFromSchema(&root_view.value, schema, &error), &error, + "init nanoarrow array view"); + throw_nanoarrow_status(ArrowArrayViewSetArray(&root_view.value, array, &error), &error, + "bind nanoarrow array view"); + + Table table; + table.schema.fields.reserve(static_cast(schema->n_children)); + for (int64_t i = 0; i < schema->n_children; ++i) { + table.schema.fields.emplace_back(schema->children[i]->name); + table.schema.index[table.schema.fields.back()] = static_cast(i); + } + + std::vector child_schema_views(static_cast(schema->n_children)); + for (int64_t i = 0; i < schema->n_children; ++i) { + throw_nanoarrow_status( + ArrowSchemaViewInit(&child_schema_views[static_cast(i)], + schema->children[i], &error), + &error, "init child nanoarrow schema view"); + } + + table.rows.reserve(static_cast(array->length)); + for (int64_t row = 0; row < array->length; ++row) { + Row out_row; + out_row.reserve(static_cast(schema->n_children)); + for (int64_t col = 0; col < schema->n_children; ++col) { + const auto* child_view = root_view.value.children[col]; + if (ArrowArrayViewIsNull(child_view, row)) { + out_row.emplace_back(); + continue; + } + + const auto& child_schema_view = child_schema_views[static_cast(col)]; + switch (child_schema_view.storage_type) { + case NANOARROW_TYPE_INT64: + out_row.emplace_back(ArrowArrayViewGetIntUnsafe(child_view, row)); + break; + case NANOARROW_TYPE_DOUBLE: + out_row.emplace_back(ArrowArrayViewGetDoubleUnsafe(child_view, row)); + break; + case NANOARROW_TYPE_STRING: { + const auto string_view = ArrowArrayViewGetStringUnsafe(child_view, row); + out_row.emplace_back( + std::string(string_view.data, static_cast(string_view.size_bytes))); + break; + } + case NANOARROW_TYPE_FIXED_SIZE_LIST: { + std::vector vec; + vec.reserve(static_cast(child_schema_view.fixed_size)); + const auto* values_view = child_view->children[0]; + const int64_t base = row * static_cast(child_schema_view.fixed_size); + for (int32_t i = 0; i < child_schema_view.fixed_size; ++i) { + vec.push_back( + static_cast(ArrowArrayViewGetDoubleUnsafe(values_view, base + i))); + } + out_row.emplace_back(std::move(vec)); + break; + } + default: + throw std::runtime_error("unsupported nanoarrow column type in materialization"); + } + } + table.rows.push_back(std::move(out_row)); + } + + return table; +} + +} // namespace + +void save_nanoarrow_ipc_table(const Table& table, const std::string& path) { + ArrowError error; + std::memset(&error, 0, sizeof(error)); + + std::vector layouts; + layouts.reserve(table.schema.fields.size()); + for (std::size_t i = 0; i < table.schema.fields.size(); ++i) { + layouts.push_back(infer_column_layout(table, i)); + } + + ArrowSchemaGuard schema; + build_schema(table, layouts, &schema.value); + + ArrowArrayGuard array; + throw_nanoarrow_status(ArrowArrayInitFromSchema(&array.value, &schema.value, &error), &error, + "init nanoarrow array from schema"); + throw_nanoarrow_status(ArrowArrayStartAppending(&array.value), nullptr, + "start appending nanoarrow array"); + + for (const auto& row : table.rows) { + if (row.size() < table.schema.fields.size()) { + throw std::runtime_error("row width is smaller than schema width"); + } + for (std::size_t i = 0; i < table.schema.fields.size(); ++i) { + append_row_value(array.value.children[i], layouts[i], row[i]); + } + throw_nanoarrow_status(ArrowArrayFinishElement(&array.value), nullptr, + "finish struct row"); + } + throw_nanoarrow_status(ArrowArrayFinishBuildingDefault(&array.value, &error), &error, + "finish nanoarrow array"); + + FilePtr file(open_file_or_throw(path, "wb")); + ArrowIpcOutputStreamGuard output_stream; + throw_nanoarrow_status(ArrowIpcOutputStreamInitFile(&output_stream.value, file.get(), 1), + nullptr, "init nanoarrow output stream"); + file.release(); + + ArrowIpcWriterGuard writer; + throw_nanoarrow_status(ArrowIpcWriterInit(&writer.value, &output_stream.value), nullptr, + "init nanoarrow writer"); + + ArrowArrayViewGuard array_view; + throw_nanoarrow_status(ArrowArrayViewInitFromSchema(&array_view.value, &schema.value, &error), + &error, "init nanoarrow array view"); + throw_nanoarrow_status(ArrowArrayViewSetArray(&array_view.value, &array.value, &error), &error, + "set nanoarrow array view"); + + throw_nanoarrow_status(ArrowIpcWriterWriteSchema(&writer.value, &schema.value, &error), &error, + "write nanoarrow schema"); + throw_nanoarrow_status( + ArrowIpcWriterWriteArrayView(&writer.value, &array_view.value, &error), &error, + "write nanoarrow record batch"); + throw_nanoarrow_status(ArrowIpcWriterWriteArrayView(&writer.value, nullptr, &error), &error, + "write nanoarrow end-of-stream"); +} + +Table load_nanoarrow_ipc_table(const std::string& path) { + FilePtr file(open_file_or_throw(path, "rb")); + ArrowIpcInputStreamGuard input_stream; + throw_nanoarrow_status(ArrowIpcInputStreamInitFile(&input_stream.value, file.get(), 1), + nullptr, "init nanoarrow input stream"); + file.release(); + + ArrowIpcArrayStreamReaderOptions options; + std::memset(&options, 0, sizeof(options)); + options.field_index = -1; + options.use_shared_buffers = 0; + + ArrowArrayStreamGuard stream; + throw_nanoarrow_status( + ArrowIpcArrayStreamReaderInit(&stream.value, &input_stream.value, &options), nullptr, + "init nanoarrow ipc reader"); + + ArrowSchemaGuard schema; + throw_stream_status(stream.value.get_schema(&stream.value, &schema.value), &stream.value, + "read nanoarrow schema"); + + Table table; + while (true) { + ArrowArrayGuard array; + throw_stream_status(stream.value.get_next(&stream.value, &array.value), &stream.value, + "read nanoarrow record batch"); + if (array.value.release == nullptr) { + break; + } + + Table batch = table_from_nanoarrow_batch(&schema.value, &array.value); + if (table.schema.fields.empty()) { + table.schema = batch.schema; + } + table.rows.insert(table.rows.end(), std::make_move_iterator(batch.rows.begin()), + std::make_move_iterator(batch.rows.end())); + } + + return table; +} + +} // namespace dataflow diff --git a/src/dataflow/core/execution/nanoarrow_ipc_codec.h b/src/dataflow/core/execution/nanoarrow_ipc_codec.h new file mode 100644 index 0000000..1c01b51 --- /dev/null +++ b/src/dataflow/core/execution/nanoarrow_ipc_codec.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +#include "src/dataflow/core/execution/table.h" + +namespace dataflow { + +Table load_nanoarrow_ipc_table(const std::string& path); +void save_nanoarrow_ipc_table(const Table& table, const std::string& path); + +} // namespace dataflow diff --git a/src/dataflow/core/execution/source_materialization.cc b/src/dataflow/core/execution/source_materialization.cc new file mode 100644 index 0000000..d26c63c --- /dev/null +++ b/src/dataflow/core/execution/source_materialization.cc @@ -0,0 +1,361 @@ +#include "src/dataflow/core/execution/source_materialization.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/dataflow/core/execution/nanoarrow_ipc_codec.h" +#include "src/dataflow/core/execution/stream/binary_row_batch.h" + +namespace dataflow { + +namespace { + +constexpr const char* kRootEnv = "VELARIA_MATERIALIZATION_DIR"; +constexpr const char* kDataFormatEnv = "VELARIA_MATERIALIZATION_FORMAT"; +constexpr const char* kMetadataFile = "meta.txt"; + +std::uint64_t fnv1a64(const std::string& input) { + std::uint64_t hash = 1469598103934665603ULL; + for (unsigned char ch : input) { + hash ^= static_cast(ch); + hash *= 1099511628211ULL; + } + return hash; +} + +std::string hex64(std::uint64_t value) { + std::ostringstream out; + out << std::hex; + out.width(16); + out.fill('0'); + out << value; + return out.str(); +} + +std::uint64_t stat_mtime_ns(const struct stat& info) { +#if defined(__APPLE__) + return static_cast(info.st_mtimespec.tv_sec) * 1000000000ULL + + static_cast(info.st_mtimespec.tv_nsec); +#elif defined(_WIN32) + return static_cast(info.st_mtime) * 1000000000ULL; +#else + return static_cast(info.st_mtim.tv_sec) * 1000000000ULL + + static_cast(info.st_mtim.tv_nsec); +#endif +} + +std::optional parse_materialization_data_format( + const std::string& value) { + if (value == "binary" || value == "binary_row_batch") { + return MaterializationDataFormat::BinaryRowBatch; + } + if (value == "nanoarrow" || value == "nanoarrow_ipc") { + return MaterializationDataFormat::NanoArrowIpc; + } + return std::nullopt; +} + +std::string data_file_name(MaterializationDataFormat format) { + switch (format) { + case MaterializationDataFormat::BinaryRowBatch: + return "table.bin"; + case MaterializationDataFormat::NanoArrowIpc: + return "table.nanoarrow"; + } + return "table.bin"; +} + +std::optional read_entry(const std::string& metadata_path, + const std::string& root, + const std::string& source_id) { + std::ifstream in(metadata_path); + if (!in.is_open()) { + return std::nullopt; + } + + SourceMaterializationEntry entry; + entry.metadata_path = metadata_path; + + std::string line; + while (std::getline(in, line)) { + const auto pos = line.find('='); + if (pos == std::string::npos) { + continue; + } + const auto key = line.substr(0, pos); + const auto value = line.substr(pos + 1); + if (key == "source_id") { + entry.source_id = value; + } else if (key == "data_format") { + if (const auto parsed = parse_materialization_data_format(value)) { + entry.data_format = *parsed; + } else { + return std::nullopt; + } + } else if (key == "source_format") { + entry.fingerprint.source_format = value; + } else if (key == "source_options") { + entry.fingerprint.source_options = value; + } else if (key == "abs_path") { + entry.fingerprint.abs_path = value; + } else if (key == "file_size") { + entry.fingerprint.file_size = static_cast(std::stoull(value)); + } else if (key == "mtime_ns") { + entry.fingerprint.mtime_ns = static_cast(std::stoull(value)); + } + } + + if (entry.source_id.empty() || entry.fingerprint.source_format.empty() || + entry.fingerprint.abs_path.empty()) { + return std::nullopt; + } + if (entry.source_id != source_id) { + return std::nullopt; + } + entry.data_path = + (std::filesystem::path(root) / source_id / data_file_name(entry.data_format)).string(); + return entry; +} + +void write_atomic_text(const std::string& path, const std::string& content) { + namespace fs = std::filesystem; + const fs::path target(path); + const fs::path tmp = target.string() + ".tmp"; + { + std::ofstream out(tmp); + if (!out.is_open()) { + throw std::runtime_error("cannot open metadata temp file: " + tmp.string()); + } + out << content; + } + std::error_code ec; + fs::rename(tmp, target, ec); + if (ec) { + fs::remove(target, ec); + ec.clear(); + fs::rename(tmp, target, ec); + if (ec) { + throw std::runtime_error("cannot atomically publish metadata file: " + path); + } + } +} + +std::vector read_binary_file(const std::string& path) { + std::ifstream in(path, std::ios::binary); + if (!in.is_open()) { + throw std::runtime_error("cannot open materialized data file: " + path); + } + in.seekg(0, std::ios::end); + const auto end = in.tellg(); + if (end < 0) { + throw std::runtime_error("cannot size materialized data file: " + path); + } + std::vector payload(static_cast(end)); + in.seekg(0, std::ios::beg); + if (!payload.empty() && + !in.read(reinterpret_cast(payload.data()), static_cast(payload.size()))) { + throw std::runtime_error("cannot read materialized data file: " + path); + } + return payload; +} + +void write_atomic_binary(const std::string& path, const std::vector& payload) { + namespace fs = std::filesystem; + const fs::path target(path); + const fs::path tmp = target.string() + ".tmp"; + { + std::ofstream out(tmp, std::ios::binary); + if (!out.is_open()) { + throw std::runtime_error("cannot open materialized data temp file: " + tmp.string()); + } + if (!payload.empty()) { + out.write(reinterpret_cast(payload.data()), + static_cast(payload.size())); + if (!out.good()) { + throw std::runtime_error("cannot write materialized data temp file: " + tmp.string()); + } + } + } + std::error_code ec; + fs::rename(tmp, target, ec); + if (ec) { + fs::remove(target, ec); + ec.clear(); + fs::rename(tmp, target, ec); + if (ec) { + throw std::runtime_error("cannot atomically publish materialized data file: " + path); + } + } +} + +void serialize_materialized_table(MaterializationDataFormat format, const Table& table, + const std::string& path) { + switch (format) { + case MaterializationDataFormat::BinaryRowBatch: { + BinaryRowBatchCodec codec; + std::vector payload; + codec.serialize(table, &payload); + write_atomic_binary(path, payload); + return; + } + case MaterializationDataFormat::NanoArrowIpc: + save_nanoarrow_ipc_table(table, path); + return; + } +} + +Table deserialize_materialized_table(MaterializationDataFormat format, + const std::string& path) { + switch (format) { + case MaterializationDataFormat::BinaryRowBatch: { + const auto payload = read_binary_file(path); + BinaryRowBatchCodec codec; + return codec.deserialize(payload); + } + case MaterializationDataFormat::NanoArrowIpc: + return load_nanoarrow_ipc_table(path); + } + throw std::runtime_error("unsupported materialization data format"); +} + +} // namespace + +std::string default_source_materialization_root() { + if (const char* env = std::getenv(kRootEnv)) { + if (env[0] != '\0') { + return env; + } + } + return (std::filesystem::temp_directory_path() / "velaria_source_materialization_cache") + .string(); +} + +MaterializationDataFormat default_source_materialization_data_format() { + if (const char* env = std::getenv(kDataFormatEnv)) { + if (env[0] != '\0') { + if (const auto parsed = parse_materialization_data_format(env)) { + return *parsed; + } + } + } + return MaterializationDataFormat::BinaryRowBatch; +} + +std::string materialization_data_format_name(MaterializationDataFormat format) { + switch (format) { + case MaterializationDataFormat::BinaryRowBatch: + return "binary_row_batch"; + case MaterializationDataFormat::NanoArrowIpc: + return "nanoarrow_ipc"; + } + return "binary_row_batch"; +} + +bool materialization_data_format_is_available(MaterializationDataFormat format) { + switch (format) { + case MaterializationDataFormat::BinaryRowBatch: + return true; + case MaterializationDataFormat::NanoArrowIpc: + return true; + } + return false; +} + +MaterializedSourceFingerprint capture_file_source_fingerprint(const std::string& path, + std::string source_format, + std::string source_options) { + namespace fs = std::filesystem; + const auto abs_path = fs::absolute(path).string(); + struct stat info {}; + if (::stat(abs_path.c_str(), &info) != 0) { + throw std::runtime_error("cannot stat source file: " + abs_path); + } + + MaterializedSourceFingerprint fingerprint; + fingerprint.source_format = std::move(source_format); + fingerprint.source_options = std::move(source_options); + fingerprint.abs_path = abs_path; + fingerprint.file_size = static_cast(info.st_size); + fingerprint.mtime_ns = stat_mtime_ns(info); + return fingerprint; +} + +SourceMaterializationStore::SourceMaterializationStore(std::string root, + MaterializationDataFormat data_format) + : root_(std::move(root)), data_format_(data_format) {} + +std::string SourceMaterializationStore::source_id( + const MaterializedSourceFingerprint& fingerprint) const { + std::string key = fingerprint.source_format; + key.push_back('\0'); + key += fingerprint.source_options; + key.push_back('\0'); + key += fingerprint.abs_path; + return hex64(fnv1a64(key)); +} + +std::string SourceMaterializationStore::metadata_path(const std::string& source_id) const { + return (std::filesystem::path(root_) / source_id / kMetadataFile).string(); +} + +std::string SourceMaterializationStore::data_path(const std::string& source_id, + MaterializationDataFormat data_format) const { + return (std::filesystem::path(root_) / source_id / data_file_name(data_format)).string(); +} + +std::optional SourceMaterializationStore::lookup( + const MaterializedSourceFingerprint& fingerprint) const { + const auto id = source_id(fingerprint); + auto entry = read_entry(metadata_path(id), root_, id); + if (!entry.has_value()) { + return std::nullopt; + } + if (entry->source_id != id || entry->data_format != data_format_) { + return std::nullopt; + } + if (entry->fingerprint.source_format != fingerprint.source_format || + entry->fingerprint.source_options != fingerprint.source_options || + entry->fingerprint.abs_path != fingerprint.abs_path || + entry->fingerprint.file_size != fingerprint.file_size || + entry->fingerprint.mtime_ns != fingerprint.mtime_ns) { + return std::nullopt; + } + if (!std::filesystem::exists(entry->data_path)) { + return std::nullopt; + } + return entry; +} + +void SourceMaterializationStore::save(const MaterializedSourceFingerprint& fingerprint, + const Table& table) const { + namespace fs = std::filesystem; + const auto id = source_id(fingerprint); + const fs::path dir = fs::path(root_) / id; + std::error_code ec; + fs::create_directories(dir, ec); + + serialize_materialized_table(data_format_, table, data_path(id, data_format_)); + + std::ostringstream metadata; + metadata << "source_id=" << id << "\n"; + metadata << "data_format=" << materialization_data_format_name(data_format_) << "\n"; + metadata << "source_format=" << fingerprint.source_format << "\n"; + metadata << "source_options=" << fingerprint.source_options << "\n"; + metadata << "abs_path=" << fingerprint.abs_path << "\n"; + metadata << "file_size=" << fingerprint.file_size << "\n"; + metadata << "mtime_ns=" << fingerprint.mtime_ns << "\n"; + write_atomic_text(metadata_path(id), metadata.str()); +} + +Table SourceMaterializationStore::load(const SourceMaterializationEntry& entry) const { + return deserialize_materialized_table(entry.data_format, entry.data_path); +} + +} // namespace dataflow diff --git a/src/dataflow/core/execution/source_materialization.h b/src/dataflow/core/execution/source_materialization.h new file mode 100644 index 0000000..3335fe5 --- /dev/null +++ b/src/dataflow/core/execution/source_materialization.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include +#include + +#include "src/dataflow/core/execution/table.h" + +namespace dataflow { + +enum class MaterializationDataFormat { + BinaryRowBatch = 0, + NanoArrowIpc = 1, +}; + +struct MaterializedSourceFingerprint { + std::string source_format; + std::string source_options; + std::string abs_path; + std::uint64_t file_size = 0; + std::uint64_t mtime_ns = 0; +}; + +struct SourceMaterializationEntry { + std::string source_id; + MaterializationDataFormat data_format = MaterializationDataFormat::BinaryRowBatch; + MaterializedSourceFingerprint fingerprint; + std::string metadata_path; + std::string data_path; +}; + +std::string default_source_materialization_root(); +MaterializationDataFormat default_source_materialization_data_format(); +std::string materialization_data_format_name(MaterializationDataFormat format); +bool materialization_data_format_is_available(MaterializationDataFormat format); +MaterializedSourceFingerprint capture_file_source_fingerprint(const std::string& path, + std::string source_format, + std::string source_options); + +class SourceMaterializationStore { + public: + explicit SourceMaterializationStore( + std::string root, + MaterializationDataFormat data_format = default_source_materialization_data_format()); + + std::optional lookup( + const MaterializedSourceFingerprint& fingerprint) const; + void save(const MaterializedSourceFingerprint& fingerprint, const Table& table) const; + Table load(const SourceMaterializationEntry& entry) const; + + MaterializationDataFormat data_format() const { return data_format_; } + std::string source_id(const MaterializedSourceFingerprint& fingerprint) const; + std::string metadata_path(const std::string& source_id) const; + std::string data_path(const std::string& source_id) const { + return data_path(source_id, data_format_); + } + std::string data_path(const std::string& source_id, MaterializationDataFormat data_format) const; + + private: + std::string root_; + MaterializationDataFormat data_format_ = MaterializationDataFormat::BinaryRowBatch; +}; + +} // namespace dataflow diff --git a/src/dataflow/tests/source_materialization_test.cc b/src/dataflow/tests/source_materialization_test.cc new file mode 100644 index 0000000..94bc01b --- /dev/null +++ b/src/dataflow/tests/source_materialization_test.cc @@ -0,0 +1,244 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/dataflow/core/contract/api/session.h" +#include "src/dataflow/core/execution/nanoarrow_ipc_codec.h" +#include "src/dataflow/core/execution/source_materialization.h" +#include "src/dataflow/core/execution/stream/binary_row_batch.h" + +namespace { + +void expect(bool cond, const std::string& msg) { + if (!cond) { + throw std::runtime_error(msg); + } +} + +void expect_table_value(const dataflow::Table& table, std::size_t row, std::size_t column, + const dataflow::Value& expected, const std::string& msg) { + expect(row < table.rows.size(), msg + ": row out of range"); + expect(column < table.rows[row].size(), msg + ": column out of range"); + const auto& actual = table.rows[row][column]; + if (expected.isNull()) { + expect(actual.isNull(), msg + ": expected null but got " + actual.toString()); + return; + } + expect(actual == expected, + msg + ": expected " + expected.toString() + ", got " + actual.toString()); +} + +std::string make_temp_dir(const std::string& pattern) { + std::string dir = "/tmp/" + pattern + "-XXXXXX"; + std::vector buffer(dir.begin(), dir.end()); + buffer.push_back('\0'); + char* out = mkdtemp(buffer.data()); + if (out == nullptr) { + throw std::runtime_error("mkdtemp failed"); + } + return std::string(out); +} + +std::string make_temp_file(const std::string& pattern) { + std::string path = "/tmp/" + pattern + "-XXXXXX"; + std::vector buffer(path.begin(), path.end()); + buffer.push_back('\0'); + const int fd = mkstemp(buffer.data()); + if (fd == -1) { + throw std::runtime_error("mkstemp failed"); + } + close(fd); + return std::string(buffer.data()); +} + +void write_csv(const std::string& path, const std::string& body) { + std::ofstream out(path); + if (!out.is_open()) { + throw std::runtime_error("cannot write csv file"); + } + out << body; +} + +void expect_roundtrip_sample_table(const dataflow::Table& table, const std::string& label) { + expect(table.schema.fields.size() == 5, label + ": schema width mismatch"); + expect(table.rows.size() == 2, label + ": row count mismatch"); + expect_table_value(table, 0, 0, dataflow::Value(int64_t(1)), label + ": int mismatch"); + expect_table_value(table, 0, 1, dataflow::Value(3.5), label + ": double mismatch"); + expect_table_value(table, 0, 2, dataflow::Value("alice"), label + ": string mismatch"); + expect_table_value(table, 0, 3, dataflow::Value(std::vector{1.0f, 0.5f}), + label + ": vector mismatch"); + expect_table_value(table, 0, 4, dataflow::Value(), label + ": null mismatch"); +} + +} // namespace + +int main() { + try { + const auto cache_root = make_temp_dir("velaria-source-materialization-cache"); + expect(setenv("VELARIA_MATERIALIZATION_DIR", cache_root.c_str(), 1) == 0, + "failed to set materialization dir env"); + expect(setenv("VELARIA_MATERIALIZATION_FORMAT", "binary_row_batch", 1) == 0, + "failed to set materialization format env"); + + expect(dataflow::default_source_materialization_data_format() == + dataflow::MaterializationDataFormat::BinaryRowBatch, + "default materialization data format mismatch"); + expect(dataflow::materialization_data_format_name( + dataflow::MaterializationDataFormat::BinaryRowBatch) == "binary_row_batch", + "binary data format name mismatch"); + expect(dataflow::materialization_data_format_name( + dataflow::MaterializationDataFormat::NanoArrowIpc) == "nanoarrow_ipc", + "nanoarrow data format name mismatch"); + expect(dataflow::materialization_data_format_is_available( + dataflow::MaterializationDataFormat::BinaryRowBatch), + "binary data format should be available"); + expect(dataflow::materialization_data_format_is_available( + dataflow::MaterializationDataFormat::NanoArrowIpc), + "nanoarrow data format should be available"); + + const auto binary_path = make_temp_file("velaria-binary-materialized"); + const auto nanoarrow_path = make_temp_file("velaria-nanoarrow-materialized"); + dataflow::Table roundtrip_source; + roundtrip_source.schema = dataflow::Schema({"id", "score", "name", "embedding", "note"}); + roundtrip_source.rows = { + {dataflow::Value(int64_t(1)), dataflow::Value(3.5), dataflow::Value("alice"), + dataflow::Value(std::vector{1.0f, 0.5f}), dataflow::Value()}, + {dataflow::Value(int64_t(2)), dataflow::Value(), dataflow::Value("bob"), + dataflow::Value(std::vector{0.0f, 2.0f}), dataflow::Value("kept")}, + }; + dataflow::BinaryRowBatchCodec codec; + std::vector payload; + codec.serialize(roundtrip_source, &payload); + { + std::ofstream out(binary_path, std::ios::binary); + expect(out.is_open(), "cannot open binary roundtrip file"); + out.write(reinterpret_cast(payload.data()), + static_cast(payload.size())); + expect(out.good(), "cannot write binary roundtrip file"); + } + std::ifstream in(binary_path, std::ios::binary); + expect(in.is_open(), "cannot reopen binary roundtrip file"); + std::vector loaded_payload((std::istreambuf_iterator(in)), + std::istreambuf_iterator()); + const auto roundtrip_loaded = codec.deserialize(loaded_payload); + expect(roundtrip_loaded.schema.fields == roundtrip_source.schema.fields, + "binary roundtrip schema mismatch"); + expect_roundtrip_sample_table(roundtrip_loaded, "binary roundtrip"); + + dataflow::save_nanoarrow_ipc_table(roundtrip_source, nanoarrow_path); + const auto nanoarrow_loaded = dataflow::load_nanoarrow_ipc_table(nanoarrow_path); + expect(nanoarrow_loaded.schema.fields == roundtrip_source.schema.fields, + "nanoarrow roundtrip schema mismatch"); + expect_roundtrip_sample_table(nanoarrow_loaded, "nanoarrow roundtrip"); + + const auto csv_path = make_temp_file("velaria-materialized-source"); + write_csv(csv_path, "id,score,name\n1,10,alice\n2,20,bob\n"); + + auto& session = dataflow::DataflowSession::builder(); + const auto first = session.read_csv(csv_path).toTable(); + expect(first.rows.size() == 2, "first csv read row count mismatch"); + expect_table_value(first, 1, 2, dataflow::Value("bob"), "first csv read content mismatch"); + + const auto first_fingerprint = + dataflow::capture_file_source_fingerprint(csv_path, "csv", "delimiter=,"); + dataflow::SourceMaterializationStore store( + dataflow::default_source_materialization_root(), + dataflow::MaterializationDataFormat::BinaryRowBatch); + const auto first_entry = store.lookup(first_fingerprint); + expect(first_entry.has_value(), "source materialization entry should exist after first read"); + expect(first_entry->data_format == dataflow::MaterializationDataFormat::BinaryRowBatch, + "source materialization data format mismatch"); + expect(first_entry->fingerprint.source_format == "csv", + "source materialization source format mismatch"); + expect(first_entry->fingerprint.source_options == "delimiter=,", + "source materialization source options mismatch"); + expect(first_entry->data_path.size() >= 4 && + first_entry->data_path.substr(first_entry->data_path.size() - 4) == ".bin", + "binary materialization file suffix mismatch"); + const auto first_cache_mtime = + std::filesystem::last_write_time(first_entry->data_path); + + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + const auto second = session.read_csv(csv_path).toTable(); + expect(second.rows.size() == 2, "second csv read row count mismatch"); + expect_table_value(second, 0, 1, dataflow::Value(int64_t(10)), + "second csv read content mismatch"); + const auto second_entry = + store.lookup(dataflow::capture_file_source_fingerprint(csv_path, "csv", "delimiter=,")); + expect(second_entry.has_value(), "source materialization entry should still exist"); + const auto second_cache_mtime = + std::filesystem::last_write_time(second_entry->data_path); + expect(first_cache_mtime == second_cache_mtime, + "cache hit should not rewrite binary materialization file"); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + write_csv(csv_path, "id,score,name\n1,10,alice\n2,99,bob\n3,30,charlie\n"); + const auto updated = session.read_csv(csv_path).toTable(); + expect(updated.rows.size() == 3, "updated csv read row count mismatch"); + expect_table_value(updated, 1, 1, dataflow::Value(int64_t(99)), + "updated csv read should reflect source changes"); + const auto updated_entry = + store.lookup(dataflow::capture_file_source_fingerprint(csv_path, "csv", "delimiter=,")); + expect(updated_entry.has_value(), "updated source materialization entry should exist"); + const auto updated_cache_mtime = + std::filesystem::last_write_time(updated_entry->data_path); + expect(updated_cache_mtime != second_cache_mtime, + "cache invalidation should rewrite binary materialization file"); + + const auto nanoarrow_cache_root = make_temp_dir("velaria-source-materialization-nanoarrow"); + expect(setenv("VELARIA_MATERIALIZATION_DIR", nanoarrow_cache_root.c_str(), 1) == 0, + "failed to reset materialization dir env"); + expect(setenv("VELARIA_MATERIALIZATION_FORMAT", "nanoarrow_ipc", 1) == 0, + "failed to switch materialization format env"); + + const auto nanoarrow_csv_path = make_temp_file("velaria-nanoarrow-source"); + write_csv(nanoarrow_csv_path, "id,score,name\n1,10,alice\n2,20,bob\n"); + const auto nanoarrow_first = session.read_csv(nanoarrow_csv_path).toTable(); + expect(nanoarrow_first.rows.size() == 2, "nanoarrow csv read row count mismatch"); + expect_table_value(nanoarrow_first, 1, 2, dataflow::Value("bob"), + "nanoarrow csv read content mismatch"); + + dataflow::SourceMaterializationStore nanoarrow_store( + dataflow::default_source_materialization_root(), + dataflow::MaterializationDataFormat::NanoArrowIpc); + const auto nanoarrow_fingerprint = + dataflow::capture_file_source_fingerprint(nanoarrow_csv_path, "csv", "delimiter=,"); + const auto nanoarrow_entry = nanoarrow_store.lookup(nanoarrow_fingerprint); + expect(nanoarrow_entry.has_value(), "nanoarrow source materialization entry should exist"); + expect(nanoarrow_entry->data_format == dataflow::MaterializationDataFormat::NanoArrowIpc, + "nanoarrow source materialization data format mismatch"); + expect(nanoarrow_entry->data_path.size() >= 10 && + nanoarrow_entry->data_path.substr(nanoarrow_entry->data_path.size() - 10) == + ".nanoarrow", + "nanoarrow materialization file suffix mismatch"); + const auto nanoarrow_first_cache_mtime = + std::filesystem::last_write_time(nanoarrow_entry->data_path); + + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + const auto nanoarrow_second = session.read_csv(nanoarrow_csv_path).toTable(); + expect(nanoarrow_second.rows.size() == 2, "nanoarrow second csv read row count mismatch"); + expect_table_value(nanoarrow_second, 0, 1, dataflow::Value(int64_t(10)), + "nanoarrow second csv read content mismatch"); + const auto nanoarrow_second_entry = nanoarrow_store.lookup( + dataflow::capture_file_source_fingerprint(nanoarrow_csv_path, "csv", "delimiter=,")); + expect(nanoarrow_second_entry.has_value(), + "nanoarrow source materialization entry should still exist"); + const auto nanoarrow_second_cache_mtime = + std::filesystem::last_write_time(nanoarrow_second_entry->data_path); + expect(nanoarrow_first_cache_mtime == nanoarrow_second_cache_mtime, + "nanoarrow cache hit should not rewrite materialization file"); + + std::cout << "[test] source materialization binary+nanoarrow cache ok" << std::endl; + return 0; + } catch (const std::exception& ex) { + std::cerr << "[test] source materialization cache failed: " << ex.what() << std::endl; + return 1; + } +}