Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@
*/
package org.apache.iceberg.aws.s3.signer;

import static java.lang.String.format;
import static org.apache.iceberg.rest.RESTCatalogAdapter.castRequest;
import static org.apache.iceberg.rest.RESTCatalogAdapter.castResponse;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.InputStreamReader;
import java.io.Reader;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
Expand All @@ -37,23 +26,14 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.ResourcePaths;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.iceberg.rest.RemoteSignerServlet;
import org.apache.iceberg.rest.requests.RemoteSignRequest;
import org.apache.iceberg.rest.responses.ImmutableRemoteSignResponse;
import org.apache.iceberg.rest.responses.RemoteSignResponse;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.auth.signer.params.AwsS3V4SignerParams;
import software.amazon.awssdk.http.SdkHttpFullRequest;
Expand All @@ -65,113 +45,32 @@
* {@link S3SignerServlet} provides a simple servlet implementation to emulate the server-side
* behavior of signing S3 requests and handling OAuth.
*/
public class S3SignerServlet extends HttpServlet {

private static final Logger LOG = LoggerFactory.getLogger(S3SignerServlet.class);
public class S3SignerServlet extends RemoteSignerServlet {

static final Clock SIGNING_CLOCK = Clock.fixed(Instant.now(), ZoneId.of("UTC"));
static final Set<String> UNSIGNED_HEADERS =
Sets.newHashSet(
Arrays.asList("range", "x-amz-date", "amz-sdk-invocation-id", "amz-sdk-retry"));
private static final String POST = "POST";

private static final Set<SdkHttpMethod> CACHEABLE_METHODS =
Stream.of(SdkHttpMethod.GET, SdkHttpMethod.HEAD).collect(Collectors.toSet());

private final Map<String, String> responseHeaders =
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
private final ObjectMapper mapper;

private List<SignRequestValidator> s3SignRequestValidators = Lists.newArrayList();

/**
* SignRequestValidator is a wrapper class used for validating the contents of the S3SignRequest
* and thus verifying the behavior of the client during testing.
*/
public static class SignRequestValidator {
private final Predicate<S3SignRequest> requestMatcher;
private final Predicate<S3SignRequest> requestExpectation;
private final String assertMessage;

public SignRequestValidator(
Predicate<S3SignRequest> requestExpectation,
Predicate<S3SignRequest> requestMatcher,
String assertMessage) {
this.requestExpectation = requestExpectation;
this.requestMatcher = requestMatcher;
this.assertMessage = assertMessage;
}

void validateRequest(S3SignRequest request) {
if (requestMatcher.test(request)) {
assertThat(requestExpectation.test(request)).as(assertMessage).isTrue();
}
}
}

public S3SignerServlet(ObjectMapper mapper) {
this.mapper = mapper;
}

public S3SignerServlet(ObjectMapper mapper, List<SignRequestValidator> s3SignRequestValidators) {
this.mapper = mapper;
this.s3SignRequestValidators = s3SignRequestValidators;
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
execute(request, response);
}

@Override
protected void doHead(HttpServletRequest request, HttpServletResponse response) {
execute(request, response);
}
/** A fake remote signing endpoint for testing purposes. */
static final String S3_SIGNER_ENDPOINT = "v1/namespaces/ns1/tables/t1/sign/s3";

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
execute(request, response);
public S3SignerServlet() {
super(S3_SIGNER_ENDPOINT);
}

@Override
protected void doDelete(HttpServletRequest request, HttpServletResponse response) {
execute(request, response);
}

private OAuthTokenResponse handleOAuth(Map<String, String> requestMap) {
String grantType = requestMap.get("grant_type");
switch (grantType) {
case "client_credentials":
return castResponse(
OAuthTokenResponse.class,
OAuthTokenResponse.builder()
.withToken("client-credentials-token:sub=" + requestMap.get("client_id"))
.withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
.withTokenType("Bearer")
.setExpirationInSeconds(10000)
.build());

case "urn:ietf:params:oauth:grant-type:token-exchange":
String actor = requestMap.get("actor_token");
String token =
String.format(
"token-exchange-token:sub=%s%s",
requestMap.get("subject_token"), actor != null ? ",act=" + actor : "");
return castResponse(
OAuthTokenResponse.class,
OAuthTokenResponse.builder()
.withToken(token)
.withIssuedTokenType("urn:ietf:params:oauth:token-type:access_token")
.withTokenType("Bearer")
.setExpirationInSeconds(10000)
.build());

default:
throw new UnsupportedOperationException("Unsupported grant_type: " + grantType);
protected void validateSignRequest(RemoteSignRequest request) {
if ("POST".equalsIgnoreCase(request.method()) && request.uri().getQuery().contains("delete")) {
String body = request.body();
Preconditions.checkArgument(
body != null && !body.isEmpty(),
"Sign request for delete objects should have a request body");
}
}

private S3SignResponse signRequest(S3SignRequest request) {
@Override
protected RemoteSignResponse signRequest(RemoteSignRequest request) {
AwsS3V4SignerParams signingParams =
AwsS3V4SignerParams.builder()
.awsCredentials(TestS3RestSigner.CREDENTIALS_PROVIDER.resolveCredentials())
Expand Down Expand Up @@ -207,59 +106,6 @@ private S3SignResponse signRequest(S3SignRequest request) {
Map<String, List<String>> headers = Maps.newHashMap(sign.headers());
headers.putAll(unsignedHeaders);

return ImmutableS3SignResponse.builder().uri(request.uri()).headers(headers).build();
}

protected void execute(HttpServletRequest request, HttpServletResponse response) {
response.setStatus(HttpServletResponse.SC_OK);
responseHeaders.forEach(response::setHeader);

String path = request.getRequestURI().substring(1);
Object requestBody;
try {
// we only need to handle oauth tokens & s3 sign request routes here as those are the only
// requests that are being done by the S3V4RestSignerClient
if (POST.equals(request.getMethod())
&& S3V4RestSignerClient.S3_SIGNER_DEFAULT_ENDPOINT.equals(path)) {
S3SignRequest s3SignRequest =
castRequest(
S3SignRequest.class, mapper.readValue(request.getReader(), S3SignRequest.class));
s3SignRequestValidators.forEach(validator -> validator.validateRequest(s3SignRequest));
S3SignResponse s3SignResponse = signRequest(s3SignRequest);
if (CACHEABLE_METHODS.contains(SdkHttpMethod.fromValue(s3SignRequest.method()))) {
// tell the client this can be cached
response.setHeader(
S3V4RestSignerClient.CACHE_CONTROL, S3V4RestSignerClient.CACHE_CONTROL_PRIVATE);
} else {
response.setHeader(
S3V4RestSignerClient.CACHE_CONTROL, S3V4RestSignerClient.CACHE_CONTROL_NO_CACHE);
}

mapper.writeValue(response.getWriter(), s3SignResponse);
} else if (POST.equals(request.getMethod()) && ResourcePaths.tokens().equals(path)) {
try (Reader reader = new InputStreamReader(request.getInputStream())) {
requestBody = RESTUtil.decodeFormData(CharStreams.toString(reader));
}

OAuthTokenResponse oAuthTokenResponse =
handleOAuth((Map<String, String>) castRequest(Map.class, requestBody));
mapper.writeValue(response.getWriter(), oAuthTokenResponse);
} else {
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
mapper.writeValue(
response.getWriter(),
ErrorResponse.builder()
.responseCode(400)
.withType("BadRequestException")
.withMessage(format("No route for request: %s %s", request.getMethod(), path))
.build());
}
} catch (RESTException e) {
LOG.error("Error processing REST request", e);
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} catch (Exception e) {
LOG.error("Unexpected exception when processing REST request", e);
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
return ImmutableRemoteSignResponse.builder().uri(request.uri()).headers(headers).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.iceberg.aws.s3.MinioUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.util.ThreadPools;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -107,8 +107,10 @@ public static void beforeClass() throws Exception {
ImmutableS3V4RestSignerClient.builder()
.properties(
ImmutableMap.of(
S3V4RestSignerClient.S3_SIGNER_URI,
RESTCatalogProperties.SIGNER_URI,
httpServer.getURI().toString(),
RESTCatalogProperties.SIGNER_ENDPOINT,
S3SignerServlet.S3_SIGNER_ENDPOINT,
OAuth2Properties.CREDENTIAL,
"catalog:12345"))
.build(),
Expand Down Expand Up @@ -182,15 +184,7 @@ public void before() throws Exception {
}

private static Server initHttpServer() throws Exception {
S3SignerServlet.SignRequestValidator deleteObjectsWithBody =
new S3SignerServlet.SignRequestValidator(
(s3SignRequest) ->
"post".equalsIgnoreCase(s3SignRequest.method())
&& s3SignRequest.uri().getQuery().contains("delete"),
(s3SignRequest) -> s3SignRequest.body() != null && !s3SignRequest.body().isEmpty(),
"Sign request for delete objects should have a request body");
S3SignerServlet servlet =
new S3SignerServlet(S3ObjectMapper.mapper(), ImmutableList.of(deleteObjectsWithBody));
S3SignerServlet servlet = new S3SignerServlet();
ServletContextHandler servletContext =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContext.addServlet(new ServletHolder(servlet), "/*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@code RESTObjectMapper} instead.
*/
@Deprecated
public class S3ObjectMapper {

private static final JsonFactory FACTORY = new JsonFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import org.apache.iceberg.rest.RESTRequest;
import org.immutables.value.Value;

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
* org.apache.iceberg.rest.requests.RemoteSignRequest} instead.
*/
@Deprecated
@Value.Immutable
public interface S3SignRequest extends RESTRequest {
String region();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
* org.apache.iceberg.rest.requests.RemoteSignRequestParser} instead.
*/
@Deprecated
public class S3SignRequestParser {

private static final String REGION = "region";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import org.apache.iceberg.rest.RESTResponse;
import org.immutables.value.Value;

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
* org.apache.iceberg.rest.responses.RemoteSignResponse} instead.
*/
@Deprecated
@Value.Immutable
public interface S3SignResponse extends RESTResponse {
URI uri();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.JsonUtil;

/**
* @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
* org.apache.iceberg.rest.responses.RemoteSignResponseParser} instead.
*/
@Deprecated
public class S3SignResponseParser {

private static final String URI = "uri";
Expand Down
Loading