Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ filegroup(
srcs = [
"src/dataflow/core/execution/csv.cc",
"src/dataflow/core/execution/csv.h",
"src/dataflow/core/execution/nanoarrow_ipc_codec.cc",
"src/dataflow/core/execution/nanoarrow_ipc_codec.h",
"src/dataflow/core/execution/source_materialization.cc",
"src/dataflow/core/execution/source_materialization.h",
"src/dataflow/core/execution/table.cc",
"src/dataflow/core/execution/table.h",
"src/dataflow/core/execution/value.cc",
Expand Down Expand Up @@ -106,6 +110,8 @@ cc_library(
"src/dataflow/core/execution/value.cc",
"src/dataflow/core/execution/table.cc",
"src/dataflow/core/execution/csv.cc",
"src/dataflow/core/execution/nanoarrow_ipc_codec.cc",
"src/dataflow/core/execution/source_materialization.cc",
"src/dataflow/core/contract/catalog/catalog.cc",
"src/dataflow/core/contract/api/dataframe.cc",
"src/dataflow/core/contract/api/session.cc",
Expand All @@ -129,6 +135,8 @@ cc_library(
"src/dataflow/core/execution/value.h",
"src/dataflow/core/execution/table.h",
"src/dataflow/core/execution/csv.h",
"src/dataflow/core/execution/nanoarrow_ipc_codec.h",
"src/dataflow/core/execution/source_materialization.h",
"src/dataflow/core/contract/catalog/catalog.h",
"src/dataflow/core/contract/api/dataframe.h",
"src/dataflow/core/contract/api/session.h",
Expand All @@ -153,6 +161,7 @@ cc_library(
"src/dataflow/core/contract/source_sink_abi.h",
"src/dataflow/core/execution/stream/stream.h",
],
deps = ["@nanoarrow_src//:nanoarrow"],
visibility = ["//visibility:public"],
)

Expand All @@ -165,6 +174,7 @@ test_suite(
":stream_runtime_test",
":stream_strategy_explain_test",
":vector_runtime_test",
":source_materialization_test",
],
)

Expand Down Expand Up @@ -407,3 +417,9 @@ cc_test(
":dataflow_core",
],
)

cc_test(
name = "source_materialization_test",
srcs = ["src/dataflow/tests/source_materialization_test.cc"],
deps = [":dataflow_core"],
)
56 changes: 56 additions & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,62 @@ bazel_dep(name = "rules_python", version = "1.7.0")
local_python_configure = use_repo_rule("//tools:python_configure.bzl", "local_python_configure")
local_python_configure(name = "velaria_local_python")

http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "nanoarrow_src",
build_file_content = """load("@rules_cc//cc:defs.bzl", "cc_library")

genrule(
name = "nanoarrow_config_header",
srcs = ["src/nanoarrow/nanoarrow_config.h.in"],
outs = ["src/nanoarrow/nanoarrow_config.h"],
cmd = ("sed " +
"-e 's/@NANOARROW_VERSION_MAJOR@/0/g' " +
"-e 's/@NANOARROW_VERSION_MINOR@/8/g' " +
"-e 's/@NANOARROW_VERSION_PATCH@/0/g' " +
"-e 's/@NANOARROW_VERSION@/0.8.0/g' " +
"-e 's/@NANOARROW_NAMESPACE_DEFINE@//g' " +
"$< >$@"),
)

cc_library(
name = "nanoarrow",
srcs = [
"src/nanoarrow/common/array.c",
"src/nanoarrow/common/array_stream.c",
"src/nanoarrow/common/schema.c",
"src/nanoarrow/common/utils.c",
"src/nanoarrow/ipc/codecs.c",
"src/nanoarrow/ipc/decoder.c",
"src/nanoarrow/ipc/encoder.c",
"src/nanoarrow/ipc/reader.c",
"src/nanoarrow/ipc/writer.c",
"thirdparty/flatcc/src/runtime/builder.c",
"thirdparty/flatcc/src/runtime/emitter.c",
"thirdparty/flatcc/src/runtime/refmap.c",
"thirdparty/flatcc/src/runtime/verifier.c",
],
hdrs = glob([
"src/nanoarrow/**/*.h",
"thirdparty/flatcc/include/**/*.h",
]) + [":nanoarrow_config_header"],
includes = [
"src",
"thirdparty/flatcc/include",
],
linkstatic = True,
visibility = ["//visibility:public"],
)
""",
sha256 = "1c5136edf5c1e9cd8c47c4e31dfe07d0e09c25f27be20c8d6e78a0f4a4ed3fae",
strip_prefix = "arrow-nanoarrow-apache-arrow-nanoarrow-0.8.0",
type = "tar.gz",
urls = [
"https://codeload.github.com/apache/arrow-nanoarrow/tar.gz/refs/tags/apache-arrow-nanoarrow-0.8.0",
],
)

