diff --git a/.github/workflows/gradle-publish.yml b/.github/workflows/gradle-publish.yml index a3bcfa7..a4df842 100644 --- a/.github/workflows/gradle-publish.yml +++ b/.github/workflows/gradle-publish.yml @@ -21,10 +21,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' server-id: github # Value of the distributionManagement/repository/id field of the pom.xml settings-path: ${{ github.workspace }} # location for the settings.xml file diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index bcb1b24..d60bbc4 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -22,10 +22,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'zulu' - name: Gradle Wrapper run: ./gradlew wrapper --gradle-version 8.8 @@ -64,10 +64,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' # Generates and submits a dependency graph, enabling Dependabot Alerts for all project dependencies. # See: https://github.com/gradle/actions/blob/main/dependency-submission/README.md diff --git a/core/build.gradle b/core/build.gradle index ae82c0b..66800ae 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -58,8 +58,8 @@ dependencies { } java { - sourceCompatibility = JavaVersion.toVersion("17") - targetCompatibility = JavaVersion.toVersion("17") + sourceCompatibility = JavaVersion.toVersion("21") + targetCompatibility = JavaVersion.toVersion("21") } graalvmNative.toolchainDetection = false diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java index 5af3c42..36dc224 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java @@ -1,5 +1,5 @@ /* - * (C) 2017-2025 TCN Inc. All rights reserved. + * (C) 2017-2026 TCN Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,21 +25,59 @@ import io.grpc.TlsChannelCredentials; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Base class for bidirectional gRPC stream clients. + * + *

Provides shared infrastructure for stream lifecycle management including exponential backoff + * with jitter, hung connection detection, connection tracking, and graceful shutdown. Subclasses + * implement {@link #runStream()} for stream-specific logic and {@link #getStreamName()} for + * logging. + */ public abstract class GateClientAbstract { private static final Logger log = LoggerFactory.getLogger(GateClientAbstract.class); + // Stream lifecycle constants + protected static final long STREAM_TIMEOUT_MINUTES = 5; + protected static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; + + // Backoff configuration + private static final long BACKOFF_BASE_MS = 2000; + private static final long BACKOFF_MAX_MS = 30000; + private static final double BACKOFF_JITTER = 0.2; + + // Channel management private ManagedChannel sharedChannel; private final ReentrantLock lock = new ReentrantLock(); private Config currentConfig = null; protected final String tenant; + // Connection tracking — shared across all stream subclasses + protected final AtomicReference lastMessageTime = new AtomicReference<>(); + protected final AtomicReference lastDisconnectTime = new AtomicReference<>(); + protected final AtomicReference reconnectionStartTime = new AtomicReference<>(); + protected final AtomicReference connectionEstablishedTime = new AtomicReference<>(); + protected final AtomicLong totalReconnectionAttempts = new AtomicLong(0); + protected final AtomicLong successfulReconnections = new AtomicLong(0); + protected final AtomicReference lastErrorType = new AtomicReference<>(); + protected final AtomicLong consecutiveFailures = new AtomicLong(0); + protected final AtomicBoolean isRunning = new AtomicBoolean(false); + public GateClientAbstract(String tenant, Config currentConfig) { this.currentConfig = currentConfig; this.tenant = tenant; @@ -54,6 +92,200 @@ public GateClientAbstract(String tenant, Config currentConfig) { "gRPC-Channel-Cleanup")); } + // --------------------------------------------------------------------------- + // Stream lifecycle — template method pattern + // --------------------------------------------------------------------------- + + /** + * Human-readable name for this stream, used in log messages. Override in subclasses that use the + * template {@link #start()} method (e.g. "EventStream", "JobQueue"). + */ + protected String getStreamName() { + return getClass().getSimpleName(); + } + + /** + * Run a single stream session. Called by the template {@link #start()} method. Subclasses that + * use the shared lifecycle must override this. Legacy subclasses that override start() directly + * can ignore it. + */ + protected void runStream() + throws UnconfiguredException, InterruptedException, HungConnectionException { + throw new UnsupportedOperationException( + getStreamName() + " must override runStream() or start()"); + } + + /** + * Template method that handles the full stream lifecycle: backoff, try/catch, error tracking. + * Each invocation represents one connection attempt. + */ + public void start() { + if (isUnconfigured()) { + log.warn("{} is unconfigured, cannot start", getStreamName()); + return; + } + + // Prevent concurrent invocations from the fixed-rate scheduler + if (!isRunning.compareAndSet(false, true)) { + return; + } + + // Exponential backoff: sleep before retrying if we have consecutive failures + long backoffMs = computeBackoffMs(); + if (backoffMs > 0) { + log.debug( + "[{}] Waiting {}ms before reconnect (consecutive failures: {})", + getStreamName(), + backoffMs, + consecutiveFailures.get()); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isRunning.set(false); + return; + } + } + + try { + log.debug("[{}] Starting, checking configuration status", getStreamName()); + reconnectionStartTime.set(Instant.now()); + resetChannelBackoff(); + runStream(); + } catch (HungConnectionException e) { + log.warn("[{}] Connection appears hung: {}", getStreamName(), e.getMessage()); + lastErrorType.set("HungConnection"); + consecutiveFailures.incrementAndGet(); + } catch (UnconfiguredException e) { + log.error("[{}] Configuration error: {}", getStreamName(), e.getMessage()); + lastErrorType.set("UnconfiguredException"); + consecutiveFailures.incrementAndGet(); + } catch (InterruptedException e) { + log.info("[{}] Interrupted", getStreamName()); + Thread.currentThread().interrupt(); + } catch (Exception e) { + if (isRoutineStreamClosure(e)) { + log.debug("[{}] Stream closed by server (routine reconnect)", getStreamName()); + } else { + log.error("[{}] Error: {}", getStreamName(), e.getMessage(), e); + } + lastErrorType.set(e.getClass().getSimpleName()); + consecutiveFailures.incrementAndGet(); + } finally { + totalReconnectionAttempts.incrementAndGet(); + lastDisconnectTime.set(Instant.now()); + isRunning.set(false); + onStreamDisconnected(); + log.debug("[{}] Stream session ended", getStreamName()); + } + } + + /** Hook called in the finally block of start(). Subclasses can clear observer refs here. */ + protected void onStreamDisconnected() {} + + // --------------------------------------------------------------------------- + // Shared stream helpers + // --------------------------------------------------------------------------- + + /** Record that the first server response was received and the connection is established. */ + protected void onConnectionEstablished() { + connectionEstablishedTime.set(Instant.now()); + successfulReconnections.incrementAndGet(); + consecutiveFailures.set(0); + lastErrorType.set(null); + + log.info( + "[{}] Connection established (took {})", + getStreamName(), + Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + } + + /** Check if the connection is hung (no messages received within threshold). */ + protected void checkForHungConnection() throws HungConnectionException { + Instant lastMsg = lastMessageTime.get(); + if (lastMsg == null) { + lastMsg = connectionEstablishedTime.get(); + } + if (lastMsg != null + && lastMsg.isBefore( + Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { + throw new HungConnectionException( + "No messages received since " + lastMsg + " - connection appears hung"); + } + } + + /** + * Wait for a stream latch to complete, periodically checking for hung connections. Returns when + * the latch counts down, the timeout expires, or a hung connection is detected. + */ + protected void awaitStreamWithHungDetection(CountDownLatch latch) + throws InterruptedException, HungConnectionException { + long startTime = System.currentTimeMillis(); + long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); + + while ((System.currentTimeMillis() - startTime) < maxDurationMs) { + if (latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS)) { + return; // Stream completed + } + checkForHungConnection(); + } + } + + /** Compute backoff delay with jitter based on consecutive failure count. */ + private long computeBackoffMs() { + long failures = consecutiveFailures.get(); + if (failures <= 0) { + return 0; + } + long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); + double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; + return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + } + + /** + * Build base stream status map with connection tracking fields. Subclasses should call this and + * add their own stream-specific counters. + */ + protected Map buildStreamStatus() { + Instant lastDisconnect = lastDisconnectTime.get(); + Instant connectionEstablished = connectionEstablishedTime.get(); + Instant reconnectStart = reconnectionStartTime.get(); + Instant lastMessage = lastMessageTime.get(); + + Map status = new HashMap<>(); + status.put("isRunning", isRunning.get()); + status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); + status.put("successfulReconnections", successfulReconnections.get()); + status.put("consecutiveFailures", consecutiveFailures.get()); + status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); + status.put( + "connectionEstablishedTime", + connectionEstablished != null ? connectionEstablished.toString() : null); + status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); + status.put("lastErrorType", lastErrorType.get()); + status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); + return status; + } + + // --------------------------------------------------------------------------- + // Graceful shutdown + // --------------------------------------------------------------------------- + + /** Override in subclasses to add stream-specific stop logic (e.g. closing observers). */ + public void stop() { + doStop(); + } + + /** Shared shutdown logic: set running flag, shut down channel. */ + protected void doStop() { + isRunning.set(false); + shutdown(); + } + + // --------------------------------------------------------------------------- + // Configuration and channel management + // --------------------------------------------------------------------------- + public Config getConfig() { return currentConfig; } @@ -119,20 +351,18 @@ protected void shutdown() { tenant, e); channelToShutdown.shutdownNow(); - Thread.currentThread().interrupt(); // Preserve interrupt status + Thread.currentThread().interrupt(); } - // Set static field to null AFTER successful shutdown sharedChannel = null; } else { log.debug("Tenant: {} - Shared channel is already null, shut down, or terminated.", tenant); - // Ensure static field is null if channel is unusable if (sharedChannel != null && (sharedChannel.isShutdown() || sharedChannel.isTerminated())) { sharedChannel = null; } } } finally { - lock.unlock(); // Guaranteed unlock in finally block + lock.unlock(); } } @@ -161,7 +391,6 @@ private void forceShutdownSharedChannel() { public ManagedChannel getChannel() throws UnconfiguredException { long getChannelStartTime = System.currentTimeMillis(); - // Double-checked locking pattern for thread-safe lazy initialization ManagedChannel localChannel = sharedChannel; if (localChannel == null || localChannel.isShutdown() || localChannel.isTerminated()) { @@ -187,7 +416,6 @@ public ManagedChannel getChannel() throws UnconfiguredException { lockWaitTime); } try { - // Double-check condition inside synchronized block localChannel = sharedChannel; var shutdown = localChannel == null || localChannel.isShutdown(); @@ -279,27 +507,31 @@ private ManagedChannel createNewChannel() throws UnconfiguredException { "TCN Gate client configuration error during channel creation", e); } catch (UnconfiguredException e) { log.error("Tenant: {} - Configuration error during shared channel creation", tenant, e); - throw e; // Re-throw specific unconfigured exception + throw e; } catch (Exception e) { log.error("Tenant: {} - Unexpected error during shared channel creation", tenant, e); throw new UnconfiguredException("Unexpected error configuring TCN Gate client channel", e); } } - public abstract void start(); + /** + * Reset gRPC's internal connect backoff on the shared channel. This forces the name resolver to + * immediately retry DNS resolution instead of waiting for its own exponential backoff timer. + */ + protected void resetChannelBackoff() { + ManagedChannel channel = sharedChannel; + if (channel != null && !channel.isShutdown() && !channel.isTerminated()) { + channel.resetConnectBackoff(); + } + } - /** Resets the gRPC channel after a connection failure */ + /** Resets the gRPC channel after a connection failure. */ protected void resetChannel() { log.info("Tenant: {} - Resetting static shared gRPC channel after connection failure.", tenant); shutdown(); } - /** - * Helper method to handle StatusRuntimeException - * - * @param e The exception to handle - * @return true if the exception was handled - */ + /** Handle StatusRuntimeException, resetting channel on UNAVAILABLE. */ protected boolean handleStatusRuntimeException(StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { log.warn( @@ -310,7 +542,7 @@ protected boolean handleStatusRuntimeException(StatusRuntimeException e) { return false; } - /** Get channel statistics for monitoring */ + /** Get channel statistics for monitoring. */ public Map getChannelStats() { ManagedChannel channel = sharedChannel; if (channel != null) { @@ -322,4 +554,25 @@ public Map getChannelStats() { } return Map.of("channel", "null"); } + + /** Exception for hung connection detection. */ + public static class HungConnectionException extends Exception { + public HungConnectionException(String message) { + super(message); + } + } + + private boolean isRoutineStreamClosure(Exception e) { + Throwable cause = e; + while (cause != null) { + if (cause instanceof StatusRuntimeException sre + && sre.getStatus().getCode() == Status.Code.UNAVAILABLE + && sre.getMessage() != null + && sre.getMessage().contains("NO_ERROR")) { + return true; + } + cause = cause.getCause(); + } + return false; + } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java index 2d12d4c..39e6011 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java @@ -22,278 +22,392 @@ import build.buf.gen.tcnapi.exile.gate.v2.GateServiceGrpc; import com.tcn.exile.config.Config; import com.tcn.exile.gateclients.UnconfiguredException; +import com.tcn.exile.log.LogCategory; +import com.tcn.exile.log.StructuredLogger; import com.tcn.exile.plugin.PluginInterface; import io.grpc.stub.StreamObserver; +import jakarta.annotation.PreDestroy; +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -/** Streams events from Gate using the EventStream API with acknowledgment. */ +/** + * Bidirectional event streaming with acknowledgment using the EventStream API. + * + *

The client drives the loop: sends a request (with ACK IDs + event count), server responds with + * events, client processes them and sends the next request with ACKs. This repeats until the + * server's 5-minute context timeout expires. + */ public class GateClientEventStream extends GateClientAbstract { - protected static final org.slf4j.Logger log = - org.slf4j.LoggerFactory.getLogger(GateClientEventStream.class); + private static final StructuredLogger log = new StructuredLogger(GateClientEventStream.class); private static final int BATCH_SIZE = 100; - private static final long REQUEST_TIMEOUT_SECONDS = 60; private final PluginInterface plugin; + private final AtomicLong eventsProcessed = new AtomicLong(0); + private final AtomicLong eventsFailed = new AtomicLong(0); - // Track ACKs between polling cycles - events successfully processed - // will be ACKed in the next request + // Pending ACKs buffer — survives stream reconnections. + // Event IDs are added after successful processing and removed only after + // successful send to the server. If a stream breaks before ACKs are sent, + // they carry over to the next connection attempt. private final List pendingAcks = new ArrayList<>(); + // Stream observer for sending requests/ACKs + private final AtomicReference> requestObserverRef = + new AtomicReference<>(); + public GateClientEventStream(String tenant, Config currentConfig, PluginInterface plugin) { super(tenant, currentConfig); this.plugin = plugin; } @Override - public void start() { - try { - if (isUnconfigured()) { - log.debug("Tenant: {} - Configuration not set, skipping event stream", tenant); - return; - } - if (!plugin.isRunning()) { - log.debug( - "Tenant: {} - Plugin is not running (possibly due to database disconnection), skipping event stream", - tenant); - return; - } + protected String getStreamName() { + return "EventStream"; + } - int totalProcessed = 0; - long cycleStart = System.currentTimeMillis(); + @Override + protected void onStreamDisconnected() { + requestObserverRef.set(null); + } - // Copy pending ACKs and clear for this cycle - List acksToSend; + @Override + protected void runStream() + throws UnconfiguredException, InterruptedException, HungConnectionException { + var latch = new CountDownLatch(1); + var errorRef = new AtomicReference(); + var firstResponseReceived = new AtomicBoolean(false); + + var responseObserver = + new StreamObserver() { + @Override + public void onNext(EventStreamResponse response) { + lastMessageTime.set(Instant.now()); + + if (firstResponseReceived.compareAndSet(false, true)) { + onConnectionEstablished(); + } + + if (response.getEventsCount() == 0) { + log.debug( + LogCategory.GRPC, "EmptyBatch", "Received empty event batch, requesting next"); + // Brief pause to avoid hot-looping when no events are available. + // The server responds instantly to empty polls, so without this + // delay we'd loop every ~40ms burning CPU and network. + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + sendNextRequest(); + return; + } + + log.debug( + LogCategory.GRPC, + "EventsReceived", + "Received %d events from stream", + response.getEventsCount()); + + processBatch(response.getEventsList()); + sendNextRequest(); + } + + @Override + public void onError(Throwable t) { + log.warn(LogCategory.GRPC, "StreamError", "Event stream error: %s", t.getMessage()); + errorRef.set(t); + latch.countDown(); + } + + @Override + public void onCompleted() { + log.info(LogCategory.GRPC, "StreamCompleted", "Event stream completed by server"); + latch.countDown(); + } + }; + + // Open bidirectional stream + var requestObserver = GateServiceGrpc.newStub(getChannel()).eventStream(responseObserver); + requestObserverRef.set(requestObserver); + + // Send initial request — include any pending ACKs from a previous broken stream + List carryOverAcks; + synchronized (pendingAcks) { + carryOverAcks = new ArrayList<>(pendingAcks); + } + if (!carryOverAcks.isEmpty()) { + log.info( + LogCategory.GRPC, + "CarryOverAcks", + "Sending %d pending ACKs from previous stream session", + carryOverAcks.size()); + } + log.debug(LogCategory.GRPC, "Init", "Sending initial event stream request..."); + requestObserver.onNext( + EventStreamRequest.newBuilder() + .setEventCount(BATCH_SIZE) + .addAllAckEventIds(carryOverAcks) + .build()); + if (!carryOverAcks.isEmpty()) { synchronized (pendingAcks) { - acksToSend = new ArrayList<>(pendingAcks); - pendingAcks.clear(); - } - - if (!acksToSend.isEmpty()) { - log.debug("Tenant: {} - Sending {} ACKs from previous cycle", tenant, acksToSend.size()); + pendingAcks.removeAll(carryOverAcks); } + } - // Request events, sending any pending ACKs from previous cycle - EventStreamResponse response = requestEvents(acksToSend); + awaitStreamWithHungDetection(latch); - if (response == null || response.getEventsCount() == 0) { - log.debug( - "Tenant: {} - Event stream request completed successfully but no events were received", - tenant); - return; - } - - log.debug("Tenant: {} - Received {} events from stream", tenant, response.getEventsCount()); + var error = errorRef.get(); + if (error != null) { + throw new RuntimeException("Stream error", error); + } + } - // Process events and collect successful ACK IDs - List successfulAcks = new ArrayList<>(); - long batchStart = System.currentTimeMillis(); + // --------------------------------------------------------------------------- + // Request & ACK management + // --------------------------------------------------------------------------- - for (Event event : response.getEventsList()) { - String eventId = getEventId(event); - if (processEvent(event)) { - successfulAcks.add(eventId); - } - // If processing fails, event is NOT added to ACKs - it will be redelivered - } + /** + * Send the next request on the stream, draining pendingAcks into the request. If the send fails, + * the ACKs remain in pendingAcks for the next attempt or reconnection. + */ + private void sendNextRequest() { + var observer = requestObserverRef.get(); + if (observer == null) { + log.warn(LogCategory.GRPC, "RequestFailed", "Cannot send next request - no active observer"); + return; + } - long batchEnd = System.currentTimeMillis(); - totalProcessed = successfulAcks.size(); + List acksToSend; + synchronized (pendingAcks) { + acksToSend = new ArrayList<>(pendingAcks); + } - // Store successful ACKs for next cycle - synchronized (pendingAcks) { - pendingAcks.addAll(successfulAcks); - } + try { + var request = + EventStreamRequest.newBuilder() + .setEventCount(BATCH_SIZE) + .addAllAckEventIds(acksToSend) + .build(); + observer.onNext(request); - // Warn if batch processing is slow - if (totalProcessed > 0) { - long avg = (batchEnd - batchStart) / totalProcessed; - if (avg > 1000) { - log.warn( - "Tenant: {} - Event stream batch completed {} events in {}ms, average time per event: {}ms, this is long", - tenant, - totalProcessed, - batchEnd - batchStart, - avg); + if (!acksToSend.isEmpty()) { + synchronized (pendingAcks) { + pendingAcks.removeAll(acksToSend); } + log.debug( + LogCategory.GRPC, + "AckSent", + "Sent ACK for %d events, requesting next batch", + acksToSend.size()); } - - // Log summary - if (totalProcessed > 0) { - long cycleEnd = System.currentTimeMillis(); - log.info( - "Tenant: {} - Event stream cycle completed, processed {} events in {}ms", - tenant, - totalProcessed, - cycleEnd - cycleStart); - } - - } catch (UnconfiguredException e) { - log.error("Tenant: {} - Error while getting client configuration {}", tenant, e.getMessage()); - } catch (InterruptedException e) { - log.warn("Tenant: {} - Event stream interrupted", tenant); - Thread.currentThread().interrupt(); } catch (Exception e) { - log.error("Tenant: {} - Unexpected error in event stream", tenant, e); - // Clear pending ACKs on error to avoid ACKing events we may not have processed - synchronized (pendingAcks) { - pendingAcks.clear(); - } + log.error( + LogCategory.GRPC, + "RequestFailed", + "Failed to send next event stream request (keeping %d pending ACKs): %s", + acksToSend.size(), + e.getMessage()); } } - /** - * Request events using async stub with blocking wait. This handles the server-streaming pattern - * where we send request + StreamObserver. - */ - private EventStreamResponse requestEvents(List ackEventIds) - throws InterruptedException, UnconfiguredException { + // --------------------------------------------------------------------------- + // Event processing + // --------------------------------------------------------------------------- - var request = - EventStreamRequest.newBuilder() - .setEventCount(BATCH_SIZE) - .addAllAckEventIds(ackEventIds) - .build(); - - var latch = new CountDownLatch(1); - var responseRef = new AtomicReference(); - var errorRef = new AtomicReference(); - - GateServiceGrpc.newStub(getChannel()) - .eventStream( - request, - new StreamObserver() { - @Override - public void onNext(EventStreamResponse response) { - responseRef.set(response); - } - - @Override - public void onError(Throwable t) { - errorRef.set(t); - latch.countDown(); - } - - @Override - public void onCompleted() { - latch.countDown(); - } - }); - - // Wait for response with timeout - boolean completed = latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + /** Process a batch of events, adding successful ACK IDs to the pending buffer. */ + private void processBatch(List events) { + long batchStart = System.currentTimeMillis(); + int successCount = 0; - if (!completed) { - throw new RuntimeException( - "Event stream request timed out after " + REQUEST_TIMEOUT_SECONDS + " seconds"); - } - - if (errorRef.get() != null) { - throw new RuntimeException("Event stream error", errorRef.get()); + for (Event event : events) { + String eventId = getEventId(event); + if (processEvent(event)) { + synchronized (pendingAcks) { + pendingAcks.add(eventId); + } + successCount++; + eventsProcessed.incrementAndGet(); + } else { + eventsFailed.incrementAndGet(); + log.warn( + LogCategory.GRPC, + "EventNotAcked", + "Event %s NOT acknowledged - will be redelivered", + eventId); + } } - var response = responseRef.get(); - if (response == null) { - // Return empty response if none received - return EventStreamResponse.getDefaultInstance(); + long elapsed = System.currentTimeMillis() - batchStart; + if (successCount > 0 && (elapsed / successCount) > 1000) { + log.warn( + LogCategory.GRPC, + "SlowBatch", + "Event batch completed %d events in %dms, avg %dms per event", + successCount, + elapsed, + elapsed / successCount); } - return response; } /** - * Process a single event. Returns true if successfully processed (should ACK). - * - *

This mirrors the event handling logic from GateClientPollEvents exactly. + * Process a single event by dispatching to the plugin. Returns true if successfully processed. */ private boolean processEvent(Event event) { try { - if (event.hasAgentCall()) { - log.debug( - "Tenant: {} - Received agent call event {} - {}", - tenant, - event.getAgentCall().getCallSid(), - event.getAgentCall().getCallType()); - plugin.handleAgentCall(event.getAgentCall()); - } - - if (event.hasAgentResponse()) { - log.debug( - "Tenant: {} - Received agent response event {}", - tenant, - event.getAgentResponse().getAgentCallResponseSid()); - plugin.handleAgentResponse(event.getAgentResponse()); - } - - if (event.hasTelephonyResult()) { - log.debug( - "Tenant: {} - Received telephony result event {} - {}", - tenant, - event.getTelephonyResult().getCallSid(), - event.getTelephonyResult().getCallType()); - plugin.handleTelephonyResult(event.getTelephonyResult()); - } - - if (event.hasTask()) { - log.debug("Tenant: {} - Received task event {}", tenant, event.getTask().getTaskSid()); - plugin.handleTask(event.getTask()); - } - - if (event.hasTransferInstance()) { + if (!plugin.isRunning()) { log.debug( - "Tenant: {} - Received transfer instance event {}", - tenant, - event.getTransferInstance().getTransferInstanceId()); - plugin.handleTransferInstance(event.getTransferInstance()); + LogCategory.GRPC, + "PluginNotRunning", + "Plugin is not running, skipping event processing"); + return false; } - if (event.hasCallRecording()) { - log.debug( - "Tenant: {} - Received call recording event {}", - tenant, - event.getCallRecording().getRecordingId()); - plugin.handleCallRecording(event.getCallRecording()); + switch (event.getEntityCase()) { + case AGENT_CALL: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received agent call event %d - %s", + event.getAgentCall().getCallSid(), + event.getAgentCall().getCallType()); + plugin.handleAgentCall(event.getAgentCall()); + break; + case AGENT_RESPONSE: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received agent response event %s", + event.getAgentResponse().getAgentCallResponseSid()); + plugin.handleAgentResponse(event.getAgentResponse()); + break; + case TELEPHONY_RESULT: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received telephony result event %d - %s", + event.getTelephonyResult().getCallSid(), + event.getTelephonyResult().getCallType()); + plugin.handleTelephonyResult(event.getTelephonyResult()); + break; + case TASK: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received task event %s", + event.getTask().getTaskSid()); + plugin.handleTask(event.getTask()); + break; + case TRANSFER_INSTANCE: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received transfer instance event %s", + event.getTransferInstance().getTransferInstanceId()); + plugin.handleTransferInstance(event.getTransferInstance()); + break; + case CALL_RECORDING: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received call recording event %s", + event.getCallRecording().getRecordingId()); + plugin.handleCallRecording(event.getCallRecording()); + break; + default: + log.warn( + LogCategory.GRPC, "UnknownEvent", "Unknown event type: %s", event.getEntityCase()); + break; } return true; - } catch (Exception e) { - log.error("Tenant: {} - Failed to process event: {}", tenant, e.getMessage(), e); - return false; // Don't ACK - will be redelivered + log.error( + LogCategory.GRPC, + "EventProcessingError", + "Failed to process event: %s", + e.getMessage(), + e); + return false; } } - /** Extract event ID for ACK purposes. Each event type has a different primary identifier. */ + /** + * Extract the server-assigned entity ID for ACK purposes. The server sets exile_entity_id on each + * Event from the DB's entity_id, which may be a composite key (e.g. "call_type,call_sid" for + * telephony). We must echo this exact value back for the ACK to match. + */ private String getEventId(Event event) { - if (event.hasAgentCall()) { - return String.valueOf(event.getAgentCall().getAgentCallSid()); - } - if (event.hasTelephonyResult()) { - return String.valueOf(event.getTelephonyResult().getCallSid()); - } - if (event.hasAgentResponse()) { - return String.valueOf(event.getAgentResponse().getAgentCallResponseSid()); - } - if (event.hasTask()) { - return String.valueOf(event.getTask().getTaskSid()); + String entityId = event.getExileEntityId(); + if (entityId == null || entityId.isEmpty()) { + log.warn( + LogCategory.GRPC, + "MissingEntityId", + "Event has no exile_entity_id, cannot ACK. Event type: %s", + event.getEntityCase()); + return "unknown"; } - if (event.hasTransferInstance()) { - return event.getTransferInstance().getTransferInstanceId(); + return entityId; + } + + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + + @Override + public void stop() { + log.info( + LogCategory.GRPC, + "Stopping", + "Stopping GateClientEventStream (total attempts: %d, successful: %d, events: %d/%d)", + totalReconnectionAttempts.get(), + successfulReconnections.get(), + eventsProcessed.get(), + eventsFailed.get()); + + synchronized (pendingAcks) { + if (!pendingAcks.isEmpty()) { + log.warn( + LogCategory.GRPC, + "UnsentAcks", + "Shutting down with %d un-sent ACKs (these events will be redelivered): %s", + pendingAcks.size(), + pendingAcks); + } } - if (event.hasCallRecording()) { - return event.getCallRecording().getRecordingId(); + + var observer = requestObserverRef.get(); + if (observer != null) { + try { + observer.onCompleted(); + } catch (Exception e) { + log.debug(LogCategory.GRPC, "CloseError", "Error closing stream: %s", e.getMessage()); + } } - log.warn("Tenant: {} - Unknown event type, cannot extract ID", tenant); - return "unknown"; + + doStop(); + log.info(LogCategory.GRPC, "Stopped", "GateClientEventStream stopped"); + } + + @PreDestroy + public void destroy() { + stop(); } - /** Get count of pending ACKs (for diagnostics). */ - public int getPendingAckCount() { + public Map getStreamStatus() { + Map status = buildStreamStatus(); + status.put("eventsProcessed", eventsProcessed.get()); + status.put("eventsFailed", eventsFailed.get()); synchronized (pendingAcks) { - return pendingAcks.size(); + status.put("pendingAckCount", pendingAcks.size()); } + return status; } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java index 1d224a5..386b15c 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java @@ -28,10 +28,7 @@ import com.tcn.exile.plugin.PluginInterface; import io.grpc.stub.StreamObserver; import jakarta.annotation.PreDestroy; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,28 +36,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -/** Bidirectional job streaming with acknowledgment using JobQueueStream API. */ +/** Bidirectional job streaming with acknowledgment using the JobQueueStream API. */ public class GateClientJobQueue extends GateClientAbstract { private static final StructuredLogger log = new StructuredLogger(GateClientJobQueue.class); private static final int DEFAULT_TIMEOUT_SECONDS = 300; - - private static final long STREAM_TIMEOUT_MINUTES = 5; - private static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; private static final String KEEPALIVE_JOB_ID = "keepalive"; private final PluginInterface plugin; - private final AtomicBoolean establishedForCurrentAttempt = new AtomicBoolean(false); - private final AtomicReference lastMessageTime = new AtomicReference<>(); - - // Connection timing tracking - private final AtomicReference lastDisconnectTime = new AtomicReference<>(); - private final AtomicReference reconnectionStartTime = new AtomicReference<>(); - private final AtomicReference connectionEstablishedTime = new AtomicReference<>(); - private final AtomicLong totalReconnectionAttempts = new AtomicLong(0); - private final AtomicLong successfulReconnections = new AtomicLong(0); - private final AtomicReference lastErrorType = new AtomicReference<>(); - private final AtomicLong consecutiveFailures = new AtomicLong(0); - private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicLong jobsProcessed = new AtomicLong(0); private final AtomicLong jobsFailed = new AtomicLong(0); @@ -73,88 +55,31 @@ public GateClientJobQueue(String tenant, Config currentConfig, PluginInterface p this.plugin = plugin; } - /** Check if the connection is hung (no messages received in threshold time). */ - private void checkForHungConnection() throws HungConnectionException { - Instant lastMsg = lastMessageTime.get(); - if (lastMsg == null) { - lastMsg = connectionEstablishedTime.get(); - } - if (lastMsg != null - && lastMsg.isBefore( - Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { - throw new HungConnectionException( - "No messages received since " + lastMsg + " - connection appears hung"); - } + @Override + protected String getStreamName() { + return "JobQueue"; } @Override - public void start() { - if (isUnconfigured()) { - log.warn(LogCategory.GRPC, "NOOP", "JobQueue is unconfigured, cannot stream jobs"); - return; - } - - try { - log.debug(LogCategory.GRPC, "Init", "JobQueue task started, checking configuration status"); - - reconnectionStartTime.set(Instant.now()); - isRunning.set(true); - - runStream(); - - } catch (HungConnectionException e) { - log.warn( - LogCategory.GRPC, "HungConnection", "Job queue stream appears hung: %s", e.getMessage()); - lastErrorType.set("HungConnection"); - } catch (UnconfiguredException e) { - log.error(LogCategory.GRPC, "ConfigError", "Configuration error: %s", e.getMessage()); - lastErrorType.set("UnconfiguredException"); - consecutiveFailures.incrementAndGet(); - } catch (InterruptedException e) { - log.info(LogCategory.GRPC, "Interrupted", "Job queue stream interrupted"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error( - LogCategory.GRPC, "JobQueue", "Error streaming jobs from server: %s", e.getMessage(), e); - lastErrorType.set(e.getClass().getSimpleName()); - if (connectionEstablishedTime.get() == null) { - consecutiveFailures.incrementAndGet(); - } - } finally { - totalReconnectionAttempts.incrementAndGet(); - lastDisconnectTime.set(Instant.now()); - isRunning.set(false); - requestObserverRef.set(null); - log.debug(LogCategory.GRPC, "Complete", "Job queue done"); - } + protected void onStreamDisconnected() { + requestObserverRef.set(null); } - /** Run a single stream session with bidirectional communication. */ - private void runStream() + @Override + protected void runStream() throws UnconfiguredException, InterruptedException, HungConnectionException { var latch = new CountDownLatch(1); var errorRef = new AtomicReference(); var firstResponseReceived = new AtomicBoolean(false); - // Create response handler var responseObserver = new StreamObserver() { @Override public void onNext(JobQueueStreamResponse response) { lastMessageTime.set(Instant.now()); - // Log connected on first server response if (firstResponseReceived.compareAndSet(false, true)) { - connectionEstablishedTime.set(Instant.now()); - successfulReconnections.incrementAndGet(); - consecutiveFailures.set(0); - lastErrorType.set(null); - - log.info( - LogCategory.GRPC, - "ConnectionEstablished", - "Job queue connection took %s", - Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + onConnectionEstablished(); } if (!response.hasJob()) { @@ -165,7 +90,7 @@ public void onNext(JobQueueStreamResponse response) { var job = response.getJob(); String jobId = job.getJobId(); - // Handle keepalive - must ACK to register with presence store + // Handle keepalive — must ACK to register with presence store if (KEEPALIVE_JOB_ID.equals(jobId)) { log.debug( LogCategory.GRPC, "Keepalive", "Received keepalive, sending ACK to register"); @@ -180,11 +105,7 @@ public void onNext(JobQueueStreamResponse response) { jobId, getJobType(job)); - // Process the job - boolean success = processJob(job); - - // Send ACK only on success - if not success, job will be redelivered - if (success) { + if (processJob(job)) { sendAck(jobId); jobsProcessed.incrementAndGet(); } else { @@ -219,52 +140,44 @@ public void onCompleted() { log.debug(LogCategory.GRPC, "Init", "Sending initial keepalive to job queue..."); requestObserver.onNext(JobQueueStreamRequest.newBuilder().setJobId(KEEPALIVE_JOB_ID).build()); - // Wait for stream to complete, with periodic hung-connection checks - boolean completed = false; - long startTime = System.currentTimeMillis(); - long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); - - while (!completed && (System.currentTimeMillis() - startTime) < maxDurationMs) { - completed = latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS); - if (!completed) { - // Check for hung connection - checkForHungConnection(); - } - } + awaitStreamWithHungDetection(latch); - // Propagate error if any var error = errorRef.get(); if (error != null) { throw new RuntimeException("Stream error", error); } } + // --------------------------------------------------------------------------- + // ACK management + // --------------------------------------------------------------------------- + /** Send acknowledgment for a job. */ private void sendAck(String jobId) { var observer = requestObserverRef.get(); - if (observer != null) { - try { - observer.onNext(JobQueueStreamRequest.newBuilder().setJobId(jobId).build()); - log.debug(LogCategory.GRPC, "AckSent", "Sent ACK for job: %s", jobId); - } catch (Exception e) { - log.error( - LogCategory.GRPC, - "AckFailed", - "Failed to send ACK for job %s: %s", - jobId, - e.getMessage()); - } - } else { + if (observer == null) { log.warn( LogCategory.GRPC, "AckFailed", "Cannot send ACK for job %s - no active observer", jobId); + return; + } + try { + observer.onNext(JobQueueStreamRequest.newBuilder().setJobId(jobId).build()); + log.debug(LogCategory.GRPC, "AckSent", "Sent ACK for job: %s", jobId); + } catch (Exception e) { + log.error( + LogCategory.GRPC, + "AckFailed", + "Failed to send ACK for job %s: %s", + jobId, + e.getMessage()); } } - /** - * Process a job. Returns true if successfully processed. - * - *

This mirrors the job handling logic from GateClientJobStream exactly. - */ + // --------------------------------------------------------------------------- + // Job processing + // --------------------------------------------------------------------------- + + /** Process a job by dispatching to the plugin. Returns true if successfully processed. */ private boolean processJob(StreamJobsResponse value) { long jobStartTime = System.currentTimeMillis(); @@ -398,6 +311,11 @@ private void submitJobError(String jobId, String message) { } } + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + + @Override public void stop() { log.info( LogCategory.GRPC, @@ -408,9 +326,6 @@ public void stop() { jobsProcessed.get(), jobsFailed.get()); - isRunning.set(false); - - // Gracefully close the stream var observer = requestObserverRef.get(); if (observer != null) { try { @@ -420,47 +335,19 @@ public void stop() { } } - shutdown(); - log.info(LogCategory.GRPC, "GateClientJobQueueStopped", "GateClientJobQueue stopped"); + doStop(); + log.info(LogCategory.GRPC, "Stopped", "GateClientJobQueue stopped"); } @PreDestroy public void destroy() { - log.info( - LogCategory.GRPC, - "GateClientJobQueue@PreDestroyCalled", - "GateClientJobQueue @PreDestroy called"); stop(); } public Map getStreamStatus() { - Instant lastDisconnect = lastDisconnectTime.get(); - Instant connectionEstablished = connectionEstablishedTime.get(); - Instant reconnectStart = reconnectionStartTime.get(); - Instant lastMessage = lastMessageTime.get(); - - Map status = new HashMap<>(); - status.put("isRunning", isRunning.get()); - status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); - status.put("successfulReconnections", successfulReconnections.get()); - status.put("consecutiveFailures", consecutiveFailures.get()); + Map status = buildStreamStatus(); status.put("jobsProcessed", jobsProcessed.get()); status.put("jobsFailed", jobsFailed.get()); - status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); - status.put( - "connectionEstablishedTime", - connectionEstablished != null ? connectionEstablished.toString() : null); - status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); - status.put("lastErrorType", lastErrorType.get()); - status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); - return status; } - - /** Exception for hung connection detection. */ - public static class HungConnectionException extends Exception { - public HungConnectionException(String message) { - super(message); - } - } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java index e2d91fe..3ea6f87 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java @@ -111,7 +111,7 @@ public void start() { return; } - ExecutorService executorService = Executors.newFixedThreadPool(2); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); diff --git a/demo/build.gradle b/demo/build.gradle index c96feae..6b430f1 100644 --- a/demo/build.gradle +++ b/demo/build.gradle @@ -70,8 +70,8 @@ application { } java { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 } tasks.withType(JavaCompile) { diff --git a/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java b/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java index 89a4aea..d626ad8 100644 --- a/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java +++ b/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java @@ -30,6 +30,8 @@ import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; import io.micronaut.http.annotation.*; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.inject.Inject; @@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory; @Controller("/api/agents") +@ExecuteOn(TaskExecutors.BLOCKING) @OpenAPIDefinition(tags = {@Tag(name = "agents")}) public class AgentsController { private static final Logger log = LoggerFactory.getLogger(AgentsController.class); @@ -295,7 +298,7 @@ public AgentStatus getState(@PathVariable String partnerAgentId) throws Unconfig @Tag(name = "agents") public SetAgentStatusResponse setState( @PathVariable String partnerAgentId, - @PathVariable SetAgentState state /*, @Body PauseCodeReason pauseCodeReason*/) + @PathVariable SetAgentState state /* , @Body PauseCodeReason pauseCodeReason */) throws UnconfiguredException { log.debug("setState"); var request = @@ -303,8 +306,8 @@ public SetAgentStatusResponse setState( .setPartnerAgentId(partnerAgentId) .setNewState(build.buf.gen.tcnapi.exile.gate.v2.AgentState.values()[state.getValue()]); // if (pauseCodeReason != null && pauseCodeReason.reason() != null) { - // request.setReason(pauseCodeReason.reason()); - // // request.setPauseCodeReason(pauseCodeReason.reason()); + // request.setReason(pauseCodeReason.reason()); + // // request.setPauseCodeReason(pauseCodeReason.reason()); // } var res = configChangeWatcher.getGateClient().updateAgentStatus(request.build()); return new SetAgentStatusResponse(); diff --git a/gradle.properties b/gradle.properties index 792bcb9..ca67440 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,8 +3,8 @@ micronautGradlePluginVersion=4.4.4 grpcVersion=1.68.1 protobufVersion=3.25.5 -exileapiProtobufVersion=33.4.0.1.20260114173504.d15b0c747d85 -exileapiGrpcVersion=1.78.0.1.20260114173504.d15b0c747d85 +exileapiProtobufVersion=33.5.0.1.20260211211723.9ac313171527 +exileapiGrpcVersion=1.79.0.1.20260211211723.9ac313171527 org.gradle.jvmargs=-Xmx4G