From 369bae8982d5cb6378d96f78e6077a374d09828b Mon Sep 17 00:00:00 2001 From: Caleb Evans Date: Fri, 15 Aug 2025 16:18:24 -0600 Subject: [PATCH] feat(readiness): enhance readiness check to include system dependency verification and queue pod failures until ready --- .../health/PatternLibraryReadinessCheck.java | 24 +- .../operator/service/LogParserClient.java | 4 + .../operator/service/PodFailureWatcher.java | 241 +++++++++++++++++- 3 files changed, 262 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/redhat/podmortem/operator/health/PatternLibraryReadinessCheck.java b/src/main/java/com/redhat/podmortem/operator/health/PatternLibraryReadinessCheck.java index d114017..0a5170d 100644 --- a/src/main/java/com/redhat/podmortem/operator/health/PatternLibraryReadinessCheck.java +++ b/src/main/java/com/redhat/podmortem/operator/health/PatternLibraryReadinessCheck.java @@ -10,6 +10,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; import org.eclipse.microprofile.health.Readiness; @@ -22,7 +23,11 @@ public class PatternLibraryReadinessCheck implements HealthCheck { private static final Logger log = LoggerFactory.getLogger(PatternLibraryReadinessCheck.class); - private static final String PATTERN_CACHE_DIR = "/shared/patterns"; + private static final String DEFAULT_PATTERN_CACHE_DIR = "/shared/patterns"; + + @ConfigProperty(name = "pattern.cache.directory", defaultValue = DEFAULT_PATTERN_CACHE_DIR) + String patternCacheDir; + private static final int MAX_WAIT_MINUTES = 5; @Inject KubernetesClient client; @@ -40,7 +45,20 @@ public HealthCheckResponse call() { return HealthCheckResponse.named("pattern-library-sync").up().build(); } - Path cacheDir = Paths.get(PATTERN_CACHE_DIR); + boolean anyReady = + libraries.stream() + .anyMatch( + lib -> + lib.getStatus() != null + && "Ready" + .equalsIgnoreCase( + lib.getStatus().getPhase())); + if (anyReady) { + log.trace("At least one PatternLibrary reports Ready phase; marking ready"); + return HealthCheckResponse.named("pattern-library-sync").up().build(); + } + + Path cacheDir = Paths.get(patternCacheDir); if (!Files.exists(cacheDir)) { if (startupTime .plus(MAX_WAIT_MINUTES, ChronoUnit.MINUTES) @@ -49,7 +67,7 @@ public HealthCheckResponse call() { "Pattern library sync grace period exceeded (no cache dir), reporting ready anyway"); return HealthCheckResponse.named("pattern-library-sync").up().build(); } - log.debug("Pattern cache directory does not exist: {}", PATTERN_CACHE_DIR); + log.debug("Pattern cache directory does not exist: {}", patternCacheDir); return HealthCheckResponse.named("pattern-library-sync").down().build(); } diff --git a/src/main/java/com/redhat/podmortem/operator/service/LogParserClient.java b/src/main/java/com/redhat/podmortem/operator/service/LogParserClient.java index 4c09ec2..ef0bb03 100644 --- a/src/main/java/com/redhat/podmortem/operator/service/LogParserClient.java +++ b/src/main/java/com/redhat/podmortem/operator/service/LogParserClient.java @@ -46,6 +46,10 @@ public Uni analyzeLog(PodFailureData failureData) { "Received analysis result for pod: {}", failureData.getPod().getMetadata().getName())) .onFailure() + .retry() + .withBackOff(java.time.Duration.ofSeconds(2), java.time.Duration.ofSeconds(30)) + .atMost(5) + .onFailure() .invoke( throwable -> log.error( diff --git a/src/main/java/com/redhat/podmortem/operator/service/PodFailureWatcher.java b/src/main/java/com/redhat/podmortem/operator/service/PodFailureWatcher.java index dc1a273..5f3aa30 100644 --- a/src/main/java/com/redhat/podmortem/operator/service/PodFailureWatcher.java +++ b/src/main/java/com/redhat/podmortem/operator/service/PodFailureWatcher.java @@ -2,6 +2,7 @@ import com.redhat.podmortem.common.model.analysis.AnalysisResult; import com.redhat.podmortem.common.model.kube.aiprovider.AIProvider; +import com.redhat.podmortem.common.model.kube.patternlibrary.PatternLibrary; import com.redhat.podmortem.common.model.kube.podmortem.PodFailureData; import com.redhat.podmortem.common.model.kube.podmortem.Podmortem; import com.redhat.podmortem.common.model.kube.podmortem.PodmortemStatus; @@ -15,6 +16,11 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -23,7 +29,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +63,26 @@ public class PodFailureWatcher { private Set allowedNamespaces = Set.of(); private final List activeWatches = new CopyOnWriteArrayList<>(); + @ConfigProperty(name = "podmortem.processing.startup-delay-seconds", defaultValue = "60") + int startupDelaySeconds; + + @ConfigProperty(name = "quarkus.rest-client.log-parser.url") + Optional logParserBaseUrlProperty; + + @ConfigProperty(name = "quarkus.rest-client.ai-interface.url") + Optional aiInterfaceBaseUrlProperty; + + private final AtomicBoolean systemReady = new AtomicBoolean(false); + private Instant appStartupTime = Instant.now(); + private final HttpClient httpClient = + HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(2)).build(); + + // Queue pod failures until system is ready + private static final int MAX_PENDING_QUEUE_SIZE = 500; + private final ConcurrentLinkedQueue pendingFailureQueue = new ConcurrentLinkedQueue<>(); + private final Set queuedFailureKeys = + java.util.Collections.newSetFromMap(new ConcurrentHashMap<>()); + /** * Initializes the pod failure watcher on application startup. * @@ -64,6 +92,7 @@ public class PodFailureWatcher { */ public void onStartup(@Observes StartupEvent event) { log.info("Starting real-time pod failure watcher"); + appStartupTime = Instant.now(); // Parse configured namespaces (comma-separated) String namespaces = watchNamespacesProperty.orElse(""); if (!namespaces.isBlank()) { @@ -78,6 +107,7 @@ public void onStartup(@Observes StartupEvent event) { log.info("Configured to watch all namespaces (no namespace filter set)"); } startPodWatcher(); + startReadinessGuard(); } /** @@ -112,6 +142,10 @@ public void eventReceived(Action action, Pod pod) { return; } if (hasPodFailed(pod)) { + if (!systemReady.get()) { + enqueuePendingFailure(pod); + return; + } handlePodFailure(pod); } } catch (Exception e) { @@ -136,6 +170,204 @@ public void onClose(WatcherException cause) { }; } + /** + * Starts a background readiness guard. + * + *

This guard periodically checks whether the system is allowed to process pod failures. The + * system becomes ready when all of the following are true: + * + *

    + *
  • The configured startup delay (property: {@code + * podmortem.processing.startup-delay-seconds}) has elapsed + *
  • At least one {@code PatternLibrary} reports phase {@code Ready} (or none exist) + *
  • The Log Parser service {@code /q/health/ready} endpoint returns HTTP 2xx + *
  • The AI Interface {@code /q/health/ready} endpoint returns HTTP 2xx + *
+ * + *

Once ready, any queued pod failure events are processed asynchronously. + */ + private void startReadinessGuard() { + Thread.ofVirtual() + .name("podmortem-readiness-guard") + .start( + () -> { + while (!systemReady.get()) { + try { + boolean ready = checkSystemReady(); + if (ready) { + systemReady.set(true); + log.info( + "System dependencies ready; enabling failure processing"); + processQueuedFailuresAsync(); + break; + } + Thread.sleep(5000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.debug("Readiness check failed: {}", e.getMessage()); + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + }); + } + + /** + * Enqueues a failed pod event while the system is not yet ready to process failures. + * + *

Uses a bounded FIFO queue to avoid unbounded memory growth. Duplicate pod keys are + * de-duplicated while pending. When the queue is full, the oldest entry is dropped and a warn + * is logged. + * + * @param pod the pod associated with the failure event to buffer + */ + private void enqueuePendingFailure(Pod pod) { + String podKey = pod.getMetadata().getNamespace() + "/" + pod.getMetadata().getName(); + // Avoid duplicate entries + if (queuedFailureKeys.add(podKey)) { + if (pendingFailureQueue.size() >= MAX_PENDING_QUEUE_SIZE) { + String dropped = pendingFailureQueue.poll(); + if (dropped != null) { + queuedFailureKeys.remove(dropped); + log.warn("Pending failure queue full; dropping oldest: {}", dropped); + } + } + pendingFailureQueue.offer(podKey); + log.debug("Queued pod failure event until ready: {}", podKey); + } + } + + /** + * Drains the pending failure queue and processes each entry asynchronously. + * + *

For each queued item, the latest pod object is fetched to confirm the failure state before + * processing; this avoids acting on stale data. + */ + private void processQueuedFailuresAsync() { + Thread.ofVirtual() + .name("podmortem-queued-failure-drain") + .start( + () -> { + String podKey; + while ((podKey = pendingFailureQueue.poll()) != null) { + try { + queuedFailureKeys.remove(podKey); + String[] parts = podKey.split("/", 2); + if (parts.length != 2) { + continue; + } + String ns = parts[0]; + String name = parts[1]; + Pod latest = client.pods().inNamespace(ns).withName(name).get(); + if (latest != null && hasPodFailed(latest)) { + handlePodFailure(latest); + } + } catch (Exception e) { + log.debug( + "Failed processing queued failure {}: {}", + podKey, + e.getMessage()); + } + } + }); + } + + /** + * Evaluates whether the operator may begin processing pod failures. + * + *

Readiness requires that the startup delay has elapsed, pattern libraries are ready (or not + * present), and dependent services (log-parser and AI interface) report ready via their {@code + * /q/health/ready} endpoints. + * + * @return {@code true} if failure processing can start; {@code false} otherwise + */ + private boolean checkSystemReady() { + if (Duration.between(appStartupTime, Instant.now()).getSeconds() < startupDelaySeconds) { + return false; + } + + // Pattern libraries: ready if none defined or any reports phase Ready + try { + List libs = + client.resources(PatternLibrary.class).inAnyNamespace().list().getItems(); + boolean patternsReady = + libs.isEmpty() + || libs.stream() + .anyMatch( + l -> + l.getStatus() != null + && "Ready" + .equalsIgnoreCase( + l.getStatus() + .getPhase())); + if (!patternsReady) { + return false; + } + } catch (Exception e) { + return false; + } + + // log-parser ready + String logParserUrl = + logParserBaseUrlProperty.orElseGet( + () -> System.getenv("QUARKUS_REST_CLIENT_LOG_PARSER_URL")); + if (logParserUrl == null || logParserUrl.isBlank()) { + return false; + } + if (!isServiceReady(logParserUrl)) { + return false; + } + + // ai-interface ready + String aiUrl = + aiInterfaceBaseUrlProperty.orElseGet( + () -> System.getenv("QUARKUS_REST_CLIENT_AI_INTERFACE_URL")); + if (aiUrl == null || aiUrl.isBlank()) { + return false; + } + if (!isServiceReady(aiUrl)) { + return false; + } + + return true; + } + + /** + * Checks whether a dependent service is ready by querying its readiness endpoint. + * + * @param baseUrl the base URL of the service (without trailing path); {@code /q/health/ready} + * will be appended + * @return {@code true} if the service responds with HTTP 2xx, {@code false} otherwise + */ + private boolean isServiceReady(String baseUrl) { + try { + String healthUrl = + baseUrl.endsWith("/") + ? baseUrl + "q/health/ready" + : baseUrl + "/q/health/ready"; + HttpRequest request = + HttpRequest.newBuilder() + .GET() + .uri(URI.create(healthUrl)) + .timeout(Duration.ofSeconds(2)) + .build(); + HttpResponse response = + httpClient.send(request, HttpResponse.BodyHandlers.discarding()); + return response.statusCode() >= 200 && response.statusCode() < 300; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return false; + } catch (Exception e) { + return false; + } + } + /** * Determines if a pod has failed by examining container statuses. * @@ -568,17 +800,18 @@ private void restartWatcher() { } activeWatches.clear(); - new Thread( + Thread.ofVirtual() + .name("podmortem-watcher-restart") + .start( () -> { try { - Thread.sleep(5000); // Wait 5 seconds before restart + Thread.sleep(5000); log.info("Restarting pod failure watcher..."); startPodWatcher(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("Watcher restart interrupted", e); } - }) - .start(); + }); } }