python = use_extension("@rules_python//python/extensions:python.bzl", "python")
python.toolchain(python_version = "3.12")
use_repo(python, "python_3_12")
Expand Down
10 changes: 10 additions & 0 deletions python_api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ py_test(
],
)

py_test(
name = "source_materialization_test",
srcs = ["tests/test_source_materialization.py"],
main = "tests/test_source_materialization.py",
imports = ["."],
deps = [
":velaria_py_pkg",
],
)

py_test(
name = "bitable_stream_source_test",
srcs = ["tests/test_bitable_stream_source.py"],
Expand Down
90 changes: 90 additions & 0 deletions python_api/tests/test_source_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import pathlib
import tempfile
import time
import unittest
from contextlib import contextmanager

import velaria


@contextmanager
def _temporary_env(updates):
previous = {key: os.environ.get(key) for key in updates}
try:
for key, value in updates.items():
os.environ[key] = value
yield
finally:
for key, value in previous.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value


class SourceMaterializationTest(unittest.TestCase):
@staticmethod
def _build_large_csv() -> str:
lines = ["id,name,amount"]
for index in range(1, 20001):
lines.append(f"{index},user_{index},{index * 1.25:.2f}")
return "\n".join(lines) + "\n"

def _assert_sql_after_materialization_is_reused(self, format_name: str, expected_filename: str):
with tempfile.TemporaryDirectory(prefix="velaria-source-materialization-") as tmp:
root = pathlib.Path(tmp)
csv_path = root / "input.csv"
cache_dir = root / "cache"
csv_path.write_text(self._build_large_csv(), encoding="utf-8")

with _temporary_env(
{
"VELARIA_MATERIALIZATION_DIR": str(cache_dir),
"VELARIA_MATERIALIZATION_FORMAT": format_name,
}
):
first_session = velaria.Session()
first_df = first_session.read_csv(str(csv_path))
first_session.create_temp_view("source_input", first_df)
first = first_session.sql(
"SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount "
"FROM source_input "
"WHERE id > 15000"
).to_rows()
data_files = list(cache_dir.rglob(expected_filename))
self.assertEqual(len(data_files), 1)
meta_files = list(cache_dir.rglob("meta.txt"))
self.assertEqual(len(meta_files), 1)
first_mtime = data_files[0].stat().st_mtime_ns

time.sleep(0.05)
second_session = velaria.Session()
second_df = second_session.read_csv(str(csv_path))
second_session.create_temp_view("source_input", second_df)
second = second_session.sql(
"SELECT COUNT(*) AS row_count, SUM(amount) AS total_amount "
"FROM source_input "
"WHERE id > 15000"
).to_rows()
second_mtime = data_files[0].stat().st_mtime_ns

self.assertEqual(first["schema"], ["row_count", "total_amount"])
self.assertEqual(
first["rows"],
[
[5000, 109378125.0],
],
)
self.assertEqual(second, first)
self.assertEqual(first_mtime, second_mtime)

def test_binary_row_batch_materialization_reuses_cached_file(self):
self._assert_sql_after_materialization_is_reused("binary_row_batch", "table.bin")

def test_nanoarrow_ipc_materialization_reuses_cached_file(self):
self._assert_sql_after_materialization_is_reused("nanoarrow_ipc", "table.nanoarrow")


if __name__ == "__main__":
unittest.main()
22 changes: 21 additions & 1 deletion src/dataflow/core/contract/api/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>

#include "src/dataflow/core/execution/csv.h"
#include "src/dataflow/core/execution/source_materialization.h"
#include "src/dataflow/ai/plugin_runtime.h"

namespace dataflow {
Expand Down Expand Up @@ -73,7 +74,26 @@ DataflowSession& DataflowSession::builder() {
}

DataFrame DataflowSession::read_csv(const std::string& path, char delimiter) {
return DataFrame(load_csv(path, delimiter));
std::ostringstream options;
options << "delimiter=" << delimiter;
const auto fingerprint = capture_file_source_fingerprint(path, "csv", options.str());
const SourceMaterializationStore store(default_source_materialization_root(),
default_source_materialization_data_format());
if (const auto entry = store.lookup(fingerprint); entry.has_value()) {
try {
return DataFrame(store.load(*entry));
} catch (...) {
// Fall through to the CSV source path on cache corruption or decode errors.
}
}

Table table = load_csv(fingerprint.abs_path, delimiter);
try {
store.save(fingerprint, table);
} catch (...) {
// CSV remains the source of truth; cache write failures must not change read semantics.
}
return DataFrame(std::move(table));
}

StreamingDataFrame DataflowSession::readStream(std::shared_ptr<StreamSource> source) {
Expand Down
Loading
Loading