Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.ExtBookieInfo;
import org.apache.pulsar.common.policies.data.RawBookieInfo;

@Path("/bookies")
Expand Down Expand Up @@ -131,26 +134,66 @@ public void getBookieRackInfo(@Suspended final AsyncResponse asyncResponse,
})
public void deleteBookieRackInfo(@Suspended final AsyncResponse asyncResponse,
@PathParam("bookie") String bookieAddress) throws Exception {
internalDeleteBookiesRackInfo(asyncResponse, List.of(bookieAddress), false);
}

@DELETE
@Path("/racks-info")
@ApiOperation(
value = "Removed the rack placement information for a batch of bookies in the cluster",
notes = "If the 'deleteAll' parameter is set to true, it will remove the rack placement "
+ "information for all bookies in the cluster, ignoring the 'bookieAddresses' parameter"
)
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission")
})
public void batchDeleteBookiesRackInfo(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "List of bookie addresses")
@QueryParam("bookieAddresses") List<String> bookieAddresses,
@ApiParam(value = "Whether to delete all bookies rack info",
defaultValue = "false")
@QueryParam("deleteAll") @DefaultValue("false")
boolean deleteAll) throws Exception {
internalDeleteBookiesRackInfo(asyncResponse, bookieAddresses, deleteAll);
}

private void internalDeleteBookiesRackInfo(final AsyncResponse asyncResponse,
List<String> bookieAddresses,
boolean deleteAll) throws Exception {
validateSuperUserAccess();

getPulsarResources().getBookieResources()
.update(optionalBookiesRackConfiguration -> {
BookiesRackConfiguration brc = optionalBookiesRackConfiguration
.orElseGet(() -> new BookiesRackConfiguration());
.orElseGet(BookiesRackConfiguration::new);

if (!brc.removeBookie(bookieAddress)) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Bookie rack placement configuration not found: " + bookieAddress));
if (deleteAll) {
brc.clear();
} else {
for (String bookieAddress : bookieAddresses) {
if (!brc.bookieExists(bookieAddress)) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Bookie rack placement configuration not found: " + bookieAddress));
}
}
for (String bookieAddress : bookieAddresses) {
brc.removeBookie(bookieAddress);
}
}

return brc;
}).thenAccept(__ -> {
log.info("Removed {} from rack mapping info", bookieAddress);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
asyncResponse.resume(ex);
return null;
});
if (deleteAll) {
log.info("Removed all bookies rack mapping info");
} else {
log.info("Removed {} from rack mapping info", bookieAddresses);
}
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
asyncResponse.resume(ex);
return null;
});
}

@POST
Expand All @@ -168,40 +211,75 @@ public void updateBookieRackInfo(@Suspended final AsyncResponse asyncResponse,
@QueryParam("group") String group,
@ApiParam(value = "The bookie info", required = true)
BookieInfo bookieInfo) throws Exception {
validateSuperUserAccess();
internalBatchUpdateBookiesRackInfo(asyncResponse, List.of(
ExtBookieInfo.builder()
.address(bookieAddress)
.group(group)
.rack(bookieInfo.getRack())
.hostname(bookieInfo.getHostname())
.build())
);
}

if (group == null) {
throw new RestException(Status.PRECONDITION_FAILED, "Bookie 'group' parameters is missing");
}
@POST
@Path("/racks-info")
@ApiOperation(value = "Updates the rack placement information for a batch of bookies in the cluster")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission")}
)
public void batchUpdateBookiesRackInfo(@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "List of bookie info", required = true)
List<ExtBookieInfo> extBookieInfos) throws Exception {
internalBatchUpdateBookiesRackInfo(asyncResponse, extBookieInfos);
}

private void internalBatchUpdateBookiesRackInfo(final AsyncResponse asyncResponse,
List<ExtBookieInfo> extBookieInfos) throws Exception {
validateSuperUserAccess();

// validate rack name
int separatorCnt = StringUtils.countMatches(
StringUtils.strip(bookieInfo.getRack(), PATH_SEPARATOR), PATH_SEPARATOR);
boolean isRackEnabled = pulsar().getConfiguration().isBookkeeperClientRackawarePolicyEnabled();
boolean isRegionEnabled = pulsar().getConfiguration().isBookkeeperClientRegionawarePolicyEnabled();
if (isRackEnabled && ((isRegionEnabled && separatorCnt != 1) || (!isRegionEnabled && separatorCnt != 0))) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Bookie 'rack' parameter is invalid, "
+ "When `RackawareEnsemblePlacementPolicy` is enabled, the rack name is not allowed to contain "
+ "slash (`/`) except for the beginning and end of the rack name string. "
+ "When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain "
+ "one slash (`/`) except for the beginning and end of the rack name string."));
return;
for (ExtBookieInfo extBookieInfo : extBookieInfos) {
if (extBookieInfo.getGroup() == null) {
throw new RestException(Status.PRECONDITION_FAILED, "Bookie 'group' parameters is missing");
}

// validate rack name
int separatorCnt = StringUtils.countMatches(
StringUtils.strip(extBookieInfo.getRack(), PATH_SEPARATOR), PATH_SEPARATOR);
if (isRackEnabled && ((isRegionEnabled && separatorCnt != 1) || (!isRegionEnabled && separatorCnt != 0))) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Bookie 'rack' parameter is "
+ "invalid, When `RackawareEnsemblePlacementPolicy` is enabled, the rack name is not allowed "
+ "to contain slash (`/`) except for the beginning and end of the rack name string. "
+ "When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain "
+ "one slash (`/`) except for the beginning and end of the rack name string."));
return;
}
}

getPulsarResources().getBookieResources()
.update(optionalBookiesRackConfiguration -> {
BookiesRackConfiguration brc = optionalBookiesRackConfiguration
.orElseGet(() -> new BookiesRackConfiguration());

brc.updateBookie(group, bookieAddress, bookieInfo);
for (ExtBookieInfo extBookieInfo : extBookieInfos) {
brc.updateBookie(extBookieInfo.getGroup(), extBookieInfo.getAddress(),
BookieInfo.builder()
.rack(extBookieInfo.getRack())
.hostname(extBookieInfo.getHostname())
.build());
}

return brc;
}).thenAccept(__ -> {
log.info("Updated rack mapping info for {}", bookieAddress);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
asyncResponse.resume(ex);
return null;
});
String addresses = extBookieInfos.stream()
.map(ExtBookieInfo::getAddress).collect(Collectors.joining(", "));
log.info("Updated rack mapping info for {}", addresses);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
asyncResponse.resume(ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesClusterInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.ExtBookieInfo;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -191,4 +193,179 @@ public void testBasic() throws Exception {
}
}

public void testUpdateBookieRackInfo() throws Exception {
BookiesRackConfiguration conf = admin.bookies().getBookiesRackInfo();
assertTrue(conf.isEmpty());

String group = "default";
String bookie0 = "127.0.0.1:3181";
BookieInfo newInfo0 = BookieInfo.builder()
.rack("/rack1")
.hostname("127.0.0.1")
.build();

// 1. update
admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0);
BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0);
assertEquals(newInfo0.getRack(), readInfo0.getRack());
assertEquals(newInfo0.getHostname(), readInfo0.getHostname());


ExtBookieInfo newInfo1 = ExtBookieInfo.builder()
.address("127.0.0.2:3181")
.group("default")
.rack("/rack2")
.hostname("127.0.0.2")
.build();
ExtBookieInfo newInfo2 = ExtBookieInfo.builder()
.address("127.0.0.3:3181")
.group("default")
.rack("/rack3")
.hostname("127.0.0.3")
.build();

// 2. batch update
admin.bookies().batchUpdateBookiesRackInfo(List.of(newInfo1, newInfo2));

BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(newInfo1.getAddress());
assertEquals(newInfo1.getRack(), readInfo1.getRack());
assertEquals(newInfo1.getHostname(), readInfo1.getHostname());

BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(newInfo2.getAddress());
assertEquals(newInfo2.getRack(), readInfo2.getRack());
assertEquals(newInfo2.getHostname(), readInfo2.getHostname());

conf = admin.bookies().getBookiesRackInfo();
assertEquals(1, conf.size());
}

@Test
public void testDeleteBookieRackInfo() throws Exception {
String bookie0 = "127.0.0.1:3181";
String bookie1 = "127.0.0.2:3181";
String bookie2 = "127.0.0.3:3181";
String bookie3 = "127.0.0.4:3181";
String group = "default";
BookieInfo newInfo0 = BookieInfo.builder()
.rack("/rack1")
.hostname("127.0.0.1")
.build();
BookieInfo newInfo1 = BookieInfo.builder()
.rack("/rack2")
.hostname("127.0.0.2")
.build();
BookieInfo newInfo2 = BookieInfo.builder()
.rack("/rack3")
.hostname("127.0.0.3")
.build();
BookieInfo newInfo3 = BookieInfo.builder()
.rack("/rack4")
.hostname("127.0.0.4")
.build();
admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0);
admin.bookies().updateBookieRackInfo(bookie1, group, newInfo1);
admin.bookies().updateBookieRackInfo(bookie2, group, newInfo2);
admin.bookies().updateBookieRackInfo(bookie3, group, newInfo3);
BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0);
assertEquals(newInfo0, readInfo0);
BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(bookie1);
assertEquals(newInfo1, readInfo1);
BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(bookie2);
assertEquals(newInfo2, readInfo2);
BookieInfo readInfo3 = admin.bookies().getBookieRackInfo(bookie3);
assertEquals(newInfo3, readInfo3);

// 1. delete
admin.bookies().deleteBookieRackInfo(bookie0);
try {
admin.bookies().getBookieRackInfo(bookie0);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
readInfo1 = admin.bookies().getBookieRackInfo(bookie1);
assertEquals(newInfo1, readInfo1);
readInfo2 = admin.bookies().getBookieRackInfo(bookie2);
assertEquals(newInfo2, readInfo2);
readInfo3 = admin.bookies().getBookieRackInfo(bookie3);
assertEquals(newInfo3, readInfo3);

// 2. batch delete
admin.bookies().batchDeleteBookiesRackInfo(List.of(bookie1, bookie2));
try {
admin.bookies().getBookieRackInfo(bookie1);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
try {
admin.bookies().getBookieRackInfo(bookie2);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
readInfo3 = admin.bookies().getBookieRackInfo(bookie3);
assertEquals(newInfo3, readInfo3);

// 3. bookie does not exist
try {
admin.bookies().deleteBookieRackInfo(bookie1);
fail();
} catch (PulsarAdminException e) {
assertEquals(404, e.getStatusCode());
}
}

@Test
public void testClearAllBookiesRackInfo() throws Exception {
String bookie0 = "127.0.0.1:3181";
String bookie1 = "127.0.0.2:3181";
String bookie2 = "127.0.0.3:3181";
String group = "default";
BookieInfo newInfo0 = BookieInfo.builder()
.rack("/rack1")
.hostname("127.0.0.1")
.build();
BookieInfo newInfo1 = BookieInfo.builder()
.rack("/rack1")
.hostname("127.0.0.2")
.build();
BookieInfo newInfo2 = BookieInfo.builder()
.rack("/rack1")
.hostname("127.0.0.3")
.build();
admin.bookies().updateBookieRackInfo(bookie0, group, newInfo0);
admin.bookies().updateBookieRackInfo(bookie1, group, newInfo1);
admin.bookies().updateBookieRackInfo(bookie2, group, newInfo2);
BookieInfo readInfo0 = admin.bookies().getBookieRackInfo(bookie0);
assertEquals(newInfo0, readInfo0);
BookieInfo readInfo1 = admin.bookies().getBookieRackInfo(bookie1);
assertEquals(newInfo1, readInfo1);
BookieInfo readInfo2 = admin.bookies().getBookieRackInfo(bookie2);
assertEquals(newInfo2, readInfo2);

admin.bookies().clearAllBookiesRackInfo();

try {
admin.bookies().getBookieRackInfo(bookie0);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
try {
admin.bookies().getBookieRackInfo(bookie1);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
try {
admin.bookies().getBookieRackInfo(bookie2);
fail("should not reach here");
} catch (PulsarAdminException pae) {
assertEquals(404, pae.getStatusCode());
}
BookiesRackConfiguration conf = admin.bookies().getBookiesRackInfo();
assertTrue(conf.isEmpty());
}

}
Loading