From d3e8e2c94e8f9ab9fd671915b45c511bc1d62776 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 17 Feb 2023 00:30:27 +0800 Subject: [PATCH 1/5] [fix][authentication] Store the original authentication data (#19519) Signed-off-by: Zixuan Liu (cherry picked from commit 2d90089dfa2a6af99cfe257bd1f2b3d24166303a) --- .../pulsar/broker/service/ServerCnx.java | 4 +- .../MockAlwaysExpiredAuthenticationState.java | 45 ++---------- .../MockMutableAuthenticationProvider.java | 32 +++++++++ .../auth/MockMutableAuthenticationState.java | 70 +++++++++++++++++++ .../pulsar/broker/service/ServerCnxTest.java | 50 ++++++++++++- 5 files changed, 159 insertions(+), 42 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1048ed4bbb497..74776bb96d499 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -655,13 +655,15 @@ private void doAuthentication(AuthData clientData, String newAuthRole = authState.getAuthRole(); // Refresh the auth data. - this.authenticationData = authState.getAuthDataSource(); if (log.isDebugEnabled()) { log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole); } if (!useOriginalAuthState) { this.authRole = newAuthRole; + this.authenticationData = authState.getAuthDataSource(); + } else { + this.originalAuthData = authState.getAuthDataSource(); } if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java index 3fc7e321b9d08..08d291b849a5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAlwaysExpiredAuthenticationState.java @@ -18,56 +18,21 @@ */ package org.apache.pulsar.broker.auth; -import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationState; -import org.apache.pulsar.common.api.AuthData; - -import javax.naming.AuthenticationException; - -import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; /** * Class to use when verifying the behavior around expired authentication data because it will always return * true when isExpired is called. */ -public class MockAlwaysExpiredAuthenticationState implements AuthenticationState { - final MockAlwaysExpiredAuthenticationProvider provider; - AuthenticationDataSource authenticationDataSource; - volatile String authRole; - - MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) { - this.provider = provider; - } - - - @Override - public String getAuthRole() throws AuthenticationException { - if (authRole == null) { - throw new AuthenticationException("Must authenticate first."); - } - return authRole; - } +public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState { - @Override - public AuthData authenticate(AuthData authData) throws AuthenticationException { - authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8)); - authRole = provider.authenticate(authenticationDataSource); - return null; - } - - @Override - public AuthenticationDataSource getAuthDataSource() { - return authenticationDataSource; - } - - @Override - public boolean isComplete() { - return authRole != null; + MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) { + super(provider); } @Override public boolean isExpired() { return true; } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java new file mode 100644 index 0000000000000..01d56f891e27e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationProvider.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; +import javax.net.ssl.SSLSession; +import java.net.SocketAddress; + +public class MockMutableAuthenticationProvider extends MockAuthenticationProvider { + public AuthenticationState newAuthState(AuthData authData, + SocketAddress remoteAddress, + SSLSession sslSession) { + return new MockMutableAuthenticationState(this); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java new file mode 100644 index 0000000000000..cb97d26ca2b26 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMutableAuthenticationState.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import static java.nio.charset.StandardCharsets.UTF_8; +import javax.naming.AuthenticationException; +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +// MockMutableAuthenticationState always update the authentication data source and auth role. +public class MockMutableAuthenticationState implements AuthenticationState { + + final AuthenticationProvider provider; + AuthenticationDataSource authenticationDataSource; + volatile String authRole; + + MockMutableAuthenticationState(AuthenticationProvider provider) { + this.provider = provider; + } + + @Override + public String getAuthRole() throws AuthenticationException { + if (authRole == null) { + throw new AuthenticationException("Must authenticate first."); + } + return authRole; + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8)); + authRole = provider.authenticate(authenticationDataSource); + return null; + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return authenticationDataSource; + } + + @Override + public boolean isComplete() { + return authRole != null; + } + + @Override + public boolean isExpired() { + return false; + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 09428db2b1cb2..4a4628bece44f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -87,6 +87,7 @@ import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider; import org.apache.pulsar.broker.auth.MockAuthorizationProvider; +import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.auth.MockAuthenticationProvider; import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; @@ -123,7 +124,6 @@ import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; -import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess; @@ -1368,6 +1368,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc })); } + @Test + public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + String proxyRole = "pass.proxy"; + String clientRole = "pass.client"; + ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", + clientRole, clientRole, authMethodName); + channel.writeInbound(connect); + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole); + assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole); + assertEquals(serverCnx.getOriginalPrincipal(), clientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + // Request refreshing the original auth. + // Expected: + // 1. Original role and original data equals to "pass.RefreshOriginAuthData". + // 2. The broker disconnects the client, because the new role doesn't equal the old role. + String newClientRole = "pass.RefreshOriginAuthData"; + ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName, + AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test"); + channel.writeInbound(refreshAuth); + + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole); + assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + assertFalse(channel.isOpen()); + assertFalse(channel.isActive()); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel(); From 82a489d395219140b5b31bc4e6119e766651e89b Mon Sep 17 00:00:00 2001 From: rangao Date: Thu, 7 Dec 2023 11:49:00 +0800 Subject: [PATCH 2/5] address comments --- .../pulsar/broker/service/ServerCnx.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 74776bb96d499..8aa7edacf70f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -642,10 +642,6 @@ private void doAuthentication(AuthData clientData, String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; AuthData brokerData = authState.authenticate(clientData); - if (log.isDebugEnabled()) { - log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); - } - if (authState.isComplete()) { // Authentication has completed. It was either: // 1. the 1st time the authentication process was done, in which case we'll send @@ -659,18 +655,7 @@ private void doAuthentication(AuthData clientData, log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole); } - if (!useOriginalAuthState) { - this.authRole = newAuthRole; - this.authenticationData = authState.getAuthDataSource(); - } else { - this.originalAuthData = authState.getAuthDataSource(); - } - - if (log.isDebugEnabled()) { - log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", - remoteAddress, authMethod, this.authRole, originalPrincipal); - } - + AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); if (state != State.Connected) { // First time authentication is done if (service.isAuthenticationEnabled()) { @@ -685,10 +670,23 @@ private void doAuthentication(AuthData clientData, return; } } + if (!useOriginalAuthState) { + this.authRole = newAuthRole; + this.authenticationData = newAuthDataSource; + } maybeScheduleAuthenticationCredentialsRefresh(); } completeConnect(clientProtocolVersion, clientVersion); + if (log.isDebugEnabled()) { + log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", + remoteAddress, authMethod, this.authRole, originalPrincipal); + } } else { + if (!useOriginalAuthState) { + this.authenticationData = newAuthDataSource; + } else { + this.originalAuthData = newAuthDataSource; + } // If the connection was already ready, it means we're doing a refresh if (!StringUtils.isEmpty(authRole)) { if (!authRole.equals(newAuthRole)) { From f2d9a0a9133a721b71622ff6769d7a1f2f48c7f1 Mon Sep 17 00:00:00 2001 From: rangao Date: Thu, 7 Dec 2023 11:52:38 +0800 Subject: [PATCH 3/5] fix --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8aa7edacf70f2..4f90ee9397b47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -642,6 +642,10 @@ private void doAuthentication(AuthData clientData, String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; AuthData brokerData = authState.authenticate(clientData); + if (log.isDebugEnabled()) { + log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); + } + if (authState.isComplete()) { // Authentication has completed. It was either: // 1. the 1st time the authentication process was done, in which case we'll send @@ -650,11 +654,6 @@ private void doAuthentication(AuthData clientData, String newAuthRole = authState.getAuthRole(); - // Refresh the auth data. - if (log.isDebugEnabled()) { - log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole); - } - AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource(); if (state != State.Connected) { // First time authentication is done From e09d0c25116b74d2a1497609a4f34e3b6a37ff6b Mon Sep 17 00:00:00 2001 From: rangao Date: Fri, 8 Dec 2023 09:05:44 +0800 Subject: [PATCH 4/5] remove non-existing and uesless methods --- .../org/apache/pulsar/broker/service/ReplicatorTestBase.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 1aa3c2c66046b..824af6b71e317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -248,13 +248,8 @@ protected void setup() throws Exception { .brokerServiceUrl(pulsar4.getBrokerServiceUrl()) .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()) .brokerClientTlsEnabled(true) - .brokerClientCertificateFilePath(clientCertFilePath) - .brokerClientKeyFilePath(clientKeyFilePath) .brokerClientTrustCertsFilePath(caCertFilePath) .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore) - .brokerClientTlsKeyStore(clientKeyStorePath) - .brokerClientTlsKeyStorePassword(keyStorePassword) - .brokerClientTlsKeyStoreType(keyStoreType) .brokerClientTlsTrustStore(clientTrustStorePath) .brokerClientTlsTrustStorePassword(keyStorePassword) .brokerClientTlsTrustStoreType(keyStoreType) From a6ca305f211e655623aec7537fbb1e8c63f0492c Mon Sep 17 00:00:00 2001 From: rangao Date: Fri, 8 Dec 2023 09:36:34 +0800 Subject: [PATCH 5/5] fix --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4f90ee9397b47..6d84e22f162a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -658,6 +658,10 @@ private void doAuthentication(AuthData clientData, if (state != State.Connected) { // First time authentication is done if (service.isAuthenticationEnabled()) { + if (!useOriginalAuthState) { + this.authRole = newAuthRole; + this.authenticationData = newAuthDataSource; + } if (service.isAuthorizationEnabled()) { if (!service.getAuthorizationService() .isValidOriginalPrincipal(this.authRole, originalPrincipal, remoteAddress, false)) { @@ -669,10 +673,6 @@ private void doAuthentication(AuthData clientData, return; } } - if (!useOriginalAuthState) { - this.authRole = newAuthRole; - this.authenticationData = newAuthDataSource; - } maybeScheduleAuthenticationCredentialsRefresh(); } completeConnect(clientProtocolVersion, clientVersion);