Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.Readiness;
Expand All @@ -22,7 +23,11 @@
public class PatternLibraryReadinessCheck implements HealthCheck {

private static final Logger log = LoggerFactory.getLogger(PatternLibraryReadinessCheck.class);
private static final String PATTERN_CACHE_DIR = "/shared/patterns";
private static final String DEFAULT_PATTERN_CACHE_DIR = "/shared/patterns";

@ConfigProperty(name = "pattern.cache.directory", defaultValue = DEFAULT_PATTERN_CACHE_DIR)
String patternCacheDir;

private static final int MAX_WAIT_MINUTES = 5;

@Inject KubernetesClient client;
Expand All @@ -40,7 +45,20 @@ public HealthCheckResponse call() {
return HealthCheckResponse.named("pattern-library-sync").up().build();
}

Path cacheDir = Paths.get(PATTERN_CACHE_DIR);
boolean anyReady =
libraries.stream()
.anyMatch(
lib ->
lib.getStatus() != null
&& "Ready"
.equalsIgnoreCase(
lib.getStatus().getPhase()));
if (anyReady) {
log.trace("At least one PatternLibrary reports Ready phase; marking ready");
return HealthCheckResponse.named("pattern-library-sync").up().build();
}

Path cacheDir = Paths.get(patternCacheDir);
if (!Files.exists(cacheDir)) {
if (startupTime
.plus(MAX_WAIT_MINUTES, ChronoUnit.MINUTES)
Expand All @@ -49,7 +67,7 @@ public HealthCheckResponse call() {
"Pattern library sync grace period exceeded (no cache dir), reporting ready anyway");
return HealthCheckResponse.named("pattern-library-sync").up().build();
}
log.debug("Pattern cache directory does not exist: {}", PATTERN_CACHE_DIR);
log.debug("Pattern cache directory does not exist: {}", patternCacheDir);
return HealthCheckResponse.named("pattern-library-sync").down().build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public Uni<AnalysisResult> analyzeLog(PodFailureData failureData) {
"Received analysis result for pod: {}",
failureData.getPod().getMetadata().getName()))
.onFailure()
.retry()
.withBackOff(java.time.Duration.ofSeconds(2), java.time.Duration.ofSeconds(30))
.atMost(5)
.onFailure()
.invoke(
throwable ->
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.redhat.podmortem.common.model.analysis.AnalysisResult;
import com.redhat.podmortem.common.model.kube.aiprovider.AIProvider;
import com.redhat.podmortem.common.model.kube.patternlibrary.PatternLibrary;
import com.redhat.podmortem.common.model.kube.podmortem.PodFailureData;
import com.redhat.podmortem.common.model.kube.podmortem.Podmortem;
import com.redhat.podmortem.common.model.kube.podmortem.PodmortemStatus;
Expand All @@ -15,6 +16,11 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -23,7 +29,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -55,6 +63,26 @@ public class PodFailureWatcher {
private Set<String> allowedNamespaces = Set.of();
private final List<Watch> activeWatches = new CopyOnWriteArrayList<>();

@ConfigProperty(name = "podmortem.processing.startup-delay-seconds", defaultValue = "60")
int startupDelaySeconds;

@ConfigProperty(name = "quarkus.rest-client.log-parser.url")
Optional<String> logParserBaseUrlProperty;

@ConfigProperty(name = "quarkus.rest-client.ai-interface.url")
Optional<String> aiInterfaceBaseUrlProperty;

private final AtomicBoolean systemReady = new AtomicBoolean(false);
private Instant appStartupTime = Instant.now();
private final HttpClient httpClient =
HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(2)).build();

// Queue pod failures until system is ready
private static final int MAX_PENDING_QUEUE_SIZE = 500;
private final ConcurrentLinkedQueue<String> pendingFailureQueue = new ConcurrentLinkedQueue<>();
private final Set<String> queuedFailureKeys =
java.util.Collections.newSetFromMap(new ConcurrentHashMap<>());

/**
* Initializes the pod failure watcher on application startup.
*
Expand All @@ -64,6 +92,7 @@ public class PodFailureWatcher {
*/
public void onStartup(@Observes StartupEvent event) {
log.info("Starting real-time pod failure watcher");
appStartupTime = Instant.now();
// Parse configured namespaces (comma-separated)
String namespaces = watchNamespacesProperty.orElse("");
if (!namespaces.isBlank()) {
Expand All @@ -78,6 +107,7 @@ public void onStartup(@Observes StartupEvent event) {
log.info("Configured to watch all namespaces (no namespace filter set)");
}
startPodWatcher();
startReadinessGuard();
}

/**
Expand Down Expand Up @@ -112,6 +142,10 @@ public void eventReceived(Action action, Pod pod) {
return;
}
if (hasPodFailed(pod)) {
if (!systemReady.get()) {
enqueuePendingFailure(pod);
return;
}
handlePodFailure(pod);
}
} catch (Exception e) {
Expand All @@ -136,6 +170,204 @@ public void onClose(WatcherException cause) {
};
}

/**
* Starts a background readiness guard.
*
* <p>This guard periodically checks whether the system is allowed to process pod failures. The
* system becomes ready when all of the following are true:
*
* <ul>
* <li>The configured startup delay (property: {@code
* podmortem.processing.startup-delay-seconds}) has elapsed
* <li>At least one {@code PatternLibrary} reports phase {@code Ready} (or none exist)
* <li>The Log Parser service {@code /q/health/ready} endpoint returns HTTP 2xx
* <li>The AI Interface {@code /q/health/ready} endpoint returns HTTP 2xx
* </ul>
*
* <p>Once ready, any queued pod failure events are processed asynchronously.
*/
private void startReadinessGuard() {
Thread.ofVirtual()
.name("podmortem-readiness-guard")
.start(
() -> {
while (!systemReady.get()) {
try {
boolean ready = checkSystemReady();
if (ready) {
systemReady.set(true);
log.info(
"System dependencies ready; enabling failure processing");
processQueuedFailuresAsync();
break;
}
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.debug("Readiness check failed: {}", e.getMessage());
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
});
}

/**
* Enqueues a failed pod event while the system is not yet ready to process failures.
*
* <p>Uses a bounded FIFO queue to avoid unbounded memory growth. Duplicate pod keys are
* de-duplicated while pending. When the queue is full, the oldest entry is dropped and a warn
* is logged.
*
* @param pod the pod associated with the failure event to buffer
*/
private void enqueuePendingFailure(Pod pod) {
String podKey = pod.getMetadata().getNamespace() + "/" + pod.getMetadata().getName();
// Avoid duplicate entries
if (queuedFailureKeys.add(podKey)) {
if (pendingFailureQueue.size() >= MAX_PENDING_QUEUE_SIZE) {
String dropped = pendingFailureQueue.poll();
if (dropped != null) {
queuedFailureKeys.remove(dropped);
log.warn("Pending failure queue full; dropping oldest: {}", dropped);
}
}
pendingFailureQueue.offer(podKey);
log.debug("Queued pod failure event until ready: {}", podKey);
}
}

/**
* Drains the pending failure queue and processes each entry asynchronously.
*
* <p>For each queued item, the latest pod object is fetched to confirm the failure state before
* processing; this avoids acting on stale data.
*/
private void processQueuedFailuresAsync() {
Thread.ofVirtual()
.name("podmortem-queued-failure-drain")
.start(
() -> {
String podKey;
while ((podKey = pendingFailureQueue.poll()) != null) {
try {
queuedFailureKeys.remove(podKey);
String[] parts = podKey.split("/", 2);
if (parts.length != 2) {
continue;
}
String ns = parts[0];
String name = parts[1];
Pod latest = client.pods().inNamespace(ns).withName(name).get();
if (latest != null && hasPodFailed(latest)) {
handlePodFailure(latest);
}
} catch (Exception e) {
log.debug(
"Failed processing queued failure {}: {}",
podKey,
e.getMessage());
}
}
});
}

/**
* Evaluates whether the operator may begin processing pod failures.
*
* <p>Readiness requires that the startup delay has elapsed, pattern libraries are ready (or not
* present), and dependent services (log-parser and AI interface) report ready via their {@code
* /q/health/ready} endpoints.
*
* @return {@code true} if failure processing can start; {@code false} otherwise
*/
private boolean checkSystemReady() {
if (Duration.between(appStartupTime, Instant.now()).getSeconds() < startupDelaySeconds) {
return false;
}

// Pattern libraries: ready if none defined or any reports phase Ready
try {
List<com.redhat.podmortem.common.model.kube.patternlibrary.PatternLibrary> libs =
client.resources(PatternLibrary.class).inAnyNamespace().list().getItems();
boolean patternsReady =
libs.isEmpty()
|| libs.stream()
.anyMatch(
l ->
l.getStatus() != null
&& "Ready"
.equalsIgnoreCase(
l.getStatus()
.getPhase()));
if (!patternsReady) {
return false;
}
} catch (Exception e) {
return false;
}

// log-parser ready
String logParserUrl =
logParserBaseUrlProperty.orElseGet(
() -> System.getenv("QUARKUS_REST_CLIENT_LOG_PARSER_URL"));
if (logParserUrl == null || logParserUrl.isBlank()) {
return false;
}
if (!isServiceReady(logParserUrl)) {
return false;
}

// ai-interface ready
String aiUrl =
aiInterfaceBaseUrlProperty.orElseGet(
() -> System.getenv("QUARKUS_REST_CLIENT_AI_INTERFACE_URL"));
if (aiUrl == null || aiUrl.isBlank()) {
return false;
}
if (!isServiceReady(aiUrl)) {
return false;
}

return true;
}

/**
* Checks whether a dependent service is ready by querying its readiness endpoint.
*
* @param baseUrl the base URL of the service (without trailing path); {@code /q/health/ready}
* will be appended
* @return {@code true} if the service responds with HTTP 2xx, {@code false} otherwise
*/
private boolean isServiceReady(String baseUrl) {
try {
String healthUrl =
baseUrl.endsWith("/")
? baseUrl + "q/health/ready"
: baseUrl + "/q/health/ready";
HttpRequest request =
HttpRequest.newBuilder()
.GET()
.uri(URI.create(healthUrl))
.timeout(Duration.ofSeconds(2))
.build();
HttpResponse<Void> response =
httpClient.send(request, HttpResponse.BodyHandlers.discarding());
return response.statusCode() >= 200 && response.statusCode() < 300;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
return false;
}
}

/**
* Determines if a pod has failed by examining container statuses.
*
Expand Down Expand Up @@ -568,17 +800,18 @@ private void restartWatcher() {
}
activeWatches.clear();

new Thread(
Thread.ofVirtual()
.name("podmortem-watcher-restart")
.start(
() -> {
try {
Thread.sleep(5000); // Wait 5 seconds before restart
Thread.sleep(5000);
log.info("Restarting pod failure watcher...");
startPodWatcher();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Watcher restart interrupted", e);
}
})
.start();
});
}
}