diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index d0e36ecb48cd6..72c72cf164e66 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -684,10 +684,11 @@ public void deleteStatefulSet() throws InterruptedException { .numRetries(KubernetesRuntimeFactory.numRetries * 2) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs * 2) .supplier(() -> { + Map validLabels = getLabels(instanceConfig.getFunctionDetails()); String labels = String.format("tenant=%s,namespace=%s,name=%s", - instanceConfig.getFunctionDetails().getTenant(), - instanceConfig.getFunctionDetails().getNamespace(), - instanceConfig.getFunctionDetails().getName()); + validLabels.get("tenant"), + validLabels.get("namespace"), + validLabels.get("name")); V1PodList response; try { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 31f82adfe8c26..7fa279bc1d253 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -24,16 +24,32 @@ import com.google.gson.JsonObject; import com.google.protobuf.util.JsonFormat; import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.AppsV1Api; +import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1Container; +import io.kubernetes.client.openapi.models.V1PodList; import io.kubernetes.client.openapi.models.V1PodSpec; import io.kubernetes.client.openapi.models.V1PodTemplateSpec; import io.kubernetes.client.openapi.models.V1ResourceRequirements; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1Toleration; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.net.HttpURLConnection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import okhttp3.Call; +import okhttp3.Response; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.JavaVersion; +import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -49,27 +65,26 @@ import org.apache.pulsar.functions.worker.ConnectorsManager; import org.apache.pulsar.functions.worker.FunctionsManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.mockito.ArgumentMatcher; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - -import java.lang.reflect.Type; -import java.math.BigDecimal; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.stream.Collectors; - import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -1290,4 +1305,51 @@ private void assertMetricsPortConfigured(Map functionRuntimeFact .contains("--metrics_port 0")); } } + + @Test + public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exception { + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); + config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, + (fb) -> fb.setTenant("c:tenant").setNamespace("c:ns").setName("c:fn"))); + + CoreV1Api coreApi = mock(CoreV1Api.class); + AppsV1Api appsApi = mock(AppsV1Api.class); + + Call successfulCall = mock(Call.class); + Response okResponse = mock(Response.class); + when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK); + when(okResponse.isSuccessful()).thenReturn(true); + when(okResponse.message()).thenReturn(""); + when(successfulCall.execute()).thenReturn(okResponse); + + final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn"); + + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); + factory.setCoreClient(coreApi); + factory.setAppsClient(appsApi); + + ArgumentMatcher hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix); + + when(appsApi.deleteNamespacedStatefulSetCall( + argThat(hasTranslatedFunctionName), + anyString(), isNull(), isNull(), anyInt(), isNull(), anyString(), any(), isNull())).thenReturn(successfulCall); + + ApiException notFoundException = mock(ApiException.class); + when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND); + when(appsApi.readNamespacedStatefulSet( + argThat(hasTranslatedFunctionName), anyString(), isNull())).thenThrow(notFoundException); + + V1PodList podList = mock(V1PodList.class); + when(podList.getItems()).thenReturn(Collections.emptyList()); + + String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn"); + + when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), + eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull())).thenReturn(podList); + KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE); + kr.deleteStatefulSet(); + + verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), isNull(), isNull(), + eq(expectedLabels), isNull(), isNull(), isNull(), isNull(), isNull()); + } }