Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions velox/exec/StreamingAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ void StreamingAggregation::assignGroups() {
auto numInput = input_->size();

inputGroups_.resize(numInput);

// Look for the end of the last group.
vector_size_t index = 0;
if (prevInput_) {
if (prevInput_ && numGroups_ > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为何要加这个条件?

auto prevIndex = prevInput_->size() - 1;
auto* prevGroup = groups_[numGroups_ - 1];
for (; index < numInput; ++index) {
Expand Down Expand Up @@ -334,10 +333,9 @@ RowVectorPtr StreamingAggregation::getOutput() {
evaluateAggregates();

RowVectorPtr output;

if (numGroups_ > minOutputBatchSize_) {
if (numGroups_ >= minOutputBatchSize_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里有必要改?

Copy link
Author

@KevinyhZou KevinyhZou Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对,velox 里面minOutputBatchSize 最小只能取1, numGroups > minOutputBatchSize 意味着当只有一条数据的时候,是不输出的,这不符合流计算的场景。

output = createOutput(
std::min(numGroups_ - 1, static_cast<size_t>(maxOutputBatchSize_)));
std::min(numGroups_, static_cast<size_t>(maxOutputBatchSize_)));
}

prevInput_ = input_;
Expand Down
38 changes: 19 additions & 19 deletions velox/experimental/stateful/GroupWindowAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ GroupWindowAggregator::GroupWindowAggregator(
std::vector<std::unique_ptr<StatefulOperator>> targets,
std::unique_ptr<KeySelector> keySelector,
std::unique_ptr<SliceAssigner> sliceAssigner,
long allowedLateness,
int64_t allowedLateness,
bool produceUpdates,
int rowtimeIndex,
bool isEventTime,
Expand Down Expand Up @@ -82,12 +82,12 @@ void GroupWindowAggregator::getOutput() {
}

// 1. Partition input by key
std::map<uint32_t, RowVectorPtr> keyToData = keySelector_->partition(input_);
std::map<int64_t, RowVectorPtr> keyToData = keySelector_->partition(input_);
for (const auto& [key, keyedData] : keyToData) {
// 2. Set the current key in the context
windowContext_->setCurrentKey(key);
// 3. Partition the keyed data by rowtime or processing time
std::map<uint32_t, RowVectorPtr> timestampToData = sliceAssigner_->assignSliceEnd(keyedData);
std::map<int64_t, RowVectorPtr> timestampToData = sliceAssigner_->assignSliceEnd(keyedData);
for (const auto& [timestamp, data] : timestampToData) {
// 4. Assign data to window
std::vector<TimeWindow> windows =
Expand Down Expand Up @@ -143,11 +143,11 @@ void GroupWindowAggregator::close() {
}

void GroupWindowAggregator::registerCleanupTimer(uint32_t key, TimeWindow window) {
long cleanupTime =
int64_t cleanupTime =
TimeWindowUtil::toEpochMillsForTimer(
TimeWindowUtil::cleanupTime(window.maxTimestamp(), allowedLateness_, isEventTime_),
shiftTimeZone_);
if (cleanupTime == LONG_MAX) {
if (cleanupTime == INT64_MAX) {
// don't set a GC timer for "end of time"
return;
}
Expand Down Expand Up @@ -190,17 +190,17 @@ void GroupWindowAggregator::WindowTriggerContext::open() {
}

bool GroupWindowAggregator::WindowTriggerContext::onElement(
uint32_t key, RowVectorPtr row, long timestamp, TimeWindow window) {
uint32_t key, RowVectorPtr row, int64_t timestamp, TimeWindow window) {
return trigger_->onElement(key, row, timestamp, window);
}

bool GroupWindowAggregator::WindowTriggerContext::onProcessingTime(
TimeWindow window, long time) {
TimeWindow window, int64_t time) {
return trigger_->onProcessingTime(window, time);
}

bool GroupWindowAggregator::WindowTriggerContext::onEventTime(
TimeWindow window, long time) {
TimeWindow window, int64_t time) {
return trigger_->onEventTime(window, time);
}

Expand All @@ -209,31 +209,31 @@ void GroupWindowAggregator::WindowTriggerContext::onMerge(
trigger_->onMerge(key, window, nullptr);
}

long GroupWindowAggregator::WindowTriggerContext::getCurrentProcessingTime() {
int64_t GroupWindowAggregator::WindowTriggerContext::getCurrentProcessingTime() {
return internalTimerService_->currentProcessingTime();
}

long GroupWindowAggregator::WindowTriggerContext::getCurrentWatermark() {
int64_t GroupWindowAggregator::WindowTriggerContext::getCurrentWatermark() {
return internalTimerService_->currentWatermark();
}

void GroupWindowAggregator::WindowTriggerContext::registerProcessingTimeTimer(
uint32_t key, TimeWindow window, long time) {
uint32_t key, TimeWindow window, int64_t time) {
internalTimerService_->registerProcessingTimeTimer(key, window, time);
}

void GroupWindowAggregator::WindowTriggerContext::registerEventTimeTimer(
uint32_t key, TimeWindow window, long time) {
uint32_t key, TimeWindow window, int64_t time) {
internalTimerService_->registerEventTimeTimer(key, window, time);
}

void GroupWindowAggregator::WindowTriggerContext::deleteProcessingTimeTimer(
uint32_t key, TimeWindow window, long time) {
uint32_t key, TimeWindow window, int64_t time) {
internalTimerService_->deleteProcessingTimeTimer(key, window, time);
}

void GroupWindowAggregator::WindowTriggerContext::deleteEventTimeTimer(
uint32_t key, TimeWindow window, long time) {
uint32_t key, TimeWindow window, int64_t time) {
internalTimerService_->deleteEventTimeTimer(key, window, time);
}

Expand Down Expand Up @@ -265,7 +265,7 @@ WindowContext::WindowContext(
std::shared_ptr<StreamOperatorStateHandler> stateHandler,
int shiftTimeZone,
bool isEventTime,
long allowedLateness)
int64_t allowedLateness)
: windowAggregator_(windowAggregator),
windowState_(std::move(windowState)),
timerService_(std::move(timerService)),
Expand All @@ -280,11 +280,11 @@ StatePtr WindowContext::getPartitionedState(StateDescriptor& stateDescriptor) {
return stateHandler_->getGroupMapState(stateDescriptor);
}

long WindowContext::currentProcessingTime() {
int64_t WindowContext::currentProcessingTime() {
return timerService_->currentProcessingTime();
}

long WindowContext::currentWatermark() {
int64_t WindowContext::currentWatermark() {
return timerService_->currentWatermark();
}

Expand Down Expand Up @@ -316,11 +316,11 @@ void WindowContext::clearTrigger(TimeWindow window) {
}

void WindowContext::deleteCleanupTimer(TimeWindow window) {
long cleanupTime =
int64_t cleanupTime =
TimeWindowUtil::toEpochMillsForTimer(
TimeWindowUtil::cleanupTime(window.maxTimestamp(), allowedLateness_, isEventTime_),
shiftTimeZone_);
if (cleanupTime == LONG_MAX) {
if (cleanupTime == INT64_MAX) {
// no need to clean up because we didn't set one
return;
}
Expand Down
1 change: 0 additions & 1 deletion velox/experimental/stateful/GroupWindowAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "velox/experimental/stateful/TimerHeapInternalTimer.h"
#include "velox/experimental/stateful/Triggerable.h"
#include "velox/experimental/stateful/window/SliceAssigner.h"
#include "velox/experimental/stateful/window/WindowBuffer.h"
#include "velox/experimental/stateful/window/WindowProcessFunction.h"
#include "velox/experimental/stateful/window/WindowTrigger.h"

Expand Down
Loading