Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ public enum BackupPhase {
private List<String> incrBackupFileList;

/**
* New region server log timestamps for table set after distributed log roll key - table name,
* value - map of RegionServer hostname -> last log rolled timestamp
* New region server log timestamps for table set after distributed log roll. The keys consist of
* all tables that are part of the backup chain of the backup root (not just the tables that were
* specified when creating the backup, which could be a subset). The value is a map of
* RegionServer hostname to the last log-roll timestamp, i.e. the point up to which logs are
* included in the backup.
*/
private Map<TableName, Map<String, Long>> tableSetTimestampMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,26 +335,6 @@ public void finishBackupSession() throws IOException {
systemTable.finishBackupExclusiveOperation();
}

/**
* Read the last backup start code (timestamp) of last successful backup. Will return null if
* there is no startcode stored in backup system table or the value is of length 0. These two
* cases indicate there is no successful backup completed so far.
* @return the timestamp of a last successful backup
* @throws IOException exception
*/
public String readBackupStartCode() throws IOException {
return systemTable.readBackupStartCode(backupInfo.getBackupRootDir());
}

/**
* Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
* @param startCode start code
* @throws IOException exception
*/
public void writeBackupStartCode(Long startCode) throws IOException {
systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir());
}

/**
* Get the RS log information after the last log roll from backup system table.
* @return RS log info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,48 +458,6 @@ public BackupInfo readBackupInfo(String backupId) throws IOException {
}
}

/**
* Read the last backup start code (timestamp) of last successful backup. Will return null if
* there is no start code stored on hbase or the value is of length 0. These two cases indicate
* there is no successful backup completed so far.
* @param backupRoot directory path to backup destination
* @return the timestamp of last successful backup
* @throws IOException exception
*/
public String readBackupStartCode(String backupRoot) throws IOException {
LOG.trace("read backup start code from backup system table");

try (Table table = connection.getTable(tableName)) {
Get get = createGetForStartCode(backupRoot);
Result res = table.get(get);
if (res.isEmpty()) {
return null;
}
Cell cell = res.listCells().get(0);
byte[] val = CellUtil.cloneValue(cell);
if (val.length == 0) {
return null;
}
return new String(val, StandardCharsets.UTF_8);
}
}

/**
* Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte.
* @param startCode start code
* @param backupRoot root directory path to backup
* @throws IOException exception
*/
public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("write backup start code to backup system table " + startCode);
}
try (Table table = connection.getTable(tableName)) {
Put put = createPutForStartCode(startCode.toString(), backupRoot);
table.put(put);
}
}

/**
* Exclusive operations are: create, delete, merge
* @throws IOException if a table operation fails or an active backup exclusive operation is
Expand Down Expand Up @@ -1128,29 +1086,6 @@ private BackupInfo resultToBackupInfo(Result res) throws IOException {
return cellToBackupInfo(cell);
}

/**
* Creates Get operation to retrieve start code from backup system table
* @return get operation
* @throws IOException exception
*/
private Get createGetForStartCode(String rootPath) throws IOException {
Get get = new Get(rowkey(START_CODE_ROW, rootPath));
get.addFamily(BackupSystemTable.META_FAMILY);
get.readVersions(1);
return get;
}

/**
* Creates Put operation to store start code to backup system table
* @return put operation
*/
private Put createPutForStartCode(String startCode, String rootPath) {
Put put = new Put(rowkey(START_CODE_ROW, rootPath));
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"),
Bytes.toBytes(startCode));
return put;
}

/**
* Creates Get to retrieve incremental backup table set from backup system table
* @return get operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,7 @@ public void execute() throws IOException {
try (Admin admin = conn.getAdmin()) {
// Begin BACKUP
beginBackup(backupManager, backupInfo);
String savedStartCode;
boolean firstBackup;
// do snapshot for full table backup

savedStartCode = backupManager.readBackupStartCode();
firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L;
if (firstBackup) {
// This is our first backup. Let's put some marker to system table so that we can hold the
// logs while we do the backup.
backupManager.writeBackupStartCode(0L);
}

// We roll log here before we do the snapshot. It is possible there is duplicate data
// in the log that is already in the snapshot. But if we do it after the snapshot, we
// could have data loss.
Expand Down Expand Up @@ -188,15 +178,11 @@ public void execute() throws IOException {
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();

backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);

backupManager
.deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());

// backup complete
backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
completeBackup(conn, backupInfo, BackupType.FULL, conf);
} catch (Exception e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -27,7 +28,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
Expand Down Expand Up @@ -59,23 +59,11 @@ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOEx
public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
List<String> logList;
Map<String, Long> newTimestamps;
Map<String, Long> previousTimestampMins;
Map<String, Long> previousTimestampMins =
BackupUtils.getRSLogTimestampMins(readLogTimestampMap());

String savedStartCode = readBackupStartCode();

// key: tableName
// value: <RegionServer,PreviousTimeStamp>
Map<TableName, Map<String, Long>> previousTimestampMap = readLogTimestampMap();

previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);

if (LOG.isDebugEnabled()) {
LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
}
// get all new log files from .logs and .oldlogs after last TS and before new timestamp
if (
savedStartCode == null || previousTimestampMins == null || previousTimestampMins.isEmpty()
) {
if (previousTimestampMins.isEmpty()) {
throw new IOException("Cannot read any previous back up timestamps from backup system table. "
+ "In order to create an incremental backup, at least one full backup is needed.");
}
Expand All @@ -85,7 +73,7 @@ public Map<String, Long> getIncrBackupLogFileMap() throws IOException {

newTimestamps = readRegionServerLastLogRollResult();

logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf);
logList = excludeProcV2WALs(logList);
backupInfo.setIncrBackupFileList(logList);

Expand Down Expand Up @@ -113,16 +101,15 @@ private List<String> excludeProcV2WALs(List<String> logList) {
* @param olderTimestamps the timestamp for each region server of the last backup.
* @param newestTimestamps the timestamp for each region server that the backup should lead to.
* @param conf the Hadoop and Hbase configuration
* @param savedStartCode the startcode (timestamp) of last successful backup.
* @return a list of log files to be backed up
* @throws IOException exception
*/
private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
throws IOException {
Map<String, Long> newestTimestamps, Configuration conf) throws IOException {
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
+ "\n newestTimestamps: " + newestTimestamps);

long prevBackupStartTs = Collections.min(olderTimestamps.values());
Path walRootDir = CommonFSUtils.getWALRootDir(conf);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Expand Down Expand Up @@ -219,7 +206,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
* our last backup.
*/
if (oldTimeStamp == null) {
if (currentLogTS < Long.parseLong(savedStartCode)) {
if (currentLogTS < prevBackupStartTs) {
// This log file is really old, its region server was before our last backup.
continue;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,10 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
backupManager.readLogTimestampMap();

backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);

List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());

// backup complete
backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);

List<byte[]> bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
*/
package org.apache.hadoop.hbase.backup.master;

import static org.apache.hadoop.hbase.backup.BackupInfo.withState;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +30,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
Expand Down Expand Up @@ -91,10 +89,11 @@ public void init(Map<String, Object> params) {
* Calculates the timestamp boundary up to which all backup roots have already included the WAL.
* I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental
* backups.
* @param backups all completed or running backups to use for the calculation of the boundary
* @param tsBuffer a buffer (in ms) to lower the boundary for the default bound
*/
private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable)
throws IOException {
List<BackupInfo> backups = sysTable.getBackupHistory(withState(BackupState.COMPLETE));
protected static BackupBoundaries calculatePreservationBoundary(List<BackupInfo> backups,
long tsBuffer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cleaning WALs if they are older than the WAL cleanup time-boundary. "
Expand All @@ -103,11 +102,12 @@ private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTab
backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", ")));
}

// This map tracks, for every backup root, the most recent created backup (= highest timestamp)
// This map tracks, for every backup root, the most recent (= highest timestamp) completed
// backup, or if there is no such one, the currently running backup (if any)
Map<String, BackupInfo> newestBackupPerRootDir = new HashMap<>();
for (BackupInfo backup : backups) {
BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir());
if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) {
if (existingEntry == null || existingEntry.getState() == BackupState.RUNNING) {
newestBackupPerRootDir.put(backup.getBackupRootDir(), backup);
}
}
Expand All @@ -119,24 +119,12 @@ private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTab
.collect(Collectors.joining(", ")));
}

BackupBoundaries.BackupBoundariesBuilder builder =
BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT));
for (BackupInfo backupInfo : newestBackupPerRootDir.values()) {
long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir()));
// Iterate over all tables in the timestamp map, which contains all tables covered in the
// backup root, not just the tables included in that specific backup (which could be a subset)
for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) {
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
.entrySet()) {
builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode);
}
}
}

BackupBoundaries.BackupBoundariesBuilder builder = BackupBoundaries.builder(tsBuffer);
newestBackupPerRootDir.values().forEach(builder::update);
BackupBoundaries boundaries = builder.build();

if (LOG.isDebugEnabled()) {
LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode());
LOG.debug("Boundaries defaultBoundary: {}", boundaries.getDefaultBoundary());
for (Map.Entry<Address, Long> entry : boundaries.getBoundaries().entrySet()) {
LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(),
entry.getValue());
Expand All @@ -159,10 +147,11 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
}

BackupBoundaries boundaries;
try {
try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
boundaries = serverToPreservationBoundaryTs(sysTable);
}
try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
long tsBuffer = getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT);
List<BackupInfo> backupHistory = sysTable.getBackupHistory(
i -> EnumSet.of(BackupState.COMPLETE, BackupState.RUNNING).contains(i.getState()));
boundaries = calculatePreservationBoundary(backupHistory, tsBuffer);
} catch (IOException ex) {
LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
ex.getMessage(), ex);
Expand Down
Loading