From 3186da44e2294884ead021532458b0c352b635e2 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 17 Jun 2025 15:52:08 -0700 Subject: [PATCH] ISSUE-22371: Pulsar authentication for Client authentication using private_key_jwt method --- .../server/src/assemble/LICENSE.bin.txt | 6 +- .../shell/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 35 +- pulsar-broker-auth-oidc/pom.xml | 6 - pulsar-broker-common/pom.xml | 7 +- .../AuthenticationProviderToken.java | 9 + .../utils/auth/tokens/TokensCliUtils.java | 22 +- .../utils/auth/tokens/TokensCliUtilsTest.java | 6 +- .../client/impl/crypto/MessageCryptoBc.java | 2 +- pulsar-client/pom.xml | 23 + .../auth/oauth2/AuthenticationOAuth2.java | 4 + .../auth/oauth2/ClientCredentialsFlow.java | 38 +- .../impl/auth/oauth2/JwtBearerFlow.java | 142 +++++ .../client/impl/auth/oauth2/KeyFile.java | 36 ++ .../protocol/ClientCredentialsExchanger.java | 12 + .../protocol/JwtBearerExchangeRequest.java | 67 +++ .../oauth2/protocol/PrivateKeyReader.java | 486 ++++++++++++++++++ .../auth/oauth2/protocol/TokenClient.java | 59 ++- ...thenticationOAuth2JwtBearerConfigTest.java | 109 ++++ .../AuthenticationOAuth2JwtBearerTest.java | 106 ++++ .../protocol/JwtBearerGeneratorTest.java | 43 ++ .../auth/oauth2/protocol/TokenClientTest.java | 31 ++ 22 files changed, 1162 insertions(+), 89 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/JwtBearerFlow.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index c929378d0b7e6..9c502fd9bd017 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -490,9 +490,9 @@ The Apache Software License, Version 2.0 * OpenHFT - net.openhft-zero-allocation-hashing-0.16.jar * Java JSON WebTokens - - io.jsonwebtoken-jjwt-api-0.11.1.jar - - io.jsonwebtoken-jjwt-impl-0.11.1.jar - - io.jsonwebtoken-jjwt-jackson-0.11.1.jar + - io.jsonwebtoken-jjwt-api-0.11.5.jar + - io.jsonwebtoken-jjwt-impl-0.11.5.jar + - io.jsonwebtoken-jjwt-jackson-0.11.5.jar * JCTools - Java Concurrency Tools for the JVM - org.jctools-jctools-core-2.1.2.jar * Vertx diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 47c57e1df5d5b..bca4dc72429ce 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -413,6 +413,8 @@ The Apache Software License, Version 2.0 - websocket-api-9.4.57.v20241219.jar - websocket-client-9.4.57.v20241219.jar - websocket-common-9.4.57.v20241219.jar + * Java JSON WebTokens + - jjwt-api-0.11.5.jar * SnakeYaml -- snakeyaml-2.0.jar * Google Error Prone Annotations - error_prone_annotations-2.38.0.jar * Javassist -- javassist-3.25.0-GA.jar diff --git a/pom.xml b/pom.xml index a61ca8e67fd6d..e0f83bde01743 100644 --- a/pom.xml +++ b/pom.xml @@ -243,7 +243,7 @@ flexible messaging model and an intuitive client API. 8.0.33 1.15.16.Final - 0.11.1 + 0.11.5 0.28.0 3.4.0 3.6.2 @@ -311,6 +311,7 @@ flexible messaging model and an intuitive client API. 1.20.4 + 2.1.11 3.4.0 2.2 @@ -1203,17 +1204,7 @@ flexible messaging model and an intuitive client API. io.jsonwebtoken - jjwt-api - ${jsonwebtoken.version} - - - io.jsonwebtoken - jjwt-impl - ${jsonwebtoken.version} - - - io.jsonwebtoken - jjwt-jackson + jjwt ${jsonwebtoken.version} @@ -1727,6 +1718,26 @@ flexible messaging model and an intuitive client API. opentelemetry-semconv ${opentelemetry.semconv.version} + + + + io.jsonwebtoken + jjwt-api + ${jsonwebtoken.version} + + + io.jsonwebtoken + jjwt-impl + ${jsonwebtoken.version} + runtime + + + io.jsonwebtoken + jjwt-jackson + ${jsonwebtoken.version} + runtime + + diff --git a/pulsar-broker-auth-oidc/pom.xml b/pulsar-broker-auth-oidc/pom.xml index be5e94eb9f685..2befd827353d8 100644 --- a/pulsar-broker-auth-oidc/pom.xml +++ b/pulsar-broker-auth-oidc/pom.xml @@ -34,10 +34,6 @@ Open ID Connect authentication plugin for broker Pulsar Broker Auth OIDC - - 0.11.5 - - @@ -106,13 +102,11 @@ io.jsonwebtoken jjwt-api - ${jsonwebtoken.version} test io.jsonwebtoken jjwt-impl - ${jsonwebtoken.version} test diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index f018720b5e587..a0ec84f4e9394 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -65,14 +65,19 @@ jakarta.ws.rs-api + + io.jsonwebtoken + jjwt-api + io.jsonwebtoken jjwt-impl + runtime - io.jsonwebtoken jjwt-jackson + runtime diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java index 74bc85ad3ffc3..ae13a276d1316 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java @@ -35,6 +35,7 @@ import java.security.Key; import java.util.Date; import java.util.List; +import java.util.Set; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import javax.servlet.http.HttpServletRequest; @@ -246,6 +247,14 @@ private Jwt authenticateToken(final String token) throws Authenticati throw new AuthenticationException("Audiences in token: [" + String.join(", ", audiences) + "] not contains this broker: " + audience); } + } else if (object instanceof Set) { + Set audiences = (Set) object; + // audience not contains this broker, throw exception. + if (!audiences.contains(audience)) { + incrementFailureMetric(ErrorCode.INVALID_AUDIENCES); + throw new AuthenticationException("Audiences in token: [" + + String.join(", ", audiences) + "] not contains this broker: " + audience); + } } else if (object instanceof String) { if (!object.equals(audience)) { incrementFailureMetric(ErrorCode.INVALID_AUDIENCES); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java index 6f71860164638..9519a85a01524 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java @@ -20,7 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import io.jsonwebtoken.Claims; -import io.jsonwebtoken.Jwt; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.JwtException; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; import io.jsonwebtoken.io.Decoders; @@ -290,13 +291,20 @@ public Integer call() throws Exception { validationKey = AuthTokenUtils.decodeSecretKey(encodedKey); } - // Validate the token - Jwt jwt = Jwts.parserBuilder() - .setSigningKey(validationKey) - .build() - .parseClaimsJws(token); + try { + // Validate the token + Jws jwt = Jwts.parserBuilder() + .setSigningKey(validationKey) + .build() + .parseClaimsJws(token); + + System.out.println(jwt.getBody()); + + } catch (JwtException e) { + System.err.println("JWT Signature verification failed: " + e.getMessage()); + return -1; + } - System.out.println(jwt.getBody()); return 0; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java index 005b160887ec6..33a4bdfb79372 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtilsTest.java @@ -73,12 +73,12 @@ public void testCreateToken() { }; new TokensCliUtils().execute(command); - String token = baoStream.toString(); + String token = baoStream.toString().trim(); Jwt jwt = Jwts.parserBuilder() .setSigningKey(Decoders.BASE64.decode(secretKey)) .build() - .parseClaimsJws(token); + .parse(token); JwsHeader header = (JwsHeader) jwt.getHeader(); String keyId = header.getKeyId(); @@ -108,7 +108,7 @@ public void commandCreateToken_WhenCreatingATokenWithExpiryTime_ShouldHaveTheDes }; new TokensCliUtils().execute(command); - String token = baoStream.toString(); + String token = baoStream.toString().trim(); Instant start = (new Date().toInstant().plus(expireAsSec - 5, ChronoUnit.SECONDS)); Instant stop = (new Date().toInstant().plus(expireAsSec + 5, ChronoUnit.SECONDS)); diff --git a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java index 62cc00cd61ec3..3bfd37a30859d 100644 --- a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java +++ b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java @@ -238,7 +238,7 @@ public static PublicKey loadPublicKey(byte[] keyBytes) throws Exception { return publicKey; } - private PrivateKey loadPrivateKey(byte[] keyBytes) throws Exception { + public static PrivateKey loadPrivateKey(byte[] keyBytes) throws Exception { Reader keyReader = new StringReader(new String(keyBytes)); PrivateKey privateKey = null; diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index e51a1a418780b..781cc06f4115c 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -191,6 +191,22 @@ true + + + io.jsonwebtoken + jjwt-api + + + io.jsonwebtoken + jjwt-impl + runtime + + + io.jsonwebtoken + jjwt-jackson + runtime + + ${project.groupId} @@ -212,6 +228,13 @@ test + + no.nav.security + mock-oauth2-server + ${mock-oauth2-server.version} + test + + org.roaringbitmap RoaringBitmap diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index 694e4681d97f2..ed733cde0ba44 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -41,6 +41,7 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati public static final String CONFIG_PARAM_TYPE = "type"; public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials"; + public static final String TYPE_JWT_BEARER = "jwt_bearer"; public static final String AUTH_METHOD_NAME = "token"; public static final double EXPIRY_ADJUSTMENT = 0.9; private static final long serialVersionUID = 1L; @@ -80,6 +81,9 @@ public void configure(String encodedAuthParamString) { case TYPE_CLIENT_CREDENTIALS: this.flow = ClientCredentialsFlow.fromParameters(params); break; + case TYPE_JWT_BEARER: + this.flow = JwtBearerFlow.fromParameters(params); + break; default: throw new IllegalArgumentException("Unsupported authentication type: " + type); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index ef10f1afdb63b..68785b069944f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -19,17 +19,10 @@ package org.apache.pulsar.client.impl.auth.oauth2; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.URISyntaxException; import java.net.URL; -import java.net.URLConnection; -import java.nio.charset.StandardCharsets; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest; import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; @@ -81,7 +74,7 @@ public TokenResult authenticate() throws PulsarClientException { // read the private key from storage KeyFile keyFile; try { - keyFile = loadPrivateKey(this.privateKey); + keyFile = KeyFile.loadPrivateKey(this.privateKey); } catch (IOException e) { throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); } @@ -133,33 +126,4 @@ public static ClientCredentialsFlow fromParameters(Map params) { .build(); } - /** - * Loads the private key from the given URL. - * @param privateKeyURL - * @return - * @throws IOException - */ - private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException { - try { - URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection(); - try { - String protocol = urlConnection.getURL().getProtocol(); - String contentType = urlConnection.getContentType(); - if ("data".equals(protocol) && !"application/json".equals(contentType)) { - throw new IllegalArgumentException( - "Unsupported media type or encoding format: " + urlConnection.getContentType()); - } - KeyFile privateKey; - try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), - StandardCharsets.UTF_8)) { - privateKey = KeyFile.fromJson(r); - } - return privateKey; - } finally { - IOUtils.close(urlConnection); - } - } catch (URISyntaxException | InstantiationException | IllegalAccessException e) { - throw new IOException("Invalid privateKey format", e); - } - } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/JwtBearerFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/JwtBearerFlow.java new file mode 100644 index 0000000000000..0752a9b76a4a0 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/JwtBearerFlow.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import java.io.IOException; +import java.net.URL; +import java.util.Map; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.JwtBearerExchangeRequest; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException; +import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult; + +@Slf4j +public class JwtBearerFlow extends FlowBase { + + public static final String SIGNATURE_ALGORITHM = "RS256"; + + public static final String CONFIG_PARAM_SIGNATURE_ALGORITHM = "signatureAlgorithm"; + + public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl"; + public static final String CONFIG_PARAM_AUDIENCE = "audience"; + public static final String CONFIG_PARAM_KEY_FILE = "privateKey"; + public static final String CONFIG_PARAM_TOKEN_TTL = "tokenTTlMillis"; + + private static final long serialVersionUID = 1L; + + private final String audience; + private final String privateKey; + private final String signatureAlgorithm; + private final long ttlMillis; + + private transient ClientCredentialsExchanger exchanger; + + private boolean initialized = false; + + public static JwtBearerFlow fromParameters(Map params) { + URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL); + String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE); + // These are optional parameters, so we only perform a get + String audience = params.get(CONFIG_PARAM_AUDIENCE); + String signatureAlgorithm = params.getOrDefault(CONFIG_PARAM_SIGNATURE_ALGORITHM, SIGNATURE_ALGORITHM); + long ttlMillis = Long.parseLong(params.getOrDefault(CONFIG_PARAM_TOKEN_TTL, "3600000")); + + return JwtBearerFlow.builder() + .issuerUrl(issuerUrl) + .audience(audience) + .privateKey(privateKeyUrl) + .signatureAlgorithm(signatureAlgorithm) + .ttlMillis(ttlMillis) + .build(); + } + + @Builder + public JwtBearerFlow(URL issuerUrl, + String audience, + String privateKey, + String signatureAlgorithm, + long ttlMillis) { + super(issuerUrl); + this.audience = audience; + this.privateKey = privateKey; + this.signatureAlgorithm = signatureAlgorithm; + this.ttlMillis = ttlMillis; + } + + @Override + public void initialize() throws PulsarClientException { + super.initialize(); + assert this.metadata != null; + + URL tokenUrl = this.metadata.getTokenEndpoint(); + this.exchanger = new TokenClient(tokenUrl); + initialized = true; + } + + @Override + public TokenResult authenticate() throws PulsarClientException { + // read the private key from storage + KeyFile keyFile; + try { + keyFile = KeyFile.loadPrivateKey(this.privateKey); + } catch (IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage()); + } + + // request an access token using client credentials + final String jwtAssertion; + try { + jwtAssertion = JwtBearerExchangeRequest.generateJWT( + keyFile.getClientId(), + this.audience, + keyFile.getClientSecret(), + this.signatureAlgorithm, + ttlMillis); + } catch (Exception e) { + throw new PulsarClientException("Failed to generate JWT", e); + } + TokenResult tr; + if (!initialized) { + initialize(); + } + + try { + JwtBearerExchangeRequest req = JwtBearerExchangeRequest.builder() + .assertion(jwtAssertion) + .build(); + tr = this.exchanger.exchangeClientCredentials(req); + } catch (TokenExchangeException | IOException e) { + throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: " + + e.getMessage()); + } + + return tr; + } + + @Override + public void close() throws Exception { + if (exchanger != null) { + exchanger.close(); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java index da9eea947060d..4d9ea0c8ae578 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java @@ -21,9 +21,15 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.Reader; +import java.net.URISyntaxException; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; import lombok.Data; import lombok.NoArgsConstructor; +import org.apache.commons.io.IOUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -61,4 +67,34 @@ public static KeyFile fromJson(String value) throws IOException { public static KeyFile fromJson(Reader value) throws IOException { return ObjectMapperFactory.getMapper().reader().readValue(value, KeyFile.class); } + + /** + * Loads the private key from the given URL. + * @param privateKeyURL + * @return + * @throws IOException + */ + public static KeyFile loadPrivateKey(String privateKeyURL) throws IOException { + try { + URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection(); + try { + String protocol = urlConnection.getURL().getProtocol(); + String contentType = urlConnection.getContentType(); + if ("data".equals(protocol) && !"application/json".equals(contentType)) { + throw new IllegalArgumentException( + "Unsupported media type or encoding format: " + urlConnection.getContentType()); + } + KeyFile privateKey; + try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), + StandardCharsets.UTF_8)) { + privateKey = KeyFile.fromJson(r); + } + return privateKey; + } finally { + IOUtils.close(urlConnection); + } + } catch (URISyntaxException | InstantiationException | IllegalAccessException e) { + throw new IOException("Invalid privateKey format", e); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java index 0b95abb4c60b7..537c816e1cecd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java @@ -33,4 +33,16 @@ public interface ClientCredentialsExchanger extends AutoCloseable { */ TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) throws TokenExchangeException, IOException; + + /** + * Requests an exchange of client credentials for a JWT Bearer token. + * @param req the request details. + * @return an access token. + * @throws TokenExchangeException if the OAuth server returned a detailed error. + * @throws IOException if a general IO error occurred. + */ + default TokenResult exchangeClientCredentials(JwtBearerExchangeRequest req) + throws TokenExchangeException, IOException { + throw new UnsupportedOperationException("JWT Bearer exchange is not supported by this exchanger"); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java new file mode 100644 index 0000000000000..ad424d50f406d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.jsonwebtoken.JwtBuilder; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.net.URI; +import java.security.PrivateKey; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import lombok.Builder; +import lombok.Data; + +/** + * A token request based on the exchange of JWT Bearer Token. + * + * @see OAuth 2.0 RFC 7523, section 2.2 + */ +@Data +@Builder +public class JwtBearerExchangeRequest { + + @JsonProperty("assertion") + private String assertion; + + public static String generateJWT(String clientId, + String audience, + String privateKeyStringUrl, + String signatureAlgorithm, + long ttlMillis) + throws Exception { + URI privateKeyStringUri = URI.create(privateKeyStringUrl); + PrivateKey privateKey = PrivateKeyReader.getPrivateKey(privateKeyStringUri); + Instant now = Instant.now(); + + // per https://developer.okta.com/docs/guides/build-self-signed-jwt/java/main/#gather-claims-information + // issuer and subject must be the same as client_id + JwtBuilder builder = Jwts.builder() + .setAudience(audience) + .setIssuedAt(Date.from(now)) + .setExpiration(Date.from(now.plus(ttlMillis, ChronoUnit.MILLIS))) + .setIssuer(clientId) + .setSubject(clientId) + .signWith(privateKey, SignatureAlgorithm.forName(signatureAlgorithm)); + + return builder.compact(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java new file mode 100644 index 0000000000000..be28ea20cf8dd --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLConnection; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.spec.EncodedKeySpec; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.RSAPrivateCrtKeySpec; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; + +/** + * Class for reading RSA private key from PEM file. It uses + * the JMeter FileServer to find the file. So the file should + * be located in the same directory as the test plan if the + * path is relative. + *

+ *

There is a cache so each file is only read once. If file + * is changed, it will not take effect until the program + * restarts. + *

+ *

It can read PEM files with PKCS#8 or PKCS#1 encodings. + * It doesn't support encrypted PEM files. + *

+ * "borrowed" from https://github.com/groovenauts/jmeter_oauth_plugin/blob/master/jmeter/ + * src/main/java/org/apache/jmeter/protocol/oauth/sampler/PrivateKeyReader.java + * with some modifications: + * - not tied to key specified as a file path + * - minus extra dependencies from jmeter + * - minus key caching + */ +@Slf4j +public class PrivateKeyReader { + + // Private key file using PKCS #1 encoding + public static final String P1_BEGIN_MARKER = "-----BEGIN RSA PRIVATE KEY"; //$NON-NLS-1$ + public static final String P1_END_MARKER = "-----END RSA PRIVATE KEY"; //$NON-NLS-1$ + + // Private key file using PKCS #8 encoding + public static final String P8_BEGIN_MARKER = "-----BEGIN PRIVATE KEY"; //$NON-NLS-1$ + public static final String P8_END_MARKER = "-----END PRIVATE KEY"; //$NON-NLS-1$ + + /** + * Read the PEM file and return the key. + * + * @return + * @throws IOException + */ + public static PrivateKey getPrivateKey(URI keyUri) + throws IOException, URISyntaxException, InstantiationException, IllegalAccessException { + + String line; + + KeyFactory factory; + try { + factory = KeyFactory.getInstance("RSA"); //$NON-NLS-1$ + } catch (NoSuchAlgorithmException e) { + throw new IOException("JCE error: " + e.getMessage()); //$NON-NLS-1$ + } + + URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(keyUri.toURL().toString()) + .openConnection(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream) urlConnection.getContent(), + StandardCharsets.UTF_8))) { + + while ((line = reader.readLine()) != null) { + if (line.contains(P1_BEGIN_MARKER)) { + byte[] keyBytes = readKeyMaterial(P1_END_MARKER, reader); + RSAPrivateCrtKeySpec keySpec = getRSAKeySpec(keyBytes); + + try { + return factory.generatePrivate(keySpec); + } catch (InvalidKeySpecException e) { + throw new IOException("Invalid PKCS#1 PEM file: " + e.getMessage()); //$NON-NLS-1$ + } + } + + if (line.contains(P8_BEGIN_MARKER)) { + byte[] keyBytes = readKeyMaterial(P8_END_MARKER, reader); + EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(keyBytes); + + try { + return factory.generatePrivate(keySpec); + } catch (InvalidKeySpecException e) { + throw new IOException("Invalid PKCS#8 PEM file: " + e.getMessage()); //$NON-NLS-1$ + } + } + + } + } + throw new IOException("Invalid PEM file: no begin marker"); //$NON-NLS-1$ + } + + + /** + * Read the PEM file and convert it into binary DER stream. + * + * @return + * @throws IOException + */ + private static byte[] readKeyMaterial(String endMarker, BufferedReader reader) throws IOException { + String line; + StringBuilder buf = new StringBuilder(); + + while ((line = reader.readLine()) != null) { + if (line.contains(endMarker)) { + + return new Base64().decode(buf.toString().getBytes(StandardCharsets.UTF_8)); + } + + buf.append(line.trim()); + } + + throw new IOException("Invalid PEM file: No end marker"); //$NON-NLS-1$ + } + + /** + * Convert PKCS#1 encoded private key into RSAPrivateCrtKeySpec. + *

+ *

The ASN.1 syntax for the private key with CRT is + * + *

+     * --
+     * -- Representation of RSA private key with information for the CRT algorithm.
+     * --
+     * RSAPrivateKey ::= SEQUENCE {
+     *   version           Version,
+     *   modulus           INTEGER,  -- n
+     *   publicExponent    INTEGER,  -- e
+     *   privateExponent   INTEGER,  -- d
+     *   prime1            INTEGER,  -- p
+     *   prime2            INTEGER,  -- q
+     *   exponent1         INTEGER,  -- d mod (p-1)
+     *   exponent2         INTEGER,  -- d mod (q-1)
+     *   coefficient       INTEGER,  -- (inverse of q) mod p
+     *   otherPrimeInfos   OtherPrimeInfos OPTIONAL
+     * }
+     * 
+ * + * @param keyBytes PKCS#1 encoded key + * @return KeySpec + * @throws IOException + */ + + private static RSAPrivateCrtKeySpec getRSAKeySpec(byte[] keyBytes) throws IOException { + + DerParser parser = new DerParser(keyBytes); + + Asn1Object sequence = parser.read(); + if (sequence.getType() != DerParser.SEQUENCE) { + throw new IOException("Invalid DER: not a sequence"); //$NON-NLS-1$ + } + + // Parse inside the sequence + parser = sequence.getParser(); + + parser.read(); // Skip version + BigInteger modulus = parser.read().getInteger(); + BigInteger publicExp = parser.read().getInteger(); + BigInteger privateExp = parser.read().getInteger(); + BigInteger prime1 = parser.read().getInteger(); + BigInteger prime2 = parser.read().getInteger(); + BigInteger exp1 = parser.read().getInteger(); + BigInteger exp2 = parser.read().getInteger(); + BigInteger crtCoef = parser.read().getInteger(); + + RSAPrivateCrtKeySpec keySpec = new RSAPrivateCrtKeySpec( + modulus, publicExp, privateExp, prime1, prime2, + exp1, exp2, crtCoef); + + return keySpec; + } +} + +/** + * A bare-minimum ASN.1 DER decoder, just having enough functions to + * decode PKCS#1 private keys. Especially, it doesn't handle explicitly + * tagged types with an outer tag. + *

+ *

This parser can only handle one layer. To parse nested constructs, + * get a new parser for each layer using Asn1Object.getParser(). + *

+ *

There are many DER decoders in JRE but using them will tie this + * program to a specific JCE/JVM. + */ +class DerParser { + + // Classes + public static final int UNIVERSAL = 0x00; + public static final int APPLICATION = 0x40; + public static final int CONTEXT = 0x80; + public static final int PRIVATE = 0xC0; + + // Constructed Flag + public static final int CONSTRUCTED = 0x20; + + // Tag and data types + public static final int ANY = 0x00; + public static final int BOOLEAN = 0x01; + public static final int INTEGER = 0x02; + public static final int BIT_STRING = 0x03; + public static final int OCTET_STRING = 0x04; + public static final int NULL = 0x05; + public static final int OBJECT_IDENTIFIER = 0x06; + public static final int REAL = 0x09; + public static final int ENUMERATED = 0x0a; + public static final int RELATIVE_OID = 0x0d; + + public static final int SEQUENCE = 0x10; + public static final int SET = 0x11; + + public static final int NUMERIC_STRING = 0x12; + public static final int PRINTABLE_STRING = 0x13; + public static final int T61_STRING = 0x14; + public static final int VIDEOTEX_STRING = 0x15; + public static final int IA5_STRING = 0x16; + public static final int GRAPHIC_STRING = 0x19; + public static final int ISO646_STRING = 0x1A; + public static final int GENERAL_STRING = 0x1B; + + public static final int UTF8_STRING = 0x0C; + public static final int UNIVERSAL_STRING = 0x1C; + public static final int BMP_STRING = 0x1E; + + public static final int UTC_TIME = 0x17; + public static final int GENERALIZED_TIME = 0x18; + + protected InputStream in; + + /** + * Create a new DER decoder from an input stream. + * + * @param in The DER encoded stream + */ + public DerParser(InputStream in) throws IOException { + this.in = in; + } + + /** + * Create a new DER decoder from a byte array. + * + * @param bytes encoded bytes + * @throws IOException + */ + public DerParser(byte[] bytes) throws IOException { + this(new ByteArrayInputStream(bytes)); + } + + /** + * Read next object. If it's constructed, the value holds + * encoded content and it should be parsed by a new + * parser from Asn1Object.getParser. + * + * @return A object + * @throws IOException + */ + public Asn1Object read() throws IOException { + int tag = in.read(); + + if (tag == -1) { + throw new IOException("Invalid DER: stream too short, missing tag"); //$NON-NLS-1$ + } + + int length = getLength(); + + byte[] value = new byte[length]; + int n = in.read(value); + if (n < length) { + throw new IOException("Invalid DER: stream too short, missing value"); //$NON-NLS-1$ + } + + Asn1Object o = new Asn1Object(tag, length, value); + + return o; + } + + /** + * Decode the length of the field. Can only support length + * encoding up to 4 octets. + *

+ *

In BER/DER encoding, length can be encoded in 2 forms, + *

    + *
  • Short form. One octet. Bit 8 has value "0" and bits 7-1 + * give the length. + *
  • Long form. Two to 127 octets (only 4 is supported here). + * Bit 8 of first octet has value "1" and bits 7-1 give the + * number of additional length octets. Second and following + * octets give the length, base 256, most significant digit first. + *
+ * + * @return The length as integer + * @throws IOException + */ + private int getLength() throws IOException { + + int i = in.read(); + if (i == -1) { + throw new IOException("Invalid DER: length missing"); //$NON-NLS-1$ + } + + // A single byte short length + if ((i & ~0x7F) == 0) { + return i; + } + + int num = i & 0x7F; + + // We can't handle length longer than 4 bytes + if (i >= 0xFF || num > 4) { + throw new IOException("Invalid DER: length field too big (" //$NON-NLS-1$ + + i + ")"); //$NON-NLS-1$ + } + + byte[] bytes = new byte[num]; + int n = in.read(bytes); + if (n < num) { + throw new IOException("Invalid DER: length too short"); //$NON-NLS-1$ + } + + return new BigInteger(1, bytes).intValue(); + } + +} + + +/** + * An ASN.1 TLV. The object is not parsed. It can + * only handle integers and strings. + */ +class Asn1Object { + + protected final int type; + protected final int length; + protected final byte[] value; + protected final int tag; + + /** + * Construct a ASN.1 TLV. The TLV could be either a + * constructed or primitive entity. + *

+ *

The first byte in DER encoding is made of following fields, + *

+     * -------------------------------------------------
+     * |Bit 8|Bit 7|Bit 6|Bit 5|Bit 4|Bit 3|Bit 2|Bit 1|
+     * -------------------------------------------------
+     * |  Class    | CF  |     +      Type             |
+     * -------------------------------------------------
+     * 
+ *
    + *
  • Class: Universal, Application, Context or Private + *
  • CF: Constructed flag. If 1, the field is constructed. + *
  • Type: This is actually called tag in ASN.1. It + * indicates data type (Integer, String) or a construct + * (sequence, choice, set). + *
+ * + * @param tag Tag or Identifier + * @param length Length of the field + * @param value Encoded octet string for the field. + */ + public Asn1Object(int tag, int length, byte[] value) { + this.tag = tag; + this.type = tag & 0x1F; + this.length = length; + this.value = value; + } + + public int getType() { + return type; + } + + public int getLength() { + return length; + } + + public byte[] getValue() { + return value; + } + + public boolean isConstructed() { + return (tag & DerParser.CONSTRUCTED) == DerParser.CONSTRUCTED; + } + + /** + * For constructed field, return a parser for its content. + * + * @return A parser for the construct. + * @throws IOException + */ + public DerParser getParser() throws IOException { + if (!isConstructed()) { + throw new IOException("Invalid DER: can't parse primitive entity"); //$NON-NLS-1$ + } + + return new DerParser(value); + } + + /** + * Get the value as integer. + * + * @return BigInteger + * @throws IOException + */ + public BigInteger getInteger() throws IOException { + if (type != DerParser.INTEGER) { + throw new IOException("Invalid DER: object is not integer"); //$NON-NLS-1$ + } + + return new BigInteger(value); + } + + /** + * Get value as string. Most strings are treated + * as Latin-1. + * + * @return Java string + * @throws IOException + */ + public String getString() throws IOException { + Charset charset; + + switch (type) { + + // Not all are Latin-1 but it's the closest thing + case DerParser.NUMERIC_STRING: + case DerParser.PRINTABLE_STRING: + case DerParser.VIDEOTEX_STRING: + case DerParser.IA5_STRING: + case DerParser.GRAPHIC_STRING: + case DerParser.ISO646_STRING: + case DerParser.GENERAL_STRING: + // "ISO-8859-1"; //$NON-NLS-1$ + charset = StandardCharsets.ISO_8859_1; + break; + + case DerParser.BMP_STRING: + // "UTF-16BE"; //$NON-NLS-1$ + charset = StandardCharsets.UTF_16BE; + break; + + case DerParser.UTF8_STRING: + // "UTF-8"; //$NON-NLS-1$ + charset = StandardCharsets.UTF_8; + break; + + case DerParser.UNIVERSAL_STRING: + throw new IOException("Invalid DER: can't handle UCS-4 string"); //$NON-NLS-1$ + + default: + throw new IOException("Invalid DER: object is not a string"); //$NON-NLS-1$ + } + + return new String(value, charset); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index f4e4c770e67fc..9af2dc6f08103 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -89,12 +89,31 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) { if (!StringUtils.isBlank(req.getScope())) { bodyMap.put("scope", req.getScope()); } + return encodeUrlParams(bodyMap); + } + + String buildClientCredentialsBody(JwtBearerExchangeRequest req) { + Map bodyMap = new TreeMap<>(); + // https://datatracker.ietf.org/doc/html/rfc7523#section-2.2 + bodyMap.put("grant_type", "client_credentials"); + bodyMap.put("client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer"); + bodyMap.put("client_assertion", req.getAssertion()); + return encodeUrlParams(bodyMap); + } + + private static String encodeUrlParams(Map bodyMap) { return bodyMap.entrySet().stream() .map(e -> URLEncoder.encode(e.getKey(), StandardCharsets.UTF_8) + '=' + URLEncoder.encode(e.getValue(), StandardCharsets.UTF_8)) .collect(Collectors.joining("&")); } + public TokenResult exchangeClientCredentials(JwtBearerExchangeRequest req) + throws TokenExchangeException, IOException { + String body = buildClientCredentialsBody(req); + return exchangeClientCredentials(body); + } + /** * Performs a token exchange using client credentials. * @param req the client credentials request details. @@ -104,9 +123,12 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) { public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) throws TokenExchangeException, IOException { String body = buildClientCredentialsBody(req); + return exchangeClientCredentials(body); + } + private TokenResult exchangeClientCredentials(String body) + throws TokenExchangeException, IOException { try { - Response res = httpClient.preparePost(tokenUrl.toString()) .setHeader("Accept", "application/json") .setHeader("Content-Type", "application/x-www-form-urlencoded") @@ -115,25 +137,24 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re .get(); switch (res.getStatusCode()) { - case 200: - return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenResult.class); - - case 400: // Bad request - case 401: // Unauthorized - throw new TokenExchangeException( - ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), - TokenError.class)); - - default: - throw new IOException( - "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); + case 200: + return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenResult.class); + case 400: // Bad request + case 401: // Unauthorized + throw new TokenExchangeException( + ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(), + TokenError.class)); + default: + throw new IOException( + "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); } - - - - } catch (InterruptedException | ExecutionException e1) { - throw new IOException(e1); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException(ie); + } catch (ExecutionException ee) { + throw new IOException(ee); } } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java new file mode 100644 index 0000000000000..540cbafd81784 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests {@link AuthenticationOAuth2 with JwtBearerFlow}. + */ +public class AuthenticationOAuth2JwtBearerConfigTest { + private static final ObjectMapper mapper = new ObjectMapper(); + + private AuthenticationOAuth2 auth; + + @BeforeMethod + public void before() { + MockClock clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC); + Flow flow = mock(JwtBearerFlow.class); + this.auth = new AuthenticationOAuth2(flow, clock); + } + + private static Map getMinimalAuthConfig() { + Map params = new HashMap<>(); + params.put("type", "jwt_bearer"); + return params; + } + + private Map getMinimalWorkingAuthConfig() { + Map params = getMinimalAuthConfig(); + params.put(JwtBearerFlow.CONFIG_PARAM_ISSUER_URL, "http://localhost/.well-known/openid-configuration"); + params.put(JwtBearerFlow.CONFIG_PARAM_AUDIENCE, "http://localhost"); + params.put(JwtBearerFlow.CONFIG_PARAM_KEY_FILE, "data:base64,e30="); // empty private key + + return params; + } + + private static void configureAuth(AuthenticationOAuth2 auth, Map params) throws Exception { + String authParams = mapper.writeValueAsString(params); + auth.configure(authParams); + } + + @Test + public void testGetAuthMethodName() throws Exception { + Map params = getMinimalWorkingAuthConfig(); + configureAuth(auth, params); + + assertNotNull(this.auth.flow); + assertEquals(this.auth.getAuthMethodName(), "token"); + } + + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Unsupported.*") + public void testUnknownType() throws Exception { + Map params = getMinimalAuthConfig(); + params.put("type", "garbage"); + configureAuth(auth, params); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*") + public void testMissingParams() throws Exception { + Map params = getMinimalAuthConfig(); + configureAuth(auth, params); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*") + public void testConfigureRequired() throws Exception { + Map params = getMinimalAuthConfig(); + configureAuth(auth, params); + + this.auth.configure("{}"); + } + + @Test + public void testConfigureWithOptionalParams() throws Exception { + Map params = getMinimalWorkingAuthConfig(); + params.put(JwtBearerFlow.CONFIG_PARAM_TOKEN_TTL, "600000"); + params.put(JwtBearerFlow.CONFIG_PARAM_SIGNATURE_ALGORITHM, "RS256"); + configureAuth(auth, params); + + assertNotNull(this.auth.flow); + assertEquals(this.auth.getAuthMethodName(), "token"); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java new file mode 100644 index 0000000000000..219720495dd55 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import no.nav.security.mock.oauth2.MockOAuth2Server; +import org.apache.commons.io.FileUtils; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Tests {@link AuthenticationOAuth2}. + */ +public class AuthenticationOAuth2JwtBearerTest { + static final String RSA_PRIVATE_KEY = "./src/test/resources/crypto_rsa_private.key"; + + private static final ObjectMapper mapper = new ObjectMapper(); + + private AuthenticationOAuth2 auth; + + MockOAuth2Server server; + String wellKnownUrl; + + @BeforeMethod + public void before() throws Exception { + server = new MockOAuth2Server(); + server.start(); + wellKnownUrl = server.wellKnownUrl("default").toString(); + this.auth = new AuthenticationOAuth2(); + } + + @AfterMethod + public void after() { + if (server != null) { + server.shutdown(); + } + } + + private static Map getWorkingAuthConfig(String wellKnownUrl) throws Exception { + Map params = new HashMap<>(); + params.put("type", "jwt_bearer"); + params.put(JwtBearerFlow.CONFIG_PARAM_ISSUER_URL, wellKnownUrl); + params.put(JwtBearerFlow.CONFIG_PARAM_AUDIENCE, "http://localhost"); + String privateKey = Paths.get(RSA_PRIVATE_KEY).toUri().toURL().toString(); + + KeyFile kf = new KeyFile(); + kf.setType("jwt_bearer"); + kf.setIssuerUrl(wellKnownUrl); + kf.setClientId("test-client-id"); + kf.setClientEmail("test-client-email@noop.com"); + kf.setClientSecret(privateKey); + + Path filePath = Files.createTempFile("keyfile", ".json"); + filePath.toFile().deleteOnExit(); + FileUtils.writeStringToFile(filePath.toFile(), kf.toJson(), StandardCharsets.UTF_8); + + params.put(JwtBearerFlow.CONFIG_PARAM_KEY_FILE, filePath.toUri().toURL().toString()); + + return params; + } + + private static void configureAuth(AuthenticationOAuth2 auth, Map params) throws Exception { + String authParams = mapper.writeValueAsString(params); + auth.configure(authParams); + } + + @Test + public void testE2e() throws Exception { + Map params = getWorkingAuthConfig(wellKnownUrl); + configureAuth(auth, params); + + assertNotNull(this.auth.flow); + assertEquals(this.auth.getAuthMethodName(), "token"); + + AuthenticationDataProvider data; + data = this.auth.getAuthData(); + assertNotNull(data.getCommandData()); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java new file mode 100644 index 0000000000000..58b90f777142e --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.auth.oauth2.protocol; + +import java.nio.file.Paths; +import junit.framework.TestCase; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class JwtBearerGeneratorTest extends TestCase { + + final String rsaPrivateKey = "./src/test/resources/crypto_rsa_private.key"; + + @Test + public void testGenerateJwt() throws Exception { + + String privateKey = Paths.get(rsaPrivateKey).toUri().toURL().toString(); + + String jwt = JwtBearerExchangeRequest.generateJWT("client-id", + "audience", + privateKey, + "RS256", + 30000); + assertNotNull(jwt); + } + +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java index 131427682076e..a072874e75917 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java @@ -99,4 +99,35 @@ public void exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest TokenResult tr = tokenClient.exchangeClientCredentials(request); assertNotNull(tr); } + + @Test + @SuppressWarnings("unchecked") + public void exchangeJwtBearerTest() throws + IOException, TokenExchangeException, ExecutionException, InterruptedException { + DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class); + URL url = new URL("http://localhost"); + TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient); + JwtBearerExchangeRequest request = JwtBearerExchangeRequest.builder() + .assertion("client-assertion-test") + .build(); + String body = tokenClient.buildClientCredentialsBody(request); + BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class); + Response response = mock(Response.class); + ListenableFuture listenableFuture = mock(ListenableFuture.class); + when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded")) + .thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.execute()).thenReturn(listenableFuture); + when(listenableFuture.get()).thenReturn(response); + when(response.getStatusCode()).thenReturn(200); + TokenResult tokenResult = new TokenResult(); + tokenResult.setAccessToken("test-access-token"); + tokenResult.setIdToken("test-id"); + when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes()); + TokenResult tr = tokenClient.exchangeClientCredentials(request); + assertNotNull(tr); + } + }