From 4d03761aa96ca56374dae48271660c258493063e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 8 Dec 2025 06:45:03 -0800 Subject: [PATCH 01/16] Add TPC-H Q18 streaming benchmark with two-phase prefilter optimization --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 13 +- cpp/benchmarks/streaming/ndsh/q18.cpp | 1555 ++++++++++++++++++ 2 files changed, 1566 insertions(+), 2 deletions(-) create mode 100644 cpp/benchmarks/streaming/ndsh/q18.cpp diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 6fa4bd27b..9aa363a59 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -37,8 +37,9 @@ target_link_libraries( ) add_executable(q09 "q09.cpp") +add_executable(q18 "q18.cpp") set_target_properties( - q09 + q09 q18 PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" CXX_STANDARD 20 CXX_STANDARD_REQUIRED ON @@ -49,10 +50,18 @@ target_compile_options( q09 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" "$<$:${RAPIDSMPF_CUDA_FLAGS}>" ) +target_compile_options( + q18 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" +) target_link_libraries( q09 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ $ maybe_asan ) +target_link_libraries( + q18 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan +) install( TARGETS rapidsmpfndsh COMPONENT benchmarking @@ -60,7 +69,7 @@ install( EXCLUDE_FROM_ALL ) install( - TARGETS q09 + TARGETS q09 q18 COMPONENT benchmarking DESTINATION bin/benchmarks/librapidsmpf EXCLUDE_FROM_ALL diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp new file mode 100644 index 000000000..54b154ff4 --- /dev/null +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -0,0 +1,1555 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-License-Identifier: Apache-2.0 + * + * TPC-H Query 18 - Pre-filter Optimization + * + * This benchmark implements Q18 with a two-phase approach: + * + * Phase 1 (blocking): Compute qualifying orderkeys + * - Read lineitem -> groupby(l_orderkey, sum(l_quantity)) -> filter(sum > 300) + * - All-gather across ranks -> final groupby+filter + * - Result: ~171K qualifying orderkeys at SF3000 (tiny) + * + * Phase 2 (streaming): Pre-filter and join + * - Read lineitem -> semi-join filter -> all-gather (~684K rows) + * - Read orders -> semi-join filter -> all-gather (~171K rows) + * - Local join (no shuffle needed - data is tiny) + * - Join with customer -> groupby -> sort -> write + * + * Benefits: + * - No shuffle needed for lineitem/orders (99.98% data reduction) + * - No fanout node complexity + * + * Disclaimers: + * - The two-phase approach corresponds to "advanced" query optimization + * - It may not be beneficial to re-read the lineitem table from remote storage + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concatenate.hpp" +#include "join.hpp" +#include "utils.hpp" + +namespace { + +// ============================================================================ +// Utility Functions +// ============================================================================ + +// NOTE: This is added to ndsh::detail in https://github.com/rapidsai/rapidsmpf/pull/710 +std::string get_table_path( + std::string const& input_directory, std::string const& table_name +) { + auto dir = input_directory.empty() ? "." : input_directory; + auto file_path = dir + "/" + table_name + ".parquet"; + if (std::filesystem::exists(file_path)) { + return file_path; + } + return dir + "/" + table_name + "/"; +} + +// ============================================================================ +// Table Readers +// ============================================================================ + +rapidsmpf::streaming::Node read_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "lineitem") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"l_orderkey", "l_quantity"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_orders( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "orders") + ); + auto options = + cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"o_orderkey", "o_custkey", "o_orderdate", "o_totalprice"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +rapidsmpf::streaming::Node read_customer( + std::shared_ptr ctx, + std::shared_ptr ch_out, + std::size_t num_producers, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory +) { + auto files = rapidsmpf::ndsh::detail::list_parquet_files( + get_table_path(input_directory, "customer") + ); + auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) + .columns({"c_custkey", "c_name"}) + .build(); + return rapidsmpf::streaming::node::read_parquet( + ctx, ch_out, num_producers, options, num_rows_per_chunk + ); +} + +// ============================================================================ +// Phase 1: Compute Qualifying Orderkeys +// ============================================================================ + +/** + * @brief Stage 1: Chunk-wise groupby (NO filter yet!) + * + * Computes partial aggregates: groupby(l_orderkey, sum(l_quantity)) + * The same orderkey may appear in multiple chunks, so we can't filter here. + */ +rapidsmpf::streaming::Node chunkwise_groupby_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + std::uint64_t sequence = 0; + std::size_t total_input_rows = 0; + std::size_t total_output_rows = 0; + std::size_t chunk_count = 0; + + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + total_input_rows += static_cast(table.num_rows()); + chunk_count++; + + // Groupby l_orderkey, sum(l_quantity) - NO FILTER + auto grouper = cudf::groupby::groupby( + table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + std::vector requests; + requests.push_back( + cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) + ); + auto [keys, results] = grouper.aggregate(requests, chunk_stream, mr); + + auto result_columns = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result_columns)); + } + auto grouped_table = std::make_unique(std::move(result_columns)); + total_output_rows += static_cast(grouped_table->num_rows()); + + if (grouped_table->num_rows() > 0) { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::move(grouped_table), chunk_stream + ) + ) + ); + } + } + + ctx->comm()->logger().print( + "chunkwise_groupby: rank processed ", + chunk_count, + " chunks, ", + total_input_rows, + " -> ", + total_output_rows, + " rows" + ); + co_await ch_out->drain(ctx->executor()); +} + +/** + * @brief Stage 3: Final groupby + filter (after concatenate + all-gather) + * + * Merges partial aggregates and filters for sum > threshold. + * Input: concatenated partial aggregates (l_orderkey, partial_sum) + * Output: qualifying orderkeys (l_orderkey, total_sum where total_sum > threshold) + */ +rapidsmpf::streaming::Node final_groupby_filter_lineitem( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + double quantity_threshold +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + auto mr = ctx->br()->device_mr(); + + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + ctx->comm()->logger().print("final_groupby_filter: rank received EMPTY input!"); + co_await ch_out->drain(ctx->executor()); + co_return; + } + + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + ctx->comm()->logger().print( + "final_groupby_filter: rank processing ", table.num_rows(), " partial aggregates" + ); + + // Final groupby to merge partial sums for same orderkey + auto grouper = cudf::groupby::groupby( + table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + std::vector requests; + requests.push_back( + cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) + ); + auto [keys, results] = grouper.aggregate(requests, chunk_stream, mr); + + auto result_columns = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result_columns)); + } + auto merged_table = std::make_unique(std::move(result_columns)); + + ctx->comm()->logger().print( + "final_groupby_filter: rank merged to ", + merged_table->num_rows(), + " unique orderkeys" + ); + + // NOW filter for sum > threshold + auto sum_col = merged_table->view().column(1); + auto threshold_scalar = cudf::make_numeric_scalar( + cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, mr + ); + static_cast*>(threshold_scalar.get()) + ->set_value(quantity_threshold, chunk_stream); + + auto mask = cudf::binary_operation( + sum_col, + *threshold_scalar, + cudf::binary_operator::GREATER, + cudf::data_type(cudf::type_id::BOOL8), + chunk_stream, + mr + ); + + auto filtered_table = + cudf::apply_boolean_mask(merged_table->view(), mask->view(), chunk_stream, mr); + + ctx->comm()->logger().print( + "final_groupby_filter: ", + filtered_table->num_rows(), + " qualifying orderkeys (sum > ", + quantity_threshold, + ")" + ); + + if (filtered_table->num_rows() > 0) { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(filtered_table), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +/** + * @brief All-gather node for Phase 1. + * + * Collects partial aggregates from all ranks and outputs concatenated result. + */ +rapidsmpf::streaming::Node allgather_partial_aggregates( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + auto msg = co_await ch_in->receive(); + + std::unique_ptr result; + rmm::cuda_stream_view stream = cudf::get_default_stream(); + + if (ctx->comm()->nranks() > 1) { + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + + if (!msg.empty()) { + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + stream = chunk.stream(); + auto pack = cudf::pack(chunk.table_view(), stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), stream) + )} + ); + } + gatherer.insert_finished(); + + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + + result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + stream, + ctx->br(), + ctx->statistics() + ); + } else { + // Single rank - just pass through + if (!msg.empty()) { + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + stream = chunk.stream(); + result = std::make_unique( + chunk.table_view(), stream, ctx->br()->device_mr() + ); + } + } + + ctx->comm()->logger().debug( + "allgather_partial_aggregates: ", result ? result->num_rows() : 0, " rows" + ); + + if (result && result->num_rows() > 0) { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(result), stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +/** + * @brief Phase 1: Compute qualifying orderkeys. + * + * Pipeline strategy depends on number of ranks: + * + * Single-rank (all-gather approach): + * 1. Read lineitem → chunkwise_groupby (partial aggregates) + * 2. Concatenate → final groupby + filter + * + * Multi-rank (shuffle approach - required at scale!): + * 1. Read lineitem → chunkwise_groupby (partial aggregates) + * 2. SHUFFLE by orderkey (distributes work across ranks) + * 3. Per-partition: concatenate → groupby → filter + * 4. All-gather ONLY the tiny filtered result (~57K orderkeys at SF1000) + * + * The shuffle approach is required for multi-rank because: + * - Each rank produces partial sums for MOST orderkeys (not just 1/N) + * - All-gathering these partials would be O(orderkeys) per rank = OOM at scale + * - Shuffle ensures each rank only merges 1/N of the orderkeys + * + * @return Table with single column (l_orderkey) of qualifying orders, or nullptr if + * empty. + */ +std::unique_ptr compute_qualifying_orderkeys( + std::shared_ptr ctx, + cudf::size_type num_rows_per_chunk, + std::string const& input_directory, + double quantity_threshold, + std::uint32_t num_partitions, + rapidsmpf::OpID base_tag +) { + bool const single_rank = ctx->comm()->nranks() == 1; + ctx->comm()->logger().print( + "Phase 1: Computing qualifying orderkeys", + single_rank ? " (single-rank: local groupby)" : " (multi-rank: shuffle-based)" + ); + + // Build Phase 1 pipeline + std::vector nodes; + + // Stage 1: Read lineitem → chunk-wise groupby (partial aggregates) + auto lineitem = ctx->create_channel(); + nodes.push_back(read_lineitem(ctx, lineitem, 4, num_rows_per_chunk, input_directory)); + + auto partial_aggs = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_lineitem(ctx, lineitem, partial_aggs)); + + std::shared_ptr to_concat; + std::shared_ptr to_collect; + + if (single_rank) { + // Single rank: simple local pipeline (no shuffle needed) + to_concat = partial_aggs; + + auto concatenated = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, to_concat, concatenated, rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + auto filtered_local = ctx->create_channel(); + nodes.push_back(final_groupby_filter_lineitem( + ctx, concatenated, filtered_local, quantity_threshold + )); + + to_collect = filtered_local; + } else { + // Multi-rank: SHUFFLE partial aggregates by orderkey (column 0) + // This distributes work across ranks - required at scale! + auto partial_aggs_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + partial_aggs, + partial_aggs_shuffled, + {0}, // l_orderkey + num_partitions, + rapidsmpf::OpID{static_cast(base_tag)} + ) + ); + to_concat = partial_aggs_shuffled; + + // Per-partition: concatenate → groupby → filter + auto concatenated = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, to_concat, concatenated, rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + auto filtered_local = ctx->create_channel(); + nodes.push_back(final_groupby_filter_lineitem( + ctx, concatenated, filtered_local, quantity_threshold + )); + + // All-gather the TINY filtered result (~57K orderkeys at SF1000) + auto gathered = ctx->create_channel(); + nodes.push_back(allgather_partial_aggregates( + ctx, + filtered_local, + gathered, + rapidsmpf::OpID{static_cast(base_tag + 1)} + )); + to_collect = gathered; + } + + // Collect result using pull_from_channel (safe coroutine pattern) + std::vector result_messages; + nodes.push_back( + rapidsmpf::streaming::node::pull_from_channel(ctx, to_collect, result_messages) + ); + + // Run pipeline + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + + // Extract result from collected messages + std::unique_ptr result; + if (!result_messages.empty()) { + auto chunk = rapidsmpf::ndsh::to_device( + ctx, result_messages[0].release() + ); + auto stream = chunk.stream(); + auto table_view = chunk.table_view(); + + // Extract just the orderkey column (column 0) + // The filtered result has (l_orderkey, sum_quantity), we only need l_orderkey + std::vector> cols; + cols.push_back( + std::make_unique( + table_view.column(0), stream, ctx->br()->device_mr() + ) + ); + stream.synchronize(); + result = std::make_unique(std::move(cols)); + } + + ctx->comm()->logger().print( + "Phase 1 complete: ", result ? result->num_rows() : 0, " qualifying orderkeys" + ); + + return result; +} + +// ============================================================================ +// Phase 2: Pre-filter Pipeline +// ============================================================================ + +/** + * @brief Pre-filter table by qualifying orderkeys using semi-join. + * + * @param qualifying_orderkeys Table with single column of qualifying l_orderkey values. + * @param key_column_idx Which column in input chunks to match against orderkeys. + */ +rapidsmpf::streaming::Node prefilter_by_orderkeys( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + std::shared_ptr qualifying_orderkeys, + cudf::size_type key_column_idx +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::uint64_t sequence = 0; + + // Build filtered_join for semi-join (orderkeys is the "right"/build side) + auto joiner = cudf::filtered_join( + qualifying_orderkeys->view(), + cudf::null_equality::UNEQUAL, + cudf::set_as_build_table::RIGHT, + cudf::get_default_stream() + ); + + std::size_t total_input_rows = 0; + std::size_t total_output_rows = 0; + + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + total_input_rows += static_cast(table.num_rows()); + + // Semi-join: get indices of matching rows + auto match = joiner.semi_join( + table.select({key_column_idx}), chunk_stream, ctx->br()->device_mr() + ); + + // Gather matching rows - convert device_uvector to column_view + auto indices = cudf::column_view( + cudf::data_type{cudf::type_id::INT32}, + static_cast(match->size()), + match->data(), + nullptr, // null mask + 0 // null count + ); + auto filtered = cudf::gather( + table, indices, cudf::out_of_bounds_policy::DONT_CHECK, chunk_stream + ); + + total_output_rows += static_cast(filtered->num_rows()); + + if (filtered->num_rows() > 0) { + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::move(filtered), chunk_stream + ) + ) + ); + } + } + + ctx->comm()->logger().print( + "prefilter: rank processed ", + total_input_rows, + " -> ", + total_output_rows, + " rows (", + (total_output_rows * 100.0 / std::max(total_input_rows, std::size_t{1})), + "%)" + ); + + co_await ch_out->drain(ctx->executor()); +} + +/** + * @brief All-gather node: collect from ch_in, all-gather across ranks, send to ch_out. + */ +rapidsmpf::streaming::Node allgather_table( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_await ch_out->drain(ctx->executor()); + co_return; + } + + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + ctx->comm()->logger().debug("allgather_table: local has ", table.num_rows(), " rows"); + + std::unique_ptr result; + if (ctx->comm()->nranks() > 1) { + rapidsmpf::streaming::AllGather gatherer{ctx, tag}; + + auto pack = cudf::pack(table, chunk_stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), chunk_stream) + )} + ); + gatherer.insert_finished(); + + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + + result = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + chunk_stream, + ctx->br(), + ctx->statistics() + ); + } else { + result = + std::make_unique(table, chunk_stream, ctx->br()->device_mr()); + } + + ctx->comm()->logger().debug( + "allgather_table: gathered has ", result->num_rows(), " rows" + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::move(result), chunk_stream + ) + ) + ); + co_await ch_out->drain(ctx->executor()); +} + +/** + * @brief Local inner join (both inputs are small after pre-filtering). + * + * Receives one chunk from each input, joins them, outputs result. + */ +rapidsmpf::streaming::Node local_inner_join( + std::shared_ptr ctx, + std::shared_ptr ch_left, + std::shared_ptr ch_right, + std::shared_ptr ch_out, + std::vector left_on, + std::vector right_on +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_left, ch_right, ch_out}; + + auto left_msg = co_await ch_left->receive(); + auto right_msg = co_await ch_right->receive(); + + if (left_msg.empty() || right_msg.empty()) { + co_await ch_out->drain(ctx->executor()); + co_return; + } + + auto left_chunk = rapidsmpf::ndsh::to_device( + ctx, left_msg.release() + ); + auto right_chunk = rapidsmpf::ndsh::to_device( + ctx, right_msg.release() + ); + + auto stream = left_chunk.stream(); + auto left_table = left_chunk.table_view(); + auto right_table = right_chunk.table_view(); + + ctx->comm()->logger().print( + "local_inner_join: ", left_table.num_rows(), " x ", right_table.num_rows() + ); + + // Build hash table on right (smaller side typically) + auto hash_table = + cudf::hash_join(right_table.select(right_on), cudf::null_equality::EQUAL, stream); + + auto [left_indices, right_indices] = hash_table.inner_join( + left_table.select(left_on), {}, stream, ctx->br()->device_mr() + ); + + // Gather from both sides using device_span for column_view + cudf::column_view left_col = cudf::device_span(*left_indices); + cudf::column_view right_col = + cudf::device_span(*right_indices); + + auto left_gathered = cudf::gather( + left_table, left_col, cudf::out_of_bounds_policy::DONT_CHECK, stream + ); + auto right_gathered = cudf::gather( + right_table.select({1}), // Only l_quantity from lineitem + right_col, + cudf::out_of_bounds_policy::DONT_CHECK, + stream + ); + + // Concatenate columns: all from left + l_quantity from right + auto result_cols = left_gathered->release(); + auto right_cols = right_gathered->release(); + for (auto&& col : right_cols) { + result_cols.push_back(std::move(col)); + } + + auto result = std::make_unique(std::move(result_cols)); + ctx->comm()->logger().print( + "local_inner_join: result has ", result->num_rows(), " rows" + ); + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique(std::move(result), stream) + ) + ); + co_await ch_out->drain(ctx->executor()); +} + +// ============================================================================ +// Final Processing (reused from q18.cpp) +// ============================================================================ + +rapidsmpf::streaming::Node reorder_columns( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::uint64_t seq = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto table = chunk.table_view(); + auto reordered_table = std::make_unique( + table.select({1, 0, 2, 3, 4, 5}), chunk.stream(), ctx->br()->device_mr() + ); + co_await ch_out->send( + rapidsmpf::streaming::to_message( + seq++, + std::make_unique( + std::move(reordered_table), chunk.stream() + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node chunkwise_groupby_agg( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + std::uint64_t sequence = 0; + while (true) { + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + break; + } + co_await ctx->executor()->schedule(); + auto chunk = rapidsmpf::ndsh::to_device( + ctx, msg.release() + ); + auto chunk_stream = chunk.stream(); + auto table = chunk.table_view(); + + auto grouper = cudf::groupby::groupby( + table.select({0, 1, 2, 3, 4}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + std::vector requests; + requests.push_back( + cudf::groupby::aggregation_request(table.column(5), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, chunk_stream, ctx->br()->device_mr()); + + auto result_columns = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(result_columns)); + } + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + sequence++, + std::make_unique( + std::make_unique(std::move(result_columns)), chunk_stream + ) + ) + ); + } + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node final_groupby_and_sort( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::shared_ptr ch_out, + rapidsmpf::OpID allgather_tag +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; + + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_await ch_out->drain(ctx->executor()); + co_return; + } + + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto stream = chunk.stream(); + auto table = chunk.table_view(); + + // Local groupby + std::unique_ptr local_result; + if (table.num_rows() > 0) { + auto grouper = cudf::groupby::groupby( + table.select({0, 1, 2, 3, 4}), cudf::null_policy::EXCLUDE, cudf::sorted::NO + ); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + std::vector requests; + requests.push_back( + cudf::groupby::aggregation_request(table.column(5), std::move(aggs)) + ); + auto [keys, results] = + grouper.aggregate(requests, stream, ctx->br()->device_mr()); + auto cols = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(cols)); + } + local_result = std::make_unique(std::move(cols)); + } + + // All-gather if multi-rank + std::unique_ptr global_result; + if (ctx->comm()->nranks() > 1 && local_result) { + rapidsmpf::streaming::AllGather gatherer{ctx, allgather_tag}; + auto pack = cudf::pack(local_result->view(), stream, ctx->br()->device_mr()); + gatherer.insert( + 0, + {rapidsmpf::PackedData( + std::move(pack.metadata), + ctx->br()->move(std::move(pack.gpu_data), stream) + )} + ); + gatherer.insert_finished(); + + auto packed_data = + co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); + + if (ctx->comm()->rank() == 0) { + auto gathered = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_data), ctx->br(), true, ctx->statistics() + ), + stream, + ctx->br(), + ctx->statistics() + ); + + // Final groupby + auto grouper = cudf::groupby::groupby( + gathered->view().select({0, 1, 2, 3, 4}), + cudf::null_policy::EXCLUDE, + cudf::sorted::NO + ); + std::vector> aggs; + aggs.push_back(cudf::make_sum_aggregation()); + std::vector requests; + requests.push_back( + cudf::groupby::aggregation_request( + gathered->view().column(5), std::move(aggs) + ) + ); + auto [keys, results] = + grouper.aggregate(requests, stream, ctx->br()->device_mr()); + auto cols = keys->release(); + for (auto&& r : results) { + std::ranges::move(r.results, std::back_inserter(cols)); + } + global_result = std::make_unique(std::move(cols)); + } + } else { + global_result = std::move(local_result); + } + + // Sort and limit (rank 0 only in multi-rank) + if (global_result && (ctx->comm()->nranks() == 1 || ctx->comm()->rank() == 0)) { + auto sorted = cudf::sort_by_key( + global_result->view(), + global_result->view().select({4, 3}), // o_totalprice DESC, o_orderdate ASC + {cudf::order::DESCENDING, cudf::order::ASCENDING}, + {cudf::null_order::AFTER, cudf::null_order::AFTER}, + stream, + ctx->br()->device_mr() + ); + cudf::size_type limit = std::min(100, sorted->num_rows()); + auto limited = cudf::slice(sorted->view(), {0, limit})[0]; + + co_await ch_out->send( + rapidsmpf::streaming::to_message( + 0, + std::make_unique( + std::make_unique( + limited, stream, ctx->br()->device_mr() + ), + stream + ) + ) + ); + } + + co_await ch_out->drain(ctx->executor()); +} + +rapidsmpf::streaming::Node write_parquet( + std::shared_ptr ctx, + std::shared_ptr ch_in, + std::string output_path +) { + rapidsmpf::streaming::ShutdownAtExit c{ch_in}; + auto msg = co_await ch_in->receive(); + if (msg.empty()) { + co_return; + } + auto chunk = + rapidsmpf::ndsh::to_device(ctx, msg.release()); + auto sink = cudf::io::sink_info(output_path); + auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); + auto metadata = cudf::io::table_input_metadata(chunk.table_view()); + metadata.column_metadata[0].set_name("c_name"); + metadata.column_metadata[1].set_name("c_custkey"); + metadata.column_metadata[2].set_name("o_orderkey"); + metadata.column_metadata[3].set_name("o_orderdate"); + metadata.column_metadata[4].set_name("o_totalprice"); + metadata.column_metadata[5].set_name("col6"); + builder = builder.metadata(metadata); + cudf::io::write_parquet(builder.build(), chunk.stream()); + ctx->comm()->logger().print( + "Wrote ", chunk.table_view().num_rows(), " rows to ", output_path + ); +} + +} // namespace + +// ============================================================================ +// Command Line Parsing +// ============================================================================ + +struct ProgramOptions { + int num_streaming_threads{4}; // 4 threads provides good pipeline parallelism + cudf::size_type num_rows_per_chunk{100'000'000}; + std::uint32_t num_partitions{64}; + bool use_shuffle{false}; // Use shuffle joins in Phase 2 for multi-GPU scaling + std::optional spill_device_limit{std::nullopt}; + std::string output_file; + std::string input_directory; +}; + +ProgramOptions parse_options(int argc, char** argv) { + ProgramOptions options; + + auto print_usage = [&argv]() { + std::cerr + << "Usage: " << argv[0] << " [options]\n" + << "Options:\n" + << " --num-streaming-threads Number of streaming threads (default: 4)\n" + << " --num-rows-per-chunk Number of rows per chunk (default: " + "100000000)\n" + << " --num-partitions Number of shuffle partitions (default: " + "64)\n" + << " --use-shuffle Use shuffle joins in Phase 2 for " + "multi-GPU scaling\n" + << " --spill-device-limit Fractional spill device limit (default: " + "None)\n" + << " --output-file Output file path (required)\n" + << " --input-directory Input directory path (required)\n" + << " --help Show this help message\n"; + }; + + static struct option long_options[] = { + {"num-streaming-threads", required_argument, nullptr, 1}, + {"num-rows-per-chunk", required_argument, nullptr, 2}, + {"num-partitions", required_argument, nullptr, 7}, + {"use-shuffle", no_argument, nullptr, 8}, + {"output-file", required_argument, nullptr, 3}, + {"input-directory", required_argument, nullptr, 4}, + {"help", no_argument, nullptr, 5}, + {"spill-device-limit", required_argument, nullptr, 6}, + {nullptr, 0, nullptr, 0} + }; + + int opt; + int option_index = 0; + bool saw_output_file = false; + bool saw_input_directory = false; + + while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { + switch (opt) { + case 1: + options.num_streaming_threads = std::atoi(optarg); + break; + case 2: + options.num_rows_per_chunk = std::atoi(optarg); + break; + case 3: + options.output_file = optarg; + saw_output_file = true; + break; + case 4: + options.input_directory = optarg; + saw_input_directory = true; + break; + case 5: + print_usage(); + std::exit(0); + case 6: + options.spill_device_limit = std::stod(optarg); + break; + case 7: + options.num_partitions = static_cast(std::atoi(optarg)); + break; + case 8: + options.use_shuffle = true; + break; + default: + print_usage(); + std::exit(1); + } + } + + if (!saw_output_file || !saw_input_directory) { + if (!saw_output_file) + std::cerr << "Error: --output-file is required\n"; + if (!saw_input_directory) + std::cerr << "Error: --input-directory is required\n"; + print_usage(); + std::exit(1); + } + + return options; +} + +// ============================================================================ +// Main +// ============================================================================ + +int main(int argc, char** argv) { + cudaFree(nullptr); + rapidsmpf::mpi::init(&argc, &argv); + MPI_Comm mpi_comm; + RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); + + auto cmd_options = parse_options(argc, argv); + + auto limit_size = rmm::percent_of_free_device_memory( + static_cast(cmd_options.spill_device_limit.value_or(1) * 100) + ); + rmm::mr::cuda_async_memory_resource mr{}; + auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); + rmm::device_async_resource_ref mr_ref(stats_mr); + rmm::mr::set_current_device_resource(&stats_mr); + rmm::mr::set_current_device_resource_ref(mr_ref); + + std::unordered_map + memory_available{}; + if (cmd_options.spill_device_limit.has_value()) { + memory_available[rapidsmpf::MemoryType::DEVICE] = rapidsmpf::LimitAvailableMemory{ + &stats_mr, static_cast(limit_size) + }; + } + + auto br = std::make_shared( + stats_mr, std::move(memory_available) + ); + auto envvars = rapidsmpf::config::get_environment_variables(); + envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); + auto options = rapidsmpf::config::Options(envvars); + auto stats = std::make_shared(&stats_mr); + + { + auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); + auto progress = + std::make_shared(comm->logger(), stats); + auto ctx = + std::make_shared(options, comm, br, stats); + + comm->logger().print("Q18 Pre-filter Benchmark"); + comm->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" + ); + comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + comm->logger().print( + "Phase 1 mode: ", + ctx->comm()->nranks() > 1 ? "shuffle (multi-rank)" : "local (single-rank)", + ", Phase 2 mode: ", + cmd_options.use_shuffle ? "shuffle joins" : "local joins", + ", partitions: ", + cmd_options.num_partitions + ); + + std::string output_path = cmd_options.output_file; + + for (int iteration = 0; iteration < 2; iteration++) { + auto start = std::chrono::steady_clock::now(); + + // ================================================================ + // Phase 1: Compute qualifying orderkeys (blocking) + // Uses shuffle for multi-rank (required at scale to avoid OOM) + // Uses simple local groupby for single-rank + // ================================================================ + std::unique_ptr qualifying_orderkeys = + compute_qualifying_orderkeys( + ctx, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory, + 300.0, // quantity_threshold + cmd_options.num_partitions, + rapidsmpf::OpID{static_cast(100 + iteration * 10)} + ); + auto phase1_end = std::chrono::steady_clock::now(); + + if (!qualifying_orderkeys || qualifying_orderkeys->num_rows() == 0) { + comm->logger().print("No qualifying orderkeys found - empty result"); + continue; + } + + // Share orderkeys across nodes (they're small and identical on all ranks) + auto shared_orderkeys = + std::make_shared(std::move(*qualifying_orderkeys)); + + // ================================================================ + // Phase 2: Build pre-filtered pipeline + // ================================================================ + std::vector nodes; + std::uint32_t num_partitions = cmd_options.num_partitions; + int op_id = 0; + + // Read and pre-filter lineitem + auto lineitem_raw = ctx->create_channel(); + nodes.push_back(read_lineitem( + ctx, + lineitem_raw, + 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + + auto lineitem_filtered = ctx->create_channel(); + nodes.push_back(prefilter_by_orderkeys( + ctx, lineitem_raw, lineitem_filtered, shared_orderkeys, 0 + )); + + // Read and pre-filter orders + auto orders_raw = ctx->create_channel(); + nodes.push_back(read_orders( + ctx, + orders_raw, + 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + + auto orders_filtered = ctx->create_channel(); + nodes.push_back(prefilter_by_orderkeys( + ctx, orders_raw, orders_filtered, shared_orderkeys, 0 + )); + + // Read customer + auto customer = ctx->create_channel(); + nodes.push_back(read_customer( + ctx, + customer, + 4, + cmd_options.num_rows_per_chunk, + cmd_options.input_directory + )); + + auto all_joined = ctx->create_channel(); + + bool const single_rank = ctx->comm()->nranks() == 1; + + if (cmd_options.use_shuffle && !single_rank) { + // ============================================================ + // SHUFFLE MODE: Proper parallel scaling (multi-rank only) + // ============================================================ + + // Shuffle filtered lineitem by orderkey + auto lineitem_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + lineitem_filtered, + lineitem_shuffled, + {0}, // l_orderkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + } + ) + ); + + // Shuffle filtered orders by orderkey + auto orders_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + orders_filtered, + orders_shuffled, + {0}, // o_orderkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + } + ) + ); + + // Shuffle-based join: orders x lineitem + auto orders_x_lineitem = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + orders_shuffled, + lineitem_shuffled, + orders_x_lineitem, + {0}, // o_orderkey + {0}, // l_orderkey + rapidsmpf::ndsh::KeepKeys::YES + ) + ); // output: o_orderkey, o_custkey, o_orderdate, o_totalprice, + // l_quantity + + // Shuffle orders_x_lineitem by custkey for customer join + auto orders_x_lineitem_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + orders_x_lineitem, + orders_x_lineitem_shuffled, + {1}, // o_custkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + } + ) + ); + + // Shuffle customer by custkey + auto customer_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + customer, + customer_shuffled, + {0}, // c_custkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + } + ) + ); + + // Shuffle-based join: customer x orders_x_lineitem + nodes.push_back( + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + customer_shuffled, + orders_x_lineitem_shuffled, + all_joined, + {0}, // c_custkey + {1}, // o_custkey + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + } else if (cmd_options.use_shuffle && single_rank) { + // ============================================================ + // SINGLE-RANK SHUFFLE MODE: Use local joins (skip shuffle overhead) + // ============================================================ + ctx->comm()->logger().print( + "Phase 2: Single-rank mode - using local joins (skipping shuffle)" + ); + + // Concatenate filtered lineitem + auto lineitem_concat = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + lineitem_filtered, + lineitem_concat, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Concatenate filtered orders + auto orders_concat = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + orders_filtered, + orders_concat, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Local join: orders x lineitem (both small after pre-filtering) + auto orders_x_lineitem = ctx->create_channel(); + nodes.push_back(local_inner_join( + ctx, + orders_concat, + lineitem_concat, + orders_x_lineitem, + {0}, // o_orderkey + {0} // l_orderkey + )); + + // Join with customer (broadcast - orders_x_lineitem is small) + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + customer, + orders_x_lineitem, + all_joined, + {0}, // c_custkey + {1}, // o_custkey + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + }, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + } else { + // ============================================================ + // ALL-GATHER MODE: Simple but doesn't scale + // ============================================================ + + auto lineitem_concat = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + lineitem_filtered, + lineitem_concat, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + auto lineitem_gathered = ctx->create_channel(); + nodes.push_back(allgather_table( + ctx, + lineitem_concat, + lineitem_gathered, + rapidsmpf::OpID{static_cast(200 + iteration)} + )); + + auto orders_concat = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + orders_filtered, + orders_concat, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + auto orders_gathered = ctx->create_channel(); + nodes.push_back(allgather_table( + ctx, + orders_concat, + orders_gathered, + rapidsmpf::OpID{static_cast(201 + iteration)} + )); + + // Local join: orders x lineitem (both small after pre-filtering) + auto orders_x_lineitem = ctx->create_channel(); + nodes.push_back(local_inner_join( + ctx, + orders_gathered, + lineitem_gathered, + orders_x_lineitem, + {0}, // o_orderkey + {0} // l_orderkey + )); + + // Join with customer (broadcast - orders_x_lineitem is small) + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + customer, + orders_x_lineitem, + all_joined, + {0}, // c_custkey + {1}, // o_custkey + rapidsmpf::OpID{static_cast(202 + iteration)}, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + } + + // Reorder columns + auto reordered = ctx->create_channel(); + nodes.push_back(reorder_columns(ctx, all_joined, reordered)); + + // Groupby aggregation + auto groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg(ctx, reordered, groupby_output)); + + auto concat_groupby = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + groupby_output, + concat_groupby, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Final groupby, all-gather, sort, limit + auto final_output = ctx->create_channel(); + nodes.push_back(final_groupby_and_sort( + ctx, + concat_groupby, + final_output, + rapidsmpf::OpID{ + static_cast(200 + iteration * 10 + op_id++) + } + )); + + // Write output + nodes.push_back(write_parquet(ctx, final_output, output_path)); + + // Run pipeline + auto phase2_build_end = std::chrono::steady_clock::now(); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + auto phase2_run_end = std::chrono::steady_clock::now(); + + std::chrono::duration phase1_time = phase1_end - start; + std::chrono::duration phase2_build_time = + phase2_build_end - phase1_end; + std::chrono::duration phase2_run_time = + phase2_run_end - phase2_build_end; + std::chrono::duration total_time = phase2_run_end - start; + + comm->logger().print( + "Iteration ", + iteration, + " Phase 1 (groupby+filter) [s]: ", + phase1_time.count() + ); + comm->logger().print( + "Iteration ", iteration, " Phase 2 build [s]: ", phase2_build_time.count() + ); + comm->logger().print( + "Iteration ", iteration, " Phase 2 run [s]: ", phase2_run_time.count() + ); + comm->logger().print( + "Iteration ", iteration, " TOTAL [s]: ", total_time.count() + ); + comm->logger().print(stats->report()); + + RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + } + } + + RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); + RAPIDSMPF_MPI(MPI_Finalize()); + return 0; +} From 4dea5e85c35ecf30b94c985950b31298e60b137c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 8 Dec 2025 06:52:20 -0800 Subject: [PATCH 02/16] update usage docs --- cpp/benchmarks/streaming/ndsh/q18.cpp | 56 ++++++++++++++++++++------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 54b154ff4..533bd0c04 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -4,26 +4,52 @@ * * TPC-H Query 18 - Pre-filter Optimization * - * This benchmark implements Q18 with a two-phase approach: + * Usage: + * # Single GPU or small scale factors (SF1-SF100): + * q18 --input-directory /path/to/tpch/data --output-file result.parquet * - * Phase 1 (blocking): Compute qualifying orderkeys - * - Read lineitem -> groupby(l_orderkey, sum(l_quantity)) -> filter(sum > 300) - * - All-gather across ranks -> final groupby+filter - * - Result: ~171K qualifying orderkeys at SF3000 (tiny) + * # Multi-GPU with large scale factors (SF1000+): + * mpirun -np 4 q18 --input-directory /path/to/tpch/data \ + * --output-file result.parquet --use-shuffle * - * Phase 2 (streaming): Pre-filter and join - * - Read lineitem -> semi-join filter -> all-gather (~684K rows) - * - Read orders -> semi-join filter -> all-gather (~171K rows) - * - Local join (no shuffle needed - data is tiny) - * - Join with customer -> groupby -> sort -> write + * Key Options: + * --use-shuffle Use shuffle-based distributed joins instead of all-gather. + * REQUIRED for large scale factors (SF1000+) on multi-GPU to + * avoid memory pressure from all-gathering large intermediate + * tables to every rank. * - * Benefits: - * - No shuffle needed for lineitem/orders (99.98% data reduction) - * - No fanout node complexity + * --spill-device-limit + * Fraction of GPU memory before spilling to host (default: 0.8). + * Use lower values (e.g., 0.5) for memory-constrained systems. + * + * Algorithm: + * This benchmark implements Q18 with a two-phase approach that exploits the + * high selectivity of the "sum(l_quantity) > 300" filter (~0.004% of orders). + * + * Phase 1 (blocking): Compute qualifying orderkeys + * - Read lineitem -> groupby(l_orderkey, sum(l_quantity)) -> filter(sum > 300) + * - With --use-shuffle: shuffle by orderkey for parallel aggregation + * - Without --use-shuffle: all-gather partial aggregates (redundant work) + * - Result: ~57 orderkeys at SF1, ~57K at SF1000, ~171K at SF3000 + * + * Phase 2 (streaming): Pre-filter and join + * - Read lineitem/orders -> semi-join filter using qualifying orderkeys + * - With --use-shuffle: shuffle filtered data for parallel joins + * - Without --use-shuffle: all-gather filtered data (works for small results) + * - Join with customer -> groupby -> sort -> write top 100 + * + * When to use --use-shuffle: + * - Multi-GPU runs at SF1000+: The intermediate filtered tables (~228K lineitem + * rows, ~57K orders rows at SF1000) become too large to all-gather efficiently. + * - Memory-constrained systems: Shuffle distributes memory pressure across ranks. + * + * When NOT to use --use-shuffle: + * - Single GPU: No benefit from shuffle overhead. + * - Small scale factors (SF1-SF100): All-gather is faster for tiny results. * * Disclaimers: - * - The two-phase approach corresponds to "advanced" query optimization - * - It may not be beneficial to re-read the lineitem table from remote storage + * - The two-phase approach corresponds to "advanced" query optimization. + * - Re-reading lineitem may not be optimal with slow remote storage. */ #include From 0a129a90be021f5870b2fa4a92826381421e290c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 8 Dec 2025 07:01:45 -0800 Subject: [PATCH 03/16] cleanup --- cpp/benchmarks/streaming/ndsh/q18.cpp | 142 ++------------------------ 1 file changed, 8 insertions(+), 134 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 533bd0c04..2fe817684 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -360,11 +360,11 @@ rapidsmpf::streaming::Node final_groupby_filter_lineitem( } /** - * @brief All-gather node for Phase 1. + * @brief All-gather node: collect from ch_in, all-gather across ranks, send to ch_out. * - * Collects partial aggregates from all ranks and outputs concatenated result. + * For single-rank, this is a simple pass-through. */ -rapidsmpf::streaming::Node allgather_partial_aggregates( +rapidsmpf::streaming::Node allgather_table( std::shared_ptr ctx, std::shared_ptr ch_in, std::shared_ptr ch_out, @@ -421,7 +421,7 @@ rapidsmpf::streaming::Node allgather_partial_aggregates( } ctx->comm()->logger().debug( - "allgather_partial_aggregates: ", result ? result->num_rows() : 0, " rows" + "allgather_table: ", result ? result->num_rows() : 0, " rows gathered" ); if (result && result->num_rows() > 0) { @@ -535,7 +535,7 @@ std::unique_ptr compute_qualifying_orderkeys( // All-gather the TINY filtered result (~57K orderkeys at SF1000) auto gathered = ctx->create_channel(); - nodes.push_back(allgather_partial_aggregates( + nodes.push_back(allgather_table( ctx, filtered_local, gathered, @@ -669,75 +669,6 @@ rapidsmpf::streaming::Node prefilter_by_orderkeys( co_await ch_out->drain(ctx->executor()); } -/** - * @brief All-gather node: collect from ch_in, all-gather across ranks, send to ch_out. - */ -rapidsmpf::streaming::Node allgather_table( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out, - rapidsmpf::OpID tag -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - co_await ch_out->drain(ctx->executor()); - co_return; - } - - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); - - ctx->comm()->logger().debug("allgather_table: local has ", table.num_rows(), " rows"); - - std::unique_ptr result; - if (ctx->comm()->nranks() > 1) { - rapidsmpf::streaming::AllGather gatherer{ctx, tag}; - - auto pack = cudf::pack(table, chunk_stream, ctx->br()->device_mr()); - gatherer.insert( - 0, - {rapidsmpf::PackedData( - std::move(pack.metadata), - ctx->br()->move(std::move(pack.gpu_data), chunk_stream) - )} - ); - gatherer.insert_finished(); - - auto packed_data = - co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); - - result = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_data), ctx->br(), true, ctx->statistics() - ), - chunk_stream, - ctx->br(), - ctx->statistics() - ); - } else { - result = - std::make_unique(table, chunk_stream, ctx->br()->device_mr()); - } - - ctx->comm()->logger().debug( - "allgather_table: gathered has ", result->num_rows(), " rows" - ); - - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::move(result), chunk_stream - ) - ) - ); - co_await ch_out->drain(ctx->executor()); -} - /** * @brief Local inner join (both inputs are small after pre-filtering). * @@ -1046,7 +977,7 @@ rapidsmpf::streaming::Node write_parquet( metadata.column_metadata[2].set_name("o_orderkey"); metadata.column_metadata[3].set_name("o_orderdate"); metadata.column_metadata[4].set_name("o_totalprice"); - metadata.column_metadata[5].set_name("col6"); + metadata.column_metadata[5].set_name("sum_quantity"); builder = builder.metadata(metadata); cudf::io::write_parquet(builder.build(), chunk.stream()); ctx->comm()->logger().print( @@ -1329,6 +1260,7 @@ int main(int argc, char** argv) { ); // Shuffle-based join: orders x lineitem + // Output: o_orderkey, o_custkey, o_orderdate, o_totalprice, l_quantity auto orders_x_lineitem = ctx->create_channel(); nodes.push_back( rapidsmpf::ndsh::inner_join_shuffle( @@ -1340,8 +1272,7 @@ int main(int argc, char** argv) { {0}, // l_orderkey rapidsmpf::ndsh::KeepKeys::YES ) - ); // output: o_orderkey, o_custkey, o_orderdate, o_totalprice, - // l_quantity + ); // Shuffle orders_x_lineitem by custkey for customer join auto orders_x_lineitem_shuffled = ctx->create_channel(); @@ -1386,63 +1317,6 @@ int main(int argc, char** argv) { ) ); - } else if (cmd_options.use_shuffle && single_rank) { - // ============================================================ - // SINGLE-RANK SHUFFLE MODE: Use local joins (skip shuffle overhead) - // ============================================================ - ctx->comm()->logger().print( - "Phase 2: Single-rank mode - using local joins (skipping shuffle)" - ); - - // Concatenate filtered lineitem - auto lineitem_concat = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - lineitem_filtered, - lineitem_concat, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - ) - ); - - // Concatenate filtered orders - auto orders_concat = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - orders_filtered, - orders_concat, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - ) - ); - - // Local join: orders x lineitem (both small after pre-filtering) - auto orders_x_lineitem = ctx->create_channel(); - nodes.push_back(local_inner_join( - ctx, - orders_concat, - lineitem_concat, - orders_x_lineitem, - {0}, // o_orderkey - {0} // l_orderkey - )); - - // Join with customer (broadcast - orders_x_lineitem is small) - nodes.push_back( - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - customer, - orders_x_lineitem, - all_joined, - {0}, // c_custkey - {1}, // o_custkey - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) - }, - rapidsmpf::ndsh::KeepKeys::YES - ) - ); - } else { // ============================================================ // ALL-GATHER MODE: Simple but doesn't scale From 7a14f284b8b6ef492d551ee689849c4d64d3944b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 9 Dec 2025 12:19:36 -0800 Subject: [PATCH 04/16] update cmake and avoid default stream --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 26 ++++++++------------ cpp/benchmarks/streaming/ndsh/q18.cpp | 9 +++++-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index 9aa363a59..ce52ede9f 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -46,22 +46,16 @@ set_target_properties( CUDA_STANDARD 20 CUDA_STANDARD_REQUIRED ON ) -target_compile_options( - q09 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" - "$<$:${RAPIDSMPF_CUDA_FLAGS}>" -) -target_compile_options( - q18 PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" - "$<$:${RAPIDSMPF_CUDA_FLAGS}>" -) -target_link_libraries( - q09 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ - $ maybe_asan -) -target_link_libraries( - q18 PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ - $ maybe_asan -) +foreach(tgt IN ITEMS q09 q18) + target_compile_options( + ${tgt} PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" + ) + target_link_libraries( + ${tgt} PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan + ) +endforeach() install( TARGETS rapidsmpfndsh COMPONENT benchmarking diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 2fe817684..c5bc8c5bf 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -89,6 +89,8 @@ #include #include #include +#include +#include #include #include #include @@ -375,7 +377,7 @@ rapidsmpf::streaming::Node allgather_table( auto msg = co_await ch_in->receive(); std::unique_ptr result; - rmm::cuda_stream_view stream = cudf::get_default_stream(); + rmm::cuda_stream_view stream; if (ctx->comm()->nranks() > 1) { rapidsmpf::streaming::AllGather gatherer{ctx, tag}; @@ -606,7 +608,7 @@ rapidsmpf::streaming::Node prefilter_by_orderkeys( qualifying_orderkeys->view(), cudf::null_equality::UNEQUAL, cudf::set_as_build_table::RIGHT, - cudf::get_default_stream() + ctx->br()->stream_pool().get_stream() ); std::size_t total_input_rows = 0; @@ -699,7 +701,10 @@ rapidsmpf::streaming::Node local_inner_join( ctx, right_msg.release() ); + rapidsmpf::CudaEvent event; auto stream = left_chunk.stream(); + rapidsmpf::cuda_stream_join(stream, right_chunk.stream(), &event); + auto left_table = left_chunk.table_view(); auto right_table = right_chunk.table_view(); From ca1171cdef9cc6fd04ec155b99cd4ad94e7fe5ea Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 10 Dec 2025 08:55:24 -0800 Subject: [PATCH 05/16] cmake update --- cpp/benchmarks/streaming/ndsh/CMakeLists.txt | 29 ++++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt index ce52ede9f..9ae8fb4e6 100644 --- a/cpp/benchmarks/streaming/ndsh/CMakeLists.txt +++ b/cpp/benchmarks/streaming/ndsh/CMakeLists.txt @@ -36,24 +36,23 @@ target_link_libraries( $ maybe_asan ) -add_executable(q09 "q09.cpp") -add_executable(q18 "q18.cpp") -set_target_properties( - q09 q18 - PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" - CXX_STANDARD 20 - CXX_STANDARD_REQUIRED ON - CUDA_STANDARD 20 - CUDA_STANDARD_REQUIRED ON -) -foreach(tgt IN ITEMS q09 q18) +foreach(query IN ITEMS q09 q18) + add_executable(${query} "${query}.cpp") + set_target_properties( + ${query} + PROPERTIES RUNTIME_OUTPUT_DIRECTORY "$" + CXX_STANDARD 20 + CXX_STANDARD_REQUIRED ON + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON + ) target_compile_options( - ${tgt} PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" - "$<$:${RAPIDSMPF_CUDA_FLAGS}>" + ${query} PRIVATE "$<$:${RAPIDSMPF_CXX_FLAGS}>" + "$<$:${RAPIDSMPF_CUDA_FLAGS}>" ) target_link_libraries( - ${tgt} PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ - $ maybe_asan + ${query} PRIVATE rapidsmpfndsh rapidsmpf::rapidsmpf $ + $ maybe_asan ) endforeach() install( From afbc824faa9867791cd31b9be79d283aae04a926 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 06:20:29 -0800 Subject: [PATCH 06/16] add PinnedMemoryResourceDisabled arg --- cpp/benchmarks/streaming/ndsh/q18.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index c5bc8c5bf..433ca9f1b 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -1120,7 +1120,9 @@ int main(int argc, char** argv) { } auto br = std::make_shared( - stats_mr, std::move(memory_available) + stats_mr, + rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, + std::move(memory_available) ); auto envvars = rapidsmpf::config::get_environment_variables(); envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); From b74ebfa4342452e6533e3b3eb5604550b0aae785 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 06:26:42 -0800 Subject: [PATCH 07/16] check for unexpected second messages --- cpp/benchmarks/streaming/ndsh/q18.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 433ca9f1b..167dd4a7a 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -286,6 +286,8 @@ rapidsmpf::streaming::Node final_groupby_filter_lineitem( co_await ch_out->drain(ctx->executor()); co_return; } + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); @@ -375,6 +377,10 @@ rapidsmpf::streaming::Node allgather_table( rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; auto msg = co_await ch_in->receive(); + if (!msg.empty()) { + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Unexpected second message."); + } std::unique_ptr result; rmm::cuda_stream_view stream; From 3b7cbfda6c8c5e5039cb4242fb21bb79d666175a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 06:38:00 -0800 Subject: [PATCH 08/16] remove unnecessary copy --- cpp/benchmarks/streaming/ndsh/q18.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 167dd4a7a..beaa7fdf4 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -416,16 +416,15 @@ rapidsmpf::streaming::Node allgather_table( ctx->statistics() ); } else { - // Single rank - just pass through + // Single rank - just forward the message as-is if (!msg.empty()) { - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - stream = chunk.stream(); - result = std::make_unique( - chunk.table_view(), stream, ctx->br()->device_mr() - ); + co_await ch_out->send(std::move(msg)); } + ctx->comm()->logger().print( + "allgather_table: single rank finished forwarding message" + ); + co_await ch_out->drain(ctx->executor()); + co_return; } ctx->comm()->logger().debug( From e08f128aa4e554a797206bbd8422dcae29ccc511 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 06:49:40 -0800 Subject: [PATCH 09/16] simplify op-id tracking --- cpp/benchmarks/streaming/ndsh/q18.cpp | 44 +++++++++++++-------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index beaa7fdf4..b45f105e2 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -473,7 +473,7 @@ std::unique_ptr compute_qualifying_orderkeys( std::string const& input_directory, double quantity_threshold, std::uint32_t num_partitions, - rapidsmpf::OpID base_tag + rapidsmpf::OpID& base_tag ) { bool const single_rank = ctx->comm()->nranks() == 1; ctx->comm()->logger().print( @@ -522,7 +522,7 @@ std::unique_ptr compute_qualifying_orderkeys( partial_aggs_shuffled, {0}, // l_orderkey num_partitions, - rapidsmpf::OpID{static_cast(base_tag)} + base_tag++ ) ); to_concat = partial_aggs_shuffled; @@ -542,12 +542,7 @@ std::unique_ptr compute_qualifying_orderkeys( // All-gather the TINY filtered result (~57K orderkeys at SF1000) auto gathered = ctx->create_channel(); - nodes.push_back(allgather_table( - ctx, - filtered_local, - gathered, - rapidsmpf::OpID{static_cast(base_tag + 1)} - )); + nodes.push_back(allgather_table(ctx, filtered_local, gathered, base_tag++)); to_collect = gathered; } @@ -1165,6 +1160,9 @@ int main(int argc, char** argv) { // Uses shuffle for multi-rank (required at scale to avoid OOM) // Uses simple local groupby for single-rank // ================================================================ + rapidsmpf::OpID phase1_op_id{ + static_cast(100 + iteration * 10) + }; std::unique_ptr qualifying_orderkeys = compute_qualifying_orderkeys( ctx, @@ -1172,7 +1170,7 @@ int main(int argc, char** argv) { cmd_options.input_directory, 300.0, // quantity_threshold cmd_options.num_partitions, - rapidsmpf::OpID{static_cast(100 + iteration * 10)} + phase1_op_id ); auto phase1_end = std::chrono::steady_clock::now(); @@ -1190,7 +1188,7 @@ int main(int argc, char** argv) { // ================================================================ std::vector nodes; std::uint32_t num_partitions = cmd_options.num_partitions; - int op_id = 0; + int phase2_op_id = 0; // Read and pre-filter lineitem auto lineitem_raw = ctx->create_channel(); @@ -1250,9 +1248,9 @@ int main(int argc, char** argv) { lineitem_shuffled, {0}, // l_orderkey num_partitions, - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) - } + rapidsmpf::OpID{static_cast( + 200 + iteration * 10 + phase2_op_id++ + )} ) ); @@ -1265,9 +1263,9 @@ int main(int argc, char** argv) { orders_shuffled, {0}, // o_orderkey num_partitions, - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) - } + rapidsmpf::OpID{static_cast( + 200 + iteration * 10 + phase2_op_id++ + )} ) ); @@ -1295,9 +1293,9 @@ int main(int argc, char** argv) { orders_x_lineitem_shuffled, {1}, // o_custkey num_partitions, - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) - } + rapidsmpf::OpID{static_cast( + 200 + iteration * 10 + phase2_op_id++ + )} ) ); @@ -1310,9 +1308,9 @@ int main(int argc, char** argv) { customer_shuffled, {0}, // c_custkey num_partitions, - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) - } + rapidsmpf::OpID{static_cast( + 200 + iteration * 10 + phase2_op_id++ + )} ) ); @@ -1421,7 +1419,7 @@ int main(int argc, char** argv) { concat_groupby, final_output, rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + op_id++) + static_cast(200 + iteration * 10 + phase2_op_id++) } )); From 379e1b74741ccdbdeba241af0d7a97312b3c637b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 07:00:08 -0800 Subject: [PATCH 10/16] simplifications based on code review --- cpp/benchmarks/streaming/ndsh/q18.cpp | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index b45f105e2..1ada22a79 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -491,12 +491,11 @@ std::unique_ptr compute_qualifying_orderkeys( auto partial_aggs = ctx->create_channel(); nodes.push_back(chunkwise_groupby_lineitem(ctx, lineitem, partial_aggs)); - std::shared_ptr to_concat; std::shared_ptr to_collect; if (single_rank) { // Single rank: simple local pipeline (no shuffle needed) - to_concat = partial_aggs; + auto to_concat = partial_aggs; auto concatenated = ctx->create_channel(); nodes.push_back( @@ -505,12 +504,10 @@ std::unique_ptr compute_qualifying_orderkeys( ) ); - auto filtered_local = ctx->create_channel(); + to_collect = ctx->create_channel(); nodes.push_back(final_groupby_filter_lineitem( - ctx, concatenated, filtered_local, quantity_threshold + ctx, concatenated, to_collect, quantity_threshold )); - - to_collect = filtered_local; } else { // Multi-rank: SHUFFLE partial aggregates by orderkey (column 0) // This distributes work across ranks - required at scale! @@ -525,7 +522,7 @@ std::unique_ptr compute_qualifying_orderkeys( base_tag++ ) ); - to_concat = partial_aggs_shuffled; + auto to_concat = partial_aggs_shuffled; // Per-partition: concatenate → groupby → filter auto concatenated = ctx->create_channel(); @@ -697,13 +694,16 @@ rapidsmpf::streaming::Node local_inner_join( auto left_chunk = rapidsmpf::ndsh::to_device( ctx, left_msg.release() ); + auto next = co_await ch_left->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Unexpected second message from left channel."); auto right_chunk = rapidsmpf::ndsh::to_device( ctx, right_msg.release() ); + next = co_await ch_right->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "Unexpected second message from right channel."); - rapidsmpf::CudaEvent event; auto stream = left_chunk.stream(); - rapidsmpf::cuda_stream_join(stream, right_chunk.stream(), &event); + rapidsmpf::cuda_stream_join(stream, right_chunk.stream()); auto left_table = left_chunk.table_view(); auto right_table = right_chunk.table_view(); @@ -737,10 +737,7 @@ rapidsmpf::streaming::Node local_inner_join( // Concatenate columns: all from left + l_quantity from right auto result_cols = left_gathered->release(); - auto right_cols = right_gathered->release(); - for (auto&& col : right_cols) { - result_cols.push_back(std::move(col)); - } + std::ranges::move(right_gathered->release(), std::back_inserter(result_cols)); auto result = std::make_unique(std::move(result_cols)); ctx->comm()->logger().print( From f54c2239e51097137f502ac96bc6f619d494ac01 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 07:08:35 -0800 Subject: [PATCH 11/16] fuse reorder_columns into chunkwise_groupby_agg --- cpp/benchmarks/streaming/ndsh/q18.cpp | 43 +++------------------------ 1 file changed, 4 insertions(+), 39 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 1ada22a79..c4fff93c1 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -757,38 +757,6 @@ rapidsmpf::streaming::Node local_inner_join( // Final Processing (reused from q18.cpp) // ============================================================================ -rapidsmpf::streaming::Node reorder_columns( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - std::uint64_t seq = 0; - while (true) { - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - break; - } - co_await ctx->executor()->schedule(); - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - auto table = chunk.table_view(); - auto reordered_table = std::make_unique( - table.select({1, 0, 2, 3, 4, 5}), chunk.stream(), ctx->br()->device_mr() - ); - co_await ch_out->send( - rapidsmpf::streaming::to_message( - seq++, - std::make_unique( - std::move(reordered_table), chunk.stream() - ) - ) - ); - } - co_await ch_out->drain(ctx->executor()); -} - rapidsmpf::streaming::Node chunkwise_groupby_agg( std::shared_ptr ctx, std::shared_ptr ch_in, @@ -806,7 +774,8 @@ rapidsmpf::streaming::Node chunkwise_groupby_agg( ctx, msg.release() ); auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); + // Reorder columns: swap 0 and 1 to get (c_custkey, o_orderkey, ...) + auto table = chunk.table_view().select({1, 0, 2, 3, 4, 5}); auto grouper = cudf::groupby::groupby( table.select({0, 1, 2, 3, 4}), cudf::null_policy::EXCLUDE, cudf::sorted::NO @@ -1391,13 +1360,9 @@ int main(int argc, char** argv) { ); } - // Reorder columns - auto reordered = ctx->create_channel(); - nodes.push_back(reorder_columns(ctx, all_joined, reordered)); - - // Groupby aggregation + // Groupby aggregation (includes column reordering) auto groupby_output = ctx->create_channel(); - nodes.push_back(chunkwise_groupby_agg(ctx, reordered, groupby_output)); + nodes.push_back(chunkwise_groupby_agg(ctx, all_joined, groupby_output)); auto concat_groupby = ctx->create_channel(); nodes.push_back( From a8e08480b3307c9c45510a0c67aa1b4eac445965 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 07:14:08 -0800 Subject: [PATCH 12/16] add more empty-msg checks --- cpp/benchmarks/streaming/ndsh/q18.cpp | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index c4fff93c1..b7fa2b41d 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -285,9 +285,12 @@ rapidsmpf::streaming::Node final_groupby_filter_lineitem( ctx->comm()->logger().print("final_groupby_filter: rank received EMPTY input!"); co_await ch_out->drain(ctx->executor()); co_return; + } else { + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS( + next.empty(), "final_groupby_filter: Unexpected second message." + ); } - auto next = co_await ch_in->receive(); - RAPIDSMPF_EXPECTS(next.empty(), "Expecting concatenated input at this point"); auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); @@ -379,7 +382,7 @@ rapidsmpf::streaming::Node allgather_table( auto msg = co_await ch_in->receive(); if (!msg.empty()) { auto next = co_await ch_in->receive(); - RAPIDSMPF_EXPECTS(next.empty(), "Unexpected second message."); + RAPIDSMPF_EXPECTS(next.empty(), "allgather_table: Unexpected second message."); } std::unique_ptr result; @@ -818,6 +821,11 @@ rapidsmpf::streaming::Node final_groupby_and_sort( if (msg.empty()) { co_await ch_out->drain(ctx->executor()); co_return; + } else { + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS( + next.empty(), "final_groupby_and_sort: Unexpected second message." + ); } auto chunk = @@ -937,6 +945,9 @@ rapidsmpf::streaming::Node write_parquet( auto msg = co_await ch_in->receive(); if (msg.empty()) { co_return; + } else { + auto next = co_await ch_in->receive(); + RAPIDSMPF_EXPECTS(next.empty(), "write_parquet: Unexpected second message."); } auto chunk = rapidsmpf::ndsh::to_device(ctx, msg.release()); From 2e9fd361c5bb9f35e308a46bdc82d9379e0338c6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 11 Dec 2025 07:19:32 -0800 Subject: [PATCH 13/16] release original chunk in final_groupby_and_sort --- cpp/benchmarks/streaming/ndsh/q18.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index b7fa2b41d..b91d94370 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -853,6 +853,9 @@ rapidsmpf::streaming::Node final_groupby_and_sort( } local_result = std::make_unique(std::move(cols)); } + { + auto _ = std::move(chunk); + } // Release original input to free GPU memory // All-gather if multi-rank std::unique_ptr global_result; From d911352fafbc15a92688893646dc6eaa26204dfb Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Wed, 31 Dec 2025 12:14:49 -0800 Subject: [PATCH 14/16] Argument handling --- cpp/benchmarks/streaming/ndsh/q18.cpp | 667 ++++++++++-------------- cpp/benchmarks/streaming/ndsh/utils.cpp | 15 +- cpp/benchmarks/streaming/ndsh/utils.hpp | 5 + 3 files changed, 286 insertions(+), 401 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index b91d94370..23164099c 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -67,6 +67,7 @@ #include #include #include +#include #include #include #include @@ -976,97 +977,31 @@ rapidsmpf::streaming::Node write_parquet( // Command Line Parsing // ============================================================================ -struct ProgramOptions { - int num_streaming_threads{4}; // 4 threads provides good pipeline parallelism - cudf::size_type num_rows_per_chunk{100'000'000}; +struct Q18Options { std::uint32_t num_partitions{64}; - bool use_shuffle{false}; // Use shuffle joins in Phase 2 for multi-GPU scaling - std::optional spill_device_limit{std::nullopt}; - std::string output_file; - std::string input_directory; }; -ProgramOptions parse_options(int argc, char** argv) { - ProgramOptions options; - - auto print_usage = [&argv]() { - std::cerr - << "Usage: " << argv[0] << " [options]\n" - << "Options:\n" - << " --num-streaming-threads Number of streaming threads (default: 4)\n" - << " --num-rows-per-chunk Number of rows per chunk (default: " - "100000000)\n" - << " --num-partitions Number of shuffle partitions (default: " - "64)\n" - << " --use-shuffle Use shuffle joins in Phase 2 for " - "multi-GPU scaling\n" - << " --spill-device-limit Fractional spill device limit (default: " - "None)\n" - << " --output-file Output file path (required)\n" - << " --input-directory Input directory path (required)\n" - << " --help Show this help message\n"; - }; +Q18Options parse_options(int argc, char** argv) { + Q18Options options; + // NOLINTBEGIN(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays,modernize-use-designated-initializers) + opterr = 0; static struct option long_options[] = { - {"num-streaming-threads", required_argument, nullptr, 1}, - {"num-rows-per-chunk", required_argument, nullptr, 2}, - {"num-partitions", required_argument, nullptr, 7}, - {"use-shuffle", no_argument, nullptr, 8}, - {"output-file", required_argument, nullptr, 3}, - {"input-directory", required_argument, nullptr, 4}, - {"help", no_argument, nullptr, 5}, - {"spill-device-limit", required_argument, nullptr, 6}, - {nullptr, 0, nullptr, 0} + {"num-partitions", required_argument, nullptr, 7}, {nullptr, 0, nullptr, 0} }; + // NOLINTEND(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays,modernize-use-designated-initializers) int opt; int option_index = 0; - bool saw_output_file = false; - bool saw_input_directory = false; while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { switch (opt) { case 1: - options.num_streaming_threads = std::atoi(optarg); - break; - case 2: - options.num_rows_per_chunk = std::atoi(optarg); - break; - case 3: - options.output_file = optarg; - saw_output_file = true; - break; - case 4: - options.input_directory = optarg; - saw_input_directory = true; - break; - case 5: - print_usage(); - std::exit(0); - case 6: - options.spill_device_limit = std::stod(optarg); - break; - case 7: options.num_partitions = static_cast(std::atoi(optarg)); break; - case 8: - options.use_shuffle = true; - break; - default: - print_usage(); - std::exit(1); } } - if (!saw_output_file || !saw_input_directory) { - if (!saw_output_file) - std::cerr << "Error: --output-file is required\n"; - if (!saw_input_directory) - std::cerr << "Error: --input-directory is required\n"; - print_usage(); - std::exit(1); - } - return options; } @@ -1075,367 +1010,307 @@ ProgramOptions parse_options(int argc, char** argv) { // ============================================================================ int main(int argc, char** argv) { + rapidsmpf::ndsh::FinalizeMPI finalize{}; cudaFree(nullptr); - rapidsmpf::mpi::init(&argc, &argv); - MPI_Comm mpi_comm; - RAPIDSMPF_MPI(MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm)); - - auto cmd_options = parse_options(argc, argv); - - auto limit_size = rmm::percent_of_free_device_memory( - static_cast(cmd_options.spill_device_limit.value_or(1) * 100) + // work around https://github.com/rapidsai/cudf/issues/20849 + cudf::initialize(); + auto mr = rmm::mr::cuda_async_memory_resource{}; + auto stats_wrapper = rapidsmpf::RmmResourceAdaptor(&mr); + auto arguments = rapidsmpf::ndsh::parse_arguments(argc, argv); + auto q18_arguments = parse_options(argc, argv); + auto ctx = rapidsmpf::ndsh::create_context(arguments, &stats_wrapper); + std::string output_path = arguments.output_file; + + ctx->comm()->logger().print("Q18 Pre-filter Benchmark"); + ctx->comm()->logger().print( + "Executor has ", ctx->executor()->thread_count(), " threads" ); - rmm::mr::cuda_async_memory_resource mr{}; - auto stats_mr = rapidsmpf::RmmResourceAdaptor(&mr); - rmm::device_async_resource_ref mr_ref(stats_mr); - rmm::mr::set_current_device_resource(&stats_mr); - rmm::mr::set_current_device_resource_ref(mr_ref); - - std::unordered_map - memory_available{}; - if (cmd_options.spill_device_limit.has_value()) { - memory_available[rapidsmpf::MemoryType::DEVICE] = rapidsmpf::LimitAvailableMemory{ - &stats_mr, static_cast(limit_size) - }; - } - - auto br = std::make_shared( - stats_mr, - rapidsmpf::BufferResource::PinnedMemoryResourceDisabled, - std::move(memory_available) + ctx->comm()->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); + ctx->comm()->logger().print( + "Phase 1 mode: ", + ctx->comm()->nranks() > 1 ? "shuffle (multi-rank)" : "local (single-rank)", + ", Phase 2 mode: ", + arguments.use_shuffle_join ? "shuffle joins" : "local joins", + ", partitions: ", + // arguments.num_partitions + q18_arguments.num_partitions ); - auto envvars = rapidsmpf::config::get_environment_variables(); - envvars["num_streaming_threads"] = std::to_string(cmd_options.num_streaming_threads); - auto options = rapidsmpf::config::Options(envvars); - auto stats = std::make_shared(&stats_mr); - { - auto comm = rapidsmpf::ucxx::init_using_mpi(mpi_comm, options); - auto progress = - std::make_shared(comm->logger(), stats); - auto ctx = - std::make_shared(options, comm, br, stats); - - comm->logger().print("Q18 Pre-filter Benchmark"); - comm->logger().print( - "Executor has ", ctx->executor()->thread_count(), " threads" - ); - comm->logger().print("Executor has ", ctx->comm()->nranks(), " ranks"); - comm->logger().print( - "Phase 1 mode: ", - ctx->comm()->nranks() > 1 ? "shuffle (multi-rank)" : "local (single-rank)", - ", Phase 2 mode: ", - cmd_options.use_shuffle ? "shuffle joins" : "local joins", - ", partitions: ", - cmd_options.num_partitions + for (int i = 0; i < arguments.num_iterations; i++) { + auto start = std::chrono::steady_clock::now(); + + // ================================================================ + // Phase 1: Compute qualifying orderkeys (blocking) + // Uses shuffle for multi-rank (required at scale to avoid OOM) + // Uses simple local groupby for single-rank + // ================================================================ + rapidsmpf::OpID phase1_op_id{static_cast(100 + i * 10)}; + std::unique_ptr qualifying_orderkeys = compute_qualifying_orderkeys( + ctx, + arguments.num_rows_per_chunk, + arguments.input_directory, + 300.0, // quantity_threshold + q18_arguments.num_partitions, + phase1_op_id ); + auto phase1_end = std::chrono::steady_clock::now(); - std::string output_path = cmd_options.output_file; - - for (int iteration = 0; iteration < 2; iteration++) { - auto start = std::chrono::steady_clock::now(); - - // ================================================================ - // Phase 1: Compute qualifying orderkeys (blocking) - // Uses shuffle for multi-rank (required at scale to avoid OOM) - // Uses simple local groupby for single-rank - // ================================================================ - rapidsmpf::OpID phase1_op_id{ - static_cast(100 + iteration * 10) - }; - std::unique_ptr qualifying_orderkeys = - compute_qualifying_orderkeys( - ctx, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory, - 300.0, // quantity_threshold - cmd_options.num_partitions, - phase1_op_id - ); - auto phase1_end = std::chrono::steady_clock::now(); - - if (!qualifying_orderkeys || qualifying_orderkeys->num_rows() == 0) { - comm->logger().print("No qualifying orderkeys found - empty result"); - continue; - } + if (!qualifying_orderkeys || qualifying_orderkeys->num_rows() == 0) { + ctx->comm()->logger().print("No qualifying orderkeys found - empty result"); + continue; + } - // Share orderkeys across nodes (they're small and identical on all ranks) - auto shared_orderkeys = - std::make_shared(std::move(*qualifying_orderkeys)); + // Share orderkeys across nodes (they're small and identical on all ranks) + auto shared_orderkeys = + std::make_shared(std::move(*qualifying_orderkeys)); + + // ================================================================ + // Phase 2: Build pre-filtered pipeline + // ================================================================ + std::vector nodes; + std::uint32_t num_partitions = q18_arguments.num_partitions; + int phase2_op_id = 0; + + // Read and pre-filter lineitem + auto lineitem_raw = ctx->create_channel(); + nodes.push_back(read_lineitem( + ctx, lineitem_raw, 4, arguments.num_rows_per_chunk, arguments.input_directory + )); - // ================================================================ - // Phase 2: Build pre-filtered pipeline - // ================================================================ - std::vector nodes; - std::uint32_t num_partitions = cmd_options.num_partitions; - int phase2_op_id = 0; + auto lineitem_filtered = ctx->create_channel(); + nodes.push_back(prefilter_by_orderkeys( + ctx, lineitem_raw, lineitem_filtered, shared_orderkeys, 0 + )); - // Read and pre-filter lineitem - auto lineitem_raw = ctx->create_channel(); - nodes.push_back(read_lineitem( - ctx, - lineitem_raw, - 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); + // Read and pre-filter orders + auto orders_raw = ctx->create_channel(); + nodes.push_back(read_orders( + ctx, orders_raw, 4, arguments.num_rows_per_chunk, arguments.input_directory + )); - auto lineitem_filtered = ctx->create_channel(); - nodes.push_back(prefilter_by_orderkeys( - ctx, lineitem_raw, lineitem_filtered, shared_orderkeys, 0 - )); + auto orders_filtered = ctx->create_channel(); + nodes.push_back( + prefilter_by_orderkeys(ctx, orders_raw, orders_filtered, shared_orderkeys, 0) + ); - // Read and pre-filter orders - auto orders_raw = ctx->create_channel(); - nodes.push_back(read_orders( - ctx, - orders_raw, - 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); + // Read customer + auto customer = ctx->create_channel(); + nodes.push_back(read_customer( + ctx, customer, 4, arguments.num_rows_per_chunk, arguments.input_directory + )); - auto orders_filtered = ctx->create_channel(); - nodes.push_back(prefilter_by_orderkeys( - ctx, orders_raw, orders_filtered, shared_orderkeys, 0 - )); + auto all_joined = ctx->create_channel(); - // Read customer - auto customer = ctx->create_channel(); - nodes.push_back(read_customer( - ctx, - customer, - 4, - cmd_options.num_rows_per_chunk, - cmd_options.input_directory - )); + bool const single_rank = ctx->comm()->nranks() == 1; - auto all_joined = ctx->create_channel(); - - bool const single_rank = ctx->comm()->nranks() == 1; - - if (cmd_options.use_shuffle && !single_rank) { - // ============================================================ - // SHUFFLE MODE: Proper parallel scaling (multi-rank only) - // ============================================================ - - // Shuffle filtered lineitem by orderkey - auto lineitem_shuffled = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - lineitem_filtered, - lineitem_shuffled, - {0}, // l_orderkey - num_partitions, - rapidsmpf::OpID{static_cast( - 200 + iteration * 10 + phase2_op_id++ - )} - ) - ); - - // Shuffle filtered orders by orderkey - auto orders_shuffled = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - orders_filtered, - orders_shuffled, - {0}, // o_orderkey - num_partitions, - rapidsmpf::OpID{static_cast( - 200 + iteration * 10 + phase2_op_id++ - )} - ) - ); - - // Shuffle-based join: orders x lineitem - // Output: o_orderkey, o_custkey, o_orderdate, o_totalprice, l_quantity - auto orders_x_lineitem = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::inner_join_shuffle( - ctx, - orders_shuffled, - lineitem_shuffled, - orders_x_lineitem, - {0}, // o_orderkey - {0}, // l_orderkey - rapidsmpf::ndsh::KeepKeys::YES - ) - ); - - // Shuffle orders_x_lineitem by custkey for customer join - auto orders_x_lineitem_shuffled = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - orders_x_lineitem, - orders_x_lineitem_shuffled, - {1}, // o_custkey - num_partitions, - rapidsmpf::OpID{static_cast( - 200 + iteration * 10 + phase2_op_id++ - )} - ) - ); - - // Shuffle customer by custkey - auto customer_shuffled = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::shuffle( - ctx, - customer, - customer_shuffled, - {0}, // c_custkey - num_partitions, - rapidsmpf::OpID{static_cast( - 200 + iteration * 10 + phase2_op_id++ - )} - ) - ); - - // Shuffle-based join: customer x orders_x_lineitem - nodes.push_back( - rapidsmpf::ndsh::inner_join_shuffle( - ctx, - customer_shuffled, - orders_x_lineitem_shuffled, - all_joined, - {0}, // c_custkey - {1}, // o_custkey - rapidsmpf::ndsh::KeepKeys::YES - ) - ); - - } else { - // ============================================================ - // ALL-GATHER MODE: Simple but doesn't scale - // ============================================================ - - auto lineitem_concat = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - lineitem_filtered, - lineitem_concat, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - ) - ); + if (arguments.use_shuffle_join && !single_rank) { + // ============================================================ + // SHUFFLE MODE: Proper parallel scaling (multi-rank only) + // ============================================================ - auto lineitem_gathered = ctx->create_channel(); - nodes.push_back(allgather_table( + // Shuffle filtered lineitem by orderkey + auto lineitem_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( ctx, - lineitem_concat, - lineitem_gathered, - rapidsmpf::OpID{static_cast(200 + iteration)} - )); - - auto orders_concat = ctx->create_channel(); - nodes.push_back( - rapidsmpf::ndsh::concatenate( - ctx, - orders_filtered, - orders_concat, - rapidsmpf::ndsh::ConcatOrder::DONT_CARE - ) - ); + lineitem_filtered, + lineitem_shuffled, + {0}, // l_orderkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + i * 10 + phase2_op_id++) + } + ) + ); - auto orders_gathered = ctx->create_channel(); - nodes.push_back(allgather_table( + // Shuffle filtered orders by orderkey + auto orders_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( ctx, - orders_concat, - orders_gathered, - rapidsmpf::OpID{static_cast(201 + iteration)} - )); + orders_filtered, + orders_shuffled, + {0}, // o_orderkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + i * 10 + phase2_op_id++) + } + ) + ); - // Local join: orders x lineitem (both small after pre-filtering) - auto orders_x_lineitem = ctx->create_channel(); - nodes.push_back(local_inner_join( + // Shuffle-based join: orders x lineitem + // Output: o_orderkey, o_custkey, o_orderdate, o_totalprice, l_quantity + auto orders_x_lineitem = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::inner_join_shuffle( ctx, - orders_gathered, - lineitem_gathered, + orders_shuffled, + lineitem_shuffled, orders_x_lineitem, {0}, // o_orderkey - {0} // l_orderkey - )); - - // Join with customer (broadcast - orders_x_lineitem is small) - nodes.push_back( - rapidsmpf::ndsh::inner_join_broadcast( - ctx, - customer, - orders_x_lineitem, - all_joined, - {0}, // c_custkey - {1}, // o_custkey - rapidsmpf::OpID{static_cast(202 + iteration)}, - rapidsmpf::ndsh::KeepKeys::YES - ) - ); - } + {0}, // l_orderkey + rapidsmpf::ndsh::KeepKeys::YES + ) + ); + + // Shuffle orders_x_lineitem by custkey for customer join + auto orders_x_lineitem_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + orders_x_lineitem, + orders_x_lineitem_shuffled, + {1}, // o_custkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + i * 10 + phase2_op_id++) + } + ) + ); + + // Shuffle customer by custkey + auto customer_shuffled = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::shuffle( + ctx, + customer, + customer_shuffled, + {0}, // c_custkey + num_partitions, + rapidsmpf::OpID{ + static_cast(200 + i * 10 + phase2_op_id++) + } + ) + ); + + // Shuffle-based join: customer x orders_x_lineitem + nodes.push_back( + rapidsmpf::ndsh::inner_join_shuffle( + ctx, + customer_shuffled, + orders_x_lineitem_shuffled, + all_joined, + {0}, // c_custkey + {1}, // o_custkey + rapidsmpf::ndsh::KeepKeys::YES + ) + ); - // Groupby aggregation (includes column reordering) - auto groupby_output = ctx->create_channel(); - nodes.push_back(chunkwise_groupby_agg(ctx, all_joined, groupby_output)); + } else { + // ============================================================ + // ALL-GATHER MODE: Simple but doesn't scale + // ============================================================ - auto concat_groupby = ctx->create_channel(); + auto lineitem_concat = ctx->create_channel(); nodes.push_back( rapidsmpf::ndsh::concatenate( ctx, - groupby_output, - concat_groupby, + lineitem_filtered, + lineitem_concat, rapidsmpf::ndsh::ConcatOrder::DONT_CARE ) ); - // Final groupby, all-gather, sort, limit - auto final_output = ctx->create_channel(); - nodes.push_back(final_groupby_and_sort( + auto lineitem_gathered = ctx->create_channel(); + nodes.push_back(allgather_table( ctx, - concat_groupby, - final_output, - rapidsmpf::OpID{ - static_cast(200 + iteration * 10 + phase2_op_id++) - } + lineitem_concat, + lineitem_gathered, + rapidsmpf::OpID{static_cast(200 + i)} )); - // Write output - nodes.push_back(write_parquet(ctx, final_output, output_path)); - - // Run pipeline - auto phase2_build_end = std::chrono::steady_clock::now(); - rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); - auto phase2_run_end = std::chrono::steady_clock::now(); - - std::chrono::duration phase1_time = phase1_end - start; - std::chrono::duration phase2_build_time = - phase2_build_end - phase1_end; - std::chrono::duration phase2_run_time = - phase2_run_end - phase2_build_end; - std::chrono::duration total_time = phase2_run_end - start; - - comm->logger().print( - "Iteration ", - iteration, - " Phase 1 (groupby+filter) [s]: ", - phase1_time.count() - ); - comm->logger().print( - "Iteration ", iteration, " Phase 2 build [s]: ", phase2_build_time.count() - ); - comm->logger().print( - "Iteration ", iteration, " Phase 2 run [s]: ", phase2_run_time.count() - ); - comm->logger().print( - "Iteration ", iteration, " TOTAL [s]: ", total_time.count() + auto orders_concat = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + orders_filtered, + orders_concat, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) ); - comm->logger().print(stats->report()); - RAPIDSMPF_MPI(MPI_Barrier(mpi_comm)); + auto orders_gathered = ctx->create_channel(); + nodes.push_back(allgather_table( + ctx, + orders_concat, + orders_gathered, + rapidsmpf::OpID{static_cast(201 + i)} + )); + + // Local join: orders x lineitem (both small after pre-filtering) + auto orders_x_lineitem = ctx->create_channel(); + nodes.push_back(local_inner_join( + ctx, + orders_gathered, + lineitem_gathered, + orders_x_lineitem, + {0}, // o_orderkey + {0} // l_orderkey + )); + + // Join with customer (broadcast - orders_x_lineitem is small) + nodes.push_back( + rapidsmpf::ndsh::inner_join_broadcast( + ctx, + customer, + orders_x_lineitem, + all_joined, + {0}, // c_custkey + {1}, // o_custkey + rapidsmpf::OpID{static_cast(202 + i)}, + rapidsmpf::ndsh::KeepKeys::YES + ) + ); } + + // Groupby aggregation (includes column reordering) + auto groupby_output = ctx->create_channel(); + nodes.push_back(chunkwise_groupby_agg(ctx, all_joined, groupby_output)); + + auto concat_groupby = ctx->create_channel(); + nodes.push_back( + rapidsmpf::ndsh::concatenate( + ctx, + groupby_output, + concat_groupby, + rapidsmpf::ndsh::ConcatOrder::DONT_CARE + ) + ); + + // Final groupby, all-gather, sort, limit + auto final_output = ctx->create_channel(); + nodes.push_back(final_groupby_and_sort( + ctx, + concat_groupby, + final_output, + rapidsmpf::OpID{static_cast(200 + i * 10 + phase2_op_id++)} + )); + + // Write output + nodes.push_back(write_parquet(ctx, final_output, output_path)); + + // Run pipeline + auto phase2_build_end = std::chrono::steady_clock::now(); + rapidsmpf::streaming::run_streaming_pipeline(std::move(nodes)); + auto phase2_run_end = std::chrono::steady_clock::now(); + + std::chrono::duration phase1_time = phase1_end - start; + std::chrono::duration phase2_build_time = phase2_build_end - phase1_end; + std::chrono::duration phase2_run_time = phase2_run_end - phase2_build_end; + std::chrono::duration total_time = phase2_run_end - start; + + ctx->comm()->logger().print( + "Iteration ", i, " Phase 1 (groupby+filter) [s]: ", phase1_time.count() + ); + ctx->comm()->logger().print( + "Iteration ", i, " Phase 2 build [s]: ", phase2_build_time.count() + ); + ctx->comm()->logger().print( + "Iteration ", i, " Phase 2 run [s]: ", phase2_run_time.count() + ); + ctx->comm()->logger().print("Iteration ", i, " TOTAL [s]: ", total_time.count()); + ctx->comm()->logger().print(ctx->statistics()->report()); } - RAPIDSMPF_MPI(MPI_Comm_free(&mpi_comm)); - RAPIDSMPF_MPI(MPI_Finalize()); return 0; } diff --git a/cpp/benchmarks/streaming/ndsh/utils.cpp b/cpp/benchmarks/streaming/ndsh/utils.cpp index c76671cfd..faab7c8cd 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.cpp +++ b/cpp/benchmarks/streaming/ndsh/utils.cpp @@ -198,6 +198,11 @@ std::shared_ptr create_context( ProgramOptions parse_arguments(int argc, char** argv) { ProgramOptions options; + // Reset getopt state and suppress error messages for unknown options + // This allows query-specific options to be parsed in a second pass + optind = 1; + opterr = 0; + static constexpr std::array(CommType::MAX)> comm_names{"single", "mpi", "ucxx"}; @@ -382,11 +387,8 @@ ProgramOptions parse_arguments(int argc, char** argv) { options.no_pinned_host_memory = true; break; case '?': - if (optopt == 0 && optind > 1) { - std::cerr << "Error: Unknown option '" << argv[optind - 1] << "'\n\n"; - } - print_usage(); - std::exit(1); + // Silently ignore unknown options - they may be query-specific + break; default: print_usage(); std::exit(1); @@ -406,6 +408,9 @@ ProgramOptions parse_arguments(int argc, char** argv) { std::exit(1); } + // Reset optind for potential second-pass parsing by the caller + optind = 1; + return options; } } // namespace rapidsmpf::ndsh diff --git a/cpp/benchmarks/streaming/ndsh/utils.hpp b/cpp/benchmarks/streaming/ndsh/utils.hpp index b00d2f9fe..9c23eb1e5 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.hpp +++ b/cpp/benchmarks/streaming/ndsh/utils.hpp @@ -126,6 +126,11 @@ struct ProgramOptions { /** * @brief Parse commandline arguments * + * Parses common options shared across all queries. Unknown options are silently + * ignored, allowing queries to perform a second pass to parse query-specific + * arguments. After this function returns, `optind` is reset to 1 so callers can + * re-parse the arguments with their own option definitions. + * * @param argc Number of arguments * @param argv Arguments * From 9dd047afdd7c680551c95088702a5f80d26439e1 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Fri, 2 Jan 2026 08:33:50 -0800 Subject: [PATCH 15/16] refactor --- cpp/benchmarks/streaming/ndsh/q18.cpp | 285 ++++++-------------------- 1 file changed, 64 insertions(+), 221 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 23164099c..73fd30a54 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 * * TPC-H Query 18 - Pre-filter Optimization @@ -55,8 +55,7 @@ #include #include #include -#include -#include +#include #include #include @@ -90,7 +89,6 @@ #include #include #include -#include #include #include #include @@ -106,27 +104,13 @@ #include #include "concatenate.hpp" +#include "groupby.hpp" #include "join.hpp" +#include "parquet_writer.hpp" #include "utils.hpp" namespace { -// ============================================================================ -// Utility Functions -// ============================================================================ - -// NOTE: This is added to ndsh::detail in https://github.com/rapidsai/rapidsmpf/pull/710 -std::string get_table_path( - std::string const& input_directory, std::string const& table_name -) { - auto dir = input_directory.empty() ? "." : input_directory; - auto file_path = dir + "/" + table_name + ".parquet"; - if (std::filesystem::exists(file_path)) { - return file_path; - } - return dir + "/" + table_name + "/"; -} - // ============================================================================ // Table Readers // ============================================================================ @@ -139,7 +123,7 @@ rapidsmpf::streaming::Node read_lineitem( std::string const& input_directory ) { auto files = rapidsmpf::ndsh::detail::list_parquet_files( - get_table_path(input_directory, "lineitem") + rapidsmpf::ndsh::detail::get_table_path(input_directory, "lineitem") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) .columns({"l_orderkey", "l_quantity"}) @@ -157,7 +141,7 @@ rapidsmpf::streaming::Node read_orders( std::string const& input_directory ) { auto files = rapidsmpf::ndsh::detail::list_parquet_files( - get_table_path(input_directory, "orders") + rapidsmpf::ndsh::detail::get_table_path(input_directory, "orders") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) @@ -176,7 +160,7 @@ rapidsmpf::streaming::Node read_customer( std::string const& input_directory ) { auto files = rapidsmpf::ndsh::detail::list_parquet_files( - get_table_path(input_directory, "customer") + rapidsmpf::ndsh::detail::get_table_path(input_directory, "customer") ); auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info(files)) .columns({"c_custkey", "c_name"}) @@ -191,78 +175,14 @@ rapidsmpf::streaming::Node read_customer( // ============================================================================ /** - * @brief Stage 1: Chunk-wise groupby (NO filter yet!) - * - * Computes partial aggregates: groupby(l_orderkey, sum(l_quantity)) - * The same orderkey may appear in multiple chunks, so we can't filter here. + * @brief Create groupby requests for lineitem sum(l_quantity) aggregation. */ -rapidsmpf::streaming::Node chunkwise_groupby_lineitem( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - auto mr = ctx->br()->device_mr(); - std::uint64_t sequence = 0; - std::size_t total_input_rows = 0; - std::size_t total_output_rows = 0; - std::size_t chunk_count = 0; - - while (true) { - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - break; - } - co_await ctx->executor()->schedule(); - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - auto chunk_stream = chunk.stream(); - auto table = chunk.table_view(); - total_input_rows += static_cast(table.num_rows()); - chunk_count++; - - // Groupby l_orderkey, sum(l_quantity) - NO FILTER - auto grouper = cudf::groupby::groupby( - table.select({0}), cudf::null_policy::EXCLUDE, cudf::sorted::NO - ); - std::vector> aggs; - aggs.push_back(cudf::make_sum_aggregation()); - std::vector requests; - requests.push_back( - cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) - ); - auto [keys, results] = grouper.aggregate(requests, chunk_stream, mr); - - auto result_columns = keys->release(); - for (auto&& r : results) { - std::ranges::move(r.results, std::back_inserter(result_columns)); - } - auto grouped_table = std::make_unique(std::move(result_columns)); - total_output_rows += static_cast(grouped_table->num_rows()); - - if (grouped_table->num_rows() > 0) { - co_await ch_out->send( - rapidsmpf::streaming::to_message( - sequence++, - std::make_unique( - std::move(grouped_table), chunk_stream - ) - ) - ); - } - } - - ctx->comm()->logger().print( - "chunkwise_groupby: rank processed ", - chunk_count, - " chunks, ", - total_input_rows, - " -> ", - total_output_rows, - " rows" - ); - co_await ch_out->drain(ctx->executor()); +std::vector lineitem_groupby_requests() { + std::vector requests; + std::vector()>> aggs; + aggs.emplace_back(cudf::make_sum_aggregation); + requests.emplace_back(1, std::move(aggs)); // column 1 = l_quantity + return requests; } /** @@ -367,87 +287,6 @@ rapidsmpf::streaming::Node final_groupby_filter_lineitem( co_await ch_out->drain(ctx->executor()); } -/** - * @brief All-gather node: collect from ch_in, all-gather across ranks, send to ch_out. - * - * For single-rank, this is a simple pass-through. - */ -rapidsmpf::streaming::Node allgather_table( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::shared_ptr ch_out, - rapidsmpf::OpID tag -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in, ch_out}; - - auto msg = co_await ch_in->receive(); - if (!msg.empty()) { - auto next = co_await ch_in->receive(); - RAPIDSMPF_EXPECTS(next.empty(), "allgather_table: Unexpected second message."); - } - - std::unique_ptr result; - rmm::cuda_stream_view stream; - - if (ctx->comm()->nranks() > 1) { - rapidsmpf::streaming::AllGather gatherer{ctx, tag}; - - if (!msg.empty()) { - auto chunk = rapidsmpf::ndsh::to_device( - ctx, msg.release() - ); - stream = chunk.stream(); - auto pack = cudf::pack(chunk.table_view(), stream, ctx->br()->device_mr()); - gatherer.insert( - 0, - {rapidsmpf::PackedData( - std::move(pack.metadata), - ctx->br()->move(std::move(pack.gpu_data), stream) - )} - ); - } - gatherer.insert_finished(); - - auto packed_data = - co_await gatherer.extract_all(rapidsmpf::streaming::AllGather::Ordered::NO); - - result = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_data), ctx->br(), true, ctx->statistics() - ), - stream, - ctx->br(), - ctx->statistics() - ); - } else { - // Single rank - just forward the message as-is - if (!msg.empty()) { - co_await ch_out->send(std::move(msg)); - } - ctx->comm()->logger().print( - "allgather_table: single rank finished forwarding message" - ); - co_await ch_out->drain(ctx->executor()); - co_return; - } - - ctx->comm()->logger().debug( - "allgather_table: ", result ? result->num_rows() : 0, " rows gathered" - ); - - if (result && result->num_rows() > 0) { - co_await ch_out->send( - rapidsmpf::streaming::to_message( - 0, - std::make_unique( - std::move(result), stream - ) - ) - ); - } - co_await ch_out->drain(ctx->executor()); -} - /** * @brief Phase 1: Compute qualifying orderkeys. * @@ -493,7 +332,16 @@ std::unique_ptr compute_qualifying_orderkeys( nodes.push_back(read_lineitem(ctx, lineitem, 4, num_rows_per_chunk, input_directory)); auto partial_aggs = ctx->create_channel(); - nodes.push_back(chunkwise_groupby_lineitem(ctx, lineitem, partial_aggs)); + nodes.push_back( + rapidsmpf::ndsh::chunkwise_group_by( + ctx, + lineitem, + partial_aggs, + {0}, // l_orderkey + lineitem_groupby_requests(), + cudf::null_policy::EXCLUDE + ) + ); std::shared_ptr to_collect; @@ -543,7 +391,15 @@ std::unique_ptr compute_qualifying_orderkeys( // All-gather the TINY filtered result (~57K orderkeys at SF1000) auto gathered = ctx->create_channel(); - nodes.push_back(allgather_table(ctx, filtered_local, gathered, base_tag++)); + nodes.push_back( + rapidsmpf::ndsh::broadcast( + ctx, + filtered_local, + gathered, + base_tag++, + rapidsmpf::streaming::AllGather::Ordered::NO + ) + ); to_collect = gathered; } @@ -940,37 +796,6 @@ rapidsmpf::streaming::Node final_groupby_and_sort( co_await ch_out->drain(ctx->executor()); } -rapidsmpf::streaming::Node write_parquet( - std::shared_ptr ctx, - std::shared_ptr ch_in, - std::string output_path -) { - rapidsmpf::streaming::ShutdownAtExit c{ch_in}; - auto msg = co_await ch_in->receive(); - if (msg.empty()) { - co_return; - } else { - auto next = co_await ch_in->receive(); - RAPIDSMPF_EXPECTS(next.empty(), "write_parquet: Unexpected second message."); - } - auto chunk = - rapidsmpf::ndsh::to_device(ctx, msg.release()); - auto sink = cudf::io::sink_info(output_path); - auto builder = cudf::io::parquet_writer_options::builder(sink, chunk.table_view()); - auto metadata = cudf::io::table_input_metadata(chunk.table_view()); - metadata.column_metadata[0].set_name("c_name"); - metadata.column_metadata[1].set_name("c_custkey"); - metadata.column_metadata[2].set_name("o_orderkey"); - metadata.column_metadata[3].set_name("o_orderdate"); - metadata.column_metadata[4].set_name("o_totalprice"); - metadata.column_metadata[5].set_name("sum_quantity"); - builder = builder.metadata(metadata); - cudf::io::write_parquet(builder.build(), chunk.stream()); - ctx->comm()->logger().print( - "Wrote ", chunk.table_view().num_rows(), " rows to ", output_path - ); -} - } // namespace // ============================================================================ @@ -1212,12 +1037,15 @@ int main(int argc, char** argv) { ); auto lineitem_gathered = ctx->create_channel(); - nodes.push_back(allgather_table( - ctx, - lineitem_concat, - lineitem_gathered, - rapidsmpf::OpID{static_cast(200 + i)} - )); + nodes.push_back( + rapidsmpf::ndsh::broadcast( + ctx, + lineitem_concat, + lineitem_gathered, + rapidsmpf::OpID{static_cast(200 + i)}, + rapidsmpf::streaming::AllGather::Ordered::NO + ) + ); auto orders_concat = ctx->create_channel(); nodes.push_back( @@ -1230,12 +1058,15 @@ int main(int argc, char** argv) { ); auto orders_gathered = ctx->create_channel(); - nodes.push_back(allgather_table( - ctx, - orders_concat, - orders_gathered, - rapidsmpf::OpID{static_cast(201 + i)} - )); + nodes.push_back( + rapidsmpf::ndsh::broadcast( + ctx, + orders_concat, + orders_gathered, + rapidsmpf::OpID{static_cast(201 + i)}, + rapidsmpf::streaming::AllGather::Ordered::NO + ) + ); // Local join: orders x lineitem (both small after pre-filtering) auto orders_x_lineitem = ctx->create_channel(); @@ -1287,7 +1118,19 @@ int main(int argc, char** argv) { )); // Write output - nodes.push_back(write_parquet(ctx, final_output, output_path)); + nodes.push_back( + rapidsmpf::ndsh::write_parquet( + ctx, + final_output, + cudf::io::sink_info{output_path}, + {"c_name", + "c_custkey", + "o_orderkey", + "o_orderdate", + "o_totalprice", + "sum_quantity"} + ) + ); // Run pipeline auto phase2_build_end = std::chrono::steady_clock::now(); From cad5d1cf213ceb8a6e2872f952ad2329bcf9f498 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 4 Jan 2026 05:54:39 -0800 Subject: [PATCH 16/16] Options parsing --- cpp/benchmarks/streaming/ndsh/q18.cpp | 57 ++++++---------- cpp/benchmarks/streaming/ndsh/utils.cpp | 86 ++++++++++++++++++++++--- cpp/benchmarks/streaming/ndsh/utils.hpp | 27 ++++++-- 3 files changed, 119 insertions(+), 51 deletions(-) diff --git a/cpp/benchmarks/streaming/ndsh/q18.cpp b/cpp/benchmarks/streaming/ndsh/q18.cpp index 73fd30a54..737616242 100644 --- a/cpp/benchmarks/streaming/ndsh/q18.cpp +++ b/cpp/benchmarks/streaming/ndsh/q18.cpp @@ -60,7 +60,6 @@ #include #include -#include #include #include @@ -798,42 +797,15 @@ rapidsmpf::streaming::Node final_groupby_and_sort( } // namespace -// ============================================================================ -// Command Line Parsing -// ============================================================================ - -struct Q18Options { - std::uint32_t num_partitions{64}; -}; - -Q18Options parse_options(int argc, char** argv) { - Q18Options options; - // NOLINTBEGIN(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays,modernize-use-designated-initializers) - opterr = 0; - - static struct option long_options[] = { - {"num-partitions", required_argument, nullptr, 7}, {nullptr, 0, nullptr, 0} - }; - // NOLINTEND(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays,modernize-use-designated-initializers) - - int opt; - int option_index = 0; - - while ((opt = getopt_long(argc, argv, "", long_options, &option_index)) != -1) { - switch (opt) { - case 1: - options.num_partitions = static_cast(std::atoi(optarg)); - break; - } - } - - return options; -} - // ============================================================================ // Main // ============================================================================ +namespace { +/// Known query-specific options for Q18 +std::vector const Q18_QUERY_OPTIONS = {"num-partitions"}; +} // namespace + int main(int argc, char** argv) { rapidsmpf::ndsh::FinalizeMPI finalize{}; cudaFree(nullptr); @@ -842,7 +814,18 @@ int main(int argc, char** argv) { auto mr = rmm::mr::cuda_async_memory_resource{}; auto stats_wrapper = rapidsmpf::RmmResourceAdaptor(&mr); auto arguments = rapidsmpf::ndsh::parse_arguments(argc, argv); - auto q18_arguments = parse_options(argc, argv); + + // Validate query-specific options + rapidsmpf::ndsh::validate_query_options(arguments, Q18_QUERY_OPTIONS, "Q18"); + + // Extract Q18-specific options + std::uint32_t num_partitions = 64; + if (auto it = arguments.query_options.find("num-partitions"); + it != arguments.query_options.end()) + { + num_partitions = static_cast(std::stoul(it->second)); + } + auto ctx = rapidsmpf::ndsh::create_context(arguments, &stats_wrapper); std::string output_path = arguments.output_file; @@ -857,8 +840,7 @@ int main(int argc, char** argv) { ", Phase 2 mode: ", arguments.use_shuffle_join ? "shuffle joins" : "local joins", ", partitions: ", - // arguments.num_partitions - q18_arguments.num_partitions + num_partitions ); for (int i = 0; i < arguments.num_iterations; i++) { @@ -875,7 +857,7 @@ int main(int argc, char** argv) { arguments.num_rows_per_chunk, arguments.input_directory, 300.0, // quantity_threshold - q18_arguments.num_partitions, + num_partitions, phase1_op_id ); auto phase1_end = std::chrono::steady_clock::now(); @@ -893,7 +875,6 @@ int main(int argc, char** argv) { // Phase 2: Build pre-filtered pipeline // ================================================================ std::vector nodes; - std::uint32_t num_partitions = q18_arguments.num_partitions; int phase2_op_id = 0; // Read and pre-filter lineitem diff --git a/cpp/benchmarks/streaming/ndsh/utils.cpp b/cpp/benchmarks/streaming/ndsh/utils.cpp index faab7c8cd..1bd7d28ee 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.cpp +++ b/cpp/benchmarks/streaming/ndsh/utils.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -198,10 +199,9 @@ std::shared_ptr create_context( ProgramOptions parse_arguments(int argc, char** argv) { ProgramOptions options; - // Reset getopt state and suppress error messages for unknown options - // This allows query-specific options to be parsed in a second pass + // Reset getopt state. Enable error messages for unknown options. optind = 1; - opterr = 0; + opterr = 1; static constexpr std::array(CommType::MAX)> comm_names{"single", "mpi", "ucxx"}; @@ -241,6 +241,10 @@ ProgramOptions parse_arguments(int argc, char** argv) { << (options.use_shuffle_join ? "true" : "false") << ")\n" << " --output-file Output file path (required)\n" << " --input-directory Input directory path (required)\n" + << " --query-options Query-specific options as key=value " + "pairs\n" + << " (e.g., --query-options " + "num-partitions=64)\n" << " --help Show this help message\n"; }; @@ -258,6 +262,7 @@ ProgramOptions parse_arguments(int argc, char** argv) { {"comm-type", required_argument, nullptr, 10}, {"periodic-spill", required_argument, nullptr, 11}, {"no-pinned-host-memory", no_argument, nullptr, 12}, + {"query-options", required_argument, nullptr, 13}, {nullptr, 0, nullptr, 0} }; // NOLINTEND(modernize-avoid-c-arrays,cppcoreguidelines-avoid-c-arrays,modernize-use-designated-initializers) @@ -386,9 +391,34 @@ ProgramOptions parse_arguments(int argc, char** argv) { case 12: options.no_pinned_host_memory = true; break; + case 13: + { + // Parse query-options: expects key=value format + // Can be specified multiple times or as space-separated within one arg + std::string arg = optarg; + auto pos = arg.find('='); + if (pos == std::string::npos || pos == 0 || pos == arg.size() - 1) { + std::cerr + << "Error: Invalid --query-options format: '" << arg << "'\n" + << "Expected format: key=value (e.g., num-partitions=64)\n\n"; + print_usage(); + std::exit(1); + } + std::string key = arg.substr(0, pos); + std::string value = arg.substr(pos + 1); + if (options.query_options.contains(key)) { + std::cerr << "Error: Duplicate query option: '" << key << "'\n\n"; + print_usage(); + std::exit(1); + } + options.query_options[key] = value; + break; + } case '?': - // Silently ignore unknown options - they may be query-specific - break; + // Unknown option - getopt already printed an error message + std::cerr << "\n"; + print_usage(); + std::exit(1); default: print_usage(); std::exit(1); @@ -408,9 +438,49 @@ ProgramOptions parse_arguments(int argc, char** argv) { std::exit(1); } - // Reset optind for potential second-pass parsing by the caller - optind = 1; + // Check for unexpected positional arguments + if (optind < argc) { + std::cerr << "Error: Unexpected positional argument(s):"; + for (int i = optind; i < argc; ++i) { + std::cerr << " '" << argv[i] << "'"; + } + std::cerr << "\n\n"; + print_usage(); + std::exit(1); + } return options; } + +void validate_query_options( + ProgramOptions const& options, + std::vector const& known_keys, + std::string const& query_name +) { + std::unordered_set known_set(known_keys.begin(), known_keys.end()); + std::vector unknown_keys; + + for (auto const& [key, value] : options.query_options) { + if (!known_set.contains(key)) { + unknown_keys.push_back(key); + } + } + + if (!unknown_keys.empty()) { + std::cerr << "Error: Unknown query option(s) for " << query_name << ":"; + for (auto const& key : unknown_keys) { + std::cerr << " '" << key << "'"; + } + std::cerr << "\n\nValid query options for " << query_name << ":"; + if (known_keys.empty()) { + std::cerr << " (none)"; + } else { + for (auto const& key : known_keys) { + std::cerr << "\n " << key; + } + } + std::cerr << "\n"; + std::exit(1); + } +} } // namespace rapidsmpf::ndsh diff --git a/cpp/benchmarks/streaming/ndsh/utils.hpp b/cpp/benchmarks/streaming/ndsh/utils.hpp index 9c23eb1e5..f92a6526f 100644 --- a/cpp/benchmarks/streaming/ndsh/utils.hpp +++ b/cpp/benchmarks/streaming/ndsh/utils.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: Apache-2.0 */ @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -121,15 +122,15 @@ struct ProgramOptions { bool use_shuffle_join = false; ///< Use shuffle join for "big" joins? std::string output_file; ///< File to write output to std::string input_directory; ///< Directory containing input files. + std::unordered_map + query_options; ///< Query-specific key=value options }; /** * @brief Parse commandline arguments * - * Parses common options shared across all queries. Unknown options are silently - * ignored, allowing queries to perform a second pass to parse query-specific - * arguments. After this function returns, `optind` is reset to 1 so callers can - * re-parse the arguments with their own option definitions. + * Parses common options shared across all queries. Unknown options cause an error. + * Query-specific options should be passed via --query-options as key=value pairs. * * @param argc Number of arguments * @param argv Arguments @@ -138,6 +139,22 @@ struct ProgramOptions { */ ProgramOptions parse_arguments(int argc, char** argv); +/** + * @brief Validate that all query options have been consumed + * + * Checks that the provided known_keys contains all keys in query_options. + * Prints an error and exits if unknown query options are found. + * + * @param options ProgramOptions containing query_options + * @param known_keys Set of valid option keys for this query + * @param query_name Name of the query (for error messages) + */ +void validate_query_options( + ProgramOptions const& options, + std::vector const& known_keys, + std::string const& query_name +); + /** * @brief Create a streaming execution context for a query. *