diff --git a/velox/exec/StreamingAggregation.cpp b/velox/exec/StreamingAggregation.cpp index f08729e676e3..8733d6e484fb 100644 --- a/velox/exec/StreamingAggregation.cpp +++ b/velox/exec/StreamingAggregation.cpp @@ -210,10 +210,9 @@ void StreamingAggregation::assignGroups() { auto numInput = input_->size(); inputGroups_.resize(numInput); - // Look for the end of the last group. vector_size_t index = 0; - if (prevInput_) { + if (prevInput_ && numGroups_ > 0) { auto prevIndex = prevInput_->size() - 1; auto* prevGroup = groups_[numGroups_ - 1]; for (; index < numInput; ++index) { @@ -334,10 +333,9 @@ RowVectorPtr StreamingAggregation::getOutput() { evaluateAggregates(); RowVectorPtr output; - - if (numGroups_ > minOutputBatchSize_) { + if (numGroups_ >= minOutputBatchSize_) { output = createOutput( - std::min(numGroups_ - 1, static_cast(maxOutputBatchSize_))); + std::min(numGroups_, static_cast(maxOutputBatchSize_))); } prevInput_ = input_; diff --git a/velox/experimental/stateful/GroupWindowAggregator.cpp b/velox/experimental/stateful/GroupWindowAggregator.cpp index 2e03df5b99c7..42ff24bb85f4 100644 --- a/velox/experimental/stateful/GroupWindowAggregator.cpp +++ b/velox/experimental/stateful/GroupWindowAggregator.cpp @@ -24,7 +24,7 @@ GroupWindowAggregator::GroupWindowAggregator( std::vector> targets, std::unique_ptr keySelector, std::unique_ptr sliceAssigner, - long allowedLateness, + int64_t allowedLateness, bool produceUpdates, int rowtimeIndex, bool isEventTime, @@ -82,12 +82,12 @@ void GroupWindowAggregator::getOutput() { } // 1. Partition input by key - std::map keyToData = keySelector_->partition(input_); + std::map keyToData = keySelector_->partition(input_); for (const auto& [key, keyedData] : keyToData) { // 2. Set the current key in the context windowContext_->setCurrentKey(key); // 3. Partition the keyed data by rowtime or processing time - std::map timestampToData = sliceAssigner_->assignSliceEnd(keyedData); + std::map timestampToData = sliceAssigner_->assignSliceEnd(keyedData); for (const auto& [timestamp, data] : timestampToData) { // 4. Assign data to window std::vector windows = @@ -143,11 +143,11 @@ void GroupWindowAggregator::close() { } void GroupWindowAggregator::registerCleanupTimer(uint32_t key, TimeWindow window) { - long cleanupTime = + int64_t cleanupTime = TimeWindowUtil::toEpochMillsForTimer( TimeWindowUtil::cleanupTime(window.maxTimestamp(), allowedLateness_, isEventTime_), shiftTimeZone_); - if (cleanupTime == LONG_MAX) { + if (cleanupTime == INT64_MAX) { // don't set a GC timer for "end of time" return; } @@ -190,17 +190,17 @@ void GroupWindowAggregator::WindowTriggerContext::open() { } bool GroupWindowAggregator::WindowTriggerContext::onElement( - uint32_t key, RowVectorPtr row, long timestamp, TimeWindow window) { + uint32_t key, RowVectorPtr row, int64_t timestamp, TimeWindow window) { return trigger_->onElement(key, row, timestamp, window); } bool GroupWindowAggregator::WindowTriggerContext::onProcessingTime( - TimeWindow window, long time) { + TimeWindow window, int64_t time) { return trigger_->onProcessingTime(window, time); } bool GroupWindowAggregator::WindowTriggerContext::onEventTime( - TimeWindow window, long time) { + TimeWindow window, int64_t time) { return trigger_->onEventTime(window, time); } @@ -209,31 +209,31 @@ void GroupWindowAggregator::WindowTriggerContext::onMerge( trigger_->onMerge(key, window, nullptr); } -long GroupWindowAggregator::WindowTriggerContext::getCurrentProcessingTime() { +int64_t GroupWindowAggregator::WindowTriggerContext::getCurrentProcessingTime() { return internalTimerService_->currentProcessingTime(); } -long GroupWindowAggregator::WindowTriggerContext::getCurrentWatermark() { +int64_t GroupWindowAggregator::WindowTriggerContext::getCurrentWatermark() { return internalTimerService_->currentWatermark(); } void GroupWindowAggregator::WindowTriggerContext::registerProcessingTimeTimer( - uint32_t key, TimeWindow window, long time) { + uint32_t key, TimeWindow window, int64_t time) { internalTimerService_->registerProcessingTimeTimer(key, window, time); } void GroupWindowAggregator::WindowTriggerContext::registerEventTimeTimer( - uint32_t key, TimeWindow window, long time) { + uint32_t key, TimeWindow window, int64_t time) { internalTimerService_->registerEventTimeTimer(key, window, time); } void GroupWindowAggregator::WindowTriggerContext::deleteProcessingTimeTimer( - uint32_t key, TimeWindow window, long time) { + uint32_t key, TimeWindow window, int64_t time) { internalTimerService_->deleteProcessingTimeTimer(key, window, time); } void GroupWindowAggregator::WindowTriggerContext::deleteEventTimeTimer( - uint32_t key, TimeWindow window, long time) { + uint32_t key, TimeWindow window, int64_t time) { internalTimerService_->deleteEventTimeTimer(key, window, time); } @@ -265,7 +265,7 @@ WindowContext::WindowContext( std::shared_ptr stateHandler, int shiftTimeZone, bool isEventTime, - long allowedLateness) + int64_t allowedLateness) : windowAggregator_(windowAggregator), windowState_(std::move(windowState)), timerService_(std::move(timerService)), @@ -280,11 +280,11 @@ StatePtr WindowContext::getPartitionedState(StateDescriptor& stateDescriptor) { return stateHandler_->getGroupMapState(stateDescriptor); } -long WindowContext::currentProcessingTime() { +int64_t WindowContext::currentProcessingTime() { return timerService_->currentProcessingTime(); } -long WindowContext::currentWatermark() { +int64_t WindowContext::currentWatermark() { return timerService_->currentWatermark(); } @@ -316,11 +316,11 @@ void WindowContext::clearTrigger(TimeWindow window) { } void WindowContext::deleteCleanupTimer(TimeWindow window) { - long cleanupTime = + int64_t cleanupTime = TimeWindowUtil::toEpochMillsForTimer( TimeWindowUtil::cleanupTime(window.maxTimestamp(), allowedLateness_, isEventTime_), shiftTimeZone_); - if (cleanupTime == LONG_MAX) { + if (cleanupTime == INT64_MAX) { // no need to clean up because we didn't set one return; } diff --git a/velox/experimental/stateful/GroupWindowAggregator.h b/velox/experimental/stateful/GroupWindowAggregator.h index a262a7708d69..f0e4c8010378 100644 --- a/velox/experimental/stateful/GroupWindowAggregator.h +++ b/velox/experimental/stateful/GroupWindowAggregator.h @@ -21,7 +21,6 @@ #include "velox/experimental/stateful/TimerHeapInternalTimer.h" #include "velox/experimental/stateful/Triggerable.h" #include "velox/experimental/stateful/window/SliceAssigner.h" -#include "velox/experimental/stateful/window/WindowBuffer.h" #include "velox/experimental/stateful/window/WindowProcessFunction.h" #include "velox/experimental/stateful/window/WindowTrigger.h" diff --git a/velox/experimental/stateful/InternalPriorityQueue.h b/velox/experimental/stateful/InternalPriorityQueue.h index 44a1e7bbb95e..39bc91a32a9e 100644 --- a/velox/experimental/stateful/InternalPriorityQueue.h +++ b/velox/experimental/stateful/InternalPriorityQueue.h @@ -16,7 +16,6 @@ #pragma once #include -#include namespace facebook::velox::stateful { @@ -24,68 +23,233 @@ namespace facebook::velox::stateful { template class InternalPriorityQueue { public: - virtual void add(T toAdd) = 0; + virtual bool add(const T& toAdd) = 0; + + virtual bool add(T&& toAdd) = 0; virtual T poll() = 0; - virtual T peek() = 0; + virtual T& peek() = 0; virtual void clear() = 0; + + virtual bool empty() const = 0; + + virtual bool remove(const T& toRemove) = 0; + + virtual size_t size() const = 0; + + virtual bool contains(const T& value) const = 0; }; -// This class is relevent to flink HeapPriorityQueue. -// TODO: need to make it equal to flink -template +template < + typename T, + typename Compare = std::less, + typename Hash = std::hash, + typename EqualTo = std::equal_to> class HeapPriorityQueue : public InternalPriorityQueue { public: - HeapPriorityQueue() { - queue_.resize(1024); // Initial capacity, can be adjusted + explicit HeapPriorityQueue(const Compare& comp = Compare()) + : comparator_(comp) {} + + /// Constructs a priority queue with elements from the range [first, last). + template + HeapPriorityQueue(InputIt first, InputIt last, const Compare& comp = Compare()) + : comparator_(comp), heap_(first, last) { + buildHeap(); } - void add(T toAdd) override { - // Implementation for adding to the priority queue set - queue_[size_] = toAdd; - size_++; - if (size_ >= queue_.size()) { - size_ = 0; - } + /// Returns the top element without removing it. + /// Throws if the queue is empty. + T& peek() override { + VELOX_CHECK(!empty(), "Cannot peek from an empty priority queue"); + return heap_[0]; } + /// Removes and returns the top element. + /// Throws if the queue is empty. T poll() override { - // Implementation for polling from the priority queue set - size_--; - if (size_ < 0) { - size_ = queue_.size() - 1; + VELOX_CHECK(!empty(), "Cannot poll from an empty priority queu"); + return removeAt(0); + } + + /// Adds an element to the queue. + bool add(const T& value) override { + addImpl(value); + return true; + } + + /// Adds an element to the queue (move version). + bool add(T&& value) override { + addImpl(std::move(value)); + return true; + } + + /// Removes the first occurrence of the specified element from the queue. + /// Returns true if the element was found and removed, false otherwise. + bool remove(const T& value) override { + auto it = valueToIndex_.find(value); + if (it == valueToIndex_.end()) { + return false; } - return queue_[size_]; + removeAt(it->second); + return true; } - T peek() override { - int index = size_ - 1; - if (index < 0) { - index = queue_.size() - 1; + /// Adds all elements from the range [first, last) to the queue. + template + void addAll(InputIt first, InputIt last) { + for (auto it = first; it != last; ++it) { + add(*it); } - return queue_[index]; } + /// Adds all elements from the container to the queue. + template + void addAll(const Container& container) { + addAll(container.begin(), container.end()); + } + + /// Returns the number of elements in the queue. + size_t size() const override { + return heap_.size(); + } + + /// Returns true if the queue is empty. + bool empty() const override { + return heap_.empty(); + } + + /// Removes all elements from the queue. void clear() override { - queue_.clear(); - size_ = 0; - } - - void remove(T toRemove) { - // Implementation for removing an element from the priority queue set - auto it = std::find(queue_.begin(), queue_.end(), toRemove); - if (it != queue_.end()) { - *it = queue_[size_ - 1]; // Replace with the last element - size_--; - if (size_ < 0) { - size_ = queue_.size() - 1; + heap_.clear(); + valueToIndex_.clear(); + } + + /// Returns true if the queue contains the specified element. + bool contains(const T& value) const { + return valueToIndex_.find(value) != valueToIndex_.end(); + } + + private: + Compare comparator_; + std::vector heap_; + std::unordered_map valueToIndex_; + + void addImpl(const T& value) { + // Check if value already exists + if (valueToIndex_.find(value) != valueToIndex_.end()) { + // Update existing element + size_t index = valueToIndex_[value]; + heap_[index] = value; + // Re-heapify from this position + percolateUp(index); + percolateDown(index); + } else { + // Add new element + size_t index = heap_.size(); + heap_.push_back(value); + valueToIndex_[value] = index; + percolateUp(index); + } + } + + void addImpl(T&& value) { + // Check if value already exists + auto it = valueToIndex_.find(value); + if (it != valueToIndex_.end()) { + // Update existing element + size_t index = it->second; + heap_[index] = std::move(value); + // Re-heapify from this position + percolateUp(index); + percolateDown(index); + } else { + // Add new element + size_t index = heap_.size(); + heap_.push_back(std::move(value)); + valueToIndex_[heap_.back()] = index; + percolateUp(index); + } + } + + T removeAt(size_t index) { + if (index >= heap_.size()) { + VELOX_FAIL("Logical error, the removed index {} is greater than heap size {}", index, heap_.size()); + } + T t = std::move(heap_[index]); + // Remove from valueToIndex_ + valueToIndex_.erase(t); + + if (index == heap_.size() - 1) { + // Removing the last element + heap_.pop_back(); + return t; + } + + // Move last element to the removed position + T last = std::move(heap_.back()); + heap_.pop_back(); + heap_[index] = std::move(last); + valueToIndex_[heap_[index]] = index; + + // Re-heapify + percolateUp(index); + percolateDown(index); + return t; + } + + void percolateUp(size_t index) { + while (index > 0) { + size_t parent = (index - 1) / 2; + if (!comparator_(heap_[index], heap_[parent])) { + break; } + swapElements(index, parent); + index = parent; + } + } + + void percolateDown(size_t index) { + while (true) { + size_t left = 2 * index + 1; + size_t right = 2 * index + 2; + size_t smallest = index; + + if (left < heap_.size() && comparator_(heap_[left], heap_[smallest])) { + smallest = left; + } + if (right < heap_.size() && comparator_(heap_[right], heap_[smallest])) { + smallest = right; + } + + if (smallest == index) { + break; + } + + swapElements(index, smallest); + index = smallest; + } + } + + void swapElements(size_t i, size_t j) { + std::swap(heap_[i], heap_[j]); + valueToIndex_[heap_[i]] = i; + valueToIndex_[heap_[j]] = j; + } + + void buildHeap() { + // Build index map + valueToIndex_.clear(); + for (size_t i = 0; i < heap_.size(); ++i) { + valueToIndex_[heap_[i]] = i; + } + + // Build heap from the bottom up + for (int i = static_cast(heap_.size()) / 2 - 1; i >= 0; --i) { + percolateDown(static_cast(i)); } } - private: - std::vector queue_; - int size_ = 0; }; + } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/InternalTimerService.h b/velox/experimental/stateful/InternalTimerService.h index 362307b951d1..33f04cebe2ae 100644 --- a/velox/experimental/stateful/InternalTimerService.h +++ b/velox/experimental/stateful/InternalTimerService.h @@ -15,9 +15,14 @@ */ #pragma once +#include +#include #include #include #include +#include +#include +#include namespace facebook::velox::stateful { @@ -26,39 +31,52 @@ template class InternalTimerService { public: InternalTimerService(Triggerable* triggerable) - : triggerable_(triggerable) {} + : triggerable_(triggerable), processingTimeService_(std::make_shared()) {} - void registerEventTimeTimer(K key, N ns, long time) { + void registerEventTimeTimer(K key, N ns, int64_t time) { eventTimeTimersQueue_.add(std::make_shared>(time, key, ns)); } - void deleteEventTimeTimer(K key, N ns, long time) { + void deleteEventTimeTimer(K key, N ns, int64_t time) { eventTimeTimersQueue_.remove(std::make_shared>(time, key, ns)); } - void registerProcessingTimeTimer(K key, N ns, long time) { + void registerProcessingTimeTimer(K key, N ns, int64_t time) { + const std::shared_ptr>& oldHead = processingTimeTimersQueue_.empty() ? nullptr : processingTimeTimersQueue_.peek(); + if (processingTimeTimersQueue_.add(std::make_shared>(time, key, ns))) { + int64_t nextTriggerTime = oldHead != nullptr ? oldHead->timestamp() : std::numeric_limits::max() ; + if (time < nextTriggerTime) { + if (nextTimer_.has_value()) { + processingTimeService_->cancel(nextTimer_.value()); + } + nextTimer_ = processingTimeService_->registerTimer(time, ProcessingTimerTask(time, [&](int64_t processingTime) { + onProcessingTime(processingTime); + })); + } + } } - void deleteProcessingTimeTimer(K key, N ns, long time) { - eventTimeTimersQueue_.remove(std::make_shared>(time, key, ns)); + void deleteProcessingTimeTimer(K key, N ns, int64_t time) { + processingTimeTimersQueue_.remove(std::make_shared>(time, key, ns)); } - long currentWatermark() { + int64_t currentWatermark() { // TODO: Implement watermark logic if needed. - if (eventTimeTimersQueue_.peek() != nullptr) { - return eventTimeTimersQueue_.peek()->timestamp(); + const std::shared_ptr>& timer = eventTimeTimersQueue_.empty() ? nullptr : eventTimeTimersQueue_.peek(); + if (timer != nullptr) { + return timer->timestamp(); } return 0; // or some other default value } - long currentProcessingTime() { + int64_t currentProcessingTime() { // TODO: Implement processing time logic if needed. return 0; // or some other default value } - void advanceWatermark(long time) { - while (eventTimeTimersQueue_.peek() != nullptr && - eventTimeTimersQueue_.peek()->timestamp() <= time) { + void advanceWatermark(int64_t time) { + const std::shared_ptr>& timer = eventTimeTimersQueue_.empty() ? nullptr : eventTimeTimersQueue_.peek(); + while (timer != nullptr && timer->timestamp() <= time) { auto timer = eventTimeTimersQueue_.poll(); triggerable_->onEventTime(timer); } @@ -66,11 +84,49 @@ class InternalTimerService { void close() { eventTimeTimersQueue_.clear(); + processingTimeTimersQueue_.clear(); + processingTimeService_->close(); } private: + void onProcessingTime(int64_t time) { + std::string taskName = ""; + if (nextTimer_.has_value()) { + taskName = nextTimer_.value(); + } + nextTimer_ = std::nullopt; + std::shared_ptr> timer = nullptr; + bool triggerOnProcessingTime = true; + const std::shared_ptr mtx = triggerable_->getMutex(); + if (mtx) { + std::lock_guard lock(*mtx); + while (triggerOnProcessingTime && !processingTimeTimersQueue_.empty()) { + timer = processingTimeTimersQueue_.peek(); + if (!timer || timer->timestamp() > time) { + triggerOnProcessingTime = false; + continue; + } + processingTimeTimersQueue_.poll(); + triggerable_->onProcessingTime(timer); + timer = nullptr; + } + if (!taskName.empty()) { + processingTimeService_->unregister(taskName); + } + } + + if (timer != nullptr && !nextTimer_.has_value()) { + nextTimer_ = processingTimeService_->registerTimer(timer->timestamp(), ProcessingTimerTask(timer->timestamp(), [&](int64_t processingTime) { + onProcessingTime(processingTime); + })); + } + } + Triggerable* triggerable_; - HeapPriorityQueue>> eventTimeTimersQueue_; + std::optional nextTimer_; + std::shared_ptr processingTimeService_; + HeapPriorityQueue>, HeapTimerComparator, HeapTimerHasher, HeapTimerEquals> eventTimeTimersQueue_; + HeapPriorityQueue>, HeapTimerComparator, HeapTimerHasher, HeapTimerEquals> processingTimeTimersQueue_; }; } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/KeySelector.cpp b/velox/experimental/stateful/KeySelector.cpp index 9f81f30cd8db..ce49dcaaefae 100644 --- a/velox/experimental/stateful/KeySelector.cpp +++ b/velox/experimental/stateful/KeySelector.cpp @@ -26,9 +26,9 @@ KeySelector::KeySelector( numPartitions_(numPartitions) { } -std::map KeySelector::partition(const RowVectorPtr& input) { +std::map KeySelector::partition(const RowVectorPtr& input) { if (numPartitions_ == 1) { - return std::map{{0, input}}; + return std::map{{0, input}}; } prepareForInput(input); @@ -39,7 +39,7 @@ std::map KeySelector::partition(const RowVectorPtr& inpu // TODO: this is a optimization, as the RowVector may have be partitioned in // local aggregation, so need not to partition again in global agg, but need // to verify whether the judge condition is enough. - return std::map{{*part, input}}; + return std::map{{*part, input}}; } const auto numInput = input->size(); std::map numOfKeys; @@ -66,7 +66,7 @@ std::map KeySelector::partition(const RowVectorPtr& inpu numOfKeys[partition] = index + 1; } - std::map results; + std::map results; for (auto & [key, partitionSize] : numOfKeys) { auto partitionData = wrapChildren(input, partitionSize, keyToIndexBuffers[key]); results[key] = partitionData; diff --git a/velox/experimental/stateful/KeySelector.h b/velox/experimental/stateful/KeySelector.h index baf34967c29a..02f34ca2220f 100644 --- a/velox/experimental/stateful/KeySelector.h +++ b/velox/experimental/stateful/KeySelector.h @@ -22,16 +22,16 @@ namespace facebook::velox::stateful { -/// This class is relevent to flink KeySelector. +/// This class is relevant to flink KeySelector. /// It can partition the RowVector according to the key fields. class KeySelector { public: KeySelector( std::unique_ptr partitionFunction, memory::MemoryPool* pool, - int numPartitions = INT_MAX); + int numPartitions = 1024); - std::map partition(const RowVectorPtr& input); + std::map partition(const RowVectorPtr& input); private: void prepareForInput(const RowVectorPtr& input); diff --git a/velox/experimental/stateful/LocalWindowAggregator.cpp b/velox/experimental/stateful/LocalWindowAggregator.cpp index f99ac04ab015..bd64dabfbb14 100644 --- a/velox/experimental/stateful/LocalWindowAggregator.cpp +++ b/velox/experimental/stateful/LocalWindowAggregator.cpp @@ -23,7 +23,7 @@ LocalWindowAggregator::LocalWindowAggregator( std::vector> targets, std::unique_ptr keySelector, std::unique_ptr sliceAssigner, - const long windowInterval, + const int64_t windowInterval, const bool useDayLightSaving, RowTypePtr outputType) : StatefulOperator(std::move(op), std::move(targets)), @@ -45,10 +45,9 @@ void LocalWindowAggregator::getOutput() { return; } - std::map keyToData = keySelector_->partition(input_); + std::map keyToData = keySelector_->partition(input_); for (const auto& [key, data] : keyToData) { - std::map sliceEndToData = - sliceAssigner_->partition(data); + std::map sliceEndToData = sliceAssigner_->partition(data); for (const auto& [sliceEnd, data] : sliceEndToData) { // TODO: addElement may have data output. auto windowData = data; @@ -59,7 +58,7 @@ void LocalWindowAggregator::getOutput() { input_.reset(); } -void LocalWindowAggregator::processWatermarkInternal(long timestamp) { +void LocalWindowAggregator::processWatermarkInternal(int64_t timestamp) { if (timestamp > currentWatermark_) { currentWatermark_ = timestamp; if (currentWatermark_ >= nextTriggerWatermark_) { diff --git a/velox/experimental/stateful/ProcessingTimeService.h b/velox/experimental/stateful/ProcessingTimeService.h new file mode 100644 index 000000000000..88df1c0ca4bd --- /dev/null +++ b/velox/experimental/stateful/ProcessingTimeService.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/experimental/stateful/window/TimeWindowUtil.h" +#include +#include +#include +#include +#include +#include +#include + +namespace facebook::velox::stateful { + +using ProcessingTimeCallback = std::function; + +class ProcessingTimerTask { +public: + ProcessingTimerTask( + int64_t time, + ProcessingTimeCallback callback) + : time_(time), callback_(callback) {} + + void operator()() const { + callback_(time_); + } +private: + int64_t time_; + ProcessingTimeCallback callback_; +}; + +class ProcessingTimeService { +public: + int64_t getCurrentProcessingTime() { + return TimeWindowUtil::getCurrentProcessingTime(); + } + virtual std::optional registerTimer(int64_t timestamp, ProcessingTimerTask target) = 0; + virtual void cancel(const std::string& task) {} + virtual void close() {} + + void unregister(const std::string& task) { + auto it = std::find(registry.begin(), registry.end(), task); + if (it != registry.end()) { + registry.erase(it); + } + } + + std::string generateTimerTaskName(int64_t timestamp) { + boost::uuids::random_generator generator; + boost::uuids::uuid uuid = generator(); + return "proc_time_task_" + std::to_string(timestamp) + "-" + to_string(uuid); + } +protected: + std::set registry; +}; + +class SystemProcessingTimeService : public ProcessingTimeService { +public: + SystemProcessingTimeService() : ProcessingTimeService() { + executor_ = std::make_shared(); + executor_->start(); + } + + std::optional registerTimer(int64_t timestamp, ProcessingTimerTask task) override { + int64_t currentTimestamp = getCurrentProcessingTime(); + int64_t delay = 0; + if (timestamp >= currentTimestamp) { + delay = timestamp - currentTimestamp; + } + std::string taskName = generateTimerTaskName(timestamp); + if (delay > 0) { + executor_->addFunctionOnce(task, taskName, std::chrono::microseconds(delay * 1000)); + registry.emplace(taskName); + } else { + task(); + } + return std::make_optional(taskName); + } + + void cancel(const std::string& task) override { + auto it = std::find(registry.begin(), registry.end(), task); + if (it != registry.end()) { + executor_->cancelFunction(task); + registry.erase(it); + } + } + + void close() override { + if (executor_) { + executor_->shutdown(); + } + registry.clear(); + } +private: + std::shared_ptr executor_; +}; +} diff --git a/velox/experimental/stateful/RuntimeContext.h b/velox/experimental/stateful/RuntimeContext.h index af4c8471a954..0af7722d8156 100644 --- a/velox/experimental/stateful/RuntimeContext.h +++ b/velox/experimental/stateful/RuntimeContext.h @@ -32,17 +32,17 @@ class RuntimeContext { return keyedStateBackend_->getOrCreateMapState(stateDescriptor); } - std::shared_ptr> getListState( + std::shared_ptr> getListState( StateDescriptor& stateDescriptor) { return keyedStateBackend_->getOrCreateListState(stateDescriptor); } - std::shared_ptr> getValueState( + std::shared_ptr> getValueState( StateDescriptor& stateDescriptor) { return keyedStateBackend_->getOrCreateValueState(stateDescriptor); } - std::shared_ptr> createTimerService( + std::shared_ptr> createTimerService( Triggerable* triggerable) { return keyedStateBackend_->createTimerService(triggerable); } diff --git a/velox/experimental/stateful/StatefulOperator.cpp b/velox/experimental/stateful/StatefulOperator.cpp index 4c1990cb43a2..06a332012d4b 100644 --- a/velox/experimental/stateful/StatefulOperator.cpp +++ b/velox/experimental/stateful/StatefulOperator.cpp @@ -17,8 +17,6 @@ #include "velox/experimental/stateful/StatefulTask.h" #include "velox/experimental/stateful/StreamElement.h" -#include - namespace facebook::velox::stateful { void StatefulOperator::initialize() { diff --git a/velox/experimental/stateful/StatefulPlanNode.cpp b/velox/experimental/stateful/StatefulPlanNode.cpp index 59a11c7c2690..e708c7fc5af0 100644 --- a/velox/experimental/stateful/StatefulPlanNode.cpp +++ b/velox/experimental/stateful/StatefulPlanNode.cpp @@ -248,7 +248,10 @@ folly::dynamic StreamWindowAggregationNode::serialize() const { obj["offset"] = offset_; obj["windowType"] = windowType_; obj["outputType"] = outputType_->serialize(); + obj["isEventTime"] = isEventTime_; obj["rowtimeIndex"] = rowtimeIndex_; + obj["windowStartIndex"] = windowStartIndex_; + obj["windowEndIndex"] = windowEndIndex_; return obj; } @@ -290,7 +293,10 @@ core::PlanNodePtr StreamWindowAggregationNode::create(const folly::dynamic& obj, obj["offset"].asInt(), obj["windowType"].asInt(), outputType, - obj["rowtimeIndex"].asInt()); + obj["isEventTime"].asBool(), + obj["rowtimeIndex"].asInt(), + obj["windowStartIndex"].asInt(), + obj["windowEndIndex"].asInt()); } const std::vector& GroupWindowAggsHandlerNode::sources() const { diff --git a/velox/experimental/stateful/StatefulPlanNode.h b/velox/experimental/stateful/StatefulPlanNode.h index 0948bfadec76..67915209dac6 100644 --- a/velox/experimental/stateful/StatefulPlanNode.h +++ b/velox/experimental/stateful/StatefulPlanNode.h @@ -333,7 +333,10 @@ class StreamWindowAggregationNode : public core::PlanNode { long offset, int windowType, const RowTypePtr& outputType, - int rowtimeIndex) : + bool isEventTime, + int rowtimeIndex, + int windowStartIndex, + int windowEndIndex) : PlanNode(id), aggregation_(std::move(aggregationNode)), localAgg_(std::move(localAgg)), @@ -347,7 +350,10 @@ class StreamWindowAggregationNode : public core::PlanNode { offset_(offset), windowType_(windowType), outputType_(std::move(outputType)), - rowtimeIndex_(rowtimeIndex) {} + isEventTime_(isEventTime), + rowtimeIndex_(rowtimeIndex), + windowStartIndex_(windowStartIndex), + windowEndIndex_(windowEndIndex) {} const RowTypePtr& outputType() const override { return outputType_; @@ -397,10 +403,22 @@ class StreamWindowAggregationNode : public core::PlanNode { return windowType_; } + bool isEventTime() const { + return isEventTime_; + } + int rowtimeIndex() const { return rowtimeIndex_; } + int windowStartIndex() const { + return windowStartIndex_; + } + + int windowEndIndex() const { + return windowEndIndex_; + } + const std::vector& sources() const override; std::string_view name() const override { @@ -426,7 +444,10 @@ class StreamWindowAggregationNode : public core::PlanNode { long offset_; int windowType_; const RowTypePtr outputType_; + bool isEventTime_; int rowtimeIndex_; + int windowStartIndex_; + int windowEndIndex_; }; class GroupWindowAggsHandlerNode : public core::PlanNode { diff --git a/velox/experimental/stateful/StatefulPlanner.cpp b/velox/experimental/stateful/StatefulPlanner.cpp index 8409a1b3b505..8dd2e9db5a65 100644 --- a/velox/experimental/stateful/StatefulPlanner.cpp +++ b/velox/experimental/stateful/StatefulPlanner.cpp @@ -172,23 +172,26 @@ StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( windowAggNode->useDayLightSaving(), windowAggNode->outputType()); } else { - auto localAggregator = nodeToOperator(windowAggNode->localAgg(), ctx); + auto localAggregator = windowAggNode->isEventTime() ? nodeToOperator(windowAggNode->localAgg(), ctx) : nullptr; std::unique_ptr globalSliceAssigner = std::make_unique( std::move(sliceAssigner), windowAggNode->size(), windowAggNode->step(), windowAggNode->offset(), - windowAggNode->windowType(), + Window::getType(windowAggNode->windowType()), windowAggNode->rowtimeIndex()); return std::make_unique( - std::move(localAggregator), + windowAggNode->isEventTime() ? std::move(localAggregator) : nullptr, std::move(op), std::move(targets), std::move(keySelector), std::move(globalSliceAssigner), windowAggNode->windowInterval(), - windowAggNode->useDayLightSaving()); + windowAggNode->useDayLightSaving(), + windowAggNode->isEventTime(), + windowAggNode->windowStartIndex(), + windowAggNode->windowEndIndex()); } } else if ( auto windowAggNode = @@ -207,7 +210,7 @@ StatefulOperatorPtr StatefulPlanner::nodeToStatefulOperator( 0, 0, 0, - windowAggNode->windowType(), + Window::getType(windowAggNode->windowType()), windowAggNode->rowtimeIndex()); return std::make_unique( std::unique_ptr(dynamic_cast(op.release())), @@ -316,7 +319,7 @@ std::unique_ptr StatefulPlanner::nodeToOperator( if (aggregationNode->isPreGrouped()) { return std::make_unique(opId.fetch_add(1), ctx, aggregationNode); } else { - return std::make_unique(opId.fetch_add(1), ctx, aggregationNode); + return std::make_unique(opId.fetch_add(1), ctx, aggregationNode); } } else if ( auto expandNode = diff --git a/velox/experimental/stateful/StreamJoin.cpp b/velox/experimental/stateful/StreamJoin.cpp index 15e683752eb8..9345842e1bbc 100644 --- a/velox/experimental/stateful/StreamJoin.cpp +++ b/velox/experimental/stateful/StreamJoin.cpp @@ -15,9 +15,6 @@ */ #include "velox/experimental/stateful/StreamJoin.h" #include "velox/experimental/stateful/join/JoinRecordStateViews.h" -#include "velox/expression/Expr.h" - -#include namespace facebook::velox::stateful { diff --git a/velox/experimental/stateful/StreamKeyedOperator.cpp b/velox/experimental/stateful/StreamKeyedOperator.cpp index 0e685c558940..4c6d1e09091a 100644 --- a/velox/experimental/stateful/StreamKeyedOperator.cpp +++ b/velox/experimental/stateful/StreamKeyedOperator.cpp @@ -15,8 +15,6 @@ */ #include "velox/experimental/stateful/StreamKeyedOperator.h" -#include - namespace facebook::velox::stateful { StreamKeyedOperator::StreamKeyedOperator( diff --git a/velox/experimental/stateful/TimerHeapInternalTimer.h b/velox/experimental/stateful/TimerHeapInternalTimer.h index e78d7e3c23c5..11d99ec812d7 100644 --- a/velox/experimental/stateful/TimerHeapInternalTimer.h +++ b/velox/experimental/stateful/TimerHeapInternalTimer.h @@ -15,19 +15,22 @@ */ #pragma once +#include "velox/common/base/BitUtil.h" +#include + namespace facebook::velox::stateful { // This class is relevent to flink InternalTimerServiceImpl. template -class TimerHeapInternalTimer { +class TimerHeapInternalTimer { public: - TimerHeapInternalTimer(long timestamp, K key, N ns) + TimerHeapInternalTimer(int64_t timestamp, K key, N ns) : timestamp_(timestamp), key_(key), ns_(ns), keyGroupIndex_(0) {} - long timestamp() { + int64_t timestamp() { return timestamp_; } @@ -49,10 +52,31 @@ class TimerHeapInternalTimer { ns_ == other.ns_; } private: - long timestamp_; + int64_t timestamp_; K key_; N ns_; int keyGroupIndex_; }; +template +struct HeapTimerHasher { + size_t operator() (const std::shared_ptr>& a) const { + return bits::hashMix(a->timestamp(), a->key()); + } +}; + +template +struct HeapTimerEquals { + bool operator() (const std::shared_ptr>& a, const std::shared_ptr>& b) const { + return *a == *b; + } +}; + +template +struct HeapTimerComparator { + bool operator() (const std::shared_ptr>& a, const std::shared_ptr> b) const { + return a->timestamp() < b->timestamp(); + } +}; + } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/Triggerable.h b/velox/experimental/stateful/Triggerable.h index 0d3d1eb41dd4..427d29f8c354 100644 --- a/velox/experimental/stateful/Triggerable.h +++ b/velox/experimental/stateful/Triggerable.h @@ -16,9 +16,9 @@ #pragma once #include "velox/experimental/stateful/TimerHeapInternalTimer.h" -#include "velox/vector/ComplexVector.h" #include +#include namespace facebook::velox::stateful { @@ -26,8 +26,19 @@ namespace facebook::velox::stateful { template class Triggerable { public: + Triggerable() { + mtx_ = std::make_shared(); + } virtual void onEventTime( std::shared_ptr> timer) = 0; + + virtual void onProcessingTime( + std::shared_ptr> timer) {} + + const std::shared_ptr getMutex() { return mtx_; } + +protected: + std::shared_ptr mtx_; }; } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/WatermarkAssigner.cpp b/velox/experimental/stateful/WatermarkAssigner.cpp index cb38df791f4c..a2c8691dc651 100644 --- a/velox/experimental/stateful/WatermarkAssigner.cpp +++ b/velox/experimental/stateful/WatermarkAssigner.cpp @@ -15,8 +15,6 @@ */ #include "velox/experimental/stateful/WatermarkAssigner.h" -#include - namespace facebook::velox::stateful { WatermarkAssigner::WatermarkAssigner( diff --git a/velox/experimental/stateful/WatermarkAssigner.h b/velox/experimental/stateful/WatermarkAssigner.h index 9623ab4a38df..e9478ee40655 100644 --- a/velox/experimental/stateful/WatermarkAssigner.h +++ b/velox/experimental/stateful/WatermarkAssigner.h @@ -15,9 +15,7 @@ */ #pragma once -#include "velox/exec/FilterProject.h" #include "velox/experimental/stateful/StatefulOperator.h" -#include "velox/experimental/stateful/StatefulPlanNode.h" namespace facebook::velox::stateful { diff --git a/velox/experimental/stateful/WindowAggregator.cpp b/velox/experimental/stateful/WindowAggregator.cpp index 99a14ef65ba0..d05b2e4f809e 100644 --- a/velox/experimental/stateful/WindowAggregator.cpp +++ b/velox/experimental/stateful/WindowAggregator.cpp @@ -13,33 +13,45 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include +#include +#include +#include "velox/type/Type.h" +#include "velox/vector/ComplexVector.h" #include "velox/experimental/stateful/WindowAggregator.h" +#include "velox/experimental/stateful/TimerHeapInternalTimer.h" #include "velox/experimental/stateful/window/TimeWindowUtil.h" -#include - namespace facebook::velox::stateful { WindowAggregator::WindowAggregator( - std::unique_ptr localAggerator, - std::unique_ptr globalAggerator, + std::unique_ptr localAggregator, + std::unique_ptr globalAggregator, std::vector> targets, std::unique_ptr keySelector, std::unique_ptr sliceAssigner, const long windowInterval, - const bool useDayLightSaving) - : StatefulOperator(std::move(globalAggerator), std::move(targets)), - localAggerator_(std::move(localAggerator)), + const bool useDayLightSaving, + const bool isEventTime, + const int windowStartIndex, + const int windowEndIndex) + : StatefulOperator(std::move(globalAggregator), std::move(targets)), Triggerable(), + localAggregator_(std::move(localAggregator)), keySelector_(std::move(keySelector)), sliceAssigner_(std::move(sliceAssigner)), windowInterval_(windowInterval), - useDayLightSaving_(useDayLightSaving) { - windowBuffer_ = std::make_shared(); -} + useDayLightSaving_(useDayLightSaving), + isEventTime_(isEventTime), + windowStartIndex_(windowStartIndex), + windowEndIndex_(windowEndIndex) { + windowBuffer_ = std::make_shared(); + } void WindowAggregator::initialize() { StatefulOperator::initialize(); - localAggerator_->initialize(); + if (localAggregator_) { + localAggregator_->initialize(); + } StateDescriptor stateDesc("window-aggs"); windowState_ = stateHandler()->getValueState(stateDesc); @@ -56,18 +68,15 @@ void WindowAggregator::getOutput() { return; } - std::map keyToData = keySelector_->partition(input_); + std::map keyToData = keySelector_->partition(input_); for (const auto& [key, data] : keyToData) { - std::map sliceEndToData = - sliceAssigner_->assignSliceEnd(data); + std::map sliceEndToData = sliceAssigner_->assignSliceEnd(data); for (const auto& [sliceEnd, data] : sliceEndToData) { auto windowData = data; - if (!isEventTime) { - // TODO: support processing time - //windowTimerService_->registerProcessingTimeWindowTimer(sliceEnd, sliceEnd - 1); + if (!isEventTime_) { + windowTimerService_->registerProcessingTimeTimer(key, sliceEnd, sliceEnd); } - - if (isEventTime && TimeWindowUtil::isWindowFired(sliceEnd, currentProgress_, shiftTimeZone_)) { + if (isEventTime_ && TimeWindowUtil::isWindowFired(sliceEnd, currentProgress_, shiftTimeZone_)) { // the assigned slice has been triggered, which means current element is late, // but maybe not need to drop long lastWindowEnd = sliceAssigner_->getLastWindowEnd(sliceEnd); @@ -88,16 +97,16 @@ void WindowAggregator::getOutput() { } } else { // the assigned slice hasn't been triggered, accumulate into the assigned slice + std::lock_guard lock(*mtx_); windowBuffer_->addElement(key, sliceEnd, windowData); } } } - input_.reset(); } void WindowAggregator::processWatermarkInternal(long timestamp) { - if (isEventTime && timestamp > currentProgress_) { + if (isEventTime_ && timestamp > currentProgress_) { currentProgress_ = timestamp; if (currentProgress_ >= nextTriggerWatermark_) { // we only need to call advanceProgress() when current watermark may trigger window @@ -107,8 +116,8 @@ void WindowAggregator::processWatermarkInternal(long timestamp) { continue; } // TODO: agg should output no matter how many rows in datas. - localAggerator_->addInput(TimeWindowUtil::mergeVectors(datas, op()->pool())); - RowVectorPtr localAcc = localAggerator_->getOutput(); + localAggregator_->addInput(TimeWindowUtil::mergeVectors(datas, op()->pool())); + RowVectorPtr localAcc = localAggregator_->getOutput(); auto stateAcc = windowState_->value(windowKey.key(), windowKey.window()); std::list allDatas; if (!localAcc && !stateAcc) { @@ -135,10 +144,122 @@ void WindowAggregator::processWatermarkInternal(long timestamp) { } } +/// Add window_start / window_end timestamp to output +RowVectorPtr addWindowTimestampToOutput( + const RowVectorPtr& output, + const std::string& fieldName, + const int64_t fieldValue, + const int fieldIndex) { + auto createTimestampVector = [&]( + const Timestamp& val, + const size_t size, + velox::memory::MemoryPool* pool) -> VectorPtr { + const TypePtr windowStartType = std::make_shared(); + VectorPtr windowStartVec = BaseVector::create(windowStartType, output->size(), output->pool()); + FlatVector* timestampVector = windowStartVec->asFlatVector(); + for (size_t i = 0; i < size; ++i) { + timestampVector->set(i, val); + } + return windowStartVec; + }; + const TypePtr& outputType = output->type(); + const RowTypePtr& outputRowType = std::dynamic_pointer_cast(outputType); + const std::vector& outputFieldNames = outputRowType->names(); + const std::vector& outputFieldTypes = outputRowType->children(); + const std::vector& outputFields = output->children(); + std::vector newOutputFieldNames; + std::vector newOutputFieldTypes; + std::vector newOutputFields; + VectorPtr windowStartVec = createTimestampVector(Timestamp::fromMillis(fieldValue), output->size(), output->pool()); + for (int i = 0; i < fieldIndex; ++i) { + newOutputFieldTypes.emplace_back(outputFieldTypes[i]); + newOutputFieldNames.emplace_back(outputFieldNames[i]); + newOutputFields.emplace_back(outputFields[i]); + } + newOutputFieldTypes.emplace_back(std::make_shared()); + newOutputFieldNames.emplace_back(fieldName); + newOutputFields.emplace_back(windowStartVec); + for (int i = fieldIndex + 1; i < output->childrenSize() + 1; ++i) { + newOutputFieldTypes.emplace_back(outputFieldTypes[i-1]); + newOutputFieldNames.emplace_back(outputFieldNames[i-1]); + newOutputFields.emplace_back(outputFields[i-1]); + } + auto newOutputRowType = std::make_shared(std::move(newOutputFieldNames), std::move(newOutputFieldTypes)); + return std::make_shared(output->pool(), + newOutputRowType, + output->nulls(), + output->size(), + newOutputFields, + output->getNullCount() + ); +} + +void WindowAggregator::onTimer(std::shared_ptr> timer) { + fireWindow(timer->key(), timer->timestamp(), timer->ns()); + clearWindow(timer->key(), timer->timestamp(), timer->ns()); +} + +template +void WindowAggregator::fireWindow(K key, long timerTimestamp, long windowEnd) { + RowVectorPtr output = windowState_->value(key, windowEnd); + if (!output) { + LOG(INFO) << "No output found for key: " << key << ", window end: " << windowEnd; + return; + } else { + if (windowStartIndex_ >= 0) { + output = addWindowTimestampToOutput( + output, + "window_start", + windowEnd - windowInterval_, + windowStartIndex_); + } + if (windowEndIndex_ >= 0) { + output = addWindowTimestampToOutput( + output, + "window_end", + windowEnd, + windowEndIndex_); + } + } + if (output) { + pushOutput(output); + } +} + +template +void WindowAggregator::clearWindow(K key, long timerTimestamp, long windowEnd) { + windowState_->remove(key, windowEnd); +} + void WindowAggregator::onEventTime(std::shared_ptr> timer) { - auto output = windowState_->value(timer->key(), timer->ns()); - windowState_->remove(timer->key(), timer->ns()); - pushOutput(output); + onTimer(timer); +} + +void WindowAggregator::onProcessingTime(std::shared_ptr> timer) { + if (timer->timestamp() > lastTriggeredProcessingTime_) { + lastTriggeredProcessingTime_ = timer->timestamp(); + auto windowKeyToData = windowBuffer_->advanceProgress(timer->timestamp()); + for (const auto& [windowKey, datas] : windowKeyToData) { + if (datas.empty()) { + continue; + } + std::list allDatas(datas.begin(), datas.end()); + auto stateAcc = windowState_->value(windowKey.key(), windowKey.window()); + if (stateAcc) { + allDatas.push_back(stateAcc); + } + RowVectorPtr opInput = TimeWindowUtil::mergeVectors(allDatas, op()->pool()); + op()->addInput(opInput); + auto newAcc = op()->getOutput(); + if (newAcc) { + windowState_->update(windowKey.key(), windowKey.window(), newAcc); + } + } + if (!windowKeyToData.empty()) { + windowBuffer_->clear(); + } + } + onTimer(timer); } long WindowAggregator::sliceStateMergeTarget(long sliceToMerge) { @@ -147,10 +268,13 @@ long WindowAggregator::sliceStateMergeTarget(long sliceToMerge) { } void WindowAggregator::close() { - processWatermarkInternal(INT_MAX); + processWatermarkInternal(std::numeric_limits::max()); StatefulOperator::close(); - localAggerator_->close(); + if (localAggregator_) { + localAggregator_->close(); + } input_.reset(); + windowTimerService_->close(); windowBuffer_->clear(); windowState_->clear(); currentProgress_ = 0; diff --git a/velox/experimental/stateful/WindowAggregator.h b/velox/experimental/stateful/WindowAggregator.h index 28ed705b0abc..eb364bcdd840 100644 --- a/velox/experimental/stateful/WindowAggregator.h +++ b/velox/experimental/stateful/WindowAggregator.h @@ -15,6 +15,10 @@ */ #pragma once +#include +#include +#include "velox/exec/Driver.h" +#include "velox/exec/Operator.h" #include "velox/experimental/stateful/InternalTimerService.h" #include "velox/experimental/stateful/KeySelector.h" #include "velox/experimental/stateful/StatefulOperator.h" @@ -30,13 +34,16 @@ namespace facebook::velox::stateful { class WindowAggregator : public StatefulOperator, public Triggerable { public: WindowAggregator( - std::unique_ptr localAggerator, - std::unique_ptr globalAggerator, + std::unique_ptr localAggregator, + std::unique_ptr globalAggregator, std::vector> targets, std::unique_ptr keySelector, std::unique_ptr sliceAssigner, const long windowInterval, - const bool useDayLightSaving); + const bool useDayLightSaving, + const bool isEventTime, + const int windowStartIndex, + const int windowEndIndex); void initialize() override; @@ -52,24 +59,38 @@ class WindowAggregator : public StatefulOperator, public Triggerable> timer) override; + void onProcessingTime(std::shared_ptr> timer) override; + private: void processWatermarkInternal(long timestamp) override; long sliceStateMergeTarget(long sliceToMerge); - std::unique_ptr localAggerator_; + void onTimer(std::shared_ptr> timer); + + template + void fireWindow(K key, long timerTImestamp, long windowEnd); + + template + void clearWindow(K key, long timerTimestamp, long windowEnd); + + std::unique_ptr localAggregator_; std::unique_ptr keySelector_; std::unique_ptr sliceAssigner_; WindowBufferPtr windowBuffer_; const long windowInterval_; const bool useDayLightSaving_; const int shiftTimeZone_ = 0; // TODO: support time zone shift - const bool isEventTime = true; // TODO: support processing time + const bool isEventTime_ = true; + const int windowStartIndex_ = -1; + const int windowEndIndex_ = -1; RowVectorPtr input_; long currentProgress_ = 0; long nextTriggerWatermark_ = 0; + long lastTriggeredProcessingTime_ = 0; std::shared_ptr> windowState_; std::shared_ptr> windowTimerService_; }; + } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/state/HashMapStateBackend.h b/velox/experimental/stateful/state/HashMapStateBackend.h index 49bbb4d35ddb..dd60b4290c26 100644 --- a/velox/experimental/stateful/state/HashMapStateBackend.h +++ b/velox/experimental/stateful/state/HashMapStateBackend.h @@ -15,13 +15,12 @@ */ #pragma once -#include "velox/common/serialization/Serializable.h" #include "velox/experimental/stateful/state/KeyedStateBackend.h" #include "velox/experimental/stateful/state/StateBackend.h" namespace facebook::velox::stateful { -// This class is relevent to flink org.apache.flink.runtime.state.hashmap.HashMapStateBackend. +// This class is relevant to flink org.apache.flink.runtime.state.hashmap.HashMapStateBackend. class HashMapStateBackend : public StateBackend { public: std::string getName() const { diff --git a/velox/experimental/stateful/state/HeapKeyedStateBackend.h b/velox/experimental/stateful/state/HeapKeyedStateBackend.h index b488bb650fac..b0933f388a17 100644 --- a/velox/experimental/stateful/state/HeapKeyedStateBackend.h +++ b/velox/experimental/stateful/state/HeapKeyedStateBackend.h @@ -15,7 +15,6 @@ */ #pragma once -#include "velox/common/serialization/Serializable.h" #include "velox/experimental/stateful/state/KeyedStateBackend.h" namespace facebook::velox::stateful { diff --git a/velox/experimental/stateful/state/HeapMapState.h b/velox/experimental/stateful/state/HeapMapState.h index b2c5a0982f2d..603741d0eb7a 100644 --- a/velox/experimental/stateful/state/HeapMapState.h +++ b/velox/experimental/stateful/state/HeapMapState.h @@ -20,7 +20,7 @@ namespace facebook::velox::stateful { -// This class is relevent to flink HeapMapState. +// This class is relevant to flink HeapMapState. template class HeapMapState : public MapState { public: diff --git a/velox/experimental/stateful/state/StateEntry.h b/velox/experimental/stateful/state/InternalKeyContext.h similarity index 51% rename from velox/experimental/stateful/state/StateEntry.h rename to velox/experimental/stateful/state/InternalKeyContext.h index ddd6b310d799..69c0c50aac20 100644 --- a/velox/experimental/stateful/state/StateEntry.h +++ b/velox/experimental/stateful/state/InternalKeyContext.h @@ -17,34 +17,19 @@ namespace facebook::velox::stateful { -/** - * This class is relevent to flink org.apache.flink.runtime.state.heap.StateMap. - * remove namespace first. It is a simplified implementation. - * @param type of key - * @param type of namespace - * @param type of state - */ -template -class StateEntry { - public: - StateEntry(K key, N ns, S state) - : key_(std::move(key)), namespace_(ns), state_(std::move(state)) {} - - K getKey() { - return key_; - } +template +class InternalKeyContext { +public: + K getCurrentKey() { + return currentKey_; + } - N getNamespace() { - return namespace_; - } + void setCurrentKey(K key) { + currentKey_ = key; + } - S getState() { - return state_; - } - - private: - K key_; - N namespace_; - S state_; +private: + K currentKey_; }; -} // namespace facebook::velox::stateful + +} // namespace facebook::velox::stateful \ No newline at end of file diff --git a/velox/experimental/stateful/state/KeyedStateBackend.h b/velox/experimental/stateful/state/KeyedStateBackend.h index 3baec96ac38f..7a3ed2f5bd4a 100644 --- a/velox/experimental/stateful/state/KeyedStateBackend.h +++ b/velox/experimental/stateful/state/KeyedStateBackend.h @@ -19,9 +19,11 @@ #include "velox/experimental/stateful/state/CheckpointListener.h" #include "velox/experimental/stateful/state/State.h" #include "velox/experimental/stateful/state/StateDescriptor.h" +#include "velox/experimental/stateful/state/InternalKeyContext.h" #include "velox/experimental/stateful/InternalTimerService.h" #include "velox/experimental/stateful/window/Window.h" #include "velox/vector/ComplexVector.h" +#include "velox/common/memory/MemoryPool.h" namespace facebook::velox::stateful { @@ -52,6 +54,18 @@ class KeyedStateBackend : public Snapshotable, public CheckpointListener { virtual std::shared_ptr> createGroupWindowAggTimerService(Triggerable* triggerable) = 0; + + void setCurrentKey(const uint32_t key) { + keyContext_->setCurrentKey(key); + } + + const uint32_t getCurrentKey() { + return keyContext_->getCurrentKey(); + } + +private: + std::shared_ptr> keyContext_; + velox::memory::MemoryPool* memoryPool; }; using KeyedStateBackendPtr = std::shared_ptr; diff --git a/velox/experimental/stateful/state/State.h b/velox/experimental/stateful/state/State.h index 4b3635e11546..34fe10cc0ecf 100644 --- a/velox/experimental/stateful/state/State.h +++ b/velox/experimental/stateful/state/State.h @@ -57,7 +57,7 @@ class ListState : public State { // virtual void update(std::list& values) = 0; }; -// This class is relevent to flink org.apache.flink.api.common.ListState. +// This class is relevent to flink org.apache.flink.api.common.ValueState. template class ValueState : public State { public: diff --git a/velox/experimental/stateful/state/StateDescriptor.h b/velox/experimental/stateful/state/StateDescriptor.h index a7306e5f7c4b..25423669e4ca 100644 --- a/velox/experimental/stateful/state/StateDescriptor.h +++ b/velox/experimental/stateful/state/StateDescriptor.h @@ -15,12 +15,11 @@ */ #pragma once -//#include "velox/common/serialization/Serializable.h" #include namespace facebook::velox::stateful { -// This class is relevent to flink org.apache.flink.api.common.StateDescriptor. +// This class is relevant to flink org.apache.flink.api.common.StateDescriptor. class StateDescriptor { public: StateDescriptor(const std::string& name) : name_(name) {} diff --git a/velox/experimental/stateful/state/StateMap.h b/velox/experimental/stateful/state/StateMap.h index 3c8e8ddef626..92962126fb40 100644 --- a/velox/experimental/stateful/state/StateMap.h +++ b/velox/experimental/stateful/state/StateMap.h @@ -15,14 +15,45 @@ */ #pragma once -#include "velox/experimental/stateful/state/StateEntry.h" +#include "velox/common/base/BitUtil.h" +#include "velox/experimental/stateful/window/Window.h" +#include "velox/experimental/stateful/utils/MathUtils.h" +#include #include +#include +#include namespace facebook::velox::stateful { +template +struct StateMapEntry { + + K key_; + N namespace_; + S state_; + uint64_t hash_; + std::shared_ptr> next_; + int32_t entryVersion_; + int32_t stateVersion_; + + StateMapEntry(K key, N ns, S state, uint64_t hash, std::shared_ptr>& next, int32_t entryVersion, int32_t stateVersion) + : key_(key), namespace_(ns), state_(state), hash_(hash), next_(next), entryVersion_(entryVersion), stateVersion_(stateVersion) {} + + StateMapEntry(StateMapEntry other, int entryVersion) + : StateMapEntry(other.key_, other.namespace_, other.state_, other.hash_, other.next_, entryVersion, other.stateVersion_) {} + + bool operator==(const StateMapEntry& entry) const { + return entry.key_ == key_ && entry.namespace_ == namespace_ && entry.state_ == state_; + } + + uint64_t hashCode() { + return bits::hashMix(bits::hashMix(key_, namespace_), state_); + } +}; + /** - * This class is relevent to flink org.apache.flink.runtime.state.heap.StateMap. + * This class is relevant to flink org.apache.flink.runtime.state.heap.StateMap. * remove namespace first. * TODO: It is a simplified implementation, not equal to flink. * @param type of key @@ -31,38 +62,275 @@ namespace facebook::velox::stateful { */ template class StateMap { + +#define MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 4 +#define MINIMUM_CAPACITY 4 +#define MAXIMUM_CAPACITY 1 << 30 +#define DEFAULT_CAPACITY 128 +#define MAX_ARRAY_SIZE std::numeric_limits::max() - 8 + public: - StateMap() { - tempTable_.resize(1024); + StateMap() : StateMap(DEFAULT_CAPACITY) {} + StateMap(int32_t capacity) { + primaryTable_ = empty_; + incrementalRehashTable_ = empty_; + highestRequiredSnapshotVersion_ = 0; + stateMapVersion_ = 0; + primaryTableSize_ = 0; + incrementalRehashTableSize_ = 0; + modCount_ = 0; + + if (capacity < 0) { + threshold_ = -1; + return; + } + + if (capacity < MINIMUM_CAPACITY) { + capacity = MINIMUM_CAPACITY; + } else if (capacity > MAXIMUM_CAPACITY) { + capacity = MAXIMUM_CAPACITY; + } else { + capacity = roundUpToPowerOfTwo(capacity); + } + + primaryTable_ = makeTable(capacity); } S get(K key, N ns) { - auto hash = (std::hash{}(key) + std::hash{}(ns)) % tempTable_.size(); - if (tempTable_[hash] != nullptr - && tempTable_[hash]->getKey() == key - && tempTable_[hash]->getNamespace() == ns) { - return tempTable_[hash]->getState(); + uint64_t hash = computeHashForOperationAndDoIncrementalRehash(key, ns); + int32_t requiredVersion = highestRequiredSnapshotVersion_; + std::vector>>& tab = selectActiveTable(hash); + uint64_t index = hash & (tab.size() - 1); + for (auto e = tab[index]; e != nullptr; e = e->next_) { + if (e->hash_ == hash && e->key_ == key && e->namespace_ == ns) { + if (e->stateVersion_ < requiredVersion) { + if (e->entryVersion_ < requiredVersion) { + e = handleChainedEntryCopyOnWrite(tab, hash & (tab.size() - 1), e); + } + e->stateVersion_ = stateMapVersion_; + /// TODO: use type serializer to copy a state + // e->state_ = + } + return e->state_; + } } - return S(); // Return default state if not found + return nullptr; } void put(K key, N ns, S state) { - // TODO: add enlarge logic. - auto hash = (std::hash{}(key) + std::hash{}(ns)) % tempTable_.size(); - tempTable_[hash] = std::make_shared>(key, ns, state); + std::shared_ptr> e = putEntry(key, ns); + e->state_ = state; + e->stateVersion_ = stateMapVersion_; } - void remove(K key, N ns) { - auto hash = (std::hash{}(key) + std::hash{}(ns)) % tempTable_.size(); - if (tempTable_[hash] != nullptr - && tempTable_[hash]->getKey() == key - && tempTable_[hash]->getNamespace() == ns) { - tempTable_[hash] = nullptr; // Remove the entry + bool containsKey(K key, N ns) { + uint64_t hash = computeHashForOperationAndDoIncrementalRehash(key, ns); + std::vector>>& tab = selectActiveTable(hash); + uint64_t index = hash & (tab.size() - 1); + for (auto e = tab[index]; e != nullptr; e = e->next_) { + if (e->hash_ == hash && e->namespace_ == ns && e->key_ == key) { + return true; + } } + return false; + } + + void remove(K key, N ns) { + removeEntry(key, ns); + } + + size_t size() { + return primaryTableSize_ + incrementalRehashTableSize_; } private: - // TODO: use map temporarily, not equal to flink. - std::vector>> tempTable_; + std::vector>> primaryTable_; + std::vector>> incrementalRehashTable_; + std::vector>> empty_; + int32_t highestRequiredSnapshotVersion_; + int32_t stateMapVersion_; + int32_t rehashIndex_; + uint64_t primaryTableSize_; + uint64_t incrementalRehashTableSize_; + uint64_t modCount_; + uint64_t threshold_; + N lastNamespace_; + std::set snapshotVersions_; + + std::shared_ptr> putEntry(K key, N ns) { + uint64_t hash = computeHashForOperationAndDoIncrementalRehash(key, ns); + std::vector>>& tab = selectActiveTable(hash); + uint64_t index = hash & (tab.size() - 1); + for (auto e = tab[index]; e != nullptr; e = e->next_) { + if (e->hash_ == hash && e->key_ == key && e->namespace_ == ns) { + if (e->entryVersion_ < highestRequiredSnapshotVersion_) { + e = handleChainedEntryCopyOnWrite(tab, index, e); + } + return e; + } + } + ++modCount_; + if (size() > threshold_) { + doubleCapacity(); + } + return addNewStateMapEntry(tab, key,ns, hash); + } + + std::shared_ptr> removeEntry(K key, N ns) { + uint64_t hash = computeHashForOperationAndDoIncrementalRehash(key, ns); + std::vector>>& tab = selectActiveTable(hash); + uint64_t index = hash & (tab.size() - 1); + for (std::shared_ptr> e = tab[index], prev = nullptr; e != nullptr; prev = e, e = e->next_) { + if (e->hash_ == hash && e->key_ == key && e->namespace_ == ns) { + if (prev == nullptr) { + tab[index] = e->next_; + } else { + if (prev->entryVersion_ < highestRequiredSnapshotVersion_) { + prev = handleChainedEntryCopyOnWrite(tab, index, prev); + } + prev->next_ = e->next_; + } + ++modCount_; + if (tab == primaryTable_) { + --primaryTableSize_; + } else { + --incrementalRehashTableSize_; + } + return e; + } + } + return nullptr; + } + + std::shared_ptr> addNewStateMapEntry(std::vector>>& table, K key, N& ns, uint64_t hash) { + if (ns == lastNamespace_) { + ns = lastNamespace_; + } else { + lastNamespace_ = ns; + } + + uint64_t index = hash & (table.size() - 1); + std::shared_ptr> newEntry = std::make_shared>(key, ns, nullptr, hash, table[index], stateMapVersion_, stateMapVersion_); + table[index] = newEntry; + if (table == primaryTable_) { + ++primaryTableSize_; + } else { + ++incrementalRehashTableSize_; + } + return newEntry; + } + + std::shared_ptr> handleChainedEntryCopyOnWrite( + std::vector>>& tab, uint64_t mapIdx, std::shared_ptr>& untilEntry) { + int32_t required = highestRequiredSnapshotVersion_; + std::shared_ptr> current = tab[mapIdx]; + std::shared_ptr> copy = nullptr; + if (current->entryVersion_ < required) { + copy = std::make_shared>(*current, stateMapVersion_); + tab[mapIdx] = copy; + } else { + copy = current; + } + + while (current != untilEntry) { + current = current->next_; + if (current->entryVersion_ < required) { + copy->next_ = std::make_shared>(*current, stateMapVersion_); + copy = copy->next_; + } else { + copy = current; + } + } + return copy; + } + + std::vector>>& selectActiveTable(uint64_t hashCode) { + return (hashCode & (primaryTable_.size() - 1)) >= rehashIndex_ ? primaryTable_ : incrementalRehashTable_; + } + + uint64_t compositeHash(K key, N ns) { + if constexpr (std::is_same::value) { + return bits::hashMix(key, ns.hashCode()); + } else { + return bits::hashMix(key, ns); + } + } + + void incrementalRehash() { + std::vector>>& oldMap = primaryTable_; + std::vector>>& newMap = incrementalRehashTable_; + + size_t oldCapacity = oldMap.size(); + size_t newMask = newMap.size() - 1; + int32_t requiredVersion = highestRequiredSnapshotVersion_; + int32_t rhIdx = rehashIndex_; + int32_t transferred = 0; + + while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) { + std::shared_ptr> e = oldMap[rhIdx]; + while (e != nullptr) { + if (e->entryVersion_ < requiredVersion) { + e = std::make_shared>(*e, stateMapVersion_); + } + std::shared_ptr> n = e->next_; + uint64_t pos = e->hash_ & newMask; + e->next_ = newMap[pos]; + newMap[pos] = e; + e = n; + ++transferred; + } + + oldMap[rhIdx] = nullptr; + if (++rhIdx == oldCapacity) { + primaryTable_ = newMap; + incrementalRehashTable_ = empty_; + primaryTableSize_ += incrementalRehashTableSize_; + incrementalRehashTableSize_ = 0; + rehashIndex_ = 0; + return ; + } + } + + primaryTableSize_ -= transferred; + incrementalRehashTableSize_ += transferred; + rehashIndex_ = rhIdx; + } + + bool isRehashing() { + return empty_ != incrementalRehashTable_; + } + + uint64_t computeHashForOperationAndDoIncrementalRehash(K key, N ns) { + if (isRehashing()) { + incrementalRehash(); + } + return compositeHash(key, ns); + } + + void doubleCapacity() { + VELOX_CHECK(!isRehashing(), "There is already a rehash in progress."); + std::vector>>& oldMap = primaryTable_; + int oldCapacity = oldMap.size(); + if (oldCapacity == MAXIMUM_CAPACITY) { + return; + } + incrementalRehashTable_ = makeTable(oldCapacity * 2); + } + + std::vector>> makeTable(uint64_t newCapacity) { + if (newCapacity < MAXIMUM_CAPACITY) { + threshold_ = (newCapacity >> 1) + (newCapacity >> 2); + } else { + if (size() > MAX_ARRAY_SIZE) { + VELOX_FAIL("Maximum capacity of CopyOnWriteStateMap is reached and the job cannot continue."); + } else { + LOG(WARNING) << "Maximum capacity of 2^30 in StateMap reached. Cannot increase hash map size."; + threshold_ = MAX_ARRAY_SIZE; + } + } + std::vector>> newMap; + newMap.resize(newCapacity); + return newMap; + } }; } // namespace facebook::velox::stateful diff --git a/velox/experimental/stateful/state/StateTable.h b/velox/experimental/stateful/state/StateTable.h index a9479034d84e..5a4924f0525b 100644 --- a/velox/experimental/stateful/state/StateTable.h +++ b/velox/experimental/stateful/state/StateTable.h @@ -15,10 +15,7 @@ */ #pragma once -#include "velox/common/base/Exceptions.h" #include "velox/experimental/stateful/state/StateMap.h" -#include -#include #include namespace facebook::velox::stateful { diff --git a/velox/experimental/stateful/state/StreamOperatorStateHandler.h b/velox/experimental/stateful/state/StreamOperatorStateHandler.h index 67eb2165402f..62fff9660af3 100644 --- a/velox/experimental/stateful/state/StreamOperatorStateHandler.h +++ b/velox/experimental/stateful/state/StreamOperatorStateHandler.h @@ -85,8 +85,12 @@ class StreamOperatorStateHandler { // TODO: should make it using template std::shared_ptr> createGroupWindowAggTimerService( Triggerable* triggerable) { - return keyedStateBackend_->createGroupWindowAggTimerService(triggerable); -} + return keyedStateBackend_->createGroupWindowAggTimerService(triggerable); + } + + void setCurrentKey(uint32_t key) { + keyedStateBackend_->setCurrentKey(key); + } private: int operatorId_; diff --git a/velox/experimental/stateful/utils/MathUtils.h b/velox/experimental/stateful/utils/MathUtils.h new file mode 100644 index 000000000000..1f6db6989fa7 --- /dev/null +++ b/velox/experimental/stateful/utils/MathUtils.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace facebook::velox::stateful { + + static int roundUpToPowerOfTwo(int32_t x) { + x = x - 1; + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; + return x + 1; + } + +} \ No newline at end of file diff --git a/velox/experimental/stateful/window/GroupWindowAssigner.cpp b/velox/experimental/stateful/window/GroupWindowAssigner.cpp index c5c5c40a177b..27aa04507aa5 100644 --- a/velox/experimental/stateful/window/GroupWindowAssigner.cpp +++ b/velox/experimental/stateful/window/GroupWindowAssigner.cpp @@ -18,12 +18,12 @@ namespace facebook::velox::stateful { SessionWindowAssigner::SessionWindowAssigner( - long gap, + int64_t gap, bool isEventTime) : gap_(gap), isEventTime_(isEventTime) { } -std::vector SessionWindowAssigner::assignWindows(RowVectorPtr element, long timestamp) { +std::vector SessionWindowAssigner::assignWindows(RowVectorPtr element, int64_t timestamp) { return {TimeWindow(timestamp, timestamp + gap_)}; } diff --git a/velox/experimental/stateful/window/GroupWindowAssigner.h b/velox/experimental/stateful/window/GroupWindowAssigner.h index fd7b573d4b56..4672c3470e23 100644 --- a/velox/experimental/stateful/window/GroupWindowAssigner.h +++ b/velox/experimental/stateful/window/GroupWindowAssigner.h @@ -27,7 +27,7 @@ namespace facebook::velox::stateful { template>> class GroupWindowAssigner { public: - virtual std::vector assignWindows(RowVectorPtr element, long timestamp) = 0; + virtual std::vector assignWindows(RowVectorPtr element, int64_t timestamp) = 0; virtual bool isEventTime() = 0; }; @@ -42,9 +42,9 @@ using GroupWindowAssignerPtr = std::shared_ptr>; class SessionWindowAssigner : public MergingWindowAssigner { public: - SessionWindowAssigner(long gap, bool isEventTime); + SessionWindowAssigner(int64_t gap, bool isEventTime); - std::vector assignWindows(RowVectorPtr element, long timestamp) override; + std::vector assignWindows(RowVectorPtr element, int64_t timestamp) override; void mergeWindows( TimeWindow newWindow, std::set& sortedWindows, MergeResultCollector& callback); @@ -57,7 +57,7 @@ class SessionWindowAssigner : public MergingWindowAssigner { TimeWindow mergeWindow( const TimeWindow& curWindow, const TimeWindow& other, std::set& mergedWindow); - long gap_; + int64_t gap_; bool isEventTime_; }; diff --git a/velox/experimental/stateful/window/MergingWindowSet.cpp b/velox/experimental/stateful/window/MergingWindowSet.cpp index 77215c9c5fec..69736405a678 100644 --- a/velox/experimental/stateful/window/MergingWindowSet.cpp +++ b/velox/experimental/stateful/window/MergingWindowSet.cpp @@ -123,7 +123,7 @@ void MergingWindowSet::close() { MergingFunction::MergingFunction( std::shared_ptr accMergingConsumer, std::shared_ptr> ctx, - long allowedLateness, + int64_t allowedLateness, bool isEventTime) : accMergingConsumer_(std::move(accMergingConsumer)), ctx_(ctx), @@ -137,7 +137,7 @@ void MergingFunction::merge( TimeWindow stateWindowResult, std::vector& stateWindowsToBeMerged) { - long mergeResultMaxTs = + int64_t mergeResultMaxTs = TimeWindowUtil::toEpochMillsForTimer(mergeResult.maxTimestamp(), ctx_->getShiftTimeZone()); VELOX_CHECK(!(isEventTime_ && mergeResultMaxTs + allowedLateness_ <= ctx_->currentWatermark()), diff --git a/velox/experimental/stateful/window/MergingWindowSet.h b/velox/experimental/stateful/window/MergingWindowSet.h index 3326ea9561d8..f34988ecc74b 100644 --- a/velox/experimental/stateful/window/MergingWindowSet.h +++ b/velox/experimental/stateful/window/MergingWindowSet.h @@ -63,7 +63,7 @@ class MergingFunction { MergingFunction( std::shared_ptr accMergingConsumer, std::shared_ptr> ctx, - long allowedLateness, + int64_t allowedLateness, bool isEventTime); void merge( @@ -75,7 +75,7 @@ class MergingFunction { private: std::shared_ptr accMergingConsumer_; std::shared_ptr> ctx_; - long allowedLateness_; + int64_t allowedLateness_; bool isEventTime_; }; diff --git a/velox/experimental/stateful/window/SliceAssigner.cpp b/velox/experimental/stateful/window/SliceAssigner.cpp index c831a1fc5585..418f604adad1 100644 --- a/velox/experimental/stateful/window/SliceAssigner.cpp +++ b/velox/experimental/stateful/window/SliceAssigner.cpp @@ -14,19 +14,17 @@ * limitations under the License. */ #include "velox/experimental/stateful/window/SliceAssigner.h" - -#include +#include "velox/experimental/stateful/window/TimeWindowUtil.h" #include -#include namespace facebook::velox::stateful { SliceAssigner::SliceAssigner( std::unique_ptr keySelector, - long size, - long step, - long offset, - int windowType, + int64_t size, + int64_t step, + int64_t offset, + WindowType windowType, int rowtimeIndex) : keySelector_(std::move(keySelector)), size_(size), @@ -38,29 +36,32 @@ SliceAssigner::SliceAssigner( sliceSize_ = std::gcd(size, step); } -std::map SliceAssigner::assignSliceEnd(const RowVectorPtr& input) { +std::map SliceAssigner::assignSliceEnd(const RowVectorPtr& input) { if (rowtimeIndex_ < 0) { - // TODO: using Processing Time Service - auto now = std::chrono::system_clock::now(); - long timestamp_ms = std::chrono::duration_cast( - now.time_since_epoch()).count(); - return {{timestamp_ms, input}}; + int64_t timestampMs = TimeWindowUtil::getCurrentProcessingTime(); + if (windowType_ == WindowType::TUMBLE) { + int64_t utcTimestamp = TimeWindowUtil::toEpochMillsForTimer(timestampMs, 0); + int64_t windowStart = stateful::TimeWindowUtil::getWindowStartWithOffset(utcTimestamp, offset_, size_); + return {{windowStart + size_, input}}; + } else { + return {{timestampMs, input}}; + } } return keySelector_->partition(input); } -long SliceAssigner::getLastWindowEnd(long sliceEnd) { - if (windowType_ == 0) { // Hopping window +int64_t SliceAssigner::getLastWindowEnd(int64_t sliceEnd) { + if (windowType_ == WindowType::HOP) { // Hopping window return sliceEnd - sliceSize_ + size_; } return sliceEnd; } -long SliceAssigner::getWindowStart(long windowEnd) { +int64_t SliceAssigner::getWindowStart(int64_t windowEnd) { return windowEnd - size_; } -long SliceAssigner::getSliceEndInterval() { +int64_t SliceAssigner::getSliceEndInterval() { return sliceSize_; } diff --git a/velox/experimental/stateful/window/SliceAssigner.h b/velox/experimental/stateful/window/SliceAssigner.h index a46bba22575e..83ede6f3f16e 100644 --- a/velox/experimental/stateful/window/SliceAssigner.h +++ b/velox/experimental/stateful/window/SliceAssigner.h @@ -16,6 +16,7 @@ #pragma once #include "velox/experimental/stateful/KeySelector.h" +#include "velox/experimental/stateful/window/Window.h" namespace facebook::velox::stateful { @@ -24,30 +25,30 @@ class SliceAssigner { public: SliceAssigner( std::unique_ptr keySelector, - long size, - long step, - long offset, - int windowType, + int64_t size, + int64_t step, + int64_t offset, + WindowType windowType, int rowtimeIndex); - std::map assignSliceEnd(const RowVectorPtr& input); + std::map assignSliceEnd(const RowVectorPtr& input); - long getLastWindowEnd(long sliceEnd); + int64_t getLastWindowEnd(int64_t sliceEnd); - long getWindowStart(long windowEnd); + int64_t getWindowStart(int64_t windowEnd); // Iterable expiredSlices(long windowEnd); - long getSliceEndInterval(); + int64_t getSliceEndInterval(); private: const std::unique_ptr keySelector_; - const long size_; - const long step_; - const long offset_; - const int windowType_; // 0: hopping window, 1: slide window - long sliceSize_; + const int64_t size_; + const int64_t step_; + const int64_t offset_; + const WindowType windowType_; + int64_t sliceSize_; int rowtimeIndex_; }; diff --git a/velox/experimental/stateful/window/TimeWindowUtil.cpp b/velox/experimental/stateful/window/TimeWindowUtil.cpp index d6e1a25b7713..342c5bf58ca8 100644 --- a/velox/experimental/stateful/window/TimeWindowUtil.cpp +++ b/velox/experimental/stateful/window/TimeWindowUtil.cpp @@ -20,25 +20,25 @@ namespace facebook::velox::stateful { // static -long TimeWindowUtil::getNextTriggerWatermark( - long currentWatermark, - long interval, +int64_t TimeWindowUtil::getNextTriggerWatermark( + int64_t currentWatermark, + int64_t interval, int shiftTimezone, bool useDayLightSaving) { - if (currentWatermark == LONG_MAX) { + if (currentWatermark == INT64_MAX) { return currentWatermark; } - long triggerWatermark; + int64_t triggerWatermark; // consider the DST timezone if (useDayLightSaving) { // TODO: support time zone - //long utcWindowStart = + //int64_t utcWindowStart = // getWindowStartWithOffset( // toUtcTimestampMills(currentWatermark, shiftTimezone), 0L, interval); //triggerWatermark = toEpochMillsForTimer(utcWindowStart + interval - 1, shiftTimezone); } else { - long start = getWindowStartWithOffset(currentWatermark, 0L, interval); + int64_t start = getWindowStartWithOffset(currentWatermark, 0L, interval); triggerWatermark = start + interval - 1; } @@ -50,8 +50,8 @@ long TimeWindowUtil::getNextTriggerWatermark( } // static -long TimeWindowUtil::getWindowStartWithOffset(long timestamp, long offset, long windowSize) { - long remainder = (timestamp - offset) % windowSize; +int64_t TimeWindowUtil::getWindowStartWithOffset(int64_t timestamp, int64_t offset, int64_t windowSize) { + int64_t remainder = (timestamp - offset) % windowSize; // handle both positive and negative cases if (remainder < 0) { return timestamp - (remainder + windowSize); @@ -60,29 +60,34 @@ long TimeWindowUtil::getWindowStartWithOffset(long timestamp, long offset, long } } +int64_t TimeWindowUtil::getCurrentProcessingTime() { + auto now = std::chrono::system_clock::now(); + return std::chrono::duration_cast(now.time_since_epoch()).count(); +} + // static bool TimeWindowUtil::isWindowFired( - long windowEnd, long currentProgress, int shiftTimeZone) { - if (windowEnd == LONG_MAX) { + int64_t windowEnd, int64_t currentProgress, int shiftTimeZone) { + if (windowEnd == INT64_MAX) { return false; } // TODO: support time zone - long windowTriggerTime = toEpochMillsForTimer(windowEnd - 1, shiftTimeZone); + int64_t windowTriggerTime = toEpochMillsForTimer(windowEnd - 1, shiftTimeZone); return currentProgress >= windowTriggerTime; } // static -long TimeWindowUtil::cleanupTime(long maxTimestamp, long allowedLateness, bool isEventTime) { +int64_t TimeWindowUtil::cleanupTime(int64_t maxTimestamp, int64_t allowedLateness, bool isEventTime) { if (isEventTime) { - long cleanupTime = std::max(0L, maxTimestamp + allowedLateness); - return cleanupTime >= maxTimestamp ? cleanupTime : LONG_MAX; + int64_t cleanupTime = std::max(0L, maxTimestamp + allowedLateness); + return cleanupTime >= maxTimestamp ? cleanupTime : INT64_MAX; } else { return std::max(0L, maxTimestamp); } } // static -long TimeWindowUtil::toEpochMillsForTimer(long timestamp, int shiftTimeZone) { +int64_t TimeWindowUtil::toEpochMillsForTimer(int64_t timestamp, int shiftTimeZone) { // TODO: support time zone return timestamp; } diff --git a/velox/experimental/stateful/window/TimeWindowUtil.h b/velox/experimental/stateful/window/TimeWindowUtil.h index 54ef2e69383b..2e5cf8ccd435 100644 --- a/velox/experimental/stateful/window/TimeWindowUtil.h +++ b/velox/experimental/stateful/window/TimeWindowUtil.h @@ -17,32 +17,32 @@ #include "velox/vector/ComplexVector.h" #include -#include -#include namespace facebook::velox::stateful { // This class is relevent to flink TimeWindowUitl. class TimeWindowUtil { public: - static long getNextTriggerWatermark( - long currentWatermark, - long interval, + static int64_t getNextTriggerWatermark( + int64_t currentWatermark, + int64_t interval, int shiftTimezone, bool useDayLightSaving); - static long getWindowStartWithOffset(long timestamp, long offset, long windowSize); + static int64_t getWindowStartWithOffset(int64_t timestamp, int64_t offset, int64_t windowSize); static bool isWindowFired( - long windowEnd, long currentProgress, int shiftTimeZone); + int64_t windowEnd, int64_t currentProgress, int shiftTimeZone); static RowVectorPtr mergeVectors( const std::list& vectors, memory::MemoryPool* pool); - static long toEpochMillsForTimer(long timestamp, int shiftTimeZone); + static int64_t toEpochMillsForTimer(int64_t timestamp, int shiftTimeZone); - static long cleanupTime(long maxTimestamp, long allowedLateness_, bool isEventTime); + static int64_t cleanupTime(int64_t maxTimestamp, int64_t allowedLateness_, bool isEventTime); + + static int64_t getCurrentProcessingTime(); }; diff --git a/velox/experimental/stateful/window/Window.h b/velox/experimental/stateful/window/Window.h index 1ca7d8027e9b..6f96e8060e87 100644 --- a/velox/experimental/stateful/window/Window.h +++ b/velox/experimental/stateful/window/Window.h @@ -15,19 +15,36 @@ */ #pragma once +#include "velox/common/base/BitUtil.h" +#include "velox/common/base/Exceptions.h" #include #include namespace facebook::velox::stateful { +enum class WindowType : int { + HOP = 0, + TUMBLE, + SESSION, + CUMULATIVE +}; + // This class is relevent to flink WindowBuffer. class Window { public: - virtual long maxTimestamp() = 0; + virtual int64_t maxTimestamp() = 0; virtual bool operator<(const Window& other) const = 0; virtual std::string toString() const = 0; + + static WindowType getType(const int32_t t) { + if (t >= 0 && t <= 3) { + return static_cast(t); + } else { + VELOX_FAIL("Window type value {} is illegal, it is not between 0 and 3", t); + } + } }; class TimeWindow : public Window { @@ -35,18 +52,18 @@ class TimeWindow : public Window { TimeWindow() : start_(-1), end_(-1) {} - TimeWindow(long start, long end) + TimeWindow(int64_t start, int64_t end) : start_(start), end_(end) {} - long maxTimestamp() override { + int64_t maxTimestamp() override { return end_ - 1; } - long start() const { + int64_t start() const { return start_; } - long end() const { + int64_t end() const { return end_; } @@ -78,9 +95,13 @@ class TimeWindow : public Window { return start_ >= 0 && end_ >= start_; } + uint64_t hashCode() { + return bits::hashMix(end_, start_); + } + private: - long start_; - long end_; + int64_t start_; + int64_t end_; }; } // namespace facebook::velox::stateful @@ -90,7 +111,7 @@ template<> struct hash { size_t operator()(const facebook::velox::stateful::TimeWindow& w) const { // TODO: verify it. - return hash()(w.start()) ^ (hash()(w.end()) << 1); + return hash()(w.start()) ^ (hash()(w.end()) << 1); } }; } \ No newline at end of file diff --git a/velox/experimental/stateful/window/WindowBuffer.cpp b/velox/experimental/stateful/window/WindowBuffer.cpp index 7713d368a50c..50bbcead02a5 100644 --- a/velox/experimental/stateful/window/WindowBuffer.cpp +++ b/velox/experimental/stateful/window/WindowBuffer.cpp @@ -20,7 +20,7 @@ namespace facebook::velox::stateful { -void RecordsWindowBuffer::addElement(uint32_t key, long sliceEnd, RowVectorPtr& element) { +void RecordsWindowBuffer::addElement(uint32_t key, int64_t sliceEnd, RowVectorPtr& element) { minSliceEnd_ = std::min(sliceEnd, minSliceEnd_); WindowKey windowKey(key, sliceEnd); auto it = buffer_.find(windowKey); @@ -35,7 +35,7 @@ void RecordsWindowBuffer::addElement(uint32_t key, long sliceEnd, RowVectorPtr& } } -std::unordered_map>& RecordsWindowBuffer::advanceProgress(long progress) { +std::unordered_map>& RecordsWindowBuffer::advanceProgress(int64_t progress) { if (TimeWindowUtil::isWindowFired(minSliceEnd_, progress, shiftTimeZone_)) { // there should be some window to be fired, flush buffer to state first return buffer_; diff --git a/velox/experimental/stateful/window/WindowBuffer.h b/velox/experimental/stateful/window/WindowBuffer.h index 20bfd7d5725e..cfe0bd6f61e5 100644 --- a/velox/experimental/stateful/window/WindowBuffer.h +++ b/velox/experimental/stateful/window/WindowBuffer.h @@ -18,7 +18,7 @@ #include "velox/experimental/stateful/window/WindowKey.h" #include "velox/vector/ComplexVector.h" #include -#include +#include namespace facebook::velox::stateful { @@ -27,9 +27,9 @@ class WindowBuffer { public: // TODO: we use hash key of RowVector as key, but flink use RowVector as key. // This is not equal to flink, should check it. - virtual void addElement(uint32_t key, long window, RowVectorPtr& element) = 0; + virtual void addElement(uint32_t key, int64_t window, RowVectorPtr& element) = 0; - virtual std::unordered_map>& advanceProgress(long progress) = 0; + virtual std::unordered_map>& advanceProgress(int64_t progress) = 0; virtual void clear() = 0; }; @@ -39,13 +39,13 @@ using WindowBufferPtr = std::shared_ptr; // This class is relevent to flink RecordsWindowBuffer. class RecordsWindowBuffer : public WindowBuffer { public: - void addElement(uint32_t key, long sliceEnd, RowVectorPtr& element) override; + void addElement(uint32_t key, int64_t sliceEnd, RowVectorPtr& element) override; - std::unordered_map>& advanceProgress(long progress) override; + std::unordered_map>& advanceProgress(int64_t progress) override; void clear() override { buffer_.clear(); - minSliceEnd_ = LONG_MAX; + minSliceEnd_ = INT64_MAX; } private: @@ -53,7 +53,7 @@ class RecordsWindowBuffer : public WindowBuffer { std::unordered_map> buffer_; // This is used to return empty map when no window is fired. std::unordered_map> empty_; - long minSliceEnd_ = LONG_MAX; + int64_t minSliceEnd_ = INT64_MAX; int shiftTimeZone_ = 0; // TODO: support time zone shift }; diff --git a/velox/experimental/stateful/window/WindowKey.h b/velox/experimental/stateful/window/WindowKey.h index 8245c17abee4..b6f22cef1655 100644 --- a/velox/experimental/stateful/window/WindowKey.h +++ b/velox/experimental/stateful/window/WindowKey.h @@ -24,10 +24,10 @@ namespace facebook::velox::stateful { // This class is relevent to flink WindowBuffer. class WindowKey { public: - WindowKey(uint32_t key, long window) + WindowKey(uint32_t key, int64_t window) : key_(key), window_(window) {} - long window() const { + int64_t window() const { return window_; } @@ -41,7 +41,7 @@ class WindowKey { private: uint32_t key_; - long window_; + int64_t window_; }; } // namespace facebook::velox::stateful @@ -51,7 +51,7 @@ template<> struct hash { size_t operator()(const facebook::velox::stateful::WindowKey& key) const { // TODO: RowVector should have a hash function. - return std::hash()(key.window()) ^ std::hash()(key.key()); + return std::hash()(key.window()) ^ std::hash()(key.key()); } }; } diff --git a/velox/experimental/stateful/window/WindowPartitionFunction.cpp b/velox/experimental/stateful/window/WindowPartitionFunction.cpp index 1a74bce58d07..75ee06fa4593 100644 --- a/velox/experimental/stateful/window/WindowPartitionFunction.cpp +++ b/velox/experimental/stateful/window/WindowPartitionFunction.cpp @@ -15,7 +15,7 @@ */ #include "velox/experimental/stateful/window/WindowPartitionFunction.h" #include "velox/experimental/stateful/window/TimeWindowUtil.h" -#include "velox/vector/FlatVector.h" +#include "velox/experimental/stateful/window/Window.h" #include @@ -24,9 +24,9 @@ namespace facebook::velox::stateful { WindowPartitionFunction::WindowPartitionFunction( const RowTypePtr& inputType, const column_index_t rowtimeIndex, - long size, - long step, - long offset, + int64_t size, + int64_t step, + int64_t offset, int windowType) : inputType_(std::move(inputType)), rowtimeIndex_(rowtimeIndex), @@ -57,11 +57,11 @@ std::optional WindowPartitionFunction::partition( for (auto i = 0; i < size; ++i) { auto child = input.childAt(rowtimeIndex_); auto ts = child->as>()->valueAt(i); - long timestamp = ts.getSeconds() * 1'000 + ts.getNanos() / 1'000'000; - if (windowType_ == 0) { // Hopping window - long start = TimeWindowUtil::getWindowStartWithOffset(timestamp, offset_, sliceSize_); + int64_t timestamp = ts.getSeconds() * 1'000 + ts.getNanos() / 1'000'000; + if (Window::getType(windowType_) == WindowType::HOP) { // Hopping window + int64_t start = TimeWindowUtil::getWindowStartWithOffset(timestamp, offset_, sliceSize_); partitions[i] = start + sliceSize_; - } else if (windowType_ == 1) { // Windowed Slice Assigner + } else if (Window::getType(windowType_) == WindowType::TUMBLE) { // Windowed Slice Assigner partitions[i] = timestamp; } else { VELOX_UNSUPPORTED("Unsupported window type: {}", windowType_); @@ -71,7 +71,7 @@ std::optional WindowPartitionFunction::partition( return std::nullopt; } -long getTimestamp( +int64_t getTimestamp( const RowVectorPtr& input, RowTypePtr inputType, const column_index_t rowtimeIndex ) { diff --git a/velox/experimental/stateful/window/WindowPartitionFunction.h b/velox/experimental/stateful/window/WindowPartitionFunction.h index e85de1e408bd..0feb56747a1d 100644 --- a/velox/experimental/stateful/window/WindowPartitionFunction.h +++ b/velox/experimental/stateful/window/WindowPartitionFunction.h @@ -27,9 +27,9 @@ class WindowPartitionFunction : public core::PartitionFunction { WindowPartitionFunction( const RowTypePtr& inputType, const column_index_t rowtimeIndex, - long size, - long step, - long offset, + int64_t size, + int64_t step, + int64_t offset, int windowType); std::optional partition( @@ -39,11 +39,11 @@ class WindowPartitionFunction : public core::PartitionFunction { private: RowTypePtr inputType_; column_index_t rowtimeIndex_; - long size_; - long step_; - long offset_; + int64_t size_; + int64_t step_; + int64_t offset_; int windowType_; - long sliceSize_; + int64_t sliceSize_; }; class StreamWindowPartitionFunctionSpec : public core::PartitionFunctionSpec { @@ -51,9 +51,9 @@ class StreamWindowPartitionFunctionSpec : public core::PartitionFunctionSpec { StreamWindowPartitionFunctionSpec( const RowTypePtr& inputType, column_index_t rowtimeIndex, - long size, - long step, - long offset, + int64_t size, + int64_t step, + int64_t offset, int windowType) : inputType_(std::move(inputType)), rowtimeIndex_(rowtimeIndex), @@ -77,9 +77,9 @@ class StreamWindowPartitionFunctionSpec : public core::PartitionFunctionSpec { private: RowTypePtr inputType_; column_index_t rowtimeIndex_; - long size_; - long step_; - long offset_; + int64_t size_; + int64_t step_; + int64_t offset_; int windowType_; }; diff --git a/velox/experimental/stateful/window/WindowProcessFunction.cpp b/velox/experimental/stateful/window/WindowProcessFunction.cpp index 8034978477ad..210a692add00 100644 --- a/velox/experimental/stateful/window/WindowProcessFunction.cpp +++ b/velox/experimental/stateful/window/WindowProcessFunction.cpp @@ -21,7 +21,7 @@ namespace facebook::velox::stateful { MergingWindowProcessFunction::MergingWindowProcessFunction( std::shared_ptr windowAssigner, exec::Operator* windowAggregator, - long allowedLateness) + int64_t allowedLateness) : WindowProcessFunction(windowAssigner, windowAggregator, allowedLateness) { } @@ -43,7 +43,7 @@ void MergingWindowProcessFunction::open(std::shared_ptr MergingWindowProcessFunction::assignStateNamespace( - uint32_t key, RowVectorPtr inputRow, long timestamp) { + uint32_t key, RowVectorPtr inputRow, int64_t timestamp) { std::vector elementWindows = windowAssigner_->assignWindows(inputRow, timestamp); mergingWindows_->initializeCache(key); reuseActualWindows_.clear(); @@ -68,11 +68,11 @@ std::vector MergingWindowProcessFunction::assignStateNamespace( } std::vector MergingWindowProcessFunction::assignActualWindows( - RowVectorPtr inputRow, long timestamp) { + RowVectorPtr inputRow, int64_t timestamp) { return reuseActualWindows_; } -void MergingWindowProcessFunction::cleanWindowIfNeeded(TimeWindow window, long currentTime) { +void MergingWindowProcessFunction::cleanWindowIfNeeded(TimeWindow window, int64_t currentTime) { if (isCleanupTime(window, currentTime)) { ctx_->clearTrigger(window); TimeWindow stateWindow = mergingWindows_->getStateWindow(ctx_->currentKey(), window); diff --git a/velox/experimental/stateful/window/WindowProcessFunction.h b/velox/experimental/stateful/window/WindowProcessFunction.h index f1ac8f7bb585..1789c3c47302 100644 --- a/velox/experimental/stateful/window/WindowProcessFunction.h +++ b/velox/experimental/stateful/window/WindowProcessFunction.h @@ -23,7 +23,6 @@ #include "velox/vector/ComplexVector.h" #include -#include #include namespace facebook::velox::stateful { @@ -36,9 +35,9 @@ class FunctionContext { virtual uint32_t currentKey() = 0; - virtual long currentProcessingTime() = 0; + virtual int64_t currentProcessingTime() = 0; - virtual long currentWatermark() = 0; + virtual int64_t currentWatermark() = 0; virtual int getShiftTimeZone() = 0; @@ -64,7 +63,7 @@ class WindowProcessFunction { WindowProcessFunction( GroupWindowAssignerPtr windowAssigner, exec::Operator* windowAggregator, - long allowedLateness) + int64_t allowedLateness) : windowAssigner_(windowAssigner), windowAggregator_(dynamic_cast(windowAggregator)), allowedLateness_(allowedLateness) {} @@ -73,14 +72,14 @@ class WindowProcessFunction { ctx_ = std::move(ctx); } - virtual std::vector assignStateNamespace(uint32_t key, RowVectorPtr inputRow, long timestamp) = 0; + virtual std::vector assignStateNamespace(uint32_t key, RowVectorPtr inputRow, int64_t timestamp) = 0; - virtual std::vector assignActualWindows(RowVectorPtr inputRow, long timestamp) = 0; + virtual std::vector assignActualWindows(RowVectorPtr inputRow, int64_t timestamp) = 0; // TODO: implement it when necessary. virtual void prepareAggregateAccumulatorForEmit(uint32_t key, W window) = 0; - virtual void cleanWindowIfNeeded(W window, long currentTime) = 0; + virtual void cleanWindowIfNeeded(W window, int64_t currentTime) = 0; // TODO: implement it when necessary. virtual void close() {} @@ -97,7 +96,7 @@ class WindowProcessFunction { <= ctx_->currentWatermark())); } - bool isCleanupTime(W window, long time) { + bool isCleanupTime(W window, int64_t time) { return time == TimeWindowUtil::toEpochMillsForTimer( TimeWindowUtil::cleanupTime( window.maxTimestamp(), allowedLateness_, windowAssigner_->isEventTime()), @@ -107,7 +106,7 @@ class WindowProcessFunction { GroupWindowAssignerPtr windowAssigner_; // TODO: windowAggregator may need state GroupWindowAggsHandler* windowAggregator_; - long allowedLateness_; + int64_t allowedLateness_; std::shared_ptr> ctx_; }; @@ -121,17 +120,17 @@ class MergingWindowProcessFunction : public WindowProcessFunction { MergingWindowProcessFunction( std::shared_ptr windowAssigner, exec::Operator* windowAggregator, - long allowedLateness); + int64_t allowedLateness); void open(std::shared_ptr> ctx) override; std::vector - assignStateNamespace(uint32_t key, RowVectorPtr inputRow, long timestamp) override; + assignStateNamespace(uint32_t key, RowVectorPtr inputRow, int64_t timestamp) override; std::vector - assignActualWindows(RowVectorPtr inputRow, long timestamp) override; + assignActualWindows(RowVectorPtr inputRow, int64_t timestamp) override; - void cleanWindowIfNeeded(TimeWindow window, long currentTime) override; + void cleanWindowIfNeeded(TimeWindow window, int64_t currentTime) override; void prepareAggregateAccumulatorForEmit(uint32_t key, TimeWindow window) override; diff --git a/velox/experimental/stateful/window/WindowTrigger.cpp b/velox/experimental/stateful/window/WindowTrigger.cpp index 738b3c162d79..537c865e817c 100644 --- a/velox/experimental/stateful/window/WindowTrigger.cpp +++ b/velox/experimental/stateful/window/WindowTrigger.cpp @@ -18,7 +18,7 @@ namespace facebook::velox::stateful { -long WindowTrigger::triggerTime(TimeWindow window) { +int64_t WindowTrigger::triggerTime(TimeWindow window) { return TimeWindowUtil::toEpochMillsForTimer(window.maxTimestamp(), ctx_->getShiftTimeZone()); } @@ -26,7 +26,7 @@ void AfterEndOfWindow::open(std::shared_ptr ctx) { ctx_ = ctx; } -bool AfterEndOfWindow::onElement(uint32_t key, RowVectorPtr element, long timestamp, TimeWindow window) { +bool AfterEndOfWindow::onElement(uint32_t key, RowVectorPtr element, int64_t timestamp, TimeWindow window) { if (triggerTime(window) <= ctx_->getCurrentWatermark()) { // if the watermark is already past the window fire immediately return true; @@ -36,11 +36,11 @@ bool AfterEndOfWindow::onElement(uint32_t key, RowVectorPtr element, long timest } } -bool AfterEndOfWindow::onProcessingTime(TimeWindow window, long time) { +bool AfterEndOfWindow::onProcessingTime(TimeWindow window, int64_t time) { return false; } -bool AfterEndOfWindow::onEventTime(TimeWindow window, long time) { +bool AfterEndOfWindow::onEventTime(TimeWindow window, int64_t time) { return time == triggerTime(window); } diff --git a/velox/experimental/stateful/window/WindowTrigger.h b/velox/experimental/stateful/window/WindowTrigger.h index 8cfb7e2c3631..acaabe9422b8 100644 --- a/velox/experimental/stateful/window/WindowTrigger.h +++ b/velox/experimental/stateful/window/WindowTrigger.h @@ -29,11 +29,11 @@ class WindowTrigger { public: virtual void open(std::shared_ptr ctx) = 0; - virtual bool onElement(uint32_t key, RowVectorPtr element, long timestamp, TimeWindow window) = 0; + virtual bool onElement(uint32_t key, RowVectorPtr element, int64_t timestamp, TimeWindow window) = 0; - virtual bool onProcessingTime(TimeWindow window, long time) = 0; + virtual bool onProcessingTime(TimeWindow window, int64_t time) = 0; - virtual bool onEventTime(TimeWindow window, long time) = 0; + virtual bool onEventTime(TimeWindow window, int64_t time) = 0; virtual bool canMerge() { return false; @@ -45,7 +45,7 @@ class WindowTrigger { virtual void clear(uint32_t key, TimeWindow window) = 0; - virtual long triggerTime(TimeWindow window); + virtual int64_t triggerTime(TimeWindow window); protected: std::shared_ptr ctx_; @@ -55,11 +55,11 @@ class AfterEndOfWindow : public WindowTrigger { public: void open(std::shared_ptr ctx); - bool onElement(uint32_t key, RowVectorPtr element, long timestamp, TimeWindow window) override; + bool onElement(uint32_t key, RowVectorPtr element, int64_t timestamp, TimeWindow window) override; - bool onProcessingTime(TimeWindow window, long time) override; + bool onProcessingTime(TimeWindow window, int64_t time) override; - bool onEventTime(TimeWindow window, long time) override ; + bool onEventTime(TimeWindow window, int64_t time) override ; void clear(uint32_t key, TimeWindow window) override; @@ -72,28 +72,28 @@ class TriggerContext : public std::enable_shared_from_this{ public: virtual void open() = 0; - virtual bool onElement(uint32_t key, RowVectorPtr row, long timestamp, TimeWindow window) = 0; + virtual bool onElement(uint32_t key, RowVectorPtr row, int64_t timestamp, TimeWindow window) = 0; - virtual bool onProcessingTime(TimeWindow window, long time) = 0; + virtual bool onProcessingTime(TimeWindow window, int64_t time) = 0; - virtual bool onEventTime(TimeWindow window, long time) = 0; + virtual bool onEventTime(TimeWindow window, int64_t time) = 0; virtual void onMerge(uint32_t key, TimeWindow window) = 0; - virtual long getCurrentProcessingTime() = 0; + virtual int64_t getCurrentProcessingTime() = 0; - virtual long getCurrentWatermark() = 0; + virtual int64_t getCurrentWatermark() = 0; // TODO: support it // MetricGroup getMetricGroup(); - virtual void registerProcessingTimeTimer(uint32_t key, TimeWindow window, long time) = 0; + virtual void registerProcessingTimeTimer(uint32_t key, TimeWindow window, int64_t time) = 0; - virtual void registerEventTimeTimer(uint32_t key, TimeWindow window, long time) = 0; + virtual void registerEventTimeTimer(uint32_t key, TimeWindow window, int64_t time) = 0; - virtual void deleteProcessingTimeTimer(uint32_t key, TimeWindow window, long time) = 0; + virtual void deleteProcessingTimeTimer(uint32_t key, TimeWindow window, int64_t time) = 0; - virtual void deleteEventTimeTimer(uint32_t key, TimeWindow window, long time) = 0; + virtual void deleteEventTimeTimer(uint32_t key, TimeWindow window, int64_t time) = 0; virtual int getShiftTimeZone() = 0;