Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class GarbageCollectorThread implements Runnable {

private static final AtomicLong threadNum = new AtomicLong(0);
final AbstractLogCompactor.Throttler throttler;
private final RateLimiter compactionReadByteRateLimiter;

/**
* Create a garbage collector thread.
Expand Down Expand Up @@ -266,6 +268,7 @@ public void removeEntryLog(long logToRemove) {
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);

lastMinorCompactionTime = lastMajorCompactionTime = System.currentTimeMillis();
compactionReadByteRateLimiter = RateLimiter.create(conf.getCompactionReadRateByBytes());
}

private EntryLogMetadataMap createEntryLogMetadataMap() throws IOException {
Expand Down Expand Up @@ -619,9 +622,10 @@ void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMet
meta.getEntryLogId(), meta.getUsage(), threshold);
}

long priorRemainingSize = meta.getRemainingSize();
long compactSize = meta.getTotalSize() - meta.getRemainingSize();
compactionReadByteRateLimiter.acquire((int) (compactSize));
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
gcStats.getReclaimedSpaceViaCompaction().addCount(compactSize);
compactedBuckets[bucketIndex]++;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String COMPACTION_RATE = "compactionRate";
protected static final String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
protected static final String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
protected static final String COMPACTION_READ_RATE_BY_BYTES = "compactionReadRateByBytes";

// Gc Parameters
protected static final String GC_WAIT_TIME = "gcWaitTime";
Expand Down Expand Up @@ -2965,6 +2966,27 @@ public ServerConfiguration setCompactionRateByBytes(int rate) {
return this;
}

/**
* Set the read rate of compaction.
*
* @param rate read rate of compaction (read bytes per second)
*
* @return ServerConfiguration
*/
public ServerConfiguration setCompactionReadRateByBytes(int rate) {
setProperty(COMPACTION_READ_RATE_BY_BYTES, rate);
return this;
}

/**
* Get the read rate of compaction. Default is Integer.MAX_VALUE.
*
* @return read rate of compaction (read bytes per second)
*/
public int getCompactionReadRateByBytes() {
return getInt(COMPACTION_READ_RATE_BY_BYTES, Integer.MAX_VALUE);
}

/**
* Should we remove pages from page cache after force write.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,7 @@ public void testCancelledCompactionWhenShuttingDown() throws Exception {
restartBookies(c -> {
c.setIsThrottleByBytes(true);
c.setCompactionRateByBytes(ENTRY_SIZE / 1000);
c.setCompactionReadRateByBytes(ENTRY_SIZE / 1000);
c.setMinorCompactionThreshold(0.2f);
c.setMajorCompactionThreshold(0.5f);
return c;
Expand Down
3 changes: 3 additions & 0 deletions conf/bk_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,9 @@ ledgerDirectories=/tmp/bk-data
# Set the rate at which compaction will readd entries. The unit is bytes added per second.
# compactionRateByBytes=1000000

# Set the rate at which compaction will read entries. The unit is bytes read per second.
# compactionReadRateByBytes=1000000

# Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction,
# which it will use new entry log files to store compacted entries during compaction; if it is set to false,
# it will use normal compaction, which it shares same entry log file with normal add operations.
Expand Down
1 change: 1 addition & 0 deletions site3/website/docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ The table below lists parameters that you can set to configure bookies. All conf
| isThrottleByBytes | Throttle compaction by bytes or by entries. | false |
| compactionRateByEntries | Set the rate at which compaction will read entries. The unit is adds per second. | 1000 |
| compactionRateByBytes | Set the rate at which compaction will read entries. The unit is bytes added per second. | 1000000 |
| compactionReadRateByBytes | Set the rate at which compaction will read entries. The unit is bytes read per second. | 1000000 |
| useTransactionalCompaction | Flag to enable/disable transactional compaction. If it is set to true, it will use transactional compaction, which uses<br />new entry log files to store entries after compaction; otherwise, it will use normal compaction, which shares same entry<br />log file with normal add operations.<br /> | false |


Expand Down