diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java index 39f903185a68..edf5a2b517ff 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java @@ -148,8 +148,11 @@ public enum BackupPhase { private List incrBackupFileList; /** - * New region server log timestamps for table set after distributed log roll key - table name, - * value - map of RegionServer hostname -> last log rolled timestamp + * New region server log timestamps for table set after distributed log roll. The keys consist of + * all tables that are part of the backup chain of the backup root (not just the tables that were + * specified when creating the backup, which could be a subset). The value is a map of + * RegionServer hostname to the last log-roll timestamp, i.e. the point up to which logs are + * included in the backup. */ private Map> tableSetTimestampMap; diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 9cab455441bb..a3f2e16f5f19 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -335,26 +335,6 @@ public void finishBackupSession() throws IOException { systemTable.finishBackupExclusiveOperation(); } - /** - * Read the last backup start code (timestamp) of last successful backup. Will return null if - * there is no startcode stored in backup system table or the value is of length 0. These two - * cases indicate there is no successful backup completed so far. - * @return the timestamp of a last successful backup - * @throws IOException exception - */ - public String readBackupStartCode() throws IOException { - return systemTable.readBackupStartCode(backupInfo.getBackupRootDir()); - } - - /** - * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. - * @param startCode start code - * @throws IOException exception - */ - public void writeBackupStartCode(Long startCode) throws IOException { - systemTable.writeBackupStartCode(startCode, backupInfo.getBackupRootDir()); - } - /** * Get the RS log information after the last log roll from backup system table. * @return RS log info diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 3b46335a7299..641c5c9613c6 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -458,48 +458,6 @@ public BackupInfo readBackupInfo(String backupId) throws IOException { } } - /** - * Read the last backup start code (timestamp) of last successful backup. Will return null if - * there is no start code stored on hbase or the value is of length 0. These two cases indicate - * there is no successful backup completed so far. - * @param backupRoot directory path to backup destination - * @return the timestamp of last successful backup - * @throws IOException exception - */ - public String readBackupStartCode(String backupRoot) throws IOException { - LOG.trace("read backup start code from backup system table"); - - try (Table table = connection.getTable(tableName)) { - Get get = createGetForStartCode(backupRoot); - Result res = table.get(get); - if (res.isEmpty()) { - return null; - } - Cell cell = res.listCells().get(0); - byte[] val = CellUtil.cloneValue(cell); - if (val.length == 0) { - return null; - } - return new String(val, StandardCharsets.UTF_8); - } - } - - /** - * Write the start code (timestamp) to backup system table. If passed in null, then write 0 byte. - * @param startCode start code - * @param backupRoot root directory path to backup - * @throws IOException exception - */ - public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException { - if (LOG.isTraceEnabled()) { - LOG.trace("write backup start code to backup system table " + startCode); - } - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStartCode(startCode.toString(), backupRoot); - table.put(put); - } - } - /** * Exclusive operations are: create, delete, merge * @throws IOException if a table operation fails or an active backup exclusive operation is @@ -1128,29 +1086,6 @@ private BackupInfo resultToBackupInfo(Result res) throws IOException { return cellToBackupInfo(cell); } - /** - * Creates Get operation to retrieve start code from backup system table - * @return get operation - * @throws IOException exception - */ - private Get createGetForStartCode(String rootPath) throws IOException { - Get get = new Get(rowkey(START_CODE_ROW, rootPath)); - get.addFamily(BackupSystemTable.META_FAMILY); - get.readVersions(1); - return get; - } - - /** - * Creates Put operation to store start code to backup system table - * @return put operation - */ - private Put createPutForStartCode(String startCode, String rootPath) { - Put put = new Put(rowkey(START_CODE_ROW, rootPath)); - put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("startcode"), - Bytes.toBytes(startCode)); - return put; - } - /** * Creates Get to retrieve incremental backup table set from backup system table * @return get operation diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index 2293fd4f8149..37aaa30d2b2e 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -133,17 +133,7 @@ public void execute() throws IOException { try (Admin admin = conn.getAdmin()) { // Begin BACKUP beginBackup(backupManager, backupInfo); - String savedStartCode; - boolean firstBackup; - // do snapshot for full table backup - - savedStartCode = backupManager.readBackupStartCode(); - firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; - if (firstBackup) { - // This is our first backup. Let's put some marker to system table so that we can hold the - // logs while we do the backup. - backupManager.writeBackupStartCode(0L); - } + // We roll log here before we do the snapshot. It is possible there is duplicate data // in the log that is already in the snapshot. But if we do it after the snapshot, we // could have data loss. @@ -188,15 +178,11 @@ public void execute() throws IOException { Map> newTableSetTimestampMap = backupManager.readLogTimestampMap(); - backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); - backupManager .deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList()); // backup complete + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); completeBackup(conn, backupInfo, BackupType.FULL, conf); } catch (Exception e) { failBackup(conn, backupInfo, backupManager, e, "Unexpected BackupException : ", diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index 20884edf836e..2d7aeb646db3 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -27,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; @@ -59,23 +59,11 @@ public IncrementalBackupManager(Connection conn, Configuration conf) throws IOEx public Map getIncrBackupLogFileMap() throws IOException { List logList; Map newTimestamps; - Map previousTimestampMins; + Map previousTimestampMins = + BackupUtils.getRSLogTimestampMins(readLogTimestampMap()); - String savedStartCode = readBackupStartCode(); - - // key: tableName - // value: - Map> previousTimestampMap = readLogTimestampMap(); - - previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap); - - if (LOG.isDebugEnabled()) { - LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId()); - } // get all new log files from .logs and .oldlogs after last TS and before new timestamp - if ( - savedStartCode == null || previousTimestampMins == null || previousTimestampMins.isEmpty() - ) { + if (previousTimestampMins.isEmpty()) { throw new IOException("Cannot read any previous back up timestamps from backup system table. " + "In order to create an incremental backup, at least one full backup is needed."); } @@ -85,7 +73,7 @@ public Map getIncrBackupLogFileMap() throws IOException { newTimestamps = readRegionServerLastLogRollResult(); - logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); + logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf); logList = excludeProcV2WALs(logList); backupInfo.setIncrBackupFileList(logList); @@ -113,16 +101,15 @@ private List excludeProcV2WALs(List logList) { * @param olderTimestamps the timestamp for each region server of the last backup. * @param newestTimestamps the timestamp for each region server that the backup should lead to. * @param conf the Hadoop and Hbase configuration - * @param savedStartCode the startcode (timestamp) of last successful backup. * @return a list of log files to be backed up * @throws IOException exception */ private List getLogFilesForNewBackup(Map olderTimestamps, - Map newestTimestamps, Configuration conf, String savedStartCode) - throws IOException { + Map newestTimestamps, Configuration conf) throws IOException { LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps + "\n newestTimestamps: " + newestTimestamps); + long prevBackupStartTs = Collections.min(olderTimestamps.values()); Path walRootDir = CommonFSUtils.getWALRootDir(conf); Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); @@ -219,7 +206,7 @@ private List getLogFilesForNewBackup(Map olderTimestamps, * our last backup. */ if (oldTimeStamp == null) { - if (currentLogTS < Long.parseLong(savedStartCode)) { + if (currentLogTS < prevBackupStartTs) { // This log file is really old, its region server was before our last backup. continue; } else { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 4fac0ca3c93c..9a8b38fb9abf 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -332,14 +332,10 @@ public void execute() throws IOException, ColumnFamilyMismatchException { Map> newTableSetTimestampMap = backupManager.readLogTimestampMap(); - backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); - List bulkLoads = handleBulkLoad(backupInfo.getTableNames()); // backup complete + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); List bulkLoadedRows = Lists.transform(bulkLoads, BulkLoad::getRowKey); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java index 7c62b7a681d8..971e80e2f83a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -17,12 +17,11 @@ */ package org.apache.hadoop.hbase.backup.master; -import static org.apache.hadoop.hbase.backup.BackupInfo.withState; - import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; @@ -91,10 +89,11 @@ public void init(Map params) { * Calculates the timestamp boundary up to which all backup roots have already included the WAL. * I.e. WALs with a lower (= older) or equal timestamp are no longer needed for future incremental * backups. + * @param backups all completed or running backups to use for the calculation of the boundary + * @param tsBuffer a buffer (in ms) to lower the boundary for the default bound */ - private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTable) - throws IOException { - List backups = sysTable.getBackupHistory(withState(BackupState.COMPLETE)); + protected static BackupBoundaries calculatePreservationBoundary(List backups, + long tsBuffer) { if (LOG.isDebugEnabled()) { LOG.debug( "Cleaning WALs if they are older than the WAL cleanup time-boundary. " @@ -103,11 +102,12 @@ private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTab backups.stream().map(BackupInfo::getBackupId).sorted().collect(Collectors.joining(", "))); } - // This map tracks, for every backup root, the most recent created backup (= highest timestamp) + // This map tracks, for every backup root, the most recent (= highest timestamp) completed + // backup, or if there is no such one, the currently running backup (if any) Map newestBackupPerRootDir = new HashMap<>(); for (BackupInfo backup : backups) { BackupInfo existingEntry = newestBackupPerRootDir.get(backup.getBackupRootDir()); - if (existingEntry == null || existingEntry.getStartTs() < backup.getStartTs()) { + if (existingEntry == null || existingEntry.getState() == BackupState.RUNNING) { newestBackupPerRootDir.put(backup.getBackupRootDir(), backup); } } @@ -119,24 +119,12 @@ private BackupBoundaries serverToPreservationBoundaryTs(BackupSystemTable sysTab .collect(Collectors.joining(", "))); } - BackupBoundaries.BackupBoundariesBuilder builder = - BackupBoundaries.builder(getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT)); - for (BackupInfo backupInfo : newestBackupPerRootDir.values()) { - long startCode = Long.parseLong(sysTable.readBackupStartCode(backupInfo.getBackupRootDir())); - // Iterate over all tables in the timestamp map, which contains all tables covered in the - // backup root, not just the tables included in that specific backup (which could be a subset) - for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { - for (Map.Entry entry : backupInfo.getTableSetTimestampMap().get(table) - .entrySet()) { - builder.addBackupTimestamps(entry.getKey(), entry.getValue(), startCode); - } - } - } - + BackupBoundaries.BackupBoundariesBuilder builder = BackupBoundaries.builder(tsBuffer); + newestBackupPerRootDir.values().forEach(builder::update); BackupBoundaries boundaries = builder.build(); if (LOG.isDebugEnabled()) { - LOG.debug("Boundaries oldestStartCode: {}", boundaries.getOldestStartCode()); + LOG.debug("Boundaries defaultBoundary: {}", boundaries.getDefaultBoundary()); for (Map.Entry entry : boundaries.getBoundaries().entrySet()) { LOG.debug("Server: {}, WAL cleanup boundary: {}", entry.getKey().getHostName(), entry.getValue()); @@ -159,10 +147,11 @@ public Iterable getDeletableFiles(Iterable files) { } BackupBoundaries boundaries; - try { - try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { - boundaries = serverToPreservationBoundaryTs(sysTable); - } + try (BackupSystemTable sysTable = new BackupSystemTable(conn)) { + long tsBuffer = getConf().getLong(TS_BUFFER_KEY, TS_BUFFER_DEFAULT); + List backupHistory = sysTable.getBackupHistory( + i -> EnumSet.of(BackupState.COMPLETE, BackupState.RUNNING).contains(i.getState())); + boundaries = calculatePreservationBoundary(backupHistory, tsBuffer); } catch (IOException ex) { LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs", ex.getMessage(), ex); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java index eb08a0ef5e08..4e0c26d7f0ae 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupBoundaries.java @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -35,22 +37,19 @@ @InterfaceAudience.Private public class BackupBoundaries { private static final Logger LOG = LoggerFactory.getLogger(BackupBoundaries.class); - private static final BackupBoundaries EMPTY_BOUNDARIES = - new BackupBoundaries(Collections.emptyMap(), Long.MAX_VALUE); // This map tracks, for every RegionServer, the least recent (= oldest / lowest timestamp) // inclusion in any backup. In other words, it is the timestamp boundary up to which all backup // roots have included the WAL in their backup. private final Map boundaries; - // The minimum WAL roll timestamp from the most recent backup of each backup root, used as a - // fallback cleanup boundary for RegionServers without explicit backup boundaries (e.g., servers - // that joined after backups began) - private final long oldestStartCode; + // The fallback cleanup boundary for RegionServers without explicit backup boundaries + // (e.g., servers that joined after backups began can be checked against this boundary) + private final long defaultBoundary; - private BackupBoundaries(Map boundaries, long oldestStartCode) { + private BackupBoundaries(Map boundaries, long defaultBoundary) { this.boundaries = boundaries; - this.oldestStartCode = oldestStartCode; + this.defaultBoundary = defaultBoundary; } public boolean isDeletable(Path walLogPath) { @@ -68,11 +67,11 @@ public boolean isDeletable(Path walLogPath) { long pathTs = AbstractFSWALProvider.getTimestamp(walLogPath.getName()); if (!boundaries.containsKey(address)) { - boolean isDeletable = pathTs <= oldestStartCode; + boolean isDeletable = pathTs <= defaultBoundary; if (LOG.isDebugEnabled()) { LOG.debug( - "Boundary for {} not found. isDeletable = {} based on oldestStartCode = {} and WAL ts of {}", - walLogPath, isDeletable, oldestStartCode, pathTs); + "Boundary for {} not found. isDeletable = {} based on defaultBoundary = {} and WAL ts of {}", + walLogPath, isDeletable, defaultBoundary, pathTs); } return isDeletable; } @@ -104,8 +103,8 @@ public Map getBoundaries() { return boundaries; } - public long getOldestStartCode() { - return oldestStartCode; + public long getDefaultBoundary() { + return defaultBoundary; } public static BackupBoundariesBuilder builder(long tsCleanupBuffer) { @@ -116,34 +115,55 @@ public static class BackupBoundariesBuilder { private final Map boundaries = new HashMap<>(); private final long tsCleanupBuffer; - private long oldestStartCode = Long.MAX_VALUE; + private long oldestStartTs = Long.MAX_VALUE; private BackupBoundariesBuilder(long tsCleanupBuffer) { this.tsCleanupBuffer = tsCleanupBuffer; } - public BackupBoundariesBuilder addBackupTimestamps(String host, long hostLogRollTs, - long backupStartCode) { - Address address = Address.fromString(host); - Long storedTs = boundaries.get(address); - if (storedTs == null || hostLogRollTs < storedTs) { - boundaries.put(address, hostLogRollTs); + /** + * Updates the boundaries based on the provided backup info. + * @param backupInfo the most recent completed backup info for a backup root, or if there is no + * such completed backup, the currently running backup. + */ + public void update(BackupInfo backupInfo) { + switch (backupInfo.getState()) { + case COMPLETE: + // If a completed backup exists in the backup root, we want to protect all logs that + // have been created since the log-roll that happened for that backup. + for (TableName table : backupInfo.getTableSetTimestampMap().keySet()) { + for (Map.Entry entry : backupInfo.getTableSetTimestampMap().get(table) + .entrySet()) { + Address regionServerAddress = Address.fromString(entry.getKey()); + Long logRollTs = entry.getValue(); + + Long storedTs = boundaries.get(regionServerAddress); + if (storedTs == null || logRollTs < storedTs) { + boundaries.put(regionServerAddress, logRollTs); + } + } + } + break; + case RUNNING: + // If there is NO completed backup in the backup root, there are no persisted log-roll + // timestamps available yet. But, we still want to protect all files that have been + // created since the start of the currently running backup. + oldestStartTs = Math.min(oldestStartTs, backupInfo.getStartTs()); + break; + default: + throw new IllegalStateException("Unexpected backupInfo state: " + backupInfo.getState()); } - - if (oldestStartCode > backupStartCode) { - oldestStartCode = backupStartCode; - } - - return this; } public BackupBoundaries build() { if (boundaries.isEmpty()) { - return EMPTY_BOUNDARIES; + long defaultBoundary = oldestStartTs - tsCleanupBuffer; + return new BackupBoundaries(Collections.emptyMap(), defaultBoundary); } - oldestStartCode -= tsCleanupBuffer; - return new BackupBoundaries(boundaries, oldestStartCode); + long oldestRollTs = Collections.min(boundaries.values()); + long defaultBoundary = Math.min(oldestRollTs, oldestStartTs) - tsCleanupBuffer; + return new BackupBoundaries(boundaries, defaultBoundary); } } } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 033975c42484..e64b6a86fd50 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.TreeSet; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -92,30 +93,8 @@ private BackupUtils() { */ public static Map getRSLogTimestampMins(Map> rsLogTimestampMap) { - if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) { - return null; - } - - HashMap rsLogTimestampMins = new HashMap<>(); - HashMap> rsLogTimestampMapByRS = new HashMap<>(); - - for (Entry> tableEntry : rsLogTimestampMap.entrySet()) { - TableName table = tableEntry.getKey(); - Map rsLogTimestamp = tableEntry.getValue(); - for (Entry rsEntry : rsLogTimestamp.entrySet()) { - String rs = rsEntry.getKey(); - Long ts = rsEntry.getValue(); - rsLogTimestampMapByRS.putIfAbsent(rs, new HashMap<>()); - rsLogTimestampMapByRS.get(rs).put(table, ts); - } - } - - for (Entry> entry : rsLogTimestampMapByRS.entrySet()) { - String rs = entry.getKey(); - rsLogTimestampMins.put(rs, BackupUtils.getMinValue(entry.getValue())); - } - - return rsLogTimestampMins; + return rsLogTimestampMap.values().stream().flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, Math::min)); } /** @@ -343,22 +322,6 @@ public static void checkTargetDir(String backupRootPath, Configuration conf) thr } } - /** - * Get the min value for all the Values a map. - * @param map map - * @return the min value - */ - public static Long getMinValue(Map map) { - Long minTimestamp = null; - if (map != null) { - ArrayList timestampList = new ArrayList<>(map.values()); - Collections.sort(timestampList); - // The min among all the RS log timestamps will be kept in backup system table table. - minTimestamp = timestampList.get(0); - } - return minTimestamp; - } - /** * Parses host name:port from archived WAL path * @param p path diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 68366dcf688a..93ae8cc01550 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -189,14 +189,11 @@ public void execute() throws IOException { Map> newTableSetTimestampMap = backupManager.readLogTimestampMap(); - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); - handleBulkLoad(backupInfo.getTableNames()); failStageIf(Stage.stage_4); // backup complete + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf); } catch (Exception e) { @@ -223,16 +220,7 @@ public void execute() throws IOException { // Begin BACKUP beginBackup(backupManager, backupInfo); failStageIf(Stage.stage_0); - String savedStartCode; - boolean firstBackup; // do snapshot for full table backup - savedStartCode = backupManager.readBackupStartCode(); - firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 0L; - if (firstBackup) { - // This is our first backup. Let's put some marker to system table so that we can hold the - // logs while we do the backup. - backupManager.writeBackupStartCode(0L); - } failStageIf(Stage.stage_1); // We roll log here before we do the snapshot. It is possible there is duplicate data // in the log that is already in the snapshot. But if we do it after the snapshot, we @@ -273,11 +261,9 @@ public void execute() throws IOException { Map> newTableSetTimestampMap = backupManager.readLogTimestampMap(); - Long newStartCode = - BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap)); - backupManager.writeBackupStartCode(newStartCode); failStageIf(Stage.stage_4); // backup complete + backupInfo.setTableSetTimestampMap(newTableSetTimestampMap); completeBackup(conn, backupInfo, BackupType.FULL, conf); } catch (Exception e) { diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java index 731f5f5b354f..eb88386f1ca7 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupSystemTable.java @@ -107,15 +107,6 @@ public void testUpdateReadDeleteBackupStatus() throws IOException { cleanBackupTable(); } - @Test - public void testWriteReadBackupStartCode() throws IOException { - long code = 100L; - table.writeBackupStartCode(code, "root"); - String readCode = table.readBackupStartCode("root"); - assertEquals(code, Long.parseLong(readCode)); - cleanBackupTable(); - } - private void cleanBackupTable() throws IOException { Admin admin = UTIL.getAdmin(); admin.disableTable(BackupSystemTable.getTableName(conf)); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java index 41060347b1db..65dcf8c54158 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java @@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.TestBackupBase; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.util.BackupBoundaries; -import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -225,10 +225,6 @@ public void testDoesNotDeleteWALsFromNewServers() throws Exception { List walsAfterB1 = getListOfWALFiles(TEST_UTIL.getConfiguration()); LOG.info("WALs after B1: {}", walsAfterB1.size()); - String startCodeStr = systemTable.readBackupStartCode(backupRoot1.toString()); - long b1StartCode = Long.parseLong(startCodeStr); - LOG.info("B1 startCode: {}", b1StartCode); - // Add a new RegionServer to the cluster LOG.info("Adding new RegionServer to cluster"); rsThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer(); @@ -295,13 +291,15 @@ public void testDoesNotDeleteWALsFromNewServers() throws Exception { @Test public void testCanDeleteFileWithNewServerWALs() { - long backupStartCode = 1000000L; + BackupInfo backup = new BackupInfo(); + backup.setState(BackupInfo.BackupState.COMPLETE); + backup.setTableSetTimestampMap( + Map.of(TableName.valueOf("table1"), Map.of("server1:60020", 1000000L))); + BackupBoundaries boundaries = + BackupLogCleaner.calculatePreservationBoundary(List.of(backup), 0L); + // Old WAL from before the backup Path oldWAL = new Path("/hbase/oldWALs/server1%2C60020%2C12345.500000"); - String host = BackupUtils.parseHostNameFromLogFile(oldWAL); - BackupBoundaries boundaries = BackupBoundaries.builder(0L) - .addBackupTimestamps(host, backupStartCode, backupStartCode).build(); - assertTrue("WAL older than backup should be deletable", BackupLogCleaner.canDeleteFile(boundaries, oldWAL)); @@ -310,12 +308,57 @@ public void testCanDeleteFileWithNewServerWALs() { assertTrue("WAL at boundary should be deletable", BackupLogCleaner.canDeleteFile(boundaries, boundaryWAL)); - // WAL from a server that joined AFTER the backup + // WAL created after the backup boundary + Path newWal = new Path("/hbase/oldWALs/server1%2C60020%2C12345.1500000"); + assertFalse("WAL newer than backup should not be deletable", + BackupLogCleaner.canDeleteFile(boundaries, newWal)); + + // WAL from a new server that joined AFTER the backup Path newServerWAL = new Path("/hbase/oldWALs/newserver%2C60020%2C99999.1500000"); assertFalse("WAL from new server (after backup) should NOT be deletable", BackupLogCleaner.canDeleteFile(boundaries, newServerWAL)); } + @Test + public void testFirstBackupProtectsFiles() { + BackupInfo backup = new BackupInfo(); + backup.setBackupId("backup_1"); + backup.setState(BackupInfo.BackupState.RUNNING); + backup.setStartTs(100L); + // Running backups have no TableSetTimestampMap + + BackupBoundaries boundaries = + BackupLogCleaner.calculatePreservationBoundary(List.of(backup), 5L); + + // There's only a single backup, and it is still running, so it's a FULL backup. + // We expect files preceding the snapshot are deletable, but files after the start are not. + // Because this is not region-server-specific, the buffer is taken into account. + Path path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.94"); + assertTrue(BackupLogCleaner.canDeleteFile(boundaries, path)); + path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.95"); + assertTrue(BackupLogCleaner.canDeleteFile(boundaries, path)); + path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.96"); + assertFalse(BackupLogCleaner.canDeleteFile(boundaries, path)); + + // If there is an already completed backup in the same root, only that one matters. + // In this case, a region-server-specific timestamp is available, so the buffer is not used. + BackupInfo backup2 = new BackupInfo(); + backup2.setBackupId("backup_2"); + backup2.setState(BackupInfo.BackupState.COMPLETE); + backup2.setStartTs(80L); + backup2 + .setTableSetTimestampMap(Map.of(TableName.valueOf("table1"), Map.of("server1:60020", 90L))); + + boundaries = BackupLogCleaner.calculatePreservationBoundary(List.of(backup, backup2), 5L); + + path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.89"); + assertTrue(BackupLogCleaner.canDeleteFile(boundaries, path)); + path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.90"); + assertTrue(BackupLogCleaner.canDeleteFile(boundaries, path)); + path = new Path("/hbase/oldWALs/server1%2C60020%2C12345.91"); + assertFalse(BackupLogCleaner.canDeleteFile(boundaries, path)); + } + @Test public void testCleansUpHMasterWal() { Path path = new Path("/hbase/MasterData/WALs/hmaster,60000,1718808578163");