From cd756d2044ef20a0c082872dc6caf7a7fc33c4bb Mon Sep 17 00:00:00 2001 From: csthomas1 Date: Tue, 21 Feb 2023 18:55:32 +0000 Subject: [PATCH 1/4] issue#19583 -- Make KubernetesRuntime translate tenant, namespace, and function name characters that are not allowed within Kubernetes labels as part of function removal --- .../functions/runtime/kubernetes/KubernetesRuntime.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index e206862b68a67..5d462dda6e01b 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -684,10 +684,11 @@ public void deleteStatefulSet() throws InterruptedException { .numRetries(KubernetesRuntimeFactory.numRetries * 2) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs * 2) .supplier(() -> { + Map validLabels = getLabels(instanceConfig.getFunctionDetails()); String labels = String.format("tenant=%s,namespace=%s,name=%s", - instanceConfig.getFunctionDetails().getTenant(), - instanceConfig.getFunctionDetails().getNamespace(), - instanceConfig.getFunctionDetails().getName()); + validLabels.get("tenant"), + validLabels.get("namespace"), + validLabels.get("name")); V1PodList response; try { From 8084c8f462f4848190c6925f77efe6fa685a8c02 Mon Sep 17 00:00:00 2001 From: csthomas1 Date: Fri, 7 Apr 2023 18:50:14 +0000 Subject: [PATCH 2/4] Added KubernetesRuntime for issue#19583 to verify function tenant, namespace, name translation to k8s labels --- .../kubernetes/KubernetesRuntimeTest.java | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 1e8194eed443a..15bf4f2e8c97b 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -24,7 +24,11 @@ import com.google.gson.JsonObject; import com.google.protobuf.util.JsonFormat; import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1PodList; import io.kubernetes.client.openapi.models.V1PodSpec; import io.kubernetes.client.openapi.models.V1PodTemplateSpec; import io.kubernetes.client.openapi.models.V1ResourceRequirements; @@ -58,6 +62,7 @@ import java.lang.reflect.Type; import java.math.BigDecimal; +import java.net.HttpURLConnection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -65,11 +70,23 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; +import okhttp3.Call; +import okhttp3.Response; import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; +import org.mockito.ArgumentMatcher; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -1233,4 +1250,54 @@ private void assertMetricsPortConfigured(Map functionRuntimeFact .contains("--metrics_port 0")); } } + + @Test + public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exception { + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); + config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> { + return fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn"); + })); + + CoreV1Api coreApi = mock(CoreV1Api.class); + AppsV1Api appsApi = mock(AppsV1Api.class); + + Call successfulCall = mock(Call.class); + Response okResponse = mock(Response.class); + when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK); + when(okResponse.isSuccessful()).thenReturn(true); + when(okResponse.message()).thenReturn(""); + when(successfulCall.execute()).thenReturn(okResponse); + + final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn"); + + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); + factory.setCoreClient(coreApi); + factory.setAppsClient(appsApi); + + ArgumentMatcher hasTranslatedFunctionName = (String t) -> { + return t.startsWith(expectedFunctionNamePrefix); + }; + + when(appsApi.deleteNamespacedStatefulSetCall( + argThat(hasTranslatedFunctionName), + anyString(), isNull(), isNull(), anyInt(), isNull(), anyString(), any(), isNull())).thenReturn(successfulCall); + + ApiException notFoundException = mock(ApiException.class); + when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND); + when(appsApi.readNamespacedStatefulSet(argThat(hasTranslatedFunctionName), anyString(), + isNull(), isNull(), isNull())).thenThrow(notFoundException); + + V1PodList podList = mock(V1PodList.class); + when(podList.getItems()).thenReturn(Collections.emptyList()); + + String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn"); + + when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), + eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull())).thenReturn(podList); + KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE); + kr.deleteStatefulSet(); + + verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), + eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull()); + } } From dfa2e1b8d503d22664813a03ae2fc1bde4b2bf81 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 13 Jun 2023 08:23:17 +0800 Subject: [PATCH 3/4] fix compile Signed-off-by: tison --- .../kubernetes/KubernetesRuntimeTest.java | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 811954c470222..fdf9f722aef83 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -35,9 +35,21 @@ import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1Toleration; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import okhttp3.Call; +import okhttp3.Response; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.JavaVersion; +import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -53,29 +65,15 @@ import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.ArgumentMatcher; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - -import java.lang.reflect.Type; -import java.math.BigDecimal; -import java.net.HttpURLConnection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import okhttp3.Call; -import okhttp3.Response; - import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; -import org.mockito.ArgumentMatcher; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -84,9 +82,9 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -1311,9 +1309,8 @@ private void assertMetricsPortConfigured(Map functionRuntimeFact @Test public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exception { InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); - config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> { - return fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn"); - })); + config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, + (fb) -> fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn"))); CoreV1Api coreApi = mock(CoreV1Api.class); AppsV1Api appsApi = mock(AppsV1Api.class); @@ -1331,9 +1328,7 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc factory.setCoreClient(coreApi); factory.setAppsClient(appsApi); - ArgumentMatcher hasTranslatedFunctionName = (String t) -> { - return t.startsWith(expectedFunctionNamePrefix); - }; + ArgumentMatcher hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix); when(appsApi.deleteNamespacedStatefulSetCall( argThat(hasTranslatedFunctionName), @@ -1341,8 +1336,8 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc ApiException notFoundException = mock(ApiException.class); when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND); - when(appsApi.readNamespacedStatefulSet(argThat(hasTranslatedFunctionName), anyString(), - isNull(), isNull(), isNull())).thenThrow(notFoundException); + when(appsApi.readNamespacedStatefulSet( + argThat(hasTranslatedFunctionName), anyString(), anyString())).thenThrow(notFoundException); V1PodList podList = mock(V1PodList.class); when(podList.getItems()).thenReturn(Collections.emptyList()); From 83c57525c0e9d405d2a6a47e89070132595c6c20 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 13 Jun 2023 08:24:44 +0800 Subject: [PATCH 4/4] fix capture Signed-off-by: tison --- .../functions/runtime/kubernetes/KubernetesRuntimeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index fdf9f722aef83..7fa279bc1d253 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -1337,7 +1337,7 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc ApiException notFoundException = mock(ApiException.class); when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND); when(appsApi.readNamespacedStatefulSet( - argThat(hasTranslatedFunctionName), anyString(), anyString())).thenThrow(notFoundException); + argThat(hasTranslatedFunctionName), anyString(), isNull())).thenThrow(notFoundException); V1PodList podList = mock(V1PodList.class); when(podList.getItems()).thenReturn(Collections.emptyList());