From d25f3d00bf9568c68a42156aea64e76e75926abb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 26 Feb 2026 17:52:30 +0200 Subject: [PATCH] [improve][broker] Optimize AsyncTokenBucket overflow solution further to reduce fallback to BigInteger --- .../pulsar/broker/qos/AsyncTokenBucket.java | 130 ++++++++++++------ .../broker/qos/AsyncTokenBucketTest.java | 2 + 2 files changed, 93 insertions(+), 39 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java index f7fc0031ccd74..37b7d0905221b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java @@ -103,6 +103,11 @@ public abstract class AsyncTokenBucket { * which has a complex solution to prevent the CAS loop content problem. */ private final LongAdder pendingConsumedTokens = new LongAdder(); + /** + * Cached pre-reduced rate parameters. Invalidated whenever {@link #getRate()} or + * {@link #getRatePeriodNanos()} returns a different value (relevant for dynamic-rate buckets). + */ + private volatile RateParameters rateParameters; protected AsyncTokenBucket(MonotonicClock clockSource, long addTokensResolutionNanos) { this.clockSource = clockSource; @@ -122,6 +127,18 @@ public static DynamicRateAsyncTokenBucketBuilder builderForDynamicRate() { protected abstract long getTargetAmountOfTokensAfterThrottling(); + private RateParameters resolveRateParameters() { + long rate = getRate(); + long ratePeriodNanos = getRatePeriodNanos(); + RateParameters current = rateParameters; + if (current != null && current.rate == rate && current.ratePeriodNanos == ratePeriodNanos) { + return current; + } + RateParameters updated = new RateParameters(rate, ratePeriodNanos); + rateParameters = updated; + return updated; + } + /** * Consumes tokens and possibly updates the token balance. New tokens are calculated if the last new token * calculation occurred more than addTokensResolutionNanos nanoseconds ago. When new tokens are added, the @@ -203,13 +220,12 @@ private long calculateNewTokensSinceLastUpdate(long currentNanos) { newTokens = 0; } else { long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0); - long currentRate = getRate(); - long currentRatePeriodNanos = getRatePeriodNanos(); + RateParameters rp = resolveRateParameters(); // new tokens is the amount of tokens that are created in the duration since the last update // with the configured rate - newTokens = safeMulDivFloor(durationNanos, currentRate, currentRatePeriodNanos); + newTokens = rp.calculateTokens(durationNanos); // carry forward the remainder nanos so that the rounding error is eliminated - long consumedNanos = safeMulDivFloor(newTokens, currentRatePeriodNanos, currentRate); + long consumedNanos = rp.calculateDuration(newTokens); long remainderNanos = durationNanos >= consumedNanos ? durationNanos - consumedNanos : 0; if (remainderNanos > 0) { REMAINDER_NANOS_UPDATER.addAndGet(this, remainderNanos); @@ -277,41 +293,8 @@ public long calculateThrottlingDuration(long requiredTokens) { } catch (ArithmeticException e) { needTokens = Long.MAX_VALUE; } - return safeMulDivFloor(needTokens, getRatePeriodNanos(), getRate()); - } - - private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) { - if (multiplicand < 0 || multiplier < 0) { - throw new IllegalArgumentException("multiplicand and multiplier must be >= 0"); - } - if (divisor <= 0) { - throw new IllegalArgumentException("divisor must be > 0"); - } - if (multiplicand == 0 || multiplier == 0) { - return 0; - } - // Fast path - // Check if multiplication fits in a 64-bit value - // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction), - // avoiding the cost of a division-based overflow check. - // It returns the upper 64 bits of the full 128-bit multiplication result. - // When the result is 0, the product fits in 64 bits. - if (Math.multiplyHigh(multiplicand, multiplier) == 0) { - long product = multiplicand * multiplier; - if (product >= 0) { - // product fits in signed 64-bit - return product / divisor; - } - // product is in [2^63, 2^64): fits unsigned but not signed - long result = Long.divideUnsigned(product, divisor); - // cap at Long.MAX_VALUE if result itself overflows signed long - return result >= 0 ? result : Long.MAX_VALUE; - } - // Fallback to BigInteger division - BigInteger result = BigInteger.valueOf(multiplicand) - .multiply(BigInteger.valueOf(multiplier)) - .divide(BigInteger.valueOf(divisor)); - return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE; + RateParameters rp = resolveRateParameters(); + return rp.calculateDuration(needTokens); } /** @@ -342,4 +325,73 @@ public boolean containsTokens() { return tokens() > 0; } + /** + * Holds pre-computed rate parameters where {@code rate} and {@code ratePeriodNanos} have been + * divided by their highest common power of ten. This reduction keeps the operands smaller and + * avoids overflow in {@link #safeMulDivFloor(long, long, long)} without changing the result of + * any integer floor-division (dividing numerator and denominator by the same factor preserves + * the quotient). The instance is cached and reused as long as the rate and period are unchanged. + */ + static final class RateParameters { + final long rate; + final long ratePeriodNanos; + final long reducedRate; + final long reducedRatePeriod; + + RateParameters(long rate, long ratePeriodNanos) { + this.rate = rate; + this.ratePeriodNanos = ratePeriodNanos; + long r = rate; + long p = ratePeriodNanos; + while (r % 10 == 0 && p % 10 == 0) { + r /= 10; + p /= 10; + } + this.reducedRate = r; + this.reducedRatePeriod = p; + } + + public long calculateTokens(long durationNanos) { + return safeMulDivFloor(durationNanos, reducedRate, reducedRatePeriod); + } + + public long calculateDuration(long tokens) { + return safeMulDivFloor(tokens, reducedRatePeriod, reducedRate); + } + + private static long safeMulDivFloor(long multiplicand, long multiplier, long divisor) { + if (multiplicand < 0 || multiplier < 0) { + throw new IllegalArgumentException("multiplicand and multiplier must be >= 0"); + } + if (divisor <= 0) { + throw new IllegalArgumentException("divisor must be > 0"); + } + if (multiplicand == 0 || multiplier == 0) { + return 0; + } + // Fast path + // Check if multiplication fits in a 64-bit value + // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul instruction), + // avoiding the cost of a division-based overflow check. + // It returns the upper 64 bits of the full 128-bit multiplication result. + // When the result is 0, the product fits in 64 bits. + if (Math.multiplyHigh(multiplicand, multiplier) == 0) { + long product = multiplicand * multiplier; + if (product >= 0) { + // product fits in signed 64-bit + return product / divisor; + } + // product is in [2^63, 2^64): fits unsigned but not signed + long result = Long.divideUnsigned(product, divisor); + // cap at Long.MAX_VALUE if result itself overflows signed long + return result >= 0 ? result : Long.MAX_VALUE; + } + // Fallback to BigInteger division + BigInteger result = BigInteger.valueOf(multiplicand) + .multiply(BigInteger.valueOf(multiplier)) + .divide(BigInteger.valueOf(divisor)); + return result.bitLength() < Long.SIZE ? result.longValue() : Long.MAX_VALUE; + } + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java index f709cb6544886..8f47b948acaff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java @@ -205,6 +205,8 @@ public Object[][] largeRates() { {1_000_000_000L}, {1_500_000_000L}, {2_000_000_000L}, + {100_000_000_000L}, + {Long.MAX_VALUE / 1_000_000_000L * 1_000_000_000L}, {Long.MAX_VALUE / 100L}, {Long.MAX_VALUE / 10L}, {Long.MAX_VALUE / 9L},