allLedgerDirs = Lists.newArrayList();
+ allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
+ if (indexDirectories != ledgerDirectories) {
+ allLedgerDirs.addAll(Arrays.asList(indexDirectories));
+ }
- try {
- Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+ try {
+ Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
Lists.newArrayList(journalDirectories), allLedgerDirs);
- } catch (BookieException | IOException e) {
- LOG.error(
+ } catch (BookieException e) {
+ LOG.error(
"Exception while updating cookie for storage expansion", e);
- return -1;
+ return -1;
+ }
+ return 0;
+ } finally {
+ rm.close();
}
- return 0;
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
new file mode 100644
index 00000000000..560868aa349
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStatus.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.util.BookKeeperConstants.BOOKIE_STATUS_FILENAME;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The status object represents the current status of a bookie instance.
+ */
+public class BookieStatus {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BookieStatus.class);
+
+ static final int CURRENT_STATUS_LAYOUT_VERSION = 1;
+
+ enum BookieMode {
+ READ_ONLY,
+ READ_WRITE;
+ }
+
+ private final static long INVALID_UPDATE_TIME = -1;
+
+ private int layoutVersion;
+ private long lastUpdateTime;
+ private BookieMode bookieMode;
+
+
+ BookieStatus() {
+ this.bookieMode = BookieMode.READ_WRITE;
+ this.layoutVersion = CURRENT_STATUS_LAYOUT_VERSION;
+ this.lastUpdateTime = INVALID_UPDATE_TIME;
+ }
+
+ private synchronized BookieMode getBookieMode() {
+ return bookieMode;
+ }
+
+ public synchronized boolean isInWritable() {
+ return bookieMode.equals(BookieMode.READ_WRITE);
+ }
+
+ synchronized boolean setToWritableMode() {
+ if (!bookieMode.equals(BookieMode.READ_WRITE)) {
+ bookieMode = BookieMode.READ_WRITE;
+ this.lastUpdateTime = System.currentTimeMillis();
+ return true;
+ }
+ return false;
+ }
+
+ synchronized boolean isInReadOnlyMode() {
+ return bookieMode.equals(BookieMode.READ_ONLY);
+ }
+
+ synchronized boolean setToReadOnlyMode() {
+ if (!bookieMode.equals(BookieMode.READ_ONLY)) {
+ bookieMode = BookieMode.READ_ONLY;
+ this.lastUpdateTime = System.currentTimeMillis();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Write bookie status to multiple directories in best effort
+ *
+ * @param directories list of directories to write to
+ *
+ */
+ synchronized void writeToDirectories(List directories) {
+ boolean success = false;
+ for (File dir : directories) {
+ try {
+ File statusFile = new File(dir, BOOKIE_STATUS_FILENAME);
+ writeToFile(statusFile, toString());
+ success = true;
+ } catch (IOException e) {
+ LOG.warn("IOException while trying to write bookie status to directory {}." +
+ " This is fine if not all directories are failed.", dir);
+ }
+ }
+ if(success){
+ LOG.info("Successfully persist bookie status {}", this.bookieMode);
+ } else {
+ LOG.warn("Failed to persist bookie status {}", this.bookieMode);
+ }
+ }
+
+ /**
+ * Write content to the file. If file does not exist, it will create one.
+ *
+ * @param file file that you want to write to
+ * @param body content to write
+ * @throws IOException
+ */
+ private static void writeToFile(File file, String body) throws IOException {
+ FileOutputStream fos = new FileOutputStream(file);
+ BufferedWriter bw = null;
+ try {
+ bw = new BufferedWriter(new OutputStreamWriter(fos, UTF_8));
+ bw.write(body);
+ } finally {
+ if (bw != null) {
+ bw.close();
+ }
+ fos.close();
+ }
+ }
+
+ /**
+ * Read bookie status from the status files, and update the bookie status if read succeed.
+ * If a status file is not readable or not found, it will skip and try to read from the next file.
+ *
+ * @param directories list of directories that store the status file
+ */
+ void readFromDirectories(List directories) {
+ boolean success = false;
+ for (File dir : directories) {
+ File statusFile = new File(dir, BOOKIE_STATUS_FILENAME);
+ try {
+ BookieStatus status = readFromFile(statusFile);
+ if (null != status) {
+ synchronized (status) {
+ if (status.lastUpdateTime > this.lastUpdateTime) {
+ this.lastUpdateTime = status.lastUpdateTime;
+ this.layoutVersion = status.layoutVersion;
+ this.bookieMode = status.bookieMode;
+ success = true;
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("IOException while trying to read bookie status from directory {}." +
+ " This is fine if not all directories failed.", dir);
+ } catch (IllegalArgumentException e ){
+ LOG.warn("IllegalArgumentException while trying to read bookie status from directory {}." +
+ " This is fine if not all directories failed.", dir);
+ }
+ }
+ if (success) {
+ LOG.info("Successfully retrieve bookie status {} from disks.", getBookieMode());
+ } else {
+ LOG.warn("Failed to retrieve bookie status from disks." +
+ " Fall back to current or default bookie status: {}", getBookieMode());
+ }
+ }
+
+
+ /**
+ * Function to read the bookie status from a single file
+ *
+ * @param file file to read from
+ * @return BookieStatus if not error, null if file not exist or any exception happens
+ * @throws IOException
+ */
+ private BookieStatus readFromFile(File file)
+ throws IOException, IllegalArgumentException {
+ if (!file.exists()) {
+ return null;
+ }
+
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream(file), UTF_8))) {
+ return parse(reader);
+ }
+ }
+
+ /**
+ * Parse the bookie status object using appropriate layout version
+ *
+ * @param reader
+ * @return BookieStatus if parse succeed, otherwise return null
+ * @throws IOException
+ */
+ public BookieStatus parse(BufferedReader reader)
+ throws IOException, IllegalArgumentException {
+ BookieStatus status = new BookieStatus();
+ String line = reader.readLine();
+ if (line == null || line.trim().isEmpty()) {
+ LOG.debug("Empty line when parsing bookie status");
+ return null;
+ }
+ String[] parts = line.split(",");
+ if (parts.length == 0) {
+ LOG.debug("Error in parsing bookie status: {}", line);
+ return null;
+ }
+ synchronized (status) {
+ status.layoutVersion = Integer.parseInt(parts[0].trim());
+ if (status.layoutVersion == 1 && parts.length == 3) {
+ status.bookieMode = BookieMode.valueOf(parts[1]);
+ status.lastUpdateTime = Long.parseLong(parts[2].trim());
+ return status;
+ }
+ }
+ return null;
+
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(CURRENT_STATUS_LAYOUT_VERSION);
+ builder.append(",");
+ builder.append(getBookieMode());
+ builder.append(",");
+ builder.append(System.currentTimeMillis());
+ builder.append("\n");
+ return builder.toString();
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index ec7793c0295..e2d4f1281f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -37,28 +37,23 @@
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.List;
import java.util.Set;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats.CookieFormat;
import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* When a bookie starts for the first time it generates a cookie, and stores
- * the cookie in zookeeper as well as in the each of the local filesystem
+ * the cookie in registration manager as well as in the each of the local filesystem
* directories it uses. This cookie is used to ensure that for the life of the
* bookie, its configuration stays the same. If any of the bookie directories
* becomes unavailable, the bookie becomes unavailable. If the bookie changes
@@ -233,79 +228,61 @@ public void writeToDirectory(File directory) throws IOException {
}
/**
- * Writes cookie details to ZooKeeper.
+ * Writes cookie details to registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
* @param version version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @throws BookieException when fail to write the cookie.
*/
- public void writeToZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version)
- throws KeeperException, InterruptedException, UnknownHostException {
- List zkAcls = ZkUtils.getACLs(conf);
- String bookieCookiePath = conf.getZkLedgersRootPath() + "/"
- + BookKeeperConstants.COOKIE_NODE;
- String zkPath = getZkPath(conf);
- byte[] data = toString().getBytes(UTF_8);
- if (Version.NEW == version) {
- if (zk.exists(bookieCookiePath, false) == null) {
- try {
- zk.create(bookieCookiePath, new byte[0],
- zkAcls, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nne) {
- LOG.info("More than one bookie tried to create {} at once. Safe to ignore",
- bookieCookiePath);
- }
- }
- zk.create(zkPath, data,
- zkAcls, CreateMode.PERSISTENT);
- } else {
- if (!(version instanceof LongVersion)) {
- throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
- }
- zk.setData(zkPath, data, (int) ((LongVersion) version).getLongVersion());
+ public void writeToRegistrationManager(RegistrationManager rm, ServerConfiguration conf, Version version)
+ throws BookieException {
+ BookieSocketAddress address = null;
+ try {
+ address = Bookie.getBookieAddress(conf);
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
}
+ byte[] data = toString().getBytes(UTF_8);
+ rm.writeCookie(address.toString(), new Versioned<>(data, version));
}
/**
- * Deletes cookie from ZooKeeper and sets znode version to DEFAULT_COOKIE_ZNODE_VERSION.
+ * Deletes cookie from registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
- * @param version zookeeper version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @param version cookie version
+ * @throws BookieException when fail to delete cookie.
*/
- public void deleteFromZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version) throws KeeperException,
- InterruptedException, UnknownHostException {
- BookieSocketAddress address = Bookie.getBookieAddress(conf);
- deleteFromZooKeeper(zk, conf, address, version);
+ public void deleteFromRegistrationManager(RegistrationManager rm,
+ ServerConfiguration conf,
+ Version version) throws BookieException {
+ BookieSocketAddress address = null;
+ try {
+ address = Bookie.getBookieAddress(conf);
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
+ }
+ deleteFromRegistrationManager(rm, address, version);
}
/**
- * Delete cookie from zookeeper.
+ * Delete cookie from registration manager.
*
- * @param zk zookeeper client
- * @param conf configuration instance
+ * @param rm registration manager
* @param address bookie address
* @param version cookie version
- * @throws KeeperException
- * @throws InterruptedException
- * @throws UnknownHostException
+ * @throws BookieException when fail to delete cookie.
*/
- public void deleteFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf,
- BookieSocketAddress address, Version version)
- throws KeeperException, InterruptedException, UnknownHostException {
+ public void deleteFromRegistrationManager(RegistrationManager rm,
+ BookieSocketAddress address,
+ Version version) throws BookieException {
if (!(version instanceof LongVersion)) {
throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
}
- String zkPath = getZkPath(conf, address);
- zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
- LOG.info("Removed cookie from {} for bookie {}.", conf.getZkLedgersRootPath(), address);
+ rm.removeCookie(address.toString(), version);
}
/**
@@ -326,48 +303,44 @@ static Builder generateCookie(ServerConfiguration conf)
}
/**
- * Read cookie from ZooKeeper.
+ * Read cookie from registration manager.
*
- * @param zk ZooKeeper instance
+ * @param rm registration manager
* @param conf configuration
* @return versioned cookie object
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws UnknownHostException
+ * @throws BookieException when fail to read cookie
*/
- public static Versioned readFromZooKeeper(ZooKeeper zk, ServerConfiguration conf)
- throws KeeperException, InterruptedException, IOException, UnknownHostException {
- return readFromZooKeeper(zk, conf, Bookie.getBookieAddress(conf));
+ public static Versioned readFromRegistrationManager(RegistrationManager rm, ServerConfiguration conf)
+ throws BookieException {
+ try {
+ return readFromRegistrationManager(rm, Bookie.getBookieAddress(conf));
+ } catch (UnknownHostException e) {
+ throw new UnknownBookieIdException(e);
+ }
}
/**
- * Read cookie from zookeeper for a given bookie address.
+ * Read cookie from registration manager for a given bookie address.
*
- * @param zk zookeeper client
- * @param conf configuration instance
+ * @param rm registration manager
* @param address bookie address
* @return versioned cookie object
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- * @throws UnknownHostException
+ * @throws BookieException when fail to read cookie
*/
- public static Versioned readFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf, BookieSocketAddress address)
- throws KeeperException, InterruptedException, IOException, UnknownHostException {
- String zkPath = getZkPath(conf, address);
-
- Stat stat = zk.exists(zkPath, false);
- byte[] data = zk.getData(zkPath, false, stat);
- BufferedReader reader = new BufferedReader(new StringReader(new String(data, UTF_8)));
+ public static Versioned readFromRegistrationManager(RegistrationManager rm,
+ BookieSocketAddress address) throws BookieException {
+ Versioned cookieData = rm.readCookie(address.toString());
+ BufferedReader reader = new BufferedReader(new StringReader(new String(cookieData.getValue(), UTF_8)));
try {
- Builder builder = parse(reader);
- Cookie cookie = builder.build();
- // sets stat version from ZooKeeper
- LongVersion version = new LongVersion(stat.getVersion());
- return new Versioned(cookie, version);
- } finally {
- reader.close();
+ try {
+ Builder builder = parse(reader);
+ Cookie cookie = builder.build();
+ return new Versioned(cookie, cookieData.getVersion());
+ } finally {
+ reader.close();
+ }
+ } catch (IOException ioe) {
+ throw new InvalidCookieException(ioe);
}
}
@@ -389,30 +362,6 @@ public static Cookie readFromDirectory(File directory) throws IOException {
}
}
- /**
- * Returns cookie path in zookeeper.
- *
- * @param conf configuration
- * @return cookie zk path
- * @throws UnknownHostException
- */
- static String getZkPath(ServerConfiguration conf)
- throws UnknownHostException {
- return getZkPath(conf, Bookie.getBookieAddress(conf));
- }
-
- /**
- * Return cookie path for a given bookie address.
- *
- * @param conf configuration
- * @param address bookie address
- * @return cookie path for bookie
- */
- static String getZkPath(AbstractConfiguration conf, BookieSocketAddress address) {
- String bookieCookiePath = conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.COOKIE_NODE;
- return bookieCookiePath + "/" + address;
- }
-
/**
* Check whether the 'bookieHost' was created using a hostname or an IP
* address. Represent as 'hostname/IPaddress' if the InetSocketAddress was
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index 045f8b2efbd..4158abddaf8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -36,20 +36,19 @@
import java.util.NoSuchElementException;
import java.util.Scanner;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.HardLink;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,22 +127,15 @@ private static int detectPreviousVersion(File directory) throws IOException {
}
}
- private static ZooKeeper newZookeeper(final ServerConfiguration conf)
+ private static RegistrationManager newRegistrationManager(final ServerConfiguration conf)
throws BookieException.UpgradeException {
+
try {
- int zkTimeout = conf.getZkTimeout();
- return ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(zkTimeout)
- .operationRetryPolicy(
- new BoundExponentialBackoffRetryPolicy(zkTimeout, zkTimeout, Integer.MAX_VALUE))
- .build();
- } catch (InterruptedException ie) {
- throw new BookieException.UpgradeException(ie);
- } catch (IOException ioe) {
- throw new BookieException.UpgradeException(ioe);
- } catch (KeeperException ke) {
- throw new BookieException.UpgradeException(ke);
+ Class extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
+ RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
+ return rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+ } catch (Exception e) {
+ throw new BookieException.UpgradeException(e);
}
}
@@ -177,7 +169,7 @@ public static void upgrade(ServerConfiguration conf)
throws BookieException.UpgradeException, InterruptedException {
LOG.info("Upgrading...");
- ZooKeeper zk = newZookeeper(conf);
+ RegistrationManager rm = newRegistrationManager(conf);
try {
Map deferredMoves = new HashMap();
Cookie.Builder cookieBuilder = Cookie.generateCookie(conf);
@@ -229,15 +221,15 @@ public boolean accept(File dir, String name) {
}
try {
- c.writeToZooKeeper(zk, conf, Version.NEW);
- } catch (KeeperException ke) {
- LOG.error("Error writing cookie to zookeeper");
+ c.writeToRegistrationManager(rm, conf, Version.NEW);
+ } catch (BookieException ke) {
+ LOG.error("Error writing cookie to registration manager");
throw new BookieException.UpgradeException(ke);
}
} catch (IOException ioe) {
throw new BookieException.UpgradeException(ioe);
} finally {
- zk.close();
+ rm.close();
}
LOG.info("Done");
}
@@ -283,7 +275,7 @@ public static void finalizeUpgrade(ServerConfiguration conf)
public static void rollback(ServerConfiguration conf)
throws BookieException.UpgradeException, InterruptedException {
LOG.info("Rolling back upgrade...");
- ZooKeeper zk = newZookeeper(conf);
+ RegistrationManager rm = newRegistrationManager(conf);
try {
for (File d : getAllDirectories(conf)) {
LOG.info("Rolling back {}", d);
@@ -305,17 +297,14 @@ public static void rollback(ServerConfiguration conf)
}
}
try {
- Versioned cookie = Cookie.readFromZooKeeper(zk, conf);
- cookie.getValue().deleteFromZooKeeper(zk, conf, cookie.getVersion());
- } catch (KeeperException ke) {
- LOG.error("Error deleting cookie from ZooKeeper");
+ Versioned cookie = Cookie.readFromRegistrationManager(rm, conf);
+ cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+ } catch (BookieException ke) {
+ LOG.error("Error deleting cookie from Registration Manager");
throw new BookieException.UpgradeException(ke);
- } catch (IOException ioe) {
- LOG.error("I/O Error deleting cookie from ZooKeeper");
- throw new BookieException.UpgradeException(ioe);
}
} finally {
- zk.close();
+ rm.close();
}
LOG.info("Done");
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index fe18d5ddf4b..6bb8e5db262 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -29,6 +29,7 @@
import java.util.Set;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.DistributionSchedule;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -88,14 +89,18 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
}
@Override
- public List reorderReadSequence(ArrayList ensemble, List writeSet,
- Map bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return null;
}
@Override
- public List reorderReadLACSequence(ArrayList ensemble, List writeSet,
- Map bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return null;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 75361209af6..3970a1d6c99 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -43,13 +43,13 @@ public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException, BookieException {
super(conf, statsLogger);
if (conf.isReadOnlyModeEnabled()) {
- readOnly.set(true);
+ forceReadOnly.set(true);
} else {
String err = "Try to init ReadOnly Bookie, while ReadOnly mode is not enabled";
LOG.error(err);
throw new IOException(err);
}
- LOG.info("Running bookie in readonly mode.");
+ LOG.info("Running bookie in force readonly mode.");
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
index 413c4874546..ca001a462f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SyncThread.java
@@ -24,7 +24,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -61,6 +63,10 @@ class SyncThread {
final LedgerDirsListener dirsListener;
final CheckpointSource checkpointSource;
+ private final Object suspensionLock = new Object();
+ private boolean suspended = false;
+ private boolean disableCheckpoint = false;
+
public SyncThread(ServerConfiguration conf,
LedgerDirsListener dirsListener,
LedgerStorage ledgerStorage,
@@ -79,25 +85,41 @@ public SyncThread(ServerConfiguration conf,
void start() {
executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- try {
- synchronized (suspensionLock) {
- while (suspended) {
- try {
- suspensionLock.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
+ public void run() {
+ try {
+ synchronized (suspensionLock) {
+ while (suspended) {
+ try {
+ suspensionLock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
}
}
+ }
+ if (!disableCheckpoint) {
checkpoint(checkpointSource.newCheckpoint());
- } catch (Throwable t) {
- LOG.error("Exception in SyncThread", t);
- dirsListener.fatalError();
}
+ } catch (Throwable t) {
+ LOG.error("Exception in SyncThread", t);
+ dirsListener.fatalError();
+ }
+ }
+ }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+ }
+
+ public Future requestFlush() {
+ return executor.submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ flush();
+ } catch (Throwable t) {
+ LOG.error("Exception flushing ledgers ", t);
}
- }, flushInterval, flushInterval, TimeUnit.MILLISECONDS);
+ return null;
+ }
+ });
}
private void flush() {
@@ -113,6 +135,11 @@ private void flush() {
return;
}
+ if (disableCheckpoint) {
+ return;
+ }
+
+ LOG.info("Flush ledger storage at checkpoint {}.", checkpoint);
try {
checkpointSource.checkpointComplete(checkpoint, false);
} catch (IOException e) {
@@ -142,9 +169,6 @@ public void checkpoint(Checkpoint checkpoint) {
}
}
- private Object suspensionLock = new Object();
- private boolean suspended = false;
-
/**
* Suspend sync thread. (for testing)
*/
@@ -166,18 +190,15 @@ public void resumeSync() {
}
}
+ @VisibleForTesting
+ public void disableCheckpoint() {
+ disableCheckpoint = true;
+ }
+
// shutdown sync thread
void shutdown() throws InterruptedException {
LOG.info("Shutting down SyncThread");
- executor.submit(new Runnable() {
- public void run() {
- try {
- flush();
- } catch (Throwable t) {
- LOG.error("Exception flushing ledgers at shutdown", t);
- }
- }
- });
+ requestFlush();
executor.shutdown();
long start = MathUtils.now();
while (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 8716c3a2ffb..8e148129e33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -40,8 +40,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.BKException.ZKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateAdvCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCreateCallback;
@@ -51,7 +52,10 @@
import org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
+import org.apache.bookkeeper.discover.ZKRegistrationClient;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
@@ -64,13 +68,11 @@
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.SystemUtils;
import org.apache.zookeeper.KeeperException;
@@ -94,7 +96,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
- final ZooKeeper zk;
+ final RegistrationClient regClient;
final EventLoopGroup eventLoopGroup;
// The stats logger for this client.
@@ -115,9 +117,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
- // whether the zk handle is one we created, or is owned by whoever
- // instantiated us
- boolean ownZKHandle = false;
final BookieClient bookieClient;
final BookieWatcher bookieWatcher;
@@ -287,7 +286,7 @@ public Builder featureProvider(FeatureProvider featureProvider) {
return this;
}
- public BookKeeper build() throws IOException, InterruptedException, KeeperException {
+ public BookKeeper build() throws IOException, InterruptedException, BKException {
Preconditions.checkNotNull(statsLogger, "No stats logger provided");
return new BookKeeper(conf, zk, eventLoopGroup, statsLogger, dnsResolver, requestTimer, featureProvider);
}
@@ -305,13 +304,13 @@ public static Builder forConfig(final ClientConfiguration conf) {
* A list of one of more servers on which zookeeper is running. The
* client assumes that the running bookies have been registered with
* zookeeper under the path
- * {@link BookieWatcher#bookieRegistrationPath}
+ * {@link AbstractConfiguration#getZkAvailableBookiesPath()}
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
public BookKeeper(String servers) throws IOException, InterruptedException,
- KeeperException {
+ BKException {
this(new ClientConfiguration().setZkServers(servers));
}
@@ -327,13 +326,17 @@ public BookKeeper(String servers) throws IOException, InterruptedException,
* @throws KeeperException
*/
public BookKeeper(final ClientConfiguration conf)
- throws IOException, InterruptedException, KeeperException {
+ throws IOException, InterruptedException, BKException {
this(conf, null, null, NullStatsLogger.INSTANCE,
null, null, null);
}
- private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException {
+ private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException, IOException {
Preconditions.checkNotNull(zk, "No zookeeper instance provided");
+ if (!zk.getState().isConnected()) {
+ LOG.error("Unconnected zookeeper handle passed to bookkeeper");
+ throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
+ }
return zk;
}
@@ -358,8 +361,7 @@ private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGro
* @throws KeeperException
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
- throws IOException, InterruptedException, KeeperException {
-
+ throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), null, NullStatsLogger.INSTANCE, null, null, null);
}
@@ -381,7 +383,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
* @throws KeeperException if the passed zk handle is not connected
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup)
- throws IOException, InterruptedException, KeeperException {
+ throws IOException, InterruptedException, BKException {
this(conf, validateZooKeeper(zk), validateEventLoopGroup(eventLoopGroup), NullStatsLogger.INSTANCE,
null, null, null);
}
@@ -396,28 +398,49 @@ private BookKeeper(ClientConfiguration conf,
DNSToSwitchMapping dnsResolver,
HashedWheelTimer requestTimer,
FeatureProvider featureProvider)
- throws IOException, InterruptedException, KeeperException {
+ throws IOException, InterruptedException, BKException {
this.conf = conf;
this.delayEnsembleChange = conf.getDelayEnsembleChange();
this.reorderReadSequence = conf.isReorderReadSequenceEnabled();
- // initialize zookeeper client
- if (zkc == null) {
- this.zk = ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(conf.getZkTimeout())
- .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
- conf.getZkTimeout(), 0))
- .statsLogger(statsLogger)
- .build();
- this.ownZKHandle = true;
+ // initialize resources
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
+ "BookKeeperClientScheduler-%d");
+ this.scheduler = Executors
+ .newSingleThreadScheduledExecutor(tfb.build());
+ this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
+ .name("BookKeeperClientWorker")
+ .numThreads(conf.getNumWorkerThreads())
+ .statsLogger(statsLogger)
+ .traceTaskExecution(conf.getEnableTaskExecutionStats())
+ .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
+ .build();
+
+ // initialize stats logger
+ this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
+ initOpLoggers(this.statsLogger);
+
+ // initialize feature provider
+ if (null == featureProvider) {
+ this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
} else {
- if (!zkc.getState().isConnected()) {
- LOG.error("Unconnected zookeeper handle passed to bookkeeper");
- throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
- }
- this.zk = zkc;
- this.ownZKHandle = false;
+ this.featureProvider = featureProvider;
+ }
+ this.disableEnsembleChangeFeature =
+ this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
+
+ // initialize registration client
+ try {
+ Class extends RegistrationClient> regClientCls = conf.getRegistrationClientClass();
+ this.regClient = ReflectionUtils.newInstance(regClientCls);
+ this.regClient.initialize(
+ conf,
+ scheduler,
+ statsLogger,
+ java.util.Optional.ofNullable(zkc));
+ } catch (ConfigurationException ce) {
+ LOG.error("Failed to initialize registration client", ce);
+ throw new IOException("Failed to initialize registration client", ce);
}
// initialize event loop group
@@ -440,25 +463,6 @@ private BookKeeper(ClientConfiguration conf,
this.ownTimer = false;
}
- if (null == featureProvider) {
- this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
- } else {
- this.featureProvider = featureProvider;
- }
-
- // get features
- this.disableEnsembleChangeFeature = this.featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName());
-
- // initialize scheduler
- ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
- "BookKeeperClientScheduler-%d");
- this.scheduler = Executors
- .newSingleThreadScheduledExecutor(tfb.build());
-
- // initialize stats logger
- this.statsLogger = statsLogger.scope(BookKeeperClientStats.CLIENT_SCOPE);
- initOpLoggers(this.statsLogger);
-
// initialize the ensemble placement
this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
@@ -482,18 +486,11 @@ private BookKeeper(ClientConfiguration conf,
} else {
this.readLACSpeculativeRequestPolicy = Optional.absent();
}
- // initialize main worker pool
- this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
- .name("BookKeeperClientWorker")
- .numThreads(conf.getNumWorkerThreads())
- .statsLogger(statsLogger)
- .traceTaskExecution(conf.getEnableTaskExecutionStats())
- .traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
- .build();
+
// initialize bookie client
this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, statsLogger);
- this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this);
+ this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient);
if (conf.getDiskWeightBasedPlacementEnabled()) {
LOG.info("Weighted ledger placement enabled");
ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
@@ -510,7 +507,12 @@ private BookKeeper(ClientConfiguration conf,
}
// initialize ledger manager
- this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+ try {
+ this.ledgerManagerFactory =
+ LedgerManagerFactory.newLedgerManagerFactory(conf, ((ZKRegistrationClient) regClient).getZk());
+ } catch (KeeperException ke) {
+ throw new ZKException();
+ }
this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
this.explicitLacInterval = conf.getExplictLacInterval();
@@ -638,7 +640,7 @@ public static DigestType fromApiDigestType(org.apache.bookkeeper.client.api.Dige
}
ZooKeeper getZkHandle() {
- return zk;
+ return ((ZKRegistrationClient) regClient).getZk();
}
protected ClientConfiguration getConf() {
@@ -1371,9 +1373,7 @@ public void close() throws BKException, InterruptedException {
if (ownEventLoopGroup) {
eventLoopGroup.shutdownGracefully();
}
- if (ownZKHandle) {
- zk.close();
- }
+ this.regClient.close();
}
private final void initOpLoggers(StatsLogger stats) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index a0d2fe2f309..aae36b872bd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -22,10 +22,13 @@
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
@@ -34,11 +37,12 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -50,8 +54,9 @@
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
@@ -64,16 +69,15 @@
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
@@ -84,13 +88,9 @@
* Admin client for BookKeeper clusters
*/
public class BookKeeperAdmin implements AutoCloseable {
- private final static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
- // ZK client instance
- private ZooKeeper zk;
- private final boolean ownsZK;
- // ZK ledgers related String constants
- private final String bookiesPath;
+ private final static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
+ private final static Logger VERBOSE = LoggerFactory.getLogger("verbose");
// BookKeeper client instance
private BookKeeper bkc;
@@ -132,7 +132,7 @@ public class BookKeeperAdmin implements AutoCloseable {
* Throws this exception if there is an error instantiating the
* BookKeeper client.
*/
- public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, KeeperException {
+ public BookKeeperAdmin(String zkServers) throws IOException, InterruptedException, BKException {
this(new ClientConfiguration().setZkServers(zkServers));
}
@@ -154,18 +154,9 @@ public BookKeeperAdmin(String zkServers) throws IOException, InterruptedExceptio
* Throws this exception if there is an error instantiating the
* BookKeeper client.
*/
- public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException {
- // Create the ZooKeeper client instance
- zk = ZooKeeperClient.newBuilder()
- .connectString(conf.getZkServers())
- .sessionTimeoutMs(conf.getZkTimeout())
- .build();
- ownsZK = true;
-
- // Create the bookie path
- bookiesPath = conf.getZkAvailableBookiesPath();
+ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, BKException {
// Create the BookKeeper client instance
- bkc = new BookKeeper(conf, zk);
+ bkc = new BookKeeper(conf);
ownsBK = true;
this.lfr = new LedgerFragmentReplicator(bkc, NullStatsLogger.INSTANCE);
this.mFactory = bkc.ledgerManagerFactory;
@@ -183,9 +174,6 @@ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, Interrupted
public BookKeeperAdmin(final BookKeeper bkc, StatsLogger statsLogger) {
this.bkc = bkc;
ownsBK = false;
- this.zk = bkc.zk;
- ownsZK = false;
- this.bookiesPath = bkc.getConf().getZkAvailableBookiesPath();
this.lfr = new LedgerFragmentReplicator(bkc, statsLogger);
this.mFactory = bkc.ledgerManagerFactory;
}
@@ -206,18 +194,6 @@ public void close() throws InterruptedException, BKException {
if (ownsBK) {
bkc.close();
}
- if (ownsZK) {
- zk.close();
- }
- }
-
- /**
- * Get {@link org.apache.zookeeper.ZooKeeper} used by bookkeeper admin client.
- *
- * @return zookeeper client used by bookkeeper admin client
- */
- public ZooKeeper getZooKeeper() {
- return zk;
}
/**
@@ -236,17 +212,8 @@ public Collection getAvailableBookies()
* @return a collection of bookie addresses
* @throws BKException if there are issues trying to read the list.
*/
- public Collection getReadOnlyBookiesSync() throws BKException {
- return bkc.bookieWatcher.getReadOnlyBookiesSync();
- }
-
- /**
- * Get a list of readonly bookies asynchronously (may be slightly out of date).
- *
- * @return a collection of bookie addresses
- */
- public Collection getReadOnlyBookiesAsync() {
- return bkc.bookieWatcher.getReadOnlyBookiesAsync();
+ public Collection getReadOnlyBookies() throws BKException {
+ return bkc.bookieWatcher.getReadOnlyBookies();
}
/**
@@ -256,9 +223,9 @@ public Collection getReadOnlyBookiesAsync() {
*
* @param listener the listener to notify
*/
- public void notifyBookiesChanged(final BookiesListener listener)
+ public void watchWritableBookiesChanged(final RegistrationListener listener)
throws BKException {
- bkc.bookieWatcher.notifyBookiesChanged(listener);
+ bkc.regClient.watchWritableBookies(listener);
}
/**
@@ -268,9 +235,9 @@ public void notifyBookiesChanged(final BookiesListener listener)
*
* @param listener the listener to notify
*/
- public void notifyReadOnlyBookiesChanged(final BookiesListener listener)
+ public void watchReadOnlyBookiesChanged(final RegistrationListener listener)
throws BKException {
- bkc.bookieWatcher.notifyReadOnlyBookiesChanged(listener);
+ bkc.regClient.watchReadOnlyBookies(listener);
}
/**
@@ -495,9 +462,20 @@ public SyncObject() {
*/
public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest)
throws InterruptedException, BKException {
+ Set bookiesSrc = Sets.newHashSet(bookieSrc);
+ recoverBookieData(bookiesSrc);
+ }
+
+ public void recoverBookieData(final Set bookiesSrc)
+ throws InterruptedException, BKException {
+ recoverBookieData(bookiesSrc, false, false);
+ }
+
+ public void recoverBookieData(final Set bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
+ throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
- asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
+ asyncRecoverBookieData(bookiesSrc, dryrun, skipOpenLedgers, new RecoverCallback() {
@Override
public void recoverComplete(int rc, Object ctx) {
LOG.info("Recover bookie operation completed with rc: " + rc);
@@ -520,7 +498,7 @@ public void recoverComplete(int rc, Object ctx) {
throw BKException.create(sync.rc);
}
}
-
+
/**
* Async method to rebuild and recover the ledger fragments data that was
* stored on the source bookie. That bookie could have failed completely and
@@ -535,89 +513,26 @@ public void recoverComplete(int rc, Object ctx) {
* @param bookieSrc
* Source bookie that had a failure. We want to replicate the
* ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
* @param cb
* RecoverCallback to invoke once all of the data on the dead
* bookie has been recovered and replicated.
* @param context
* Context for the RecoverCallback to call.
*/
- public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
+ public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc,
final RecoverCallback cb, final Object context) {
- // Sync ZK to make sure we're reading the latest bookie data.
- zk.sync(bookiesPath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path));
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- getAvailableBookies(bookieSrc, bookieDest, cb, context);
- }
+ Set bookiesSrc = Sets.newHashSet(bookieSrc);
+ asyncRecoverBookieData(bookiesSrc, cb, context);
+ }
- }, null);
+ public void asyncRecoverBookieData(final Set bookieSrc,
+ final RecoverCallback cb, final Object context) {
+ asyncRecoverBookieData(bookieSrc, false, false, cb, context);
}
- /**
- * This method asynchronously gets the set of available Bookies that the
- * dead input bookie's data will be copied over into. If the user passed in
- * a specific destination bookie, then just use that one. Otherwise, we'll
- * randomly pick one of the other available bookies to use for each ledger
- * fragment we are replicating.
- *
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
- * ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
- * @param cb
- * RecoverCallback to invoke once all of the data on the dead
- * bookie has been recovered and replicated.
- * @param context
- * Context for the RecoverCallback to call.
- */
- private void getAvailableBookies(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
- final RecoverCallback cb, final Object context) {
- final List availableBookies = new LinkedList();
- if (bookieDest != null) {
- availableBookies.add(bookieDest);
- // Now poll ZK to get the active ledgers
- getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
- } else {
- zk.getChildren(bookiesPath, null, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List children) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
- .get(rc), path));
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- for (String bookieNode : children) {
- if (BookKeeperConstants.READONLY
- .equals(bookieNode)) {
- // exclude the readonly node from available bookies.
- continue;
- }
- BookieSocketAddress addr;
- try {
- addr = new BookieSocketAddress(bookieNode);
- } catch (UnknownHostException nhe) {
- LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- availableBookies.add(addr);
- }
- // Now poll ZK to get the active ledgers
- getActiveLedgers(bookieSrc, null, cb, context, availableBookies);
- }
- }, null);
- }
+ public void asyncRecoverBookieData(final Set bookieSrc, boolean dryrun,
+ final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
+ getActiveLedgers(bookieSrc, dryrun, skipOpenLedgers, cb, context);
}
/**
@@ -626,25 +541,21 @@ public void processResult(int rc, String path, Object ctx, List children
* determine if any of the ledger fragments for it were stored at the dead
* input bookie.
*
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
+ * @param bookiesSrc
+ * Source bookies that had a failure. We want to replicate the
* ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
+ * @param dryrun
+ * dryrun the recover procedure.
+ * @param skipOpenLedgers
+ * Skip recovering open ledgers.
* @param cb
* RecoverCallback to invoke once all of the data on the dead
* bookie has been recovered and replicated.
* @param context
* Context for the RecoverCallback to call.
- * @param availableBookies
- * List of Bookie Servers that are available to use for
- * replicating data on the failed bookie. This could contain a
- * single bookie server if the user explicitly chose a bookie
- * server to replicate data to.
*/
- private void getActiveLedgers(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
- final RecoverCallback cb, final Object context, final List availableBookies) {
+ private void getActiveLedgers(final Set bookiesSrc, final boolean dryrun,
+ final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
// Wrapper class around the RecoverCallback so it can be used
// as the final VoidCallback to process ledgers
class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
@@ -663,7 +574,7 @@ public void processResult(int rc, String path, Object ctx) {
Processor ledgerProcessor = new Processor() {
@Override
public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
- recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
+ recoverLedger(bookiesSrc, ledgerId, dryrun, skipOpenLedgers, iterCallback);
}
};
bkc.getLedgerManager().asyncProcessLedgers(
@@ -671,42 +582,25 @@ ledgerProcessor, new RecoverCallbackWrapper(cb),
context, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
}
- /**
- * Get a new random bookie, but ensure that it isn't one that is already
- * in the ensemble for the ledger.
- */
- private BookieSocketAddress getNewBookie(final List bookiesAlreadyInEnsemble,
- final List availableBookies)
- throws BKException.BKNotEnoughBookiesException {
- ArrayList candidates = new ArrayList();
- candidates.addAll(availableBookies);
- candidates.removeAll(bookiesAlreadyInEnsemble);
- if (candidates.size() == 0) {
- throw new BKException.BKNotEnoughBookiesException();
- }
- return candidates.get(rand.nextInt(candidates.size()));
- }
-
/**
* This method asynchronously recovers a given ledger if any of the ledger
* entries were stored on the failed bookie.
*
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
+ * @param bookiesSrc
+ * Source bookies that had a failure. We want to replicate the
* ledger fragments that were stored there.
* @param lId
* Ledger id we want to recover.
- * @param ledgerIterCb
+ * @param dryrun
+ * printing the recovery plan without actually recovering bookies
+ * @param skipOpenLedgers
+ * Skip recovering open ledgers.
+ * @param finalLedgerIterCb
* IterationCallback to invoke once we've recovered the current
* ledger.
- * @param availableBookies
- * List of Bookie Servers that are available to use for
- * replicating data on the failed bookie. This could contain a
- * single bookie server if the user explicitly chose a bookie
- * server to replicate data to.
*/
- private void recoverLedger(final BookieSocketAddress bookieSrc, final long lId,
- final AsyncCallback.VoidCallback ledgerIterCb, final List availableBookies) {
+ private void recoverLedger(final Set bookiesSrc, final long lId, final boolean dryrun,
+ final boolean skipOpenLedgers, final AsyncCallback.VoidCallback finalLedgerIterCb) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering ledger : {}", lId);
}
@@ -714,43 +608,73 @@ private void recoverLedger(final BookieSocketAddress bookieSrc, final long lId,
asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
@Override
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
- if (rc != Code.OK.intValue()) {
+ if (rc != BKException.Code.OK) {
LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
- ledgerIterCb.processResult(rc, null, null);
+ finalLedgerIterCb.processResult(rc, null, null);
return;
}
LedgerMetadata lm = lh.getLedgerMetadata();
- if (!lm.isClosed() &&
- lm.getEnsembles().size() > 0) {
- Long lastKey = lm.getEnsembles().lastKey();
- ArrayList lastEnsemble = lm.getEnsembles().get(lastKey);
- // the original write has not removed faulty bookie from
- // current ledger ensemble. to avoid data loss issue in
- // the case of concurrent updates to the ensemble composition,
- // the recovery tool should first close the ledger
- if (lastEnsemble.contains(bookieSrc)) {
- // close opened non recovery ledger handle
+ if (skipOpenLedgers && !lm.isClosed() && !lm.isInRecovery()) {
+ LOG.info("Skip recovering open ledger {}.", lId);
+ try {
+ lh.close();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BKException bke) {
+ LOG.warn("Error on closing ledger handle for {}.", lId);
+ }
+ finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
+
+ final boolean fenceRequired = !lm.isClosed() && containBookiesInLastEnsemble(lm, bookiesSrc);
+ // the original write has not removed faulty bookie from
+ // current ledger ensemble. to avoid data loss issue in
+ // the case of concurrent updates to the ensemble composition,
+ // the recovery tool should first close the ledger
+ if (!dryrun && fenceRequired) {
+ // close opened non recovery ledger handle
+ try {
+ lh.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ }
+ asyncOpenLedger(lId, new OpenCallback() {
+ @Override
+ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+ if (newrc != BKException.Code.OK) {
+ LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+ finalLedgerIterCb.processResult(newrc, null, null);
+ return;
+ }
+ bkc.mainWorkerPool.submit(() -> {
+ // do recovery
+ recoverLedger(bookiesSrc, lId, dryrun, skipOpenLedgers, finalLedgerIterCb);
+ });
+ }
+ }, null);
+ return;
+ }
+
+ final AsyncCallback.VoidCallback ledgerIterCb = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (BKException.Code.OK != rc) {
+ LOG.error("Failed to recover ledger {} : {}", lId, rc);
+ } else {
+ LOG.info("Recovered ledger {}.", lId);
+ }
try {
lh.close();
- } catch (Exception ie) {
- LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BKException bke) {
+ LOG.warn("Error on cloing ledger handle for {}.", lId);
}
- asyncOpenLedger(lId, new OpenCallback() {
- @Override
- public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
- if (newrc != Code.OK.intValue()) {
- LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
- ledgerIterCb.processResult(newrc, null, null);
- return;
- }
- // do recovery
- recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
- }
- }, null);
- return;
+ finalLedgerIterCb.processResult(rc, path, ctx);
}
- }
+ };
/*
* This List stores the ledger fragments to recover indexed by
@@ -773,7 +697,7 @@ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
if (curEntryId != null)
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
curEntryId = entry.getKey();
- if (entry.getValue().contains(bookieSrc)) {
+ if (containBookies(entry.getValue(), bookiesSrc)) {
/*
* Current ledger fragment has entries stored on the
* dead bookie so we'll need to recover them.
@@ -797,6 +721,10 @@ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
return;
}
+ if (dryrun) {
+ VERBOSE.info("Recovered ledger {} : {}", lId, (fenceRequired ? "[fence required]" : ""));
+ }
+
/*
* Multicallback for ledger. Once all fragments for the ledger have been recovered
* trigger the ledgerIterCb
@@ -810,47 +738,69 @@ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
*/
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
- BookieSocketAddress newBookie = null;
+ ArrayList ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+ // Get bookies to replace
+ Map targetBookieAddresses;
try {
- newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
- availableBookies);
- } catch (BKException.BKNotEnoughBookiesException bke) {
- ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException,
- null, null);
+ targetBookieAddresses = getReplacementBookies(lh, ensemble, bookiesSrc);
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ if (!dryrun) {
+ ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
+ } else {
+ VERBOSE.info(" Fragment [{} - {}] : {}", startEntryId, endEntryId,
+ BKException.getMessage(BKException.Code.NotEnoughBookiesException));
+ }
continue;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Replicating fragment from [" + startEntryId
- + "," + endEntryId + "] of ledger " + lh.getId()
- + " to " + newBookie);
- }
- try {
- LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
- ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
- ArrayList currentEnsemble = lh.getLedgerMetadata().getEnsemble(
- startEntryId);
- int bookieIndex = -1;
- if (null != currentEnsemble) {
- for (int i = 0; i < currentEnsemble.size(); i++) {
- if (currentEnsemble.get(i).equals(bookieSrc)) {
- bookieIndex = i;
- break;
- }
- }
+ if (dryrun) {
+ ArrayList newEnsemble =
+ replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
+ VERBOSE.info(" Fragment [{} - {}] : ", startEntryId, endEntryId);
+ VERBOSE.info(" old ensemble : {}", formatEnsemble(ensemble, bookiesSrc, '*'));
+ VERBOSE.info(" new ensemble : {}", formatEnsemble(newEnsemble, bookiesSrc, '*'));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replicating fragment from [{}, {}] of ledger {} to {}",
+ startEntryId, endEntryId, lh.getId(), targetBookieAddresses);
+ }
+ try {
+ LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
+ ledgerFragmentsMcb, lh, startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
+ LedgerFragment ledgerFragment = new LedgerFragment(lh,
+ startEntryId, endEntryId, targetBookieAddresses.keySet());
+ asyncRecoverLedgerFragment(lh, ledgerFragment, cb, Sets.newHashSet(targetBookieAddresses.values()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
- LedgerFragment ledgerFragment = new LedgerFragment(lh,
- startEntryId, endEntryId, bookieIndex);
- asyncRecoverLedgerFragment(lh, ledgerFragment, cb, newBookie);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
}
}
+ if (dryrun) {
+ ledgerIterCb.processResult(BKException.Code.OK, null, null);
+ }
}
}, null);
}
+ static String formatEnsemble(ArrayList ensemble, Set bookiesSrc, char marker) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int i = 0; i < ensemble.size(); i++) {
+ sb.append(ensemble.get(i));
+ if (bookiesSrc.contains(ensemble.get(i))) {
+ sb.append(marker);
+ } else {
+ sb.append(' ');
+ }
+ if (i != ensemble.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
/**
* This method asynchronously recovers a ledger fragment which is a
* contiguous portion of a ledger that was stored in an ensemble that
@@ -863,16 +813,81 @@ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
* @param ledgerFragmentMcb
* - MultiCallback to invoke once we've recovered the current
* ledger fragment.
- * @param newBookie
- * - New bookie we want to use to recover and replicate the
+ * @param newBookies
+ * - New bookies we want to use to recover and replicate the
* ledger entries that were stored on the failed bookie.
*/
private void asyncRecoverLedgerFragment(final LedgerHandle lh,
final LedgerFragment ledgerFragment,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress newBookie)
- throws InterruptedException {
- lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
+ final Set newBookies) throws InterruptedException {
+ lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
+ }
+
+ private Map getReplacementBookies(
+ LedgerHandle lh,
+ List ensemble,
+ Set bookiesToRereplicate)
+ throws BKException.BKNotEnoughBookiesException {
+ Set bookieIndexesToRereplicate = Sets.newHashSet();
+ for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
+ BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
+ if (bookiesToRereplicate.contains(bookieInEnsemble)) {
+ bookieIndexesToRereplicate.add(bookieIndex);
+ }
+ }
+ return getReplacementBookiesByIndexes(
+ lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
+ }
+
+ private Map getReplacementBookiesByIndexes(
+ LedgerHandle lh,
+ List ensemble,
+ Set bookieIndexesToRereplicate,
+ Optional> excludedBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ // target bookies to replicate
+ Map targetBookieAddresses =
+ Maps.newHashMapWithExpectedSize(bookieIndexesToRereplicate.size());
+ // bookies to exclude for ensemble allocation
+ Set bookiesToExclude = Sets.newHashSet();
+ if (excludedBookies.isPresent()) {
+ bookiesToExclude.addAll(excludedBookies.get());
+ }
+
+ // excluding bookies that need to be replicated
+ for (Integer bookieIndex : bookieIndexesToRereplicate) {
+ BookieSocketAddress bookie = ensemble.get(bookieIndex);
+ bookiesToExclude.add(bookie);
+ }
+
+ // allocate bookies
+ for (Integer bookieIndex : bookieIndexesToRereplicate) {
+ BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
+ BookieSocketAddress newBookie =
+ bkc.getPlacementPolicy().replaceBookie(
+ lh.getLedgerMetadata().getEnsembleSize(),
+ lh.getLedgerMetadata().getWriteQuorumSize(),
+ lh.getLedgerMetadata().getAckQuorumSize(),
+ lh.getLedgerMetadata().getCustomMetadata(),
+ ensemble,
+ oldBookie,
+ bookiesToExclude);
+ targetBookieAddresses.put(bookieIndex, newBookie);
+ bookiesToExclude.add(newBookie);
+ }
+
+ return targetBookieAddresses;
+ }
+
+ private ArrayList replaceBookiesInEnsemble(
+ List ensemble,
+ Map replacedBookies) {
+ ArrayList newEnsemble = Lists.newArrayList(ensemble);
+ for (Map.Entry entry : replacedBookies.entrySet()) {
+ newEnsemble.set(entry.getKey(), entry.getValue());
+ }
+ return newEnsemble;
}
/**
@@ -882,20 +897,32 @@ private void asyncRecoverLedgerFragment(final LedgerHandle lh,
* - ledgerHandle
* @param ledgerFragment
* - LedgerFragment to replicate
- * @param targetBookieAddress
- * - target Bookie, to where entries should be replicated.
*/
public void replicateLedgerFragment(LedgerHandle lh,
+ final LedgerFragment ledgerFragment)
+ throws InterruptedException, BKException {
+ Optional> excludedBookies = Optional.empty();
+ Map targetBookieAddresses =
+ getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
+ ledgerFragment.getBookiesIndexes(), excludedBookies);
+ replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
+ }
+
+ private void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
- final BookieSocketAddress targetBookieAddress)
+ final Map targetBookieAddresses)
throws InterruptedException, BKException {
CompletableFuture result = new CompletableFuture<>();
ResultCallBack resultCallBack = new ResultCallBack(result);
- SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
- lh, ledgerFragment.getFirstEntryId(), ledgerFragment
- .getAddress(), targetBookieAddress);
+ SingleFragmentCallback cb = new SingleFragmentCallback(
+ resultCallBack,
+ lh,
+ ledgerFragment.getFirstEntryId(),
+ getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
- asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
+ Set targetBookieSet = Sets.newHashSet();
+ targetBookieSet.addAll(targetBookieAddresses.values());
+ asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet);
try {
SyncCallbackUtils.waitForResult(result);
@@ -903,6 +930,52 @@ public void replicateLedgerFragment(LedgerHandle lh,
throw BKException.create(bkc.getReturnRc(err.getCode()));
}
}
+
+ private static Map getReplacementBookiesMap(
+ ArrayList ensemble,
+ Map targetBookieAddresses) {
+ Map bookiesMap =
+ new HashMap();
+ for (Map.Entry entry : targetBookieAddresses.entrySet()) {
+ BookieSocketAddress oldBookie = ensemble.get(entry.getKey());
+ BookieSocketAddress newBookie = entry.getValue();
+ bookiesMap.put(oldBookie, newBookie);
+ }
+ return bookiesMap;
+ }
+
+ private static Map getReplacementBookiesMap(
+ LedgerFragment ledgerFragment,
+ Map targetBookieAddresses) {
+ Map bookiesMap =
+ new HashMap();
+ for (Integer bookieIndex : ledgerFragment.getBookiesIndexes()) {
+ BookieSocketAddress oldBookie = ledgerFragment.getAddress(bookieIndex);
+ BookieSocketAddress newBookie = targetBookieAddresses.get(bookieIndex);
+ bookiesMap.put(oldBookie, newBookie);
+ }
+ return bookiesMap;
+ }
+
+ private static boolean containBookiesInLastEnsemble(LedgerMetadata lm,
+ Set bookies) {
+ if (lm.getEnsembles().size() <= 0) {
+ return false;
+ }
+ Long lastKey = lm.getEnsembles().lastKey();
+ ArrayList lastEnsemble = lm.getEnsembles().get(lastKey);
+ return containBookies(lastEnsemble, bookies);
+ }
+
+ private static boolean containBookies(ArrayList ensemble,
+ Set bookies) {
+ for (BookieSocketAddress bookie : ensemble) {
+ if (bookies.contains(bookie)) {
+ return true;
+ }
+ }
+ return false;
+ }
/** This is the class for getting the replication result */
static class ResultCallBack implements AsyncCallback.VoidCallback {
@@ -954,6 +1027,15 @@ public static boolean format(ClientConfiguration conf,
zkAcls, CreateMode.PERSISTENT);
}
+ // create readonly bookies node if not exists
+ if (null == zkc.exists(conf.getZkAvailableBookiesPath() + "/" + READONLY, false)) {
+ zkc.create(
+ conf.getZkAvailableBookiesPath() + "/" + READONLY,
+ new byte[0],
+ zkAcls,
+ CreateMode.PERSISTENT);
+ }
+
// If old data was there then confirm with admin.
if (ledgerRootExists) {
boolean confirm = false;
@@ -1162,7 +1244,8 @@ public void triggerAudit()
throw new UnavailableException("Autorecovery is disabled. So giving up!");
}
- BookieSocketAddress auditorId = AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), zk);
+ BookieSocketAddress auditorId =
+ AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.conf), bkc.getZkHandle());
if (auditorId == null) {
LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
@@ -1196,7 +1279,7 @@ public void triggerAudit()
public void decommissionBookie(BookieSocketAddress bookieAddress)
throws CompatibilityException, UnavailableException, KeeperException, InterruptedException, IOException,
BKAuditException, TimeoutException, BKException {
- if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookiesAsync().contains(bookieAddress)) {
+ if (getAvailableBookies().contains(bookieAddress) || getReadOnlyBookies().contains(bookieAddress)) {
LOG.error("Bookie: {} is not shutdown yet", bookieAddress);
throw BKException.create(BKException.Code.IllegalOpException);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
index 70fe2bd9923..2ea5bef6895 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
@@ -29,14 +29,13 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -234,6 +233,7 @@ public boolean completeUnlessQueued() {
}
public void start() {
+ this.bk.regClient.watchWritableBookies(bookies -> availableBookiesChanged(bookies.getValue()));
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -402,7 +402,7 @@ Map getBookieInfo() throws BKException, Interru
Collection bookies;
bookies = bk.bookieWatcher.getBookies();
- bookies.addAll(bk.bookieWatcher.getReadOnlyBookiesAsync());
+ bookies.addAll(bk.bookieWatcher.getReadOnlyBookies());
totalSent.set(bookies.size());
for (BookieSocketAddress b : bookies) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index b11271f23c9..2bba1d602d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -17,43 +17,26 @@
*/
package org.apache.bookkeeper.client;
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import java.io.IOException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
+import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException.BKInterruptedException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-import java.util.Map;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.data.ACL;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -62,157 +45,73 @@
* replacement
*
*/
-class BookieWatcher implements Watcher, ChildrenCallback {
- static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
-
- public static int ZK_CONNECT_BACKOFF_SEC = 1;
- private static final Set EMPTY_SET = new HashSet();
-
- // Bookie registration path in ZK
- private final String bookieRegistrationPath;
+@Slf4j
+class BookieWatcher {
+
+ private static final Function EXCEPTION_FUNC = cause -> {
+ if (cause instanceof BKException) {
+ log.error("Failed to get bookie list : ", cause);
+ return (BKException) cause;
+ } else if (cause instanceof InterruptedException) {
+ log.error("Interrupted reading bookie list : ", cause);
+ return new BKInterruptedException();
+ } else {
+ return new MetaStoreException();
+ }
+ };
- final BookKeeper bk;
- final ScheduledExecutorService scheduler;
- final EnsemblePlacementPolicy placementPolicy;
+ private final ClientConfiguration conf;
+ private final RegistrationClient registrationClient;
+ private final EnsemblePlacementPolicy placementPolicy;
// Bookies that will not be preferred to be chosen in a new ensemble
final Cache quarantinedBookies;
- SafeRunnable reReadTask = new SafeRunnable() {
- @Override
- public void safeRun() {
- readBookies();
- }
- };
- private ReadOnlyBookieWatcher readOnlyBookieWatcher;
+ private volatile Set writableBookies = Collections.emptySet();
+ private volatile Set readOnlyBookies = Collections.emptySet();
public BookieWatcher(ClientConfiguration conf,
- ScheduledExecutorService scheduler,
EnsemblePlacementPolicy placementPolicy,
- BookKeeper bk) throws KeeperException, InterruptedException {
- this.bk = bk;
- // ZK bookie registration path
- this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
- this.scheduler = scheduler;
+ RegistrationClient registrationClient) {
+ this.conf = conf;
this.placementPolicy = placementPolicy;
- readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
+ this.registrationClient = registrationClient;
this.quarantinedBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieQuarantineTimeSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener() {
@Override
public void onRemoval(RemovalNotification bookie) {
- logger.info("Bookie {} is no longer quarantined", bookie.getKey());
+ log.info("Bookie {} is no longer quarantined", bookie.getKey());
}
}).build();
}
- void notifyBookiesChanged(final BookiesListener listener) throws BKException {
+ public Set getBookies() throws BKException {
try {
- bk.getZkHandle().getChildren(this.bookieRegistrationPath,
- new Watcher() {
- public void process(WatchedEvent event) {
- // listen children changed event from ZooKeeper
- if (event.getType() == EventType.NodeChildrenChanged) {
- listener.availableBookiesChanged();
- }
- }
- });
- } catch (KeeperException ke) {
- logger.error("Error registering watcher with zookeeper", ke);
- throw new BKException.ZKException();
- } catch (InterruptedException ie) {
+ return FutureUtils.result(registrationClient.getWritableBookies(), EXCEPTION_FUNC).getValue();
+ } catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
- logger.error("Interrupted registering watcher with zookeeper", ie);
- throw new BKException.BKInterruptedException();
+ throw ie;
}
}
- void notifyReadOnlyBookiesChanged(final BookiesListener listener) throws BKException {
- readOnlyBookieWatcher.notifyBookiesChanged(listener);
- }
-
- public Collection getReadOnlyBookiesSync() throws BKException {
- try {
- String znode = this.bookieRegistrationPath + "/" + BookKeeperConstants.READONLY;
- List children = bk.getZkHandle().getChildren(znode, false);
- return convertToBookieAddresses(children);
- } catch (KeeperException ke) {
- logger.error("Failed to get read only bookie list : ", ke);
- throw new BKException.ZKException();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- logger.error("Interrupted reading read only bookie list", ie);
- throw new BKException.BKInterruptedException();
- }
- }
-
- public Collection getBookies() throws BKException {
+ public Set getReadOnlyBookies() throws BKException {
try {
- List children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
- children.remove(BookKeeperConstants.READONLY);
- return convertToBookieAddresses(children);
- } catch (KeeperException ke) {
- logger.error("Failed to get bookie list : ", ke);
- throw new BKException.ZKException();
- } catch (InterruptedException ie) {
+ return FutureUtils.result(registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
+ } catch (BKInterruptedException ie) {
Thread.currentThread().interrupt();
- logger.error("Interrupted reading bookie list", ie);
- throw new BKException.BKInterruptedException();
+ throw ie;
}
}
- Collection getReadOnlyBookiesAsync() {
- return new HashSet(readOnlyBookieWatcher.getReadOnlyBookies());
- }
-
- public void readBookies() {
- readBookies(this);
- }
-
- public void readBookies(ChildrenCallback callback) {
- bk.getZkHandle().getChildren(this.bookieRegistrationPath, this, callback, null);
- }
-
- @Override
- public void process(WatchedEvent event) {
- readBookies();
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, List children) {
-
- if (rc != KeeperException.Code.OK.intValue()) {
- //logger.error("Error while reading bookies", KeeperException.create(Code.get(rc), path));
- // try the read after a second again
- try {
- scheduler.schedule(reReadTask, ZK_CONNECT_BACKOFF_SEC, TimeUnit.SECONDS);
- } catch (RejectedExecutionException ree) {
- logger.warn("Failed to schedule reading bookies task : ", ree);
- }
- return;
- }
-
- // Just exclude the 'readonly' znode to exclude r-o bookies from
- // available nodes list.
- children.remove(BookKeeperConstants.READONLY);
-
- HashSet newBookieAddrs = convertToBookieAddresses(children);
-
+ // this callback is already not executed in zookeeper thread
+ private synchronized void processWritableBookiesChanged(Set newBookieAddrs) {
// Update watcher outside ZK callback thread, to avoid deadlock in case some other
// component is trying to do a blocking ZK operation
- bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> {
- synchronized (BookieWatcher.this) {
- Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
- placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
- if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
- // start collecting bookieInfo for the newly joined bookies, if any
- bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
- }
- }
- }));
-
+ this.writableBookies = newBookieAddrs;
+ placementPolicy.onClusterChanged(newBookieAddrs, readOnlyBookies);
// we don't need to close clients here, because:
// a. the dead bookies will be removed from topology, which will not be used in new ensemble.
// b. the read sequence will be reordered based on znode availability, so most of the reads
@@ -229,45 +128,27 @@ public void processResult(int rc, String path, Object ctx, List children
// }
}
- private static HashSet convertToBookieAddresses(List children) {
- // Read the bookie addresses into a set for efficient lookup
- HashSet newBookieAddrs = new HashSet();
- for (String bookieAddrString : children) {
- BookieSocketAddress bookieAddr;
- try {
- bookieAddr = new BookieSocketAddress(bookieAddrString);
- } catch (IOException e) {
- logger.error("Could not parse bookie address: " + bookieAddrString + ", ignoring this bookie");
- continue;
- }
- newBookieAddrs.add(bookieAddr);
- }
- return newBookieAddrs;
+ private synchronized void processReadOnlyBookiesChanged(Set readOnlyBookies) {
+ this.readOnlyBookies = readOnlyBookies;
+ placementPolicy.onClusterChanged(writableBookies, readOnlyBookies);
}
/**
* Blocks until bookies are read from zookeeper, used in the {@link BookKeeper} constructor.
- * @throws InterruptedException
- * @throws KeeperException
+ *
+ * @throws BKException when failed to read bookies
*/
- public void readBookiesBlocking() throws InterruptedException, KeeperException {
- // Read readonly bookies first
- readOnlyBookieWatcher.readROBookiesBlocking();
-
- final LinkedBlockingQueue queue = new LinkedBlockingQueue();
- readBookies(new ChildrenCallback() {
- public void processResult(int rc, String path, Object ctx, List children) {
- bk.getMainWorkerPool().submitOrdered(path, safeRun(() -> {
- BookieWatcher.this.processResult(rc, path, ctx, children);
- queue.add(rc);
- }));
- }
- });
- int rc = queue.take();
+ public void readBookiesBlocking() throws BKException {
+ this.registrationClient.watchReadOnlyBookies(bookies -> processReadOnlyBookiesChanged(bookies.getValue()));
+ this.registrationClient.watchWritableBookies(bookies -> processWritableBookiesChanged(bookies.getValue()));
- if (rc != KeeperException.Code.OK.intValue()) {
- throw KeeperException.create(Code.get(rc));
+ try {
+ readOnlyBookies = getReadOnlyBookies();
+ } catch (Exception e) {
+ log.error("Failed getReadOnlyBookies: ", e);
}
+
+ writableBookies = getBookies();
}
/**
@@ -289,10 +170,11 @@ public ArrayList newEnsemble(int ensembleSize, int writeQuo
writeQuorumSize, ackQuorumSize, customMetadata, new HashSet(
quarantinedBookies.asMap().keySet()));
} catch (BKNotEnoughBookiesException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Not enough healthy bookies available, using quarantined bookies");
+ if (log.isDebugEnabled()) {
+ log.debug("Not enough healthy bookies available, using quarantined bookies");
}
- return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, EMPTY_SET);
+ return placementPolicy.newEnsemble(
+ ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata, Collections.emptySet());
}
}
@@ -318,8 +200,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
existingAndQuarantinedBookies, addr, excludeBookies);
} catch (BKNotEnoughBookiesException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Not enough healthy bookies available, using quarantined bookies");
+ if (log.isDebugEnabled()) {
+ log.debug("Not enough healthy bookies available, using quarantined bookies");
}
return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
new HashSet(existingBookies), addr, excludeBookies);
@@ -334,112 +216,8 @@ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize,
public void quarantineBookie(BookieSocketAddress bookie) {
if (quarantinedBookies.getIfPresent(bookie) == null) {
quarantinedBookies.put(bookie, Boolean.TRUE);
- logger.warn("Bookie {} has been quarantined because of read/write errors.", bookie);
+ log.warn("Bookie {} has been quarantined because of read/write errors.", bookie);
}
}
- /**
- * Watcher implementation to watch the readonly bookies under
- * <available>/readonly
- */
- private static class ReadOnlyBookieWatcher implements Watcher, ChildrenCallback {
-
- private final static Logger LOG = LoggerFactory.getLogger(ReadOnlyBookieWatcher.class);
- private volatile HashSet readOnlyBookies = new HashSet();
- private BookKeeper bk;
- private String readOnlyBookieRegPath;
-
- public ReadOnlyBookieWatcher(ClientConfiguration conf, BookKeeper bk) throws KeeperException,
- InterruptedException {
- this.bk = bk;
- readOnlyBookieRegPath = conf.getZkAvailableBookiesPath() + "/"
- + BookKeeperConstants.READONLY;
- if (null == bk.getZkHandle().exists(readOnlyBookieRegPath, false)) {
- try {
- List zkAcls = ZkUtils.getACLs(conf);
- bk.getZkHandle().create(readOnlyBookieRegPath, new byte[0], zkAcls,
- CreateMode.PERSISTENT);
- } catch (NodeExistsException e) {
- // this node is just now created by someone.
- }
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- readROBookies();
- }
-
- // read the readonly bookies in blocking fashion. Used only for first
- // time.
- void readROBookiesBlocking() throws InterruptedException, KeeperException {
-
- final LinkedBlockingQueue queue = new LinkedBlockingQueue();
- readROBookies(new ChildrenCallback() {
- public void processResult(int rc, String path, Object ctx, List children) {
- try {
- ReadOnlyBookieWatcher.this.processResult(rc, path, ctx, children);
- queue.put(rc);
- } catch (InterruptedException e) {
- logger.error("Interruped when trying to read readonly bookies in a blocking fashion");
- throw new RuntimeException(e);
- }
- }
- });
- int rc = queue.take();
-
- if (rc != KeeperException.Code.OK.intValue()) {
- throw KeeperException.create(Code.get(rc));
- }
- }
-
- void notifyBookiesChanged(final BookiesListener listener) throws BKException {
- try {
- List children = bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, new Watcher() {
- public void process(WatchedEvent event) {
- // listen children changed event from ZooKeeper
- if (event.getType() == EventType.NodeChildrenChanged) {
- listener.availableBookiesChanged();
- }
- }
- });
-
- // Update the list of read-only bookies
- HashSet newReadOnlyBookies = convertToBookieAddresses(children);
- readOnlyBookies = newReadOnlyBookies;
- } catch (KeeperException ke) {
- logger.error("Error registering watcher with zookeeper", ke);
- throw new BKException.ZKException();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- logger.error("Interrupted registering watcher with zookeeper", ie);
- throw new BKException.BKInterruptedException();
- }
- }
-
- // Read children and register watcher for readonly bookies path
- void readROBookies(ChildrenCallback callback) {
- bk.getZkHandle().getChildren(this.readOnlyBookieRegPath, this, callback, null);
- }
-
- void readROBookies() {
- readROBookies(this);
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx, List children) {
- if (rc != Code.OK.intValue()) {
- LOG.error("Not able to read readonly bookies : ", KeeperException.create(Code.get(rc)));
- return;
- }
-
- HashSet newReadOnlyBookies = convertToBookieAddresses(children);
- readOnlyBookies = newReadOnlyBookies;
- }
-
- // returns the readonly bookies
- public HashSet getReadOnlyBookies() {
- return readOnlyBookies;
- }
- }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 14a8a0ca6eb..ffb229d6bcb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -146,21 +146,20 @@ public Set onClusterChanged(Set writab
}
@Override
- public List reorderReadSequence(ArrayList ensemble, List writeSet, Map bookieFailureHistory) {
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
return writeSet;
}
@Override
- public List reorderReadLACSequence(ArrayList ensemble, List writeSet, Map bookieFailureHistory) {
- List retList = new ArrayList(writeSet);
- if (retList.size() < ensemble.size()) {
- for (int i = 0; i < ensemble.size(); i++) {
- if (!retList.contains(i)) {
- retList.add(i);
- }
- }
- }
- return retList;
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet) {
+ writeSet.addMissingIndices(ensemble.size());
+ return writeSet;
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index c0d78e9ccc5..6db2b6c2b99 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -19,7 +19,6 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
-import java.util.List;
import java.util.Map;
/**
@@ -32,12 +31,108 @@
* to.
*/
-interface DistributionSchedule {
+public interface DistributionSchedule {
+
+ /**
+ * A write set represents the set of bookies to which
+ * a request will be written.
+ * The set consists of a list of indices which can be
+ * used to lookup the bookie in the ensemble.
+ */
+ public interface WriteSet {
+ /**
+ * The number of indexes in the write set.
+ */
+ public int size();
+
+ /**
+ * Whether the set contains the given index.
+ */
+ public boolean contains(int i);
+
+ /**
+ * Get the index at index i.
+ */
+ public int get(int i);
+
+ /**
+ * Set the index at index i.
+ * @return the previous value at that index.
+ */
+ public int set(int i, int index);
+
+ /**
+ * Sort the indices
+ */
+ public void sort();
+
+ /**
+ * Index of a specified bookie index.
+ * -1 if not found.
+ */
+ public int indexOf(int index);
+
+ /**
+ * If we want a write set to cover all bookies in an ensemble
+ * of size X, then all of the index from 0..X must exist in the
+ * write set. This method appends those which are missing to the
+ * end of the write set.
+ */
+ public void addMissingIndices(int maxIndex);
+
+ /**
+ * Move an index from one position to another,
+ * shifting the other indices accordingly.
+ */
+ public void moveAndShift(int from, int to);
+
+ /**
+ * Recycle write set object when not in use.
+ */
+ public void recycle();
+
+ /**
+ * Make a deep copy of this write set.
+ */
+ public WriteSet copy();
+ }
+
+ public static WriteSet NULL_WRITE_SET = new WriteSet() {
+ @Override
+ public int size() { return 0; }
+ @Override
+ public boolean contains(int i) { return false; }
+ @Override
+ public int get(int i) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public int set(int i, int index) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void sort() {}
+ @Override
+ public int indexOf(int index) { return -1; }
+ @Override
+ public void addMissingIndices(int maxIndex) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void moveAndShift(int from, int to) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ @Override
+ public void recycle() {}
+ @Override
+ public WriteSet copy() { return this; }
+ };
/**
* return the set of bookie indices to send the message to
*/
- public List getWriteSet(long entryId);
+ public WriteSet getWriteSet(long entryId);
+
/**
* An ack set represents the set of bookies from which
@@ -75,6 +170,11 @@ public interface AckSet {
* Used for reissuing write requests.
*/
public boolean removeBookieAndCheck(int bookie);
+
+ /**
+ * Recycle this ack set when not used anymore
+ */
+ public void recycle();
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 8a9b86a3653..2301a9544f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -294,16 +294,19 @@ public BookieSocketAddress replaceBookie(int ensembleSize,
*
* @param ensemble
* Ensemble to read entries.
- * @param writeSet
- * Write quorum to read entries.
* @param bookieFailureHistory
* Observed failures on the bookies
- * @return read sequence of bookies
+ * @param writeSet
+ * Write quorum to read entries. This will be modified, rather than
+ * allocating a new WriteSet.
+ * @return The read sequence. This will be the same object as the passed in
+ * writeSet.
* @since 4.5
*/
- public List reorderReadSequence(ArrayList ensemble,
- List writeSet,
- Map bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet);
/**
@@ -311,16 +314,19 @@ public List reorderReadSequence(ArrayList ensemble
*
* @param ensemble
* Ensemble to read entries.
- * @param writeSet
- * Write quorum to read entries.
* @param bookieFailureHistory
* Observed failures on the bookies
- * @return read sequence of bookies
+ * @param writeSet
+ * Write quorum to read entries. This will be modified, rather than
+ * allocating a new WriteSet.
+ * @return The read sequence. This will be the same object as the passed in
+ * writeSet.
* @since 4.5
*/
- public List reorderReadLACSequence(ArrayList ensemble,
- List writeSet,
- Map bookieFailureHistory);
+ public DistributionSchedule.WriteSet reorderReadLACSequence(
+ ArrayList ensemble,
+ Map bookieFailureHistory,
+ DistributionSchedule.WriteSet writeSet);
/**
* Send the bookie info details.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index a90a3662710..0eebf3ec077 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -20,15 +20,14 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -81,39 +80,125 @@ public void readEntryComplete(int rc, long ledgerId, long entryId,
}
}
+ /**
+ * This will collect the bad bookies inside a ledger fragment.
+ */
+ private static class LedgerFragmentCallback implements GenericCallback {
+
+ private final LedgerFragment fragment;
+ private final int bookieIndex;
+ // bookie index -> return code
+ private final Map badBookies;
+ private final AtomicInteger numBookies;
+ private final GenericCallback cb;
+
+ LedgerFragmentCallback(LedgerFragment lf,
+ int bookieIndex,
+ GenericCallback cb,
+ Map badBookies,
+ AtomicInteger numBookies) {
+ this.fragment = lf;
+ this.bookieIndex = bookieIndex;
+ this.cb = cb;
+ this.badBookies = badBookies;
+ this.numBookies = numBookies;
+ }
+
+ @Override
+ public void operationComplete(int rc, LedgerFragment lf) {
+ if (BKException.Code.OK != rc) {
+ synchronized (badBookies) {
+ badBookies.put(bookieIndex, rc);
+ }
+ }
+ if (numBookies.decrementAndGet() == 0) {
+ if (badBookies.isEmpty()) {
+ cb.operationComplete(BKException.Code.OK, fragment);
+ } else {
+ int rcToReturn = BKException.Code.NoBookieAvailableException;
+ for (Map.Entry entry : badBookies.entrySet()) {
+ rcToReturn = entry.getValue();
+ if (entry.getValue() == BKException.Code.ClientClosedException) {
+ break;
+ }
+ }
+ cb.operationComplete(rcToReturn,
+ fragment.subset(badBookies.keySet()));
+ }
+ }
+ }
+ }
+
public LedgerChecker(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}
+ /**
+ * Verify a ledger fragment to collect bad bookies
+ *
+ * @param fragment
+ * fragment to verify
+ * @param cb
+ * callback
+ * @throws InvalidFragmentException
+ */
+ private void verifyLedgerFragment(LedgerFragment fragment,
+ GenericCallback cb)
+ throws InvalidFragmentException, BKException {
+ Set bookiesToCheck = fragment.getBookiesIndexes();
+ if (bookiesToCheck.isEmpty()) {
+ cb.operationComplete(BKException.Code.OK, fragment);
+ return;
+ }
+
+ AtomicInteger numBookies = new AtomicInteger(bookiesToCheck.size());
+ Map badBookies = new HashMap();
+ for (Integer bookieIndex : bookiesToCheck) {
+ LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
+ fragment, bookieIndex, cb, badBookies, numBookies);
+ verifyLedgerFragment(fragment, bookieIndex, lfCb);
+ }
+ }
+
+ /**
+ * Verify a bookie inside a ledger fragment.
+ *
+ * @param fragment
+ * ledger fragment
+ * @param bookieIndex
+ * bookie index in the fragment
+ * @param cb
+ * callback
+ * @throws InvalidFragmentException
+ */
private void verifyLedgerFragment(LedgerFragment fragment,
- GenericCallback cb) throws InvalidFragmentException {
- long firstStored = fragment.getFirstStoredEntryId();
- long lastStored = fragment.getLastStoredEntryId();
-
- // because of this if block, even if the bookie of the fragment is
- // down, it considers Fragment is available/not-bad if firstStored
- // and lastStored are LedgerHandle.INVALID_ENTRY_ID.
- // So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
- // if any change is made here, then the changes should be in BookieShell also
+ int bookieIndex,
+ GenericCallback cb)
+ throws InvalidFragmentException {
+ long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
+ long lastStored = fragment.getLastStoredEntryId(bookieIndex);
+
+ BookieSocketAddress bookie = fragment.getAddress(bookieIndex);
+ if (null == bookie) {
+ throw new InvalidFragmentException();
+ }
+
if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
+ // this fragment is not on this bookie
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
}
cb.operationComplete(BKException.Code.OK, fragment);
- return;
- }
- if (firstStored == lastStored) {
+ } else if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
- bookieClient.readEntry(fragment.getAddress(), fragment
+ bookieClient.readEntry(bookie, fragment
.getLedgerId(), firstStored, manycb, null);
} else {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
fragment, cb);
- bookieClient.readEntry(fragment.getAddress(), fragment
- .getLedgerId(), firstStored, manycb, null);
- bookieClient.readEntry(fragment.getAddress(), fragment
- .getLedgerId(), lastStored, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(), lastStored, manycb, null);
}
}
@@ -181,7 +266,7 @@ public void operationComplete(int rc, LedgerFragment result) {
* Check that all the fragments in the passed in ledger, and report those
* which are missing.
*/
- public void checkLedger(LedgerHandle lh,
+ public void checkLedger(final LedgerHandle lh,
final GenericCallback> cb) {
// build a set of all fragment replicas
final Set fragments = new HashSet();
@@ -191,66 +276,73 @@ public void checkLedger(LedgerHandle lh,
for (Map.Entry> e : lh
.getLedgerMetadata().getEnsembles().entrySet()) {
if (curEntryId != null) {
+ Set bookieIndexes = new HashSet();
for (int i = 0; i < curEnsemble.size(); i++) {
- fragments.add(new LedgerFragment(lh, curEntryId,
- e.getKey() - 1, i));
+ bookieIndexes.add(i);
}
+ fragments.add(new LedgerFragment(lh, curEntryId,
+ e.getKey() - 1, bookieIndexes));
}
curEntryId = e.getKey();
curEnsemble = e.getValue();
}
- /* Checking the last fragment of the ledger can be complicated in some cases.
+
+
+
+ /* Checking the last segment of the ledger can be complicated in some cases.
* In the case that the ledger is closed, we can just check the fragments of
- * the ledger as normal, except in the case that no entry was ever written,
- * to the ledger, in which case we check no fragments.
+ * the segment as normal even if no data has ever been written to.
* In the case that the ledger is open, but enough entries have been written,
- * for lastAddConfirmed to be set above the start entry of the fragment, we
+ * for lastAddConfirmed to be set above the start entry of the segment, we
* can also check as normal.
- * However, if lastAddConfirmed cannot be trusted, such as when it's lower than
- * the first entry id, or not set at all, we cannot be sure if there has been
- * data written to the fragment. For this reason, we have to send a read request
+ * However, if ledger is open, sometimes lastAddConfirmed cannot be trusted,
+ * such as when it's lower than the first entry id, or not set at all,
+ * we cannot be sure if there has been data written to the segment.
+ * For this reason, we have to send a read request
* to the bookies which should have the first entry. If they respond with
* NoSuchEntry we can assume it was never written. If they respond with anything
* else, we must assume the entry has been written, so we run the check.
*/
- if (curEntryId != null && !(lh.getLedgerMetadata().isClosed() && lh.getLastAddConfirmed() < curEntryId)) {
+ if (curEntryId != null) {
long lastEntry = lh.getLastAddConfirmed();
- if (lastEntry < curEntryId) {
+ if (!lh.isClosed() && lastEntry < curEntryId) {
lastEntry = curEntryId;
}
- final Set finalFragments = new HashSet();
+ Set bookieIndexes = new HashSet();
for (int i = 0; i < curEnsemble.size(); i++) {
- finalFragments.add(new LedgerFragment(lh, curEntryId,
- lastEntry, i));
+ bookieIndexes.add(i);
}
+ final LedgerFragment lastLedgerFragment = new LedgerFragment(lh, curEntryId,
+ lastEntry, bookieIndexes);
- // Check for the case that no last confirmed entry has
- // been set.
+ // Check for the case that no last confirmed entry has been set
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;
- EntryExistsCallback eecb
+ final EntryExistsCallback eecb
= new EntryExistsCallback(lh.getLedgerMetadata().getWriteQuorumSize(),
new GenericCallback() {
public void operationComplete(int rc, Boolean result) {
if (result) {
- fragments.addAll(finalFragments);
+ fragments.add(lastLedgerFragment);
}
checkFragments(fragments, cb);
}
});
- for (int bi : lh.getDistributionSchedule().getWriteSet(entryToRead)) {
- BookieSocketAddress addr = curEnsemble.get(bi);
- bookieClient.readEntry(addr, lh.getId(),
- entryToRead, eecb, null);
+ DistributionSchedule.WriteSet writeSet
+ = lh.getDistributionSchedule().getWriteSet(entryToRead);
+ for (int i = 0; i < writeSet.size(); i++) {
+ BookieSocketAddress addr = curEnsemble.get(writeSet.get(i));
+ bookieClient.readEntry(addr, lh.getId(), entryToRead, eecb, null);
}
+ writeSet.recycle();
return;
} else {
- fragments.addAll(finalFragments);
+ fragments.add(lastLedgerFragment);
}
}
@@ -275,7 +367,10 @@ private void checkFragments(Set fragments,
LOG.error("Invalid fragment found : {}", r);
allFragmentsCb.operationComplete(
BKException.Code.IncorrectParameterException, r);
+ } catch (BKException e) {
+ LOG.error("BKException when checking fragment : {}", r, e);
}
}
}
+
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index e12f77f583f..3f02355a66c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -20,19 +20,20 @@
package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.SortedMap;
-
import org.apache.bookkeeper.net.BookieSocketAddress;
/**
- * Represents the entries of a segment of a ledger which are stored on a single
- * bookie in the segments bookie ensemble.
- *
+ * Represents the entries of a segment of a ledger which are stored on subset of
+ * bookies in the segments bookie ensemble.
+ *
* Used for checking and recovery
*/
public class LedgerFragment {
- private final int bookieIndex;
+ private final Set bookieIndexes;
private final List ensemble;
private final long firstEntryId;
private final long lastKnownEntryId;
@@ -40,12 +41,14 @@ public class LedgerFragment {
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;
- LedgerFragment(LedgerHandle lh, long firstEntryId,
- long lastKnownEntryId, int bookieIndex) {
+ LedgerFragment(LedgerHandle lh,
+ long firstEntryId,
+ long lastKnownEntryId,
+ Set bookieIndexes) {
this.ledgerId = lh.getId();
this.firstEntryId = firstEntryId;
this.lastKnownEntryId = lastKnownEntryId;
- this.bookieIndex = bookieIndex;
+ this.bookieIndexes = bookieIndexes;
this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
this.schedule = lh.getDistributionSchedule();
SortedMap> ensembles = lh
@@ -54,6 +57,27 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}
+ LedgerFragment(LedgerFragment lf, Set subset) {
+ this.ledgerId = lf.ledgerId;
+ this.firstEntryId = lf.firstEntryId;
+ this.lastKnownEntryId = lf.lastKnownEntryId;
+ this.bookieIndexes = subset;
+ this.ensemble = lf.ensemble;
+ this.schedule = lf.schedule;
+ this.isLedgerClosed = lf.isLedgerClosed;
+ }
+
+ /**
+ * Return a ledger fragment contains subset of bookies.
+ *
+ * @param subset
+ * subset of bookies.
+ * @return ledger fragment contains subset of bookies
+ */
+ public LedgerFragment subset(Set subset) {
+ return new LedgerFragment(this, subset);
+ }
+
/**
* Returns true, if and only if the ledger fragment will never be modified
* by any of the clients in future, otherwise false. i.e,
@@ -83,23 +107,51 @@ long getLastKnownEntryId() {
/**
* Gets the failedBookie address
*/
- public BookieSocketAddress getAddress() {
+ public BookieSocketAddress getAddress(int bookieIndex) {
return ensemble.get(bookieIndex);
}
-
+
+ public Set getAddresses() {
+ Set addresses = new HashSet();
+ for (int bookieIndex : bookieIndexes) {
+ addresses.add(ensemble.get(bookieIndex));
+ }
+ return addresses;
+ }
+
/**
* Gets the failedBookie index
*/
- public int getBookiesIndex() {
- return bookieIndex;
+ public Set getBookiesIndexes() {
+ return bookieIndexes;
}
/**
- * Gets the first stored entry id of the fragment in failed bookie.
- *
+ * Gets the first stored entry id of the fragment in failed bookies.
+ *
* @return entryId
*/
public long getFirstStoredEntryId() {
+ Long firstEntry = null;
+ for (int bookieIndex : bookieIndexes) {
+ Long firstStoredEntryForBookie = getFirstStoredEntryId(bookieIndex);
+ if (null == firstEntry) {
+ firstEntry = firstStoredEntryForBookie;
+ } else if (null != firstStoredEntryForBookie) {
+ firstEntry = Math.min(firstEntry, firstStoredEntryForBookie);
+ }
+ }
+ return null == firstEntry ? LedgerHandle.INVALID_ENTRY_ID : firstEntry;
+ }
+
+ /**
+ * Get the first stored entry id of the fragment in the given failed bookies.
+ *
+ * @param bookieIndex
+ * the bookie index in the ensemble.
+ * @return first stored entry id on the bookie.
+ */
+ public Long getFirstStoredEntryId(int bookieIndex) {
long firstEntry = firstEntryId;
for (int i = 0; i < ensemble.size() && firstEntry <= lastKnownEntryId; i++) {
@@ -114,10 +166,30 @@ public long getFirstStoredEntryId() {
/**
* Gets the last stored entry id of the fragment in failed bookie.
- *
+ *
* @return entryId
*/
public long getLastStoredEntryId() {
+ Long lastEntry = null;
+ for (int bookieIndex : bookieIndexes) {
+ Long lastStoredEntryIdForBookie = getLastStoredEntryId(bookieIndex);
+ if (null == lastEntry) {
+ lastEntry = lastStoredEntryIdForBookie;
+ } else if (null != lastStoredEntryIdForBookie) {
+ lastEntry = Math.max(lastEntry, lastStoredEntryIdForBookie);
+ }
+ }
+ return null == lastEntry ? LedgerHandle.INVALID_ENTRY_ID : lastEntry;
+ }
+
+ /**
+ * Get the last stored entry id of the fragment in the given failed bookie.
+ *
+ * @param bookieIndex
+ * the bookie index in the ensemble.
+ * @return first stored entry id on the bookie.
+ */
+ public Long getLastStoredEntryId(int bookieIndex) {
long lastEntry = lastKnownEntryId;
for (int i = 0; i < ensemble.size() && lastEntry >= firstEntryId; i++) {
if (schedule.hasEntry(lastEntry, bookieIndex)) {
@@ -131,7 +203,7 @@ public long getLastStoredEntryId() {
/**
* Gets the ensemble of fragment
- *
+ *
* @return the ensemble for the segment which this fragment is a part of
*/
public List getEnsemble() {
@@ -143,6 +215,6 @@ public String toString() {
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
- getAddress(), isLedgerClosed);
+ getAddresses(), isLedgerClosed);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 172f9ec5cd7..95c48d5981b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -19,17 +19,20 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -79,7 +82,7 @@ public LedgerFragmentReplicator(BookKeeper bkc) {
private void replicateFragmentInternal(final LedgerHandle lh,
final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress newBookie) throws InterruptedException {
+ final Set newBookies) throws InterruptedException {
if (!lf.isClosed()) {
LOG.error("Trying to replicate an unclosed fragment;"
+ " This is not safe {}", lf);
@@ -94,13 +97,13 @@ private void replicateFragmentInternal(final LedgerHandle lh,
* Ideally this should never happen if bookie failure is taken care
* of properly. Nothing we can do though in this case.
*/
- LOG.warn("Dead bookie (" + lf.getAddress()
+ LOG.warn("Dead bookie (" + lf.getAddresses()
+ ") is still part of the current"
+ " active ensemble for ledgerId: " + lh.getId());
ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
return;
}
- if (startEntryId > endEntryId) {
+ if (startEntryId > endEntryId || endEntryId <= INVALID_ENTRY_ID) {
// for open ledger which there is no entry, the start entry id is 0,
// the end entry id is -1.
// we can return immediately to trigger forward read
@@ -126,7 +129,7 @@ private void replicateFragmentInternal(final LedgerHandle lh,
BKException.Code.LedgerRecoveryException);
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
- newBookie);
+ newBookies);
}
}
@@ -146,27 +149,27 @@ private void replicateFragmentInternal(final LedgerHandle lh,
* @param ledgerFragmentMcb
* MultiCallback to invoke once we've recovered the current
* ledger fragment.
- * @param targetBookieAddress
- * New bookie we want to use to recover and replicate the ledger
+ * @param targetBookieAddresses
+ * New bookies we want to use to recover and replicate the ledger
* entries that were stored on the failed bookie.
*/
void replicate(final LedgerHandle lh, final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress targetBookieAddress)
+ final Set targetBookieAddresses)
throws InterruptedException {
Set partionedFragments = splitIntoSubFragments(lh, lf,
bkc.getConf().getRereplicationEntryBatchSize());
- LOG.info("Fragment :" + lf + " is split into sub fragments :"
- + partionedFragments);
+ LOG.info("Replicating fragment {} in {} sub fragments.",
+ lf, partionedFragments.size());
replicateNextBatch(lh, partionedFragments.iterator(),
- ledgerFragmentMcb, targetBookieAddress);
+ ledgerFragmentMcb, targetBookieAddresses);
}
/** Replicate the batched entry fragments one after other */
private void replicateNextBatch(final LedgerHandle lh,
final Iterator fragments,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress targetBookieAddress) {
+ final Set targetBookieAddresses) {
if (fragments.hasNext()) {
try {
replicateFragmentInternal(lh, fragments.next(),
@@ -179,11 +182,11 @@ public void processResult(int rc, String v, Object ctx) {
} else {
replicateNextBatch(lh, fragments,
ledgerFragmentMcb,
- targetBookieAddress);
+ targetBookieAddresses);
}
}
- }, targetBookieAddress);
+ }, targetBookieAddresses);
} catch (InterruptedException e) {
ledgerFragmentMcb.processResult(
BKException.Code.InterruptedException, null, null);
@@ -224,7 +227,7 @@ static Set splitIntoSubFragments(LedgerHandle lh,
for (int i = 0; i < splitsWithFullEntries; i++) {
fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1;
fragments.add(new LedgerFragment(lh, firstEntryId,
- fragmentSplitLastEntry, ledgerFragment.getBookiesIndex()));
+ fragmentSplitLastEntry, ledgerFragment.getBookiesIndexes()));
firstEntryId = fragmentSplitLastEntry + 1;
}
@@ -233,7 +236,7 @@ static Set splitIntoSubFragments(LedgerHandle lh,
if (lastSplitWithPartialEntries > 0) {
fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId
+ lastSplitWithPartialEntries - 1, ledgerFragment
- .getBookiesIndex()));
+ .getBookiesIndexes()));
}
return fragments;
}
@@ -242,7 +245,7 @@ static Set splitIntoSubFragments(LedgerHandle lh,
* This method asynchronously recovers a specific ledger entry by reading
* the values via the BookKeeper Client (which would read it from the other
* replicas) and then writing it to the chosen new bookie.
- *
+ *
* @param entryId
* Ledger Entry ID to recover.
* @param lh
@@ -250,14 +253,41 @@ static Set splitIntoSubFragments(LedgerHandle lh,
* @param ledgerFragmentEntryMcb
* MultiCallback to invoke once we've recovered the current
* ledger entry.
- * @param newBookie
- * New bookie we want to use to recover and replicate the ledger
+ * @param newBookies
+ * New bookies we want to use to recover and replicate the ledger
* entries that were stored on the failed bookie.
*/
private void recoverLedgerFragmentEntry(final Long entryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
- final BookieSocketAddress newBookie) throws InterruptedException {
+ final Set newBookies) throws InterruptedException {
+ final AtomicInteger numCompleted = new AtomicInteger(0);
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ final WriteCallback multiWriteCallback = new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
+ new Object[] { ledgerId, entryId, addr, BKException.create(rc) });
+ if (completed.compareAndSet(false, true)) {
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ }
+ } else {
+ numEntriesWritten.inc();
+ if (ctx instanceof Long) {
+ numBytesWritten.registerSuccessfulValue((Long) ctx);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
+ new Object[] { ledgerId, entryId, addr });
+ }
+ if (numCompleted.incrementAndGet() == newBookies.size() &&
+ completed.compareAndSet(false, true)) {
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ }
+ }
+ }
+ };
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
* read the entry from one of the other replicated bookies other than
@@ -286,42 +316,16 @@ public void readComplete(int rc, LedgerHandle lh,
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
- bkc.getBookieClient().addEntry(newBookie, lh.getId(),
- lh.getLedgerKey(), entryId, toSend,
- new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId,
- long entryId, BookieSocketAddress addr,
- Object ctx) {
- if (rc != BKException.Code.OK) {
- LOG.error(
- "BK error writing entry for ledgerId: "
- + ledgerId + ", entryId: "
- + entryId + ", bookie: "
- + addr, BKException
- .create(rc));
- } else {
- numEntriesWritten.inc();
- numBytesWritten.registerSuccessfulValue(dataLength);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Success writing ledger id "
- + ledgerId + ", entry id "
- + entryId + " to a new bookie "
- + addr + "!");
- }
- }
- /*
- * Pass the return code result up the chain with
- * the parent callback.
- */
- ledgerFragmentEntryMcb.processResult(rc, null,
- null);
- }
- }, null, BookieProtocol.FLAG_RECOVERY_ADD);
+ for (BookieSocketAddress newBookie : newBookies) {
+ bkc.getBookieClient().addEntry(newBookie, lh.getId(),
+ lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+ multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD);
+ }
+ toSend.release();
}
}, null);
}
-
+
/**
* Callback for recovery of a single ledger fragment. Once the fragment has
* had all entries replicated, update the ensemble in zookeeper. Once
@@ -332,17 +336,15 @@ static class SingleFragmentCallback implements AsyncCallback.VoidCallback {
final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
final long fragmentStartId;
- final BookieSocketAddress oldBookie;
- final BookieSocketAddress newBookie;
+ final Map oldBookie2NewBookie;
SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
LedgerHandle lh, long fragmentStartId,
- BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
+ Map oldBookie2NewBookie) {
this.ledgerFragmentsMcb = ledgerFragmentsMcb;
this.lh = lh;
this.fragmentStartId = fragmentStartId;
- this.newBookie = newBookie;
- this.oldBookie = oldBookie;
+ this.oldBookie2NewBookie = oldBookie2NewBookie;
}
@Override
@@ -353,39 +355,32 @@ public void processResult(int rc, String path, Object ctx) {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
- updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
- oldBookie, newBookie);
+ updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, oldBookie2NewBookie);
}
}
/** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
private static void updateEnsembleInfo(
AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
- LedgerHandle lh, BookieSocketAddress oldBookie,
- BookieSocketAddress newBookie) {
+ LedgerHandle lh, Map oldBookie2NewBookie) {
/*
* Update the ledger metadata's ensemble info to point to the new
* bookie.
*/
ArrayList ensemble = lh.getLedgerMetadata()
.getEnsembles().get(fragmentStartId);
- int deadBookieIndex = ensemble.indexOf(oldBookie);
-
- /*
- * An update to the ensemble info might happen after re-reading ledger metadata.
- * Such an update might reflect a change to the ensemble membership such that
- * it might not be necessary to replace the bookie.
- */
- if (deadBookieIndex >= 0) {
- ensemble.remove(deadBookieIndex);
- ensemble.add(deadBookieIndex, newBookie);
- lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
- fragmentStartId, lh, oldBookie, newBookie));
- }
- else {
- LOG.warn("Bookie {} doesn't exist in ensemble {} anymore.", oldBookie, ensemble);
- ensembleUpdatedCb.processResult(BKException.Code.UnexpectedConditionException, null, null);
+ for (Map.Entry entry : oldBookie2NewBookie.entrySet()) {
+ int deadBookieIndex = ensemble.indexOf(entry.getKey());
+ // update ensemble info might happen after re-read ledger metadata, so the ensemble might already
+ // change. if ensemble is already changed, skip replacing the bookie doesn't exist.
+ if (deadBookieIndex >= 0) {
+ ensemble.set(deadBookieIndex, entry.getValue());
+ } else {
+ LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", entry.getKey(), ensemble);
+ }
}
+ lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
+ fragmentStartId, lh, oldBookie2NewBookie));
}
/**
@@ -397,17 +392,15 @@ private static class UpdateEnsembleCb implements GenericCallback