-
Notifications
You must be signed in to change notification settings - Fork 29
Adding NDSH Q13 impl #706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Adding NDSH Q13 impl #706
Changes from all commits
30a05c7
c0b3a94
d8ba508
d17800c
d253e87
4dccab5
b2149de
ae0cafa
68b756f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -351,6 +351,143 @@ streaming::Node inner_join_shuffle( | |||||
| co_await ch_out->drain(ctx->executor()); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * @brief Left join a table chunk against a build hash table, returning a message of the | ||||||
| * result. | ||||||
| * | ||||||
| * @param ctx Streaming context | ||||||
| * @param left_chunk Chunk to join (probe side) | ||||||
| * @param sequence Sequence number of the output | ||||||
| * @param joiner hash_join object representing the build table | ||||||
| * @param build_carrier Columns from the build-side table to be included in the output | ||||||
| * @param left_on Key column indices in `left_chunk` | ||||||
| * @param build_stream Stream the `joiner` will be deallocated on | ||||||
| * @param build_event Event recording the creation of the `joiner` | ||||||
| * | ||||||
| * @return Message of `TableChunk` containing the result of the left join | ||||||
| */ | ||||||
| streaming::Message left_join_chunk( | ||||||
| std::shared_ptr<streaming::Context> ctx, | ||||||
| streaming::TableChunk&& left_chunk, | ||||||
| std::uint64_t sequence, | ||||||
| cudf::hash_join& joiner, | ||||||
| cudf::table_view build_carrier, | ||||||
| std::vector<cudf::size_type> left_on, | ||||||
| rmm::cuda_stream_view build_stream, | ||||||
| CudaEvent* build_event | ||||||
| ) { | ||||||
| CudaEvent event; | ||||||
| auto chunk_stream = left_chunk.stream(); | ||||||
| build_event->stream_wait(chunk_stream); | ||||||
| auto probe_table = left_chunk.table_view(); | ||||||
| auto probe_keys = probe_table.select(left_on); | ||||||
| auto [probe_match, build_match] = | ||||||
| joiner.left_join(probe_keys, std::nullopt, chunk_stream, ctx->br()->device_mr()); | ||||||
|
|
||||||
| cudf::column_view build_indices = // right | ||||||
| cudf::device_span<cudf::size_type const>(*build_match); | ||||||
| cudf::column_view probe_indices = // left | ||||||
| cudf::device_span<cudf::size_type const>(*probe_match); | ||||||
| // build_carrier is valid on build_stream, but chunk_stream is | ||||||
| // waiting for build_stream work to be done, so running this on | ||||||
| // chunk_stream is fine. | ||||||
|
|
||||||
| // For LEFT join, keep all columns from the probe (left) table including keys, | ||||||
| // since they're always valid (unlike right-side keys which may be NULL). | ||||||
| auto result_columns = cudf::gather( | ||||||
| probe_table, | ||||||
| probe_indices, | ||||||
| cudf::out_of_bounds_policy::DONT_CHECK, | ||||||
| chunk_stream, | ||||||
| ctx->br()->device_mr() | ||||||
| ) | ||||||
| ->release(); | ||||||
|
|
||||||
| // left join build indices could have sentinel values (INT_MIN), so they will be OOB. | ||||||
| std::ranges::move( | ||||||
| cudf::gather( | ||||||
| build_carrier, | ||||||
| build_indices, | ||||||
| cudf::out_of_bounds_policy::NULLIFY, | ||||||
| chunk_stream, | ||||||
| ctx->br()->device_mr() | ||||||
| ) | ||||||
| ->release(), | ||||||
| std::back_inserter(result_columns) | ||||||
| ); | ||||||
| // Deallocation of the join indices will happen on build_stream, so add stream dep | ||||||
| // This also ensure deallocation of the hash_join object waits for completion. | ||||||
| cuda_stream_join(build_stream, chunk_stream, &event); | ||||||
| return streaming::to_message( | ||||||
| sequence, | ||||||
| std::make_unique<streaming::TableChunk>( | ||||||
| std::make_unique<cudf::table>(std::move(result_columns)), chunk_stream | ||||||
| ) | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| streaming::Node left_join_shuffle( | ||||||
| std::shared_ptr<streaming::Context> ctx, | ||||||
| std::shared_ptr<streaming::Channel> left, | ||||||
| std::shared_ptr<streaming::Channel> right, | ||||||
| std::shared_ptr<streaming::Channel> ch_out, | ||||||
| std::vector<cudf::size_type> left_on, | ||||||
| std::vector<cudf::size_type> right_on | ||||||
| ) { | ||||||
| streaming::ShutdownAtExit c{left, right, ch_out}; | ||||||
| ctx->comm()->logger().print("Left shuffle join"); | ||||||
| co_await ctx->executor()->schedule(); | ||||||
| CudaEvent build_event; | ||||||
| while (true) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| // Requirement: two shuffles kick out partitions in the same order | ||||||
| auto left_msg = co_await left->receive(); | ||||||
| auto right_msg = co_await right->receive(); | ||||||
| if (left_msg.empty()) { | ||||||
| RAPIDSMPF_EXPECTS( | ||||||
| right_msg.empty(), "Left does not have same number of partitions as right" | ||||||
| ); | ||||||
| break; | ||||||
| } | ||||||
| RAPIDSMPF_EXPECTS( | ||||||
| left_msg.sequence_number() == right_msg.sequence_number(), | ||||||
| "Mismatching sequence numbers" | ||||||
| ); | ||||||
|
|
||||||
| // use right as build table | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason for this? Or just an implementation detail for now? |
||||||
| auto build_chunk = | ||||||
| co_await right_msg.release<streaming::TableChunk>().make_available(ctx); | ||||||
| auto build_stream = build_chunk.stream(); | ||||||
| auto joiner = cudf::hash_join( | ||||||
| build_chunk.table_view().select(right_on), | ||||||
| cudf::null_equality::UNEQUAL, | ||||||
| build_stream | ||||||
| ); | ||||||
| build_event.record(build_stream); | ||||||
|
|
||||||
| // drop key columns from build table. | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
| std::vector<cudf::size_type> to_keep; | ||||||
| std::ranges::copy_if( | ||||||
| std::ranges::iota_view(0, build_chunk.table_view().num_columns()), | ||||||
| std::back_inserter(to_keep), | ||||||
| [&](auto i) { return std::ranges::find(right_on, i) == right_on.end(); } | ||||||
| ); | ||||||
| auto build_carrier = build_chunk.table_view().select(to_keep); | ||||||
|
|
||||||
| auto sequence = left_msg.sequence_number(); | ||||||
| co_await ch_out->send(left_join_chunk( | ||||||
| ctx, | ||||||
| co_await left_msg.release<streaming::TableChunk>().make_available(ctx), | ||||||
| sequence, | ||||||
| joiner, | ||||||
| build_carrier, | ||||||
| left_on, | ||||||
| build_stream, | ||||||
| &build_event | ||||||
| )); | ||||||
| } | ||||||
| co_await ch_out->drain(ctx->executor()); | ||||||
| } | ||||||
|
|
||||||
| streaming::Node shuffle( | ||||||
| std::shared_ptr<streaming::Context> ctx, | ||||||
| std::shared_ptr<streaming::Channel> ch_in, | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should just need to add q13 to line 41 (
set(RAPIDSMPFNDSH_QUERIES)