params) {
+ URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
+ String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+ // These are optional parameters, so we only perform a get
+ String audience = params.get(CONFIG_PARAM_AUDIENCE);
+ String signatureAlgorithm = params.getOrDefault(CONFIG_PARAM_SIGNATURE_ALGORITHM, SIGNATURE_ALGORITHM);
+ long ttlMillis = Long.parseLong(params.getOrDefault(CONFIG_PARAM_TOKEN_TTL, "3600000"));
+
+ return JwtBearerFlow.builder()
+ .issuerUrl(issuerUrl)
+ .audience(audience)
+ .privateKey(privateKeyUrl)
+ .signatureAlgorithm(signatureAlgorithm)
+ .ttlMillis(ttlMillis)
+ .build();
+ }
+
+ @Builder
+ public JwtBearerFlow(URL issuerUrl,
+ String audience,
+ String privateKey,
+ String signatureAlgorithm,
+ long ttlMillis) {
+ super(issuerUrl);
+ this.audience = audience;
+ this.privateKey = privateKey;
+ this.signatureAlgorithm = signatureAlgorithm;
+ this.ttlMillis = ttlMillis;
+ }
+
+ @Override
+ public void initialize() throws PulsarClientException {
+ super.initialize();
+ assert this.metadata != null;
+
+ URL tokenUrl = this.metadata.getTokenEndpoint();
+ this.exchanger = new TokenClient(tokenUrl);
+ initialized = true;
+ }
+
+ @Override
+ public TokenResult authenticate() throws PulsarClientException {
+ // read the private key from storage
+ KeyFile keyFile;
+ try {
+ keyFile = KeyFile.loadPrivateKey(this.privateKey);
+ } catch (IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
+ }
+
+ // request an access token using client credentials
+ final String jwtAssertion;
+ try {
+ jwtAssertion = JwtBearerExchangeRequest.generateJWT(
+ keyFile.getClientId(),
+ this.audience,
+ keyFile.getClientSecret(),
+ this.signatureAlgorithm,
+ ttlMillis);
+ } catch (Exception e) {
+ throw new PulsarClientException("Failed to generate JWT", e);
+ }
+ TokenResult tr;
+ if (!initialized) {
+ initialize();
+ }
+
+ try {
+ JwtBearerExchangeRequest req = JwtBearerExchangeRequest.builder()
+ .assertion(jwtAssertion)
+ .build();
+ tr = this.exchanger.exchangeClientCredentials(req);
+ } catch (TokenExchangeException | IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ + e.getMessage());
+ }
+
+ return tr;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (exchanger != null) {
+ exchanger.close();
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
index da9eea947060d..4d9ea0c8ae578 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
@@ -21,9 +21,15 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.io.IOUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -61,4 +67,34 @@ public static KeyFile fromJson(String value) throws IOException {
public static KeyFile fromJson(Reader value) throws IOException {
return ObjectMapperFactory.getMapper().reader().readValue(value, KeyFile.class);
}
+
+ /**
+ * Loads the private key from the given URL.
+ * @param privateKeyURL
+ * @return
+ * @throws IOException
+ */
+ public static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
+ try {
+ URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
+ try {
+ String protocol = urlConnection.getURL().getProtocol();
+ String contentType = urlConnection.getContentType();
+ if ("data".equals(protocol) && !"application/json".equals(contentType)) {
+ throw new IllegalArgumentException(
+ "Unsupported media type or encoding format: " + urlConnection.getContentType());
+ }
+ KeyFile privateKey;
+ try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(),
+ StandardCharsets.UTF_8)) {
+ privateKey = KeyFile.fromJson(r);
+ }
+ return privateKey;
+ } finally {
+ IOUtils.close(urlConnection);
+ }
+ } catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
+ throw new IOException("Invalid privateKey format", e);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
index 0b95abb4c60b7..537c816e1cecd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -33,4 +33,16 @@ public interface ClientCredentialsExchanger extends AutoCloseable {
*/
TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
throws TokenExchangeException, IOException;
+
+ /**
+ * Requests an exchange of client credentials for a JWT Bearer token.
+ * @param req the request details.
+ * @return an access token.
+ * @throws TokenExchangeException if the OAuth server returned a detailed error.
+ * @throws IOException if a general IO error occurred.
+ */
+ default TokenResult exchangeClientCredentials(JwtBearerExchangeRequest req)
+ throws TokenExchangeException, IOException {
+ throw new UnsupportedOperationException("JWT Bearer exchange is not supported by this exchanger");
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java
new file mode 100644
index 0000000000000..ad424d50f406d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerExchangeRequest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.jsonwebtoken.JwtBuilder;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.net.URI;
+import java.security.PrivateKey;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A token request based on the exchange of JWT Bearer Token.
+ *
+ * @see OAuth 2.0 RFC 7523, section 2.2
+ */
+@Data
+@Builder
+public class JwtBearerExchangeRequest {
+
+ @JsonProperty("assertion")
+ private String assertion;
+
+ public static String generateJWT(String clientId,
+ String audience,
+ String privateKeyStringUrl,
+ String signatureAlgorithm,
+ long ttlMillis)
+ throws Exception {
+ URI privateKeyStringUri = URI.create(privateKeyStringUrl);
+ PrivateKey privateKey = PrivateKeyReader.getPrivateKey(privateKeyStringUri);
+ Instant now = Instant.now();
+
+ // per https://developer.okta.com/docs/guides/build-self-signed-jwt/java/main/#gather-claims-information
+ // issuer and subject must be the same as client_id
+ JwtBuilder builder = Jwts.builder()
+ .setAudience(audience)
+ .setIssuedAt(Date.from(now))
+ .setExpiration(Date.from(now.plus(ttlMillis, ChronoUnit.MILLIS)))
+ .setIssuer(clientId)
+ .setSubject(clientId)
+ .signWith(privateKey, SignatureAlgorithm.forName(signatureAlgorithm));
+
+ return builder.compact();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java
new file mode 100644
index 0000000000000..be28ea20cf8dd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/PrivateKeyReader.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLConnection;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyFactory;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.spec.EncodedKeySpec;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.security.spec.RSAPrivateCrtKeySpec;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+
+/**
+ * Class for reading RSA private key from PEM file. It uses
+ * the JMeter FileServer to find the file. So the file should
+ * be located in the same directory as the test plan if the
+ * path is relative.
+ *
+ *
There is a cache so each file is only read once. If file
+ * is changed, it will not take effect until the program
+ * restarts.
+ *
+ *
It can read PEM files with PKCS#8 or PKCS#1 encodings.
+ * It doesn't support encrypted PEM files.
+ *
+ * "borrowed" from https://github.com/groovenauts/jmeter_oauth_plugin/blob/master/jmeter/
+ * src/main/java/org/apache/jmeter/protocol/oauth/sampler/PrivateKeyReader.java
+ * with some modifications:
+ * - not tied to key specified as a file path
+ * - minus extra dependencies from jmeter
+ * - minus key caching
+ */
+@Slf4j
+public class PrivateKeyReader {
+
+ // Private key file using PKCS #1 encoding
+ public static final String P1_BEGIN_MARKER = "-----BEGIN RSA PRIVATE KEY"; //$NON-NLS-1$
+ public static final String P1_END_MARKER = "-----END RSA PRIVATE KEY"; //$NON-NLS-1$
+
+ // Private key file using PKCS #8 encoding
+ public static final String P8_BEGIN_MARKER = "-----BEGIN PRIVATE KEY"; //$NON-NLS-1$
+ public static final String P8_END_MARKER = "-----END PRIVATE KEY"; //$NON-NLS-1$
+
+ /**
+ * Read the PEM file and return the key.
+ *
+ * @return
+ * @throws IOException
+ */
+ public static PrivateKey getPrivateKey(URI keyUri)
+ throws IOException, URISyntaxException, InstantiationException, IllegalAccessException {
+
+ String line;
+
+ KeyFactory factory;
+ try {
+ factory = KeyFactory.getInstance("RSA"); //$NON-NLS-1$
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException("JCE error: " + e.getMessage()); //$NON-NLS-1$
+ }
+
+ URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(keyUri.toURL().toString())
+ .openConnection();
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream) urlConnection.getContent(),
+ StandardCharsets.UTF_8))) {
+
+ while ((line = reader.readLine()) != null) {
+ if (line.contains(P1_BEGIN_MARKER)) {
+ byte[] keyBytes = readKeyMaterial(P1_END_MARKER, reader);
+ RSAPrivateCrtKeySpec keySpec = getRSAKeySpec(keyBytes);
+
+ try {
+ return factory.generatePrivate(keySpec);
+ } catch (InvalidKeySpecException e) {
+ throw new IOException("Invalid PKCS#1 PEM file: " + e.getMessage()); //$NON-NLS-1$
+ }
+ }
+
+ if (line.contains(P8_BEGIN_MARKER)) {
+ byte[] keyBytes = readKeyMaterial(P8_END_MARKER, reader);
+ EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(keyBytes);
+
+ try {
+ return factory.generatePrivate(keySpec);
+ } catch (InvalidKeySpecException e) {
+ throw new IOException("Invalid PKCS#8 PEM file: " + e.getMessage()); //$NON-NLS-1$
+ }
+ }
+
+ }
+ }
+ throw new IOException("Invalid PEM file: no begin marker"); //$NON-NLS-1$
+ }
+
+
+ /**
+ * Read the PEM file and convert it into binary DER stream.
+ *
+ * @return
+ * @throws IOException
+ */
+ private static byte[] readKeyMaterial(String endMarker, BufferedReader reader) throws IOException {
+ String line;
+ StringBuilder buf = new StringBuilder();
+
+ while ((line = reader.readLine()) != null) {
+ if (line.contains(endMarker)) {
+
+ return new Base64().decode(buf.toString().getBytes(StandardCharsets.UTF_8));
+ }
+
+ buf.append(line.trim());
+ }
+
+ throw new IOException("Invalid PEM file: No end marker"); //$NON-NLS-1$
+ }
+
+ /**
+ * Convert PKCS#1 encoded private key into RSAPrivateCrtKeySpec.
+ *
+ *
The ASN.1 syntax for the private key with CRT is
+ *
+ *
+ * --
+ * -- Representation of RSA private key with information for the CRT algorithm.
+ * --
+ * RSAPrivateKey ::= SEQUENCE {
+ * version Version,
+ * modulus INTEGER, -- n
+ * publicExponent INTEGER, -- e
+ * privateExponent INTEGER, -- d
+ * prime1 INTEGER, -- p
+ * prime2 INTEGER, -- q
+ * exponent1 INTEGER, -- d mod (p-1)
+ * exponent2 INTEGER, -- d mod (q-1)
+ * coefficient INTEGER, -- (inverse of q) mod p
+ * otherPrimeInfos OtherPrimeInfos OPTIONAL
+ * }
+ *
+ *
+ * @param keyBytes PKCS#1 encoded key
+ * @return KeySpec
+ * @throws IOException
+ */
+
+ private static RSAPrivateCrtKeySpec getRSAKeySpec(byte[] keyBytes) throws IOException {
+
+ DerParser parser = new DerParser(keyBytes);
+
+ Asn1Object sequence = parser.read();
+ if (sequence.getType() != DerParser.SEQUENCE) {
+ throw new IOException("Invalid DER: not a sequence"); //$NON-NLS-1$
+ }
+
+ // Parse inside the sequence
+ parser = sequence.getParser();
+
+ parser.read(); // Skip version
+ BigInteger modulus = parser.read().getInteger();
+ BigInteger publicExp = parser.read().getInteger();
+ BigInteger privateExp = parser.read().getInteger();
+ BigInteger prime1 = parser.read().getInteger();
+ BigInteger prime2 = parser.read().getInteger();
+ BigInteger exp1 = parser.read().getInteger();
+ BigInteger exp2 = parser.read().getInteger();
+ BigInteger crtCoef = parser.read().getInteger();
+
+ RSAPrivateCrtKeySpec keySpec = new RSAPrivateCrtKeySpec(
+ modulus, publicExp, privateExp, prime1, prime2,
+ exp1, exp2, crtCoef);
+
+ return keySpec;
+ }
+}
+
+/**
+ * A bare-minimum ASN.1 DER decoder, just having enough functions to
+ * decode PKCS#1 private keys. Especially, it doesn't handle explicitly
+ * tagged types with an outer tag.
+ *
+ *
This parser can only handle one layer. To parse nested constructs,
+ * get a new parser for each layer using Asn1Object.getParser().
+ *
+ *
There are many DER decoders in JRE but using them will tie this
+ * program to a specific JCE/JVM.
+ */
+class DerParser {
+
+ // Classes
+ public static final int UNIVERSAL = 0x00;
+ public static final int APPLICATION = 0x40;
+ public static final int CONTEXT = 0x80;
+ public static final int PRIVATE = 0xC0;
+
+ // Constructed Flag
+ public static final int CONSTRUCTED = 0x20;
+
+ // Tag and data types
+ public static final int ANY = 0x00;
+ public static final int BOOLEAN = 0x01;
+ public static final int INTEGER = 0x02;
+ public static final int BIT_STRING = 0x03;
+ public static final int OCTET_STRING = 0x04;
+ public static final int NULL = 0x05;
+ public static final int OBJECT_IDENTIFIER = 0x06;
+ public static final int REAL = 0x09;
+ public static final int ENUMERATED = 0x0a;
+ public static final int RELATIVE_OID = 0x0d;
+
+ public static final int SEQUENCE = 0x10;
+ public static final int SET = 0x11;
+
+ public static final int NUMERIC_STRING = 0x12;
+ public static final int PRINTABLE_STRING = 0x13;
+ public static final int T61_STRING = 0x14;
+ public static final int VIDEOTEX_STRING = 0x15;
+ public static final int IA5_STRING = 0x16;
+ public static final int GRAPHIC_STRING = 0x19;
+ public static final int ISO646_STRING = 0x1A;
+ public static final int GENERAL_STRING = 0x1B;
+
+ public static final int UTF8_STRING = 0x0C;
+ public static final int UNIVERSAL_STRING = 0x1C;
+ public static final int BMP_STRING = 0x1E;
+
+ public static final int UTC_TIME = 0x17;
+ public static final int GENERALIZED_TIME = 0x18;
+
+ protected InputStream in;
+
+ /**
+ * Create a new DER decoder from an input stream.
+ *
+ * @param in The DER encoded stream
+ */
+ public DerParser(InputStream in) throws IOException {
+ this.in = in;
+ }
+
+ /**
+ * Create a new DER decoder from a byte array.
+ *
+ * @param bytes encoded bytes
+ * @throws IOException
+ */
+ public DerParser(byte[] bytes) throws IOException {
+ this(new ByteArrayInputStream(bytes));
+ }
+
+ /**
+ * Read next object. If it's constructed, the value holds
+ * encoded content and it should be parsed by a new
+ * parser from Asn1Object.getParser.
+ *
+ * @return A object
+ * @throws IOException
+ */
+ public Asn1Object read() throws IOException {
+ int tag = in.read();
+
+ if (tag == -1) {
+ throw new IOException("Invalid DER: stream too short, missing tag"); //$NON-NLS-1$
+ }
+
+ int length = getLength();
+
+ byte[] value = new byte[length];
+ int n = in.read(value);
+ if (n < length) {
+ throw new IOException("Invalid DER: stream too short, missing value"); //$NON-NLS-1$
+ }
+
+ Asn1Object o = new Asn1Object(tag, length, value);
+
+ return o;
+ }
+
+ /**
+ * Decode the length of the field. Can only support length
+ * encoding up to 4 octets.
+ *
+ *
In BER/DER encoding, length can be encoded in 2 forms,
+ *
+ * - Short form. One octet. Bit 8 has value "0" and bits 7-1
+ * give the length.
+ *
- Long form. Two to 127 octets (only 4 is supported here).
+ * Bit 8 of first octet has value "1" and bits 7-1 give the
+ * number of additional length octets. Second and following
+ * octets give the length, base 256, most significant digit first.
+ *
+ *
+ * @return The length as integer
+ * @throws IOException
+ */
+ private int getLength() throws IOException {
+
+ int i = in.read();
+ if (i == -1) {
+ throw new IOException("Invalid DER: length missing"); //$NON-NLS-1$
+ }
+
+ // A single byte short length
+ if ((i & ~0x7F) == 0) {
+ return i;
+ }
+
+ int num = i & 0x7F;
+
+ // We can't handle length longer than 4 bytes
+ if (i >= 0xFF || num > 4) {
+ throw new IOException("Invalid DER: length field too big (" //$NON-NLS-1$
+ + i + ")"); //$NON-NLS-1$
+ }
+
+ byte[] bytes = new byte[num];
+ int n = in.read(bytes);
+ if (n < num) {
+ throw new IOException("Invalid DER: length too short"); //$NON-NLS-1$
+ }
+
+ return new BigInteger(1, bytes).intValue();
+ }
+
+}
+
+
+/**
+ * An ASN.1 TLV. The object is not parsed. It can
+ * only handle integers and strings.
+ */
+class Asn1Object {
+
+ protected final int type;
+ protected final int length;
+ protected final byte[] value;
+ protected final int tag;
+
+ /**
+ * Construct a ASN.1 TLV. The TLV could be either a
+ * constructed or primitive entity.
+ *
+ *
The first byte in DER encoding is made of following fields,
+ *
+ * -------------------------------------------------
+ * |Bit 8|Bit 7|Bit 6|Bit 5|Bit 4|Bit 3|Bit 2|Bit 1|
+ * -------------------------------------------------
+ * | Class | CF | + Type |
+ * -------------------------------------------------
+ *
+ *
+ * - Class: Universal, Application, Context or Private
+ *
- CF: Constructed flag. If 1, the field is constructed.
+ *
- Type: This is actually called tag in ASN.1. It
+ * indicates data type (Integer, String) or a construct
+ * (sequence, choice, set).
+ *
+ *
+ * @param tag Tag or Identifier
+ * @param length Length of the field
+ * @param value Encoded octet string for the field.
+ */
+ public Asn1Object(int tag, int length, byte[] value) {
+ this.tag = tag;
+ this.type = tag & 0x1F;
+ this.length = length;
+ this.value = value;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ public boolean isConstructed() {
+ return (tag & DerParser.CONSTRUCTED) == DerParser.CONSTRUCTED;
+ }
+
+ /**
+ * For constructed field, return a parser for its content.
+ *
+ * @return A parser for the construct.
+ * @throws IOException
+ */
+ public DerParser getParser() throws IOException {
+ if (!isConstructed()) {
+ throw new IOException("Invalid DER: can't parse primitive entity"); //$NON-NLS-1$
+ }
+
+ return new DerParser(value);
+ }
+
+ /**
+ * Get the value as integer.
+ *
+ * @return BigInteger
+ * @throws IOException
+ */
+ public BigInteger getInteger() throws IOException {
+ if (type != DerParser.INTEGER) {
+ throw new IOException("Invalid DER: object is not integer"); //$NON-NLS-1$
+ }
+
+ return new BigInteger(value);
+ }
+
+ /**
+ * Get value as string. Most strings are treated
+ * as Latin-1.
+ *
+ * @return Java string
+ * @throws IOException
+ */
+ public String getString() throws IOException {
+ Charset charset;
+
+ switch (type) {
+
+ // Not all are Latin-1 but it's the closest thing
+ case DerParser.NUMERIC_STRING:
+ case DerParser.PRINTABLE_STRING:
+ case DerParser.VIDEOTEX_STRING:
+ case DerParser.IA5_STRING:
+ case DerParser.GRAPHIC_STRING:
+ case DerParser.ISO646_STRING:
+ case DerParser.GENERAL_STRING:
+ // "ISO-8859-1"; //$NON-NLS-1$
+ charset = StandardCharsets.ISO_8859_1;
+ break;
+
+ case DerParser.BMP_STRING:
+ // "UTF-16BE"; //$NON-NLS-1$
+ charset = StandardCharsets.UTF_16BE;
+ break;
+
+ case DerParser.UTF8_STRING:
+ // "UTF-8"; //$NON-NLS-1$
+ charset = StandardCharsets.UTF_8;
+ break;
+
+ case DerParser.UNIVERSAL_STRING:
+ throw new IOException("Invalid DER: can't handle UCS-4 string"); //$NON-NLS-1$
+
+ default:
+ throw new IOException("Invalid DER: object is not a string"); //$NON-NLS-1$
+ }
+
+ return new String(value, charset);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index f4e4c770e67fc..9af2dc6f08103 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -89,12 +89,31 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
if (!StringUtils.isBlank(req.getScope())) {
bodyMap.put("scope", req.getScope());
}
+ return encodeUrlParams(bodyMap);
+ }
+
+ String buildClientCredentialsBody(JwtBearerExchangeRequest req) {
+ Map bodyMap = new TreeMap<>();
+ // https://datatracker.ietf.org/doc/html/rfc7523#section-2.2
+ bodyMap.put("grant_type", "client_credentials");
+ bodyMap.put("client_assertion_type", "urn:ietf:params:oauth:client-assertion-type:jwt-bearer");
+ bodyMap.put("client_assertion", req.getAssertion());
+ return encodeUrlParams(bodyMap);
+ }
+
+ private static String encodeUrlParams(Map bodyMap) {
return bodyMap.entrySet().stream()
.map(e -> URLEncoder.encode(e.getKey(), StandardCharsets.UTF_8) + '='
+ URLEncoder.encode(e.getValue(), StandardCharsets.UTF_8))
.collect(Collectors.joining("&"));
}
+ public TokenResult exchangeClientCredentials(JwtBearerExchangeRequest req)
+ throws TokenExchangeException, IOException {
+ String body = buildClientCredentialsBody(req);
+ return exchangeClientCredentials(body);
+ }
+
/**
* Performs a token exchange using client credentials.
* @param req the client credentials request details.
@@ -104,9 +123,12 @@ String buildClientCredentialsBody(ClientCredentialsExchangeRequest req) {
public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
throws TokenExchangeException, IOException {
String body = buildClientCredentialsBody(req);
+ return exchangeClientCredentials(body);
+ }
+ private TokenResult exchangeClientCredentials(String body)
+ throws TokenExchangeException, IOException {
try {
-
Response res = httpClient.preparePost(tokenUrl.toString())
.setHeader("Accept", "application/json")
.setHeader("Content-Type", "application/x-www-form-urlencoded")
@@ -115,25 +137,24 @@ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest re
.get();
switch (res.getStatusCode()) {
- case 200:
- return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
- TokenResult.class);
-
- case 400: // Bad request
- case 401: // Unauthorized
- throw new TokenExchangeException(
- ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
- TokenError.class));
-
- default:
- throw new IOException(
- "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText());
+ case 200:
+ return ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenResult.class);
+ case 400: // Bad request
+ case 401: // Unauthorized
+ throw new TokenExchangeException(
+ ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenError.class));
+ default:
+ throw new IOException(
+ "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText());
}
-
-
-
- } catch (InterruptedException | ExecutionException e1) {
- throw new IOException(e1);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException(ie);
+ } catch (ExecutionException ee) {
+ throw new IOException(ee);
}
}
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java
new file mode 100644
index 0000000000000..540cbafd81784
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link AuthenticationOAuth2 with JwtBearerFlow}.
+ */
+public class AuthenticationOAuth2JwtBearerConfigTest {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private AuthenticationOAuth2 auth;
+
+ @BeforeMethod
+ public void before() {
+ MockClock clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
+ Flow flow = mock(JwtBearerFlow.class);
+ this.auth = new AuthenticationOAuth2(flow, clock);
+ }
+
+ private static Map getMinimalAuthConfig() {
+ Map params = new HashMap<>();
+ params.put("type", "jwt_bearer");
+ return params;
+ }
+
+ private Map getMinimalWorkingAuthConfig() {
+ Map params = getMinimalAuthConfig();
+ params.put(JwtBearerFlow.CONFIG_PARAM_ISSUER_URL, "http://localhost/.well-known/openid-configuration");
+ params.put(JwtBearerFlow.CONFIG_PARAM_AUDIENCE, "http://localhost");
+ params.put(JwtBearerFlow.CONFIG_PARAM_KEY_FILE, "data:base64,e30="); // empty private key
+
+ return params;
+ }
+
+ private static void configureAuth(AuthenticationOAuth2 auth, Map params) throws Exception {
+ String authParams = mapper.writeValueAsString(params);
+ auth.configure(authParams);
+ }
+
+ @Test
+ public void testGetAuthMethodName() throws Exception {
+ Map params = getMinimalWorkingAuthConfig();
+ configureAuth(auth, params);
+
+ assertNotNull(this.auth.flow);
+ assertEquals(this.auth.getAuthMethodName(), "token");
+ }
+
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Unsupported.*")
+ public void testUnknownType() throws Exception {
+ Map params = getMinimalAuthConfig();
+ params.put("type", "garbage");
+ configureAuth(auth, params);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*")
+ public void testMissingParams() throws Exception {
+ Map params = getMinimalAuthConfig();
+ configureAuth(auth, params);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*")
+ public void testConfigureRequired() throws Exception {
+ Map params = getMinimalAuthConfig();
+ configureAuth(auth, params);
+
+ this.auth.configure("{}");
+ }
+
+ @Test
+ public void testConfigureWithOptionalParams() throws Exception {
+ Map params = getMinimalWorkingAuthConfig();
+ params.put(JwtBearerFlow.CONFIG_PARAM_TOKEN_TTL, "600000");
+ params.put(JwtBearerFlow.CONFIG_PARAM_SIGNATURE_ALGORITHM, "RS256");
+ configureAuth(auth, params);
+
+ assertNotNull(this.auth.flow);
+ assertEquals(this.auth.getAuthMethodName(), "token");
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java
new file mode 100644
index 0000000000000..219720495dd55
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2JwtBearerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import no.nav.security.mock.oauth2.MockOAuth2Server;
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link AuthenticationOAuth2}.
+ */
+public class AuthenticationOAuth2JwtBearerTest {
+ static final String RSA_PRIVATE_KEY = "./src/test/resources/crypto_rsa_private.key";
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private AuthenticationOAuth2 auth;
+
+ MockOAuth2Server server;
+ String wellKnownUrl;
+
+ @BeforeMethod
+ public void before() throws Exception {
+ server = new MockOAuth2Server();
+ server.start();
+ wellKnownUrl = server.wellKnownUrl("default").toString();
+ this.auth = new AuthenticationOAuth2();
+ }
+
+ @AfterMethod
+ public void after() {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+
+ private static Map getWorkingAuthConfig(String wellKnownUrl) throws Exception {
+ Map params = new HashMap<>();
+ params.put("type", "jwt_bearer");
+ params.put(JwtBearerFlow.CONFIG_PARAM_ISSUER_URL, wellKnownUrl);
+ params.put(JwtBearerFlow.CONFIG_PARAM_AUDIENCE, "http://localhost");
+ String privateKey = Paths.get(RSA_PRIVATE_KEY).toUri().toURL().toString();
+
+ KeyFile kf = new KeyFile();
+ kf.setType("jwt_bearer");
+ kf.setIssuerUrl(wellKnownUrl);
+ kf.setClientId("test-client-id");
+ kf.setClientEmail("test-client-email@noop.com");
+ kf.setClientSecret(privateKey);
+
+ Path filePath = Files.createTempFile("keyfile", ".json");
+ filePath.toFile().deleteOnExit();
+ FileUtils.writeStringToFile(filePath.toFile(), kf.toJson(), StandardCharsets.UTF_8);
+
+ params.put(JwtBearerFlow.CONFIG_PARAM_KEY_FILE, filePath.toUri().toURL().toString());
+
+ return params;
+ }
+
+ private static void configureAuth(AuthenticationOAuth2 auth, Map params) throws Exception {
+ String authParams = mapper.writeValueAsString(params);
+ auth.configure(authParams);
+ }
+
+ @Test
+ public void testE2e() throws Exception {
+ Map params = getWorkingAuthConfig(wellKnownUrl);
+ configureAuth(auth, params);
+
+ assertNotNull(this.auth.flow);
+ assertEquals(this.auth.getAuthMethodName(), "token");
+
+ AuthenticationDataProvider data;
+ data = this.auth.getAuthData();
+ assertNotNull(data.getCommandData());
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java
new file mode 100644
index 0000000000000..58b90f777142e
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/JwtBearerGeneratorTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.nio.file.Paths;
+import junit.framework.TestCase;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class JwtBearerGeneratorTest extends TestCase {
+
+ final String rsaPrivateKey = "./src/test/resources/crypto_rsa_private.key";
+
+ @Test
+ public void testGenerateJwt() throws Exception {
+
+ String privateKey = Paths.get(rsaPrivateKey).toUri().toURL().toString();
+
+ String jwt = JwtBearerExchangeRequest.generateJWT("client-id",
+ "audience",
+ privateKey,
+ "RS256",
+ 30000);
+ assertNotNull(jwt);
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
index 131427682076e..a072874e75917 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -99,4 +99,35 @@ public void exchangeClientCredentialsSuccessWithoutOptionalClientCredentialsTest
TokenResult tr = tokenClient.exchangeClientCredentials(request);
assertNotNull(tr);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void exchangeJwtBearerTest() throws
+ IOException, TokenExchangeException, ExecutionException, InterruptedException {
+ DefaultAsyncHttpClient defaultAsyncHttpClient = mock(DefaultAsyncHttpClient.class);
+ URL url = new URL("http://localhost");
+ TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+ JwtBearerExchangeRequest request = JwtBearerExchangeRequest.builder()
+ .assertion("client-assertion-test")
+ .build();
+ String body = tokenClient.buildClientCredentialsBody(request);
+ BoundRequestBuilder boundRequestBuilder = mock(BoundRequestBuilder.class);
+ Response response = mock(Response.class);
+ ListenableFuture listenableFuture = mock(ListenableFuture.class);
+ when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Accept", "application/json")).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Content-Type", "application/x-www-form-urlencoded"))
+ .thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+ when(listenableFuture.get()).thenReturn(response);
+ when(response.getStatusCode()).thenReturn(200);
+ TokenResult tokenResult = new TokenResult();
+ tokenResult.setAccessToken("test-access-token");
+ tokenResult.setIdToken("test-id");
+ when(response.getResponseBodyAsBytes()).thenReturn(new Gson().toJson(tokenResult).getBytes());
+ TokenResult tr = tokenClient.exchangeClientCredentials(request);
+ assertNotNull(tr);
+ }
+
}