From ead72415ecef3c1ff8bafdacfadd50bd46f1f504 Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Thu, 6 Jun 2024 15:05:10 +0800 Subject: [PATCH 1/2] [improve][broker] Add methods for batch updating and deleting bookie rack information --- .../pulsar/broker/admin/v2/Bookies.java | 140 +++++++++++--- .../pulsar/broker/admin/BookiesApiTest.java | 177 ++++++++++++++++++ .../apache/pulsar/client/admin/Bookies.java | 32 ++++ .../data/BookiesRackConfiguration.java | 9 + .../common/policies/data/ExtBookieInfo.java | 43 +++++ .../policies/data/impl/ExtBookieInfoImpl.java | 86 +++++++++ .../client/admin/internal/BookiesImpl.java | 37 ++++ .../common/util/ObjectMapperFactory.java | 3 + 8 files changed, 496 insertions(+), 31 deletions(-) create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ExtBookieInfo.java create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/ExtBookieInfoImpl.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java index c7b09ca9b0aa1..85cbcbe2e91c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java @@ -27,7 +27,9 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -50,6 +52,7 @@ import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesClusterInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; import org.apache.pulsar.common.policies.data.RawBookieInfo; @Path("/bookies") @@ -131,26 +134,66 @@ public void getBookieRackInfo(@Suspended final AsyncResponse asyncResponse, }) public void deleteBookieRackInfo(@Suspended final AsyncResponse asyncResponse, @PathParam("bookie") String bookieAddress) throws Exception { + internalDeleteBookiesRackInfo(asyncResponse, List.of(bookieAddress), false); + } + + @DELETE + @Path("/racks-info") + @ApiOperation( + value = "Removed the rack placement information for a batch of bookies in the cluster", + notes = "If the 'deleteAll' parameter is set to true, it will remove the rack placement " + + "information for all bookies in the cluster, ignoring the 'bookieAddresses' parameter" + ) + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 403, message = "Don't have admin permission") + }) + public void batchDeleteBookiesRackInfo(@Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "List of bookie addresses") + @QueryParam("bookieAddresses") List bookieAddresses, + @ApiParam(value = "Whether to delete all bookies rack info", + defaultValue = "false") + @QueryParam("deleteAll") @DefaultValue("false") + boolean deleteAll) throws Exception { + internalDeleteBookiesRackInfo(asyncResponse, bookieAddresses, deleteAll); + } + + private void internalDeleteBookiesRackInfo(final AsyncResponse asyncResponse, + List bookieAddresses, + boolean deleteAll) throws Exception { validateSuperUserAccess(); getPulsarResources().getBookieResources() .update(optionalBookiesRackConfiguration -> { BookiesRackConfiguration brc = optionalBookiesRackConfiguration - .orElseGet(() -> new BookiesRackConfiguration()); + .orElseGet(BookiesRackConfiguration::new); - if (!brc.removeBookie(bookieAddress)) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Bookie rack placement configuration not found: " + bookieAddress)); + if (deleteAll) { + brc.clear(); + } else { + for (String bookieAddress : bookieAddresses) { + if (!brc.bookieExists(bookieAddress)) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Bookie rack placement configuration not found: " + bookieAddress)); + } + } + for (String bookieAddress : bookieAddresses) { + brc.removeBookie(bookieAddress); } + } return brc; }).thenAccept(__ -> { - log.info("Removed {} from rack mapping info", bookieAddress); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - asyncResponse.resume(ex); - return null; - }); + if (deleteAll) { + log.info("Removed all bookies rack mapping info"); + } else { + log.info("Removed {} from rack mapping info", bookieAddresses); + } + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + asyncResponse.resume(ex); + return null; + }); } @POST @@ -168,24 +211,51 @@ public void updateBookieRackInfo(@Suspended final AsyncResponse asyncResponse, @QueryParam("group") String group, @ApiParam(value = "The bookie info", required = true) BookieInfo bookieInfo) throws Exception { - validateSuperUserAccess(); + internalBatchUpdateBookiesRackInfo(asyncResponse, List.of( + ExtBookieInfo.builder() + .address(bookieAddress) + .group(group) + .rack(bookieInfo.getRack()) + .hostname(bookieInfo.getHostname()) + .build()) + ); + } - if (group == null) { - throw new RestException(Status.PRECONDITION_FAILED, "Bookie 'group' parameters is missing"); - } + @POST + @Path("/racks-info") + @ApiOperation(value = "Updates the rack placement information for a batch of bookies in the cluster") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 403, message = "Don't have admin permission")} + ) + public void batchUpdateBookiesRackInfo(@Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "List of bookie info", required = true) + List extBookieInfos) throws Exception { + internalBatchUpdateBookiesRackInfo(asyncResponse, extBookieInfos); + } + + private void internalBatchUpdateBookiesRackInfo(final AsyncResponse asyncResponse, + List extBookieInfos) throws Exception { + validateSuperUserAccess(); - // validate rack name - int separatorCnt = StringUtils.countMatches( - StringUtils.strip(bookieInfo.getRack(), PATH_SEPARATOR), PATH_SEPARATOR); boolean isRackEnabled = pulsar().getConfiguration().isBookkeeperClientRackawarePolicyEnabled(); boolean isRegionEnabled = pulsar().getConfiguration().isBookkeeperClientRegionawarePolicyEnabled(); - if (isRackEnabled && ((isRegionEnabled && separatorCnt != 1) || (!isRegionEnabled && separatorCnt != 0))) { - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Bookie 'rack' parameter is invalid, " - + "When `RackawareEnsemblePlacementPolicy` is enabled, the rack name is not allowed to contain " - + "slash (`/`) except for the beginning and end of the rack name string. " - + "When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain " - + "one slash (`/`) except for the beginning and end of the rack name string.")); - return; + for (ExtBookieInfo extBookieInfo : extBookieInfos) { + if (extBookieInfo.getGroup() == null) { + throw new RestException(Status.PRECONDITION_FAILED, "Bookie 'group' parameters is missing"); + } + + // validate rack name + int separatorCnt = StringUtils.countMatches( + StringUtils.strip(extBookieInfo.getRack(), PATH_SEPARATOR), PATH_SEPARATOR); + if (isRackEnabled && ((isRegionEnabled && separatorCnt != 1) || (!isRegionEnabled && separatorCnt != 0))) { + asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Bookie 'rack' parameter is " + + "invalid, When `RackawareEnsemblePlacementPolicy` is enabled, the rack name is not allowed " + + "to contain slash (`/`) except for the beginning and end of the rack name string. " + + "When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain " + + "one slash (`/`) except for the beginning and end of the rack name string.")); + return; + } } getPulsarResources().getBookieResources() @@ -193,15 +263,23 @@ public void updateBookieRackInfo(@Suspended final AsyncResponse asyncResponse, BookiesRackConfiguration brc = optionalBookiesRackConfiguration .orElseGet(() -> new BookiesRackConfiguration()); - brc.updateBookie(group, bookieAddress, bookieInfo); + for (ExtBookieInfo extBookieInfo : extBookieInfos) { + brc.updateBookie(extBookieInfo.getGroup(), extBookieInfo.getAddress(), + BookieInfo.builder() + .rack(extBookieInfo.getRack()) + .hostname(extBookieInfo.getHostname()) + .build()); + } return brc; }).thenAccept(__ -> { - log.info("Updated rack mapping info for {}", bookieAddress); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - asyncResponse.resume(ex); - return null; - }); + String addresses = extBookieInfos.stream() + .map(ExtBookieInfo::getAddress).collect(Collectors.joining(", ")); + log.info("Updated rack mapping info for {}", addresses); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + asyncResponse.resume(ex); + return null; + }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java index 1bd1de2130b11..986136c9383e2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; @@ -31,6 +32,7 @@ import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesClusterInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -191,4 +193,179 @@ public void testBasic() throws Exception { } } + public void testUpdateBookieRackInfo() throws Exception { + BookiesRackConfiguration conf = admin.bookies().getBookiesRackInfo(); + assertTrue(conf.isEmpty()); + + String group = "default"; + String bookie0 = "127.0.0.1:3181"; + BookieInfo newInfo0 = BookieInfo.builder() + .rack("/rack1") + .hostname("127.0.0.1") + .build(); + + // 1. update + admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0); + BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0); + assertEquals(newInfo0.getRack(), readInfo0.getRack()); + assertEquals(newInfo0.getHostname(), readInfo0.getHostname()); + + + ExtBookieInfo newInfo1 = ExtBookieInfo.builder() + .address("127.0.0.2:3181") + .group("default") + .rack("/rack2") + .hostname("127.0.0.2") + .build(); + ExtBookieInfo newInfo2 = ExtBookieInfo.builder() + .address("127.0.0.3:3181") + .group("default") + .rack("/rack3") + .hostname("127.0.0.3") + .build(); + + // 2. batch update + admin.bookies().batchUpdateBookiesRackInfo(List.of(newInfo1, newInfo2)); + + BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(newInfo1.getAddress()); + assertEquals(newInfo1.getRack(), readInfo1.getRack()); + assertEquals(newInfo1.getHostname(), readInfo1.getHostname()); + + BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(newInfo2.getAddress()); + assertEquals(newInfo2.getRack(), readInfo2.getRack()); + assertEquals(newInfo2.getHostname(), readInfo2.getHostname()); + + conf = admin.bookies().getBookiesRackInfo(); + assertEquals(1, conf.size()); + } + + @Test + public void testDeleteBookieRackInfo() throws Exception { + String bookie0 = "127.0.0.1:3181"; + String bookie1 = "127.0.0.2:3181"; + String bookie2 = "127.0.0.3:3181"; + String bookie3 = "127.0.0.4:3181"; + String group = "default"; + BookieInfo newInfo0 = BookieInfo.builder() + .rack("/rack1") + .hostname("127.0.0.1") + .build(); + BookieInfo newInfo1 = BookieInfo.builder() + .rack("/rack2") + .hostname("127.0.0.2") + .build(); + BookieInfo newInfo2 = BookieInfo.builder() + .rack("/rack3") + .hostname("127.0.0.3") + .build(); + BookieInfo newInfo3 = BookieInfo.builder() + .rack("/rack4") + .hostname("127.0.0.4") + .build(); + admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0); + admin.bookies().updateBookieRackInfo(bookie1, group, newInfo1); + admin.bookies().updateBookieRackInfo(bookie2, group, newInfo2); + admin.bookies().updateBookieRackInfo(bookie3, group, newInfo3); + BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0); + assertEquals(newInfo0, readInfo0); + BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(bookie1); + assertEquals(newInfo1, readInfo1); + BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(bookie2); + assertEquals(newInfo2, readInfo2); + BookieInfo readInfo3 = admin.bookies().getBookieRackInfo(bookie3); + assertEquals(newInfo3, readInfo3); + + // 1. delete + admin.bookies().deleteBookieRackInfo(bookie0); + try { + admin.bookies().getBookieRackInfo(bookie0); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + readInfo1 = admin.bookies().getBookieRackInfo(bookie1); + assertEquals(newInfo1, readInfo1); + readInfo2 = admin.bookies().getBookieRackInfo(bookie2); + assertEquals(newInfo2, readInfo2); + readInfo3 = admin.bookies().getBookieRackInfo(bookie3); + assertEquals(newInfo3, readInfo3); + + // 2. batch delete + admin.bookies().batchDeleteBookiesRackInfo(List.of(bookie1, bookie2)); + try { + admin.bookies().getBookieRackInfo(bookie1); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + try { + admin.bookies().getBookieRackInfo(bookie2); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + readInfo3 = admin.bookies().getBookieRackInfo(bookie3); + assertEquals(newInfo3, readInfo3); + + // 3. bookie does not exist + try { + admin.bookies().deleteBookieRackInfo(bookie1); + fail(); + } catch (PulsarAdminException e) { + assertEquals(404, e.getStatusCode()); + } + } + + @Test + public void testClearAllBookiesRackInfo() throws Exception { + String bookie0 = "127.0.0.1:3181"; + String bookie1 = "127.0.0.2:3181"; + String bookie2 = "127.0.0.3:3181"; + String group = "default"; + BookieInfo newInfo0 = BookieInfo.builder() + .rack("/rack1") + .hostname("127.0.0.1") + .build(); + BookieInfo newInfo1 = BookieInfo.builder() + .rack("/rack1") + .hostname("127.0.0.2") + .build(); + BookieInfo newInfo2 = BookieInfo.builder() + .rack("/rack1") + .hostname("127.0.0.3") + .build(); + admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0); + admin.bookies().updateBookieRackInfo(bookie1, group, newInfo1); + admin.bookies().updateBookieRackInfo(bookie2, group, newInfo2); + BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0); + assertEquals(newInfo0, readInfo0); + BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(bookie1); + assertEquals(newInfo1, readInfo1); + BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(bookie2); + assertEquals(newInfo2, readInfo2); + + admin.bookies().clearAllBookiesRackInfo(); + + try { + admin.bookies().getBookieRackInfo(bookie0); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + try { + admin.bookies().getBookieRackInfo(bookie1); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + try { + admin.bookies().getBookieRackInfo(bookie2); + fail("should not reach here"); + } catch (PulsarAdminException pae) { + assertEquals(404, pae.getStatusCode()); + } + BookiesRackConfiguration conf = admin.bookies().getBookiesRackInfo(); + assertTrue(conf.isEmpty()); + } + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Bookies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Bookies.java index 8f4497cdb4435..cddad82723ffd 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Bookies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Bookies.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.client.admin; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesClusterInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; /** * Admin interface for bookies rack placement management. @@ -68,6 +70,26 @@ public interface Bookies { */ CompletableFuture deleteBookieRackInfoAsync(String bookieAddress); + /** + * Remove rack placement information for a batch of bookies in the cluster. + */ + void batchDeleteBookiesRackInfo(List bookieAddresses) throws PulsarAdminException; + + /** + * Remove rack placement information for a batch of bookies in the cluster asynchronously. + */ + CompletableFuture batchDeleteBookiesRackInfoAsync(List bookieAddresses); + + /** + * Clears the rack placement information for all bookies int the cluster. + */ + void clearAllBookiesRackInfo() throws PulsarAdminException; + + /** + * Clears the rack placement information for all bookies int the cluster asynchronously. + */ + CompletableFuture clearAllBookiesRackInfoAsync(); + /** * Updates the rack placement information for a specific bookie in the cluster. */ @@ -77,4 +99,14 @@ public interface Bookies { * Updates the rack placement information for a specific bookie in the cluster asynchronously. */ CompletableFuture updateBookieRackInfoAsync(String bookieAddress, String group, BookieInfo bookieInfo); + + /** + * Updates the rack placement information for a batch of bookies in the cluster. + */ + void batchUpdateBookiesRackInfo(List extBookieInfos) throws PulsarAdminException; + + /** + * Updates the rack placement information for a batch of bookies in the cluster asynchronously. + */ + CompletableFuture batchUpdateBookiesRackInfoAsync(List extBookieInfos); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java index 0640263235caf..03e2fe309bc64 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/BookiesRackConfiguration.java @@ -61,4 +61,13 @@ public synchronized void updateBookie(String group, String address, BookieInfo b removeBookie(address); computeIfAbsent(group, key -> new TreeMap<>()).put(address, bookieInfo); } + + public synchronized boolean bookieExists(String address) { + for (Map.Entry> entry : entrySet()) { + if (entry.getValue().containsKey(address)) { + return true; + } + } + return false; + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ExtBookieInfo.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ExtBookieInfo.java new file mode 100644 index 0000000000000..df084da3b2266 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ExtBookieInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import org.apache.pulsar.common.policies.data.impl.ExtBookieInfoImpl; + +/** + * Ext bookie information. + */ +public interface ExtBookieInfo { + String getAddress(); + String getGroup(); + String getRack(); + String getHostname(); + + interface Builder { + Builder address(String address); + Builder group(String group); + Builder rack(String rack); + Builder hostname(String hostname); + ExtBookieInfo build(); + } + + static Builder builder() { + return ExtBookieInfoImpl.builder(); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/ExtBookieInfoImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/ExtBookieInfoImpl.java new file mode 100644 index 0000000000000..484afd287d847 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/ExtBookieInfoImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.impl; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; + +/** + * The ExtBookieInfoImpl class is an extension of the BookieInfoImpl class, + * containing additional detailed information. + *

+ * In addition to the rack and hostname attributes, it also includes + * address and group attributes. + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public final class ExtBookieInfoImpl implements ExtBookieInfo { + private String address; + private String group; + private String rack; + private String hostname; + + public static ExtBookieInfoImplBuilder builder() { + return new ExtBookieInfoImplBuilder(); + } + + public static class ExtBookieInfoImplBuilder implements ExtBookieInfo.Builder { + private String address; + private String group; + private String rack; + private String hostname; + private static final String PATH_SEPARATOR = "/"; + + public ExtBookieInfoImplBuilder address(String address) { + this.address = address; + return this; + } + + public ExtBookieInfoImplBuilder group(String group) { + this.group = group; + return this; + } + + public ExtBookieInfoImplBuilder rack(String rack) { + this.rack = rack; + return this; + } + + public ExtBookieInfoImplBuilder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + public ExtBookieInfoImpl build() { + checkArgument(rack != null && !rack.isEmpty() && !rack.equals(PATH_SEPARATOR), + "rack name is invalid, it should not be null, empty or '/'"); + return new ExtBookieInfoImpl(address, group, rack, hostname); + } + + public static void checkArgument(boolean expression, @NonNull Object errorMessage) { + if (!expression) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java index 2286fb8c8a381..c7bba241fede1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BookiesImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.admin.internal; +import java.util.List; import java.util.concurrent.CompletableFuture; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -28,6 +29,7 @@ import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesClusterInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; public class BookiesImpl extends BaseResource implements Bookies { private final WebTarget adminBookies; @@ -81,6 +83,30 @@ public CompletableFuture deleteBookieRackInfoAsync(String bookieAddress) { return asyncDeleteRequest(path); } + public void batchDeleteBookiesRackInfo(List bookieAddresses) throws PulsarAdminException { + sync(() -> batchDeleteBookiesRackInfoAsync(bookieAddresses)); + } + + @Override + public CompletableFuture batchDeleteBookiesRackInfoAsync(List bookieAddresses) { + WebTarget path = adminBookies.path("racks-info"); + for (String bookieAddress : bookieAddresses) { + path = path.queryParam("bookieAddresses", bookieAddress); + } + return asyncDeleteRequest(path); + } + + @Override + public void clearAllBookiesRackInfo() throws PulsarAdminException { + sync(this::clearAllBookiesRackInfoAsync); + } + + @Override + public CompletableFuture clearAllBookiesRackInfoAsync() { + WebTarget path = adminBookies.path("racks-info").queryParam("deleteAll", "true"); + return asyncDeleteRequest(path); + } + @Override public void updateBookieRackInfo(String bookieAddress, String group, BookieInfo bookieInfo) throws PulsarAdminException { @@ -94,4 +120,15 @@ public CompletableFuture updateBookieRackInfoAsync( return asyncPostRequest(path, Entity.entity(bookieInfo, MediaType.APPLICATION_JSON)); } + @Override + public void batchUpdateBookiesRackInfo(List extBookieInfos) throws PulsarAdminException { + sync(() -> batchUpdateBookiesRackInfoAsync(extBookieInfos)); + } + + @Override + public CompletableFuture batchUpdateBookiesRackInfoAsync(List extBookieInfos) { + WebTarget path = adminBookies.path("racks-info"); + return asyncPostRequest(path, Entity.entity(extBookieInfos, MediaType.APPLICATION_JSON)); + } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java index 7b235cfa341d1..add5b67d1ed79 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.ExtBookieInfo; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStats; @@ -95,6 +96,7 @@ import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl; import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; +import org.apache.pulsar.common.policies.data.impl.ExtBookieInfoImpl; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; @@ -236,6 +238,7 @@ private static void setAnnotationsModule(ObjectMapper mapper) { resolver.addMapping(AuthPolicies.class, AuthPoliciesImpl.class); resolver.addMapping(AutoTopicCreationOverride.class, AutoTopicCreationOverrideImpl.class); resolver.addMapping(BookieInfo.class, BookieInfoImpl.class); + resolver.addMapping(ExtBookieInfo.class, ExtBookieInfoImpl.class); resolver.addMapping(BookiesClusterInfo.class, BookiesClusterInfoImpl.class); resolver.addMapping(BrokerInfo.class, BrokerInfoImpl.class); resolver.addMapping(BrokerStatus.class, BrokerStatusImpl.class); From 7b981df942f9f5c52921083d3f17eab0d245d8e9 Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Thu, 6 Jun 2024 15:14:41 +0800 Subject: [PATCH 2/2] fix --- .../pulsar/broker/admin/v2/Bookies.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java index 85cbcbe2e91c5..0e0ade4bc3e2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java @@ -168,19 +168,19 @@ private void internalDeleteBookiesRackInfo(final AsyncResponse asyncResponse, BookiesRackConfiguration brc = optionalBookiesRackConfiguration .orElseGet(BookiesRackConfiguration::new); - if (deleteAll) { - brc.clear(); - } else { - for (String bookieAddress : bookieAddresses) { - if (!brc.bookieExists(bookieAddress)) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Bookie rack placement configuration not found: " + bookieAddress)); + if (deleteAll) { + brc.clear(); + } else { + for (String bookieAddress : bookieAddresses) { + if (!brc.bookieExists(bookieAddress)) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Bookie rack placement configuration not found: " + bookieAddress)); + } + } + for (String bookieAddress : bookieAddresses) { + brc.removeBookie(bookieAddress); } } - for (String bookieAddress : bookieAddresses) { - brc.removeBookie(bookieAddress); - } - } return brc; }).thenAccept(__ -> {