From fc6ceace5fce753508f659f48073813ed1396b85 Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Wed, 11 Feb 2026 10:07:46 -0700 Subject: [PATCH 1/6] Enhances event stream with bidirectional handling Implements a new class for improved bidirectional event streaming. Introduces exponential backoff with jitter for reconnection attempts, addressing connection stability issues. Enhances logging for better monitoring and diagnostics. Introduces interface improvements to streamline event processing across multiple streams. Extends existing job queue functionality to incorporate similar reconnection and backoff logic, enhancing its robustness. Relates to stream-testing-updates branch. --- .../gateclients/v2/GateClientAbstract.java | 10 + .../gateclients/v2/GateClientEventStream.java | 575 +++++++++++++----- .../gateclients/v2/GateClientJobQueue.java | 55 +- 3 files changed, 490 insertions(+), 150 deletions(-) diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java index 5af3c42..f63e8f3 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java @@ -287,6 +287,16 @@ private ManagedChannel createNewChannel() throws UnconfiguredException { } public abstract void start(); + /** + * Reset gRPC's internal connect backoff on the shared channel. This forces the name resolver to + * immediately retry DNS resolution instead of waiting for its own exponential backoff timer. + */ + protected void resetChannelBackoff() { + ManagedChannel channel = sharedChannel; + if (channel != null && !channel.isShutdown() && !channel.isTerminated()) { + channel.resetConnectBackoff(); + } + } /** Resets the gRPC channel after a connection failure */ protected void resetChannel() { diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java index 2d12d4c..1723046 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java @@ -22,186 +22,375 @@ import build.buf.gen.tcnapi.exile.gate.v2.GateServiceGrpc; import com.tcn.exile.config.Config; import com.tcn.exile.gateclients.UnconfiguredException; +import com.tcn.exile.log.LogCategory; +import com.tcn.exile.log.StructuredLogger; import com.tcn.exile.plugin.PluginInterface; import io.grpc.stub.StreamObserver; +import jakarta.annotation.PreDestroy; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -/** Streams events from Gate using the EventStream API with acknowledgment. */ +/** + * Bidirectional event streaming with immediate acknowledgment using EventStream API. + * + *

The client drives the loop: sends a request (with ACK IDs + event count), server responds with + * events, client processes them and sends the next request with ACKs. This repeats until the + * server's 5-minute context timeout expires. + */ public class GateClientEventStream extends GateClientAbstract { - protected static final org.slf4j.Logger log = - org.slf4j.LoggerFactory.getLogger(GateClientEventStream.class); + private static final StructuredLogger log = new StructuredLogger(GateClientEventStream.class); private static final int BATCH_SIZE = 100; - private static final long REQUEST_TIMEOUT_SECONDS = 60; + private static final long STREAM_TIMEOUT_MINUTES = 5; + private static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; - private final PluginInterface plugin; + // Backoff configuration + private static final long BACKOFF_BASE_MS = 2000; + private static final long BACKOFF_MAX_MS = 30000; + private static final double BACKOFF_JITTER = 0.2; - // Track ACKs between polling cycles - events successfully processed - // will be ACKed in the next request + private final PluginInterface plugin; + private final AtomicBoolean establishedForCurrentAttempt = new AtomicBoolean(false); + private final AtomicReference lastMessageTime = new AtomicReference<>(); + + // Connection timing tracking + private final AtomicReference lastDisconnectTime = new AtomicReference<>(); + private final AtomicReference reconnectionStartTime = new AtomicReference<>(); + private final AtomicReference connectionEstablishedTime = new AtomicReference<>(); + private final AtomicLong totalReconnectionAttempts = new AtomicLong(0); + private final AtomicLong successfulReconnections = new AtomicLong(0); + private final AtomicReference lastErrorType = new AtomicReference<>(); + private final AtomicLong consecutiveFailures = new AtomicLong(0); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicLong eventsProcessed = new AtomicLong(0); + private final AtomicLong eventsFailed = new AtomicLong(0); + + // Pending ACKs buffer - survives stream reconnections. + // Event IDs are added here after successful processing and removed only after + // successful send to the server. If a stream breaks before ACKs are sent, + // they carry over to the next connection attempt. private final List pendingAcks = new ArrayList<>(); + // Stream observer for sending requests/ACKs + private final AtomicReference> requestObserverRef = + new AtomicReference<>(); + public GateClientEventStream(String tenant, Config currentConfig, PluginInterface plugin) { super(tenant, currentConfig); this.plugin = plugin; } - @Override - public void start() { - try { - if (isUnconfigured()) { - log.debug("Tenant: {} - Configuration not set, skipping event stream", tenant); - return; - } - if (!plugin.isRunning()) { - log.debug( - "Tenant: {} - Plugin is not running (possibly due to database disconnection), skipping event stream", - tenant); - return; - } - - int totalProcessed = 0; - long cycleStart = System.currentTimeMillis(); + /** Check if the connection is hung (no messages received in threshold time). */ + private void checkForHungConnection() throws HungConnectionException { + Instant lastMsg = lastMessageTime.get(); + if (lastMsg == null) { + lastMsg = connectionEstablishedTime.get(); + } + if (lastMsg != null + && lastMsg.isBefore( + Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { + throw new HungConnectionException( + "No messages received since " + lastMsg + " - connection appears hung"); + } + } - // Copy pending ACKs and clear for this cycle - List acksToSend; - synchronized (pendingAcks) { - acksToSend = new ArrayList<>(pendingAcks); - pendingAcks.clear(); - } + /** Compute backoff delay with jitter based on consecutive failure count. */ + private long computeBackoffMs() { + long failures = consecutiveFailures.get(); + if (failures <= 0) { + return 0; + } + // Exponential: 2s, 4s, 8s, 16s, capped at 30s + long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); + // Add ±20% jitter to avoid thundering herd between streams + double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; + return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + } - if (!acksToSend.isEmpty()) { - log.debug("Tenant: {} - Sending {} ACKs from previous cycle", tenant, acksToSend.size()); - } + @Override + public void start() { + if (isUnconfigured()) { + log.warn(LogCategory.GRPC, "NOOP", "EventStream is unconfigured, cannot stream events"); + return; + } - // Request events, sending any pending ACKs from previous cycle - EventStreamResponse response = requestEvents(acksToSend); + // Prevent concurrent invocations from the fixed-rate scheduler + if (!isRunning.compareAndSet(false, true)) { + return; + } - if (response == null || response.getEventsCount() == 0) { - log.debug( - "Tenant: {} - Event stream request completed successfully but no events were received", - tenant); + // Exponential backoff: sleep before retrying if we have consecutive failures + long backoffMs = computeBackoffMs(); + if (backoffMs > 0) { + log.debug( + LogCategory.GRPC, + "Backoff", + "Waiting %dms before reconnect attempt (consecutive failures: %d)", + backoffMs, + consecutiveFailures.get()); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isRunning.set(false); return; } + } - log.debug("Tenant: {} - Received {} events from stream", tenant, response.getEventsCount()); - - // Process events and collect successful ACK IDs - List successfulAcks = new ArrayList<>(); - long batchStart = System.currentTimeMillis(); - - for (Event event : response.getEventsList()) { - String eventId = getEventId(event); - if (processEvent(event)) { - successfulAcks.add(eventId); - } - // If processing fails, event is NOT added to ACKs - it will be redelivered - } - - long batchEnd = System.currentTimeMillis(); - totalProcessed = successfulAcks.size(); + try { + log.debug( + LogCategory.GRPC, "Init", "EventStream task started, checking configuration status"); - // Store successful ACKs for next cycle - synchronized (pendingAcks) { - pendingAcks.addAll(successfulAcks); - } + reconnectionStartTime.set(Instant.now()); - // Warn if batch processing is slow - if (totalProcessed > 0) { - long avg = (batchEnd - batchStart) / totalProcessed; - if (avg > 1000) { - log.warn( - "Tenant: {} - Event stream batch completed {} events in {}ms, average time per event: {}ms, this is long", - tenant, - totalProcessed, - batchEnd - batchStart, - avg); - } - } + // Reset gRPC's internal DNS backoff so name resolution is retried immediately + resetChannelBackoff(); - // Log summary - if (totalProcessed > 0) { - long cycleEnd = System.currentTimeMillis(); - log.info( - "Tenant: {} - Event stream cycle completed, processed {} events in {}ms", - tenant, - totalProcessed, - cycleEnd - cycleStart); - } + runStream(); + } catch (HungConnectionException e) { + log.warn(LogCategory.GRPC, "HungConnection", "Event stream appears hung: %s", e.getMessage()); + lastErrorType.set("HungConnection"); + consecutiveFailures.incrementAndGet(); } catch (UnconfiguredException e) { - log.error("Tenant: {} - Error while getting client configuration {}", tenant, e.getMessage()); + log.error(LogCategory.GRPC, "ConfigError", "Configuration error: %s", e.getMessage()); + lastErrorType.set("UnconfiguredException"); + consecutiveFailures.incrementAndGet(); } catch (InterruptedException e) { - log.warn("Tenant: {} - Event stream interrupted", tenant); + log.info(LogCategory.GRPC, "Interrupted", "Event stream interrupted"); Thread.currentThread().interrupt(); } catch (Exception e) { - log.error("Tenant: {} - Unexpected error in event stream", tenant, e); - // Clear pending ACKs on error to avoid ACKing events we may not have processed - synchronized (pendingAcks) { - pendingAcks.clear(); + log.error( + LogCategory.GRPC, + "EventStream", + "Error streaming events from server: %s", + e.getMessage(), + e); + lastErrorType.set(e.getClass().getSimpleName()); + consecutiveFailures.incrementAndGet(); + } finally { + totalReconnectionAttempts.incrementAndGet(); + lastDisconnectTime.set(Instant.now()); + isRunning.set(false); + requestObserverRef.set(null); + if (connectionEstablishedTime.get() != null) { + log.debug(LogCategory.GRPC, "Complete", "Event stream done"); } } } - /** - * Request events using async stub with blocking wait. This handles the server-streaming pattern - * where we send request + StreamObserver. - */ - private EventStreamResponse requestEvents(List ackEventIds) - throws InterruptedException, UnconfiguredException { - - var request = - EventStreamRequest.newBuilder() - .setEventCount(BATCH_SIZE) - .addAllAckEventIds(ackEventIds) - .build(); - + /** Run a single stream session with bidirectional communication. */ + private void runStream() + throws UnconfiguredException, InterruptedException, HungConnectionException { var latch = new CountDownLatch(1); - var responseRef = new AtomicReference(); var errorRef = new AtomicReference(); - - GateServiceGrpc.newStub(getChannel()) - .eventStream( - request, - new StreamObserver() { - @Override - public void onNext(EventStreamResponse response) { - responseRef.set(response); + var firstResponseReceived = new AtomicBoolean(false); + + // Create response handler + var responseObserver = + new StreamObserver() { + @Override + public void onNext(EventStreamResponse response) { + lastMessageTime.set(Instant.now()); + + // Log connected on first server response + if (firstResponseReceived.compareAndSet(false, true)) { + connectionEstablishedTime.set(Instant.now()); + successfulReconnections.incrementAndGet(); + consecutiveFailures.set(0); + lastErrorType.set(null); + + log.info( + LogCategory.GRPC, + "ConnectionEstablished", + "Event stream connection took %s", + Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + } + + if (response.getEventsCount() == 0) { + log.debug( + LogCategory.GRPC, "EmptyBatch", "Received empty event batch, requesting next"); + // Brief pause to avoid hot-looping when no events are available. + // The server responds instantly to empty polls, so without this + // delay we'd loop every ~40ms burning CPU and network. + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } - - @Override - public void onError(Throwable t) { - errorRef.set(t); - latch.countDown(); + // Still send next request to keep the loop alive + sendNextRequest(); + return; + } + + log.debug( + LogCategory.GRPC, + "EventsReceived", + "Received %d events from stream", + response.getEventsCount()); + + // Process events and store successful ACK IDs in the pending buffer + long batchStart = System.currentTimeMillis(); + int batchSuccessCount = 0; + + for (Event event : response.getEventsList()) { + String eventId = getEventId(event); + if (processEvent(event)) { + synchronized (pendingAcks) { + pendingAcks.add(eventId); + } + batchSuccessCount++; + eventsProcessed.incrementAndGet(); + } else { + eventsFailed.incrementAndGet(); + log.warn( + LogCategory.GRPC, + "EventNotAcked", + "Event %s NOT acknowledged - will be redelivered", + eventId); } - - @Override - public void onCompleted() { - latch.countDown(); + } + + long batchEnd = System.currentTimeMillis(); + + // Warn if batch processing is slow + if (batchSuccessCount > 0) { + long avg = (batchEnd - batchStart) / batchSuccessCount; + if (avg > 1000) { + log.warn( + LogCategory.GRPC, + "SlowBatch", + "Event batch completed %d events in %dms, avg %dms per event", + batchSuccessCount, + batchEnd - batchStart, + avg); } - }); + } + + // Send next request with ACKs immediately - drains pendingAcks on success + sendNextRequest(); + } + + @Override + public void onError(Throwable t) { + log.warn(LogCategory.GRPC, "StreamError", "Event stream error: %s", t.getMessage()); + errorRef.set(t); + latch.countDown(); + } + + @Override + public void onCompleted() { + log.info(LogCategory.GRPC, "StreamCompleted", "Event stream completed by server"); + latch.countDown(); + } + }; + + // Open bidirectional stream + var requestObserver = GateServiceGrpc.newStub(getChannel()).eventStream(responseObserver); + requestObserverRef.set(requestObserver); + + // Send initial request - include any pending ACKs from a previous broken stream + List carryOverAcks; + synchronized (pendingAcks) { + carryOverAcks = new ArrayList<>(pendingAcks); + } + if (!carryOverAcks.isEmpty()) { + log.info( + LogCategory.GRPC, + "CarryOverAcks", + "Sending %d pending ACKs from previous stream session", + carryOverAcks.size()); + } + log.debug(LogCategory.GRPC, "Init", "Sending initial event stream request..."); + requestObserver.onNext( + EventStreamRequest.newBuilder() + .setEventCount(BATCH_SIZE) + .addAllAckEventIds(carryOverAcks) + .build()); + // Clear pending ACKs only after successful initial send + if (!carryOverAcks.isEmpty()) { + synchronized (pendingAcks) { + pendingAcks.removeAll(carryOverAcks); + } + } + + // Wait for stream to complete, with periodic hung-connection checks + boolean completed = false; + long startTime = System.currentTimeMillis(); + long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); - // Wait for response with timeout - boolean completed = latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + while (!completed && (System.currentTimeMillis() - startTime) < maxDurationMs) { + completed = latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS); + if (!completed) { + // Check for hung connection + checkForHungConnection(); + } + } - if (!completed) { - throw new RuntimeException( - "Event stream request timed out after " + REQUEST_TIMEOUT_SECONDS + " seconds"); + // Propagate error if any + var error = errorRef.get(); + if (error != null) { + throw new RuntimeException("Stream error", error); } + } - if (errorRef.get() != null) { - throw new RuntimeException("Event stream error", errorRef.get()); + /** + * Send the next request on the stream, draining pendingAcks into the request. If the send fails, + * the ACKs remain in pendingAcks for the next attempt or reconnection. + */ + private void sendNextRequest() { + var observer = requestObserverRef.get(); + if (observer == null) { + log.warn(LogCategory.GRPC, "RequestFailed", "Cannot send next request - no active observer"); + return; } - var response = responseRef.get(); - if (response == null) { - // Return empty response if none received - return EventStreamResponse.getDefaultInstance(); + // Drain pending ACKs + List acksToSend; + synchronized (pendingAcks) { + acksToSend = new ArrayList<>(pendingAcks); + } + + try { + var request = + EventStreamRequest.newBuilder() + .setEventCount(BATCH_SIZE) + .addAllAckEventIds(acksToSend) + .build(); + observer.onNext(request); + + // Only clear after successful send + if (!acksToSend.isEmpty()) { + synchronized (pendingAcks) { + pendingAcks.removeAll(acksToSend); + } + log.debug( + LogCategory.GRPC, + "AckSent", + "Sent ACK for %d events, requesting next batch", + acksToSend.size()); + } + } catch (Exception e) { + // ACKs stay in pendingAcks - will be retried on next send or reconnection + log.error( + LogCategory.GRPC, + "RequestFailed", + "Failed to send next event stream request (keeping %d pending ACKs): %s", + acksToSend.size(), + e.getMessage()); } - return response; } /** @@ -211,10 +400,19 @@ public void onCompleted() { */ private boolean processEvent(Event event) { try { + if (!plugin.isRunning()) { + log.debug( + LogCategory.GRPC, + "PluginNotRunning", + "Plugin is not running, skipping event processing"); + return false; + } + if (event.hasAgentCall()) { log.debug( - "Tenant: {} - Received agent call event {} - {}", - tenant, + LogCategory.GRPC, + "EventProcessed", + "Received agent call event %d - %s", event.getAgentCall().getCallSid(), event.getAgentCall().getCallType()); plugin.handleAgentCall(event.getAgentCall()); @@ -222,38 +420,46 @@ private boolean processEvent(Event event) { if (event.hasAgentResponse()) { log.debug( - "Tenant: {} - Received agent response event {}", - tenant, + LogCategory.GRPC, + "EventProcessed", + "Received agent response event %s", event.getAgentResponse().getAgentCallResponseSid()); plugin.handleAgentResponse(event.getAgentResponse()); } if (event.hasTelephonyResult()) { log.debug( - "Tenant: {} - Received telephony result event {} - {}", - tenant, + LogCategory.GRPC, + "EventProcessed", + "Received telephony result event %d - %s", event.getTelephonyResult().getCallSid(), event.getTelephonyResult().getCallType()); plugin.handleTelephonyResult(event.getTelephonyResult()); } if (event.hasTask()) { - log.debug("Tenant: {} - Received task event {}", tenant, event.getTask().getTaskSid()); + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received task event %s", + event.getTask().getTaskSid()); plugin.handleTask(event.getTask()); } if (event.hasTransferInstance()) { log.debug( - "Tenant: {} - Received transfer instance event {}", - tenant, + LogCategory.GRPC, + "EventProcessed", + "Received transfer instance event %s", event.getTransferInstance().getTransferInstanceId()); plugin.handleTransferInstance(event.getTransferInstance()); } if (event.hasCallRecording()) { log.debug( - "Tenant: {} - Received call recording event {}", - tenant, + LogCategory.GRPC, + "EventProcessed", + "Received call recording event %s", event.getCallRecording().getRecordingId()); plugin.handleCallRecording(event.getCallRecording()); } @@ -261,7 +467,12 @@ private boolean processEvent(Event event) { return true; } catch (Exception e) { - log.error("Tenant: {} - Failed to process event: {}", tenant, e.getMessage(), e); + log.error( + LogCategory.GRPC, + "EventProcessingError", + "Failed to process event: %s", + e.getMessage(), + e); return false; // Don't ACK - will be redelivered } } @@ -286,14 +497,88 @@ private String getEventId(Event event) { if (event.hasCallRecording()) { return event.getCallRecording().getRecordingId(); } - log.warn("Tenant: {} - Unknown event type, cannot extract ID", tenant); + log.warn(LogCategory.GRPC, "UnknownEvent", "Unknown event type, cannot extract ID"); return "unknown"; } - /** Get count of pending ACKs (for diagnostics). */ - public int getPendingAckCount() { + public void stop() { + log.info( + LogCategory.GRPC, + "Stopping", + "Stopping GateClientEventStream (total attempts: %d, successful: %d, events: %d/%d)", + totalReconnectionAttempts.get(), + successfulReconnections.get(), + eventsProcessed.get(), + eventsFailed.get()); + + isRunning.set(false); + + // Log any un-sent ACKs so they're visible for debugging redelivery synchronized (pendingAcks) { - return pendingAcks.size(); + if (!pendingAcks.isEmpty()) { + log.warn( + LogCategory.GRPC, + "UnsentAcks", + "Shutting down with %d un-sent ACKs (these events will be redelivered): %s", + pendingAcks.size(), + pendingAcks); + } + } + + // Gracefully close the stream + var observer = requestObserverRef.get(); + if (observer != null) { + try { + observer.onCompleted(); + } catch (Exception e) { + log.debug(LogCategory.GRPC, "CloseError", "Error closing stream: %s", e.getMessage()); + } + } + + shutdown(); + log.info(LogCategory.GRPC, "GateClientEventStreamStopped", "GateClientEventStream stopped"); + } + + @PreDestroy + public void destroy() { + log.info( + LogCategory.GRPC, + "GateClientEventStream@PreDestroyCalled", + "GateClientEventStream @PreDestroy called"); + stop(); + } + + public Map getStreamStatus() { + Instant lastDisconnect = lastDisconnectTime.get(); + Instant connectionEstablished = connectionEstablishedTime.get(); + Instant reconnectStart = reconnectionStartTime.get(); + Instant lastMessage = lastMessageTime.get(); + + Map status = new HashMap<>(); + status.put("isRunning", isRunning.get()); + status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); + status.put("successfulReconnections", successfulReconnections.get()); + status.put("consecutiveFailures", consecutiveFailures.get()); + status.put("eventsProcessed", eventsProcessed.get()); + status.put("eventsFailed", eventsFailed.get()); + synchronized (pendingAcks) { + status.put("pendingAckCount", pendingAcks.size()); + } + status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); + status.put( + "connectionEstablishedTime", + connectionEstablished != null ? connectionEstablished.toString() : null); + status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); + status.put("lastErrorType", lastErrorType.get()); + status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); + + return status; + } + + /** Exception for hung connection detection. */ + public static class HungConnectionException extends Exception { + public HungConnectionException(String message) { + super(message); } } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java index 1d224a5..07d6781 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,6 +49,11 @@ public class GateClientJobQueue extends GateClientAbstract { private static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; private static final String KEEPALIVE_JOB_ID = "keepalive"; + // Backoff configuration + private static final long BACKOFF_BASE_MS = 2000; + private static final long BACKOFF_MAX_MS = 30000; + private static final double BACKOFF_JITTER = 0.2; + private final PluginInterface plugin; private final AtomicBoolean establishedForCurrentAttempt = new AtomicBoolean(false); private final AtomicReference lastMessageTime = new AtomicReference<>(); @@ -87,6 +93,19 @@ private void checkForHungConnection() throws HungConnectionException { } } + /** Compute backoff delay with jitter based on consecutive failure count. */ + private long computeBackoffMs() { + long failures = consecutiveFailures.get(); + if (failures <= 0) { + return 0; + } + // Exponential: 2s, 4s, 8s, 16s, capped at 30s + long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); + // Add ±20% jitter to avoid thundering herd between streams + double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; + return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + } + @Override public void start() { if (isUnconfigured()) { @@ -94,11 +113,36 @@ public void start() { return; } + // Prevent concurrent invocations from the fixed-rate scheduler + if (!isRunning.compareAndSet(false, true)) { + return; + } + + // Exponential backoff: sleep before retrying if we have consecutive failures + long backoffMs = computeBackoffMs(); + if (backoffMs > 0) { + log.debug( + LogCategory.GRPC, + "Backoff", + "Waiting %dms before reconnect attempt (consecutive failures: %d)", + backoffMs, + consecutiveFailures.get()); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isRunning.set(false); + return; + } + } + try { log.debug(LogCategory.GRPC, "Init", "JobQueue task started, checking configuration status"); reconnectionStartTime.set(Instant.now()); - isRunning.set(true); + + // Reset gRPC's internal DNS backoff so name resolution is retried immediately + resetChannelBackoff(); runStream(); @@ -106,6 +150,7 @@ public void start() { log.warn( LogCategory.GRPC, "HungConnection", "Job queue stream appears hung: %s", e.getMessage()); lastErrorType.set("HungConnection"); + consecutiveFailures.incrementAndGet(); } catch (UnconfiguredException e) { log.error(LogCategory.GRPC, "ConfigError", "Configuration error: %s", e.getMessage()); lastErrorType.set("UnconfiguredException"); @@ -117,15 +162,15 @@ public void start() { log.error( LogCategory.GRPC, "JobQueue", "Error streaming jobs from server: %s", e.getMessage(), e); lastErrorType.set(e.getClass().getSimpleName()); - if (connectionEstablishedTime.get() == null) { - consecutiveFailures.incrementAndGet(); - } + consecutiveFailures.incrementAndGet(); } finally { totalReconnectionAttempts.incrementAndGet(); lastDisconnectTime.set(Instant.now()); isRunning.set(false); requestObserverRef.set(null); - log.debug(LogCategory.GRPC, "Complete", "Job queue done"); + if (connectionEstablishedTime.get() != null) { + log.debug(LogCategory.GRPC, "Complete", "Job queue done"); + } } } From aa52c187d81e68c5ad8a2cdbae6becab67ebd2c3 Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Thu, 12 Feb 2026 11:41:26 -0700 Subject: [PATCH 2/6] Refactors GateClient architecture for improved stream handling Reorganizes GateClient classes by extracting shared logic into an abstract class, enabling more structured and reusable stream lifecycle management. Introduces additional monitoring and error handling, especially for gRPC connections, including exponential backoff and connection health checks. Updates gRPC library versions to ensure compatibility and bug fixes. Relates to stream-testing-updates --- .../gateclients/v2/GateClientAbstract.java | 297 +++++++++++- .../gateclients/v2/GateClientEventStream.java | 443 ++++++------------ .../gateclients/v2/GateClientJobQueue.java | 242 ++-------- gradle.properties | 4 +- 4 files changed, 460 insertions(+), 526 deletions(-) diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java index f63e8f3..49d4187 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java @@ -1,5 +1,5 @@ /* - * (C) 2017-2025 TCN Inc. All rights reserved. + * (C) 2017-2026 TCN Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,21 +25,63 @@ import io.grpc.TlsChannelCredentials; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Base class for bidirectional gRPC stream clients. + * + *

Provides shared infrastructure for stream lifecycle management including exponential backoff + * with jitter, hung connection detection, connection tracking, and graceful shutdown. Subclasses + * implement {@link #runStream()} for stream-specific logic and {@link #getStreamName()} for + * logging. + */ public abstract class GateClientAbstract { private static final Logger log = LoggerFactory.getLogger(GateClientAbstract.class); + // Stream lifecycle constants + protected static final long STREAM_TIMEOUT_MINUTES = 5; + protected static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; + + // Backoff configuration + private static final long BACKOFF_BASE_MS = 2000; + private static final long BACKOFF_MAX_MS = 30000; + private static final double BACKOFF_JITTER = 0.2; + + // Channel management private ManagedChannel sharedChannel; private final ReentrantLock lock = new ReentrantLock(); + // Simulated disconnect: when set and in the future, getChannel() refuses to + // create a new channel + private final AtomicReference disconnectUntil = new AtomicReference<>(); + private Config currentConfig = null; protected final String tenant; + // Connection tracking — shared across all stream subclasses + protected final AtomicReference lastMessageTime = new AtomicReference<>(); + protected final AtomicReference lastDisconnectTime = new AtomicReference<>(); + protected final AtomicReference reconnectionStartTime = new AtomicReference<>(); + protected final AtomicReference connectionEstablishedTime = new AtomicReference<>(); + protected final AtomicLong totalReconnectionAttempts = new AtomicLong(0); + protected final AtomicLong successfulReconnections = new AtomicLong(0); + protected final AtomicReference lastErrorType = new AtomicReference<>(); + protected final AtomicLong consecutiveFailures = new AtomicLong(0); + protected final AtomicBoolean isRunning = new AtomicBoolean(false); + public GateClientAbstract(String tenant, Config currentConfig) { this.currentConfig = currentConfig; this.tenant = tenant; @@ -54,6 +96,196 @@ public GateClientAbstract(String tenant, Config currentConfig) { "gRPC-Channel-Cleanup")); } + // --------------------------------------------------------------------------- + // Stream lifecycle — template method pattern + // --------------------------------------------------------------------------- + + /** + * Human-readable name for this stream, used in log messages. Override in subclasses that use the + * template {@link #start()} method (e.g. "EventStream", "JobQueue"). + */ + protected String getStreamName() { + return getClass().getSimpleName(); + } + + /** + * Run a single stream session. Called by the template {@link #start()} method. Subclasses that + * use the shared lifecycle must override this. Legacy subclasses that override start() directly + * can ignore it. + */ + protected void runStream() + throws UnconfiguredException, InterruptedException, HungConnectionException { + throw new UnsupportedOperationException( + getStreamName() + " must override runStream() or start()"); + } + + /** + * Template method that handles the full stream lifecycle: backoff, try/catch, error tracking. + * Each invocation represents one connection attempt. + */ + public void start() { + if (isUnconfigured()) { + log.warn("{} is unconfigured, cannot start", getStreamName()); + return; + } + + // Prevent concurrent invocations from the fixed-rate scheduler + if (!isRunning.compareAndSet(false, true)) { + return; + } + + // Exponential backoff: sleep before retrying if we have consecutive failures + long backoffMs = computeBackoffMs(); + if (backoffMs > 0) { + log.debug( + "[{}] Waiting {}ms before reconnect (consecutive failures: {})", + getStreamName(), + backoffMs, + consecutiveFailures.get()); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isRunning.set(false); + return; + } + } + + try { + log.debug("[{}] Starting, checking configuration status", getStreamName()); + reconnectionStartTime.set(Instant.now()); + resetChannelBackoff(); + runStream(); + } catch (HungConnectionException e) { + log.warn("[{}] Connection appears hung: {}", getStreamName(), e.getMessage()); + lastErrorType.set("HungConnection"); + consecutiveFailures.incrementAndGet(); + } catch (UnconfiguredException e) { + log.error("[{}] Configuration error: {}", getStreamName(), e.getMessage()); + lastErrorType.set("UnconfiguredException"); + consecutiveFailures.incrementAndGet(); + } catch (InterruptedException e) { + log.info("[{}] Interrupted", getStreamName()); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("[{}] Error: {}", getStreamName(), e.getMessage(), e); + lastErrorType.set(e.getClass().getSimpleName()); + consecutiveFailures.incrementAndGet(); + } finally { + totalReconnectionAttempts.incrementAndGet(); + lastDisconnectTime.set(Instant.now()); + isRunning.set(false); + onStreamDisconnected(); + log.debug("[{}] Stream session ended", getStreamName()); + } + } + + /** Hook called in the finally block of start(). Subclasses can clear observer refs here. */ + protected void onStreamDisconnected() {} + + // --------------------------------------------------------------------------- + // Shared stream helpers + // --------------------------------------------------------------------------- + + /** Record that the first server response was received and the connection is established. */ + protected void onConnectionEstablished() { + connectionEstablishedTime.set(Instant.now()); + successfulReconnections.incrementAndGet(); + consecutiveFailures.set(0); + lastErrorType.set(null); + + log.info( + "[{}] Connection established (took {})", + getStreamName(), + Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + } + + /** Check if the connection is hung (no messages received within threshold). */ + protected void checkForHungConnection() throws HungConnectionException { + Instant lastMsg = lastMessageTime.get(); + if (lastMsg == null) { + lastMsg = connectionEstablishedTime.get(); + } + if (lastMsg != null + && lastMsg.isBefore( + Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { + throw new HungConnectionException( + "No messages received since " + lastMsg + " - connection appears hung"); + } + } + + /** + * Wait for a stream latch to complete, periodically checking for hung connections. Returns when + * the latch counts down, the timeout expires, or a hung connection is detected. + */ + protected void awaitStreamWithHungDetection(CountDownLatch latch) + throws InterruptedException, HungConnectionException { + long startTime = System.currentTimeMillis(); + long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); + + while ((System.currentTimeMillis() - startTime) < maxDurationMs) { + if (latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS)) { + return; // Stream completed + } + checkForHungConnection(); + } + } + + /** Compute backoff delay with jitter based on consecutive failure count. */ + private long computeBackoffMs() { + long failures = consecutiveFailures.get(); + if (failures <= 0) { + return 0; + } + long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); + double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; + return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + } + + /** + * Build base stream status map with connection tracking fields. Subclasses should call this and + * add their own stream-specific counters. + */ + protected Map buildStreamStatus() { + Instant lastDisconnect = lastDisconnectTime.get(); + Instant connectionEstablished = connectionEstablishedTime.get(); + Instant reconnectStart = reconnectionStartTime.get(); + Instant lastMessage = lastMessageTime.get(); + + Map status = new HashMap<>(); + status.put("isRunning", isRunning.get()); + status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); + status.put("successfulReconnections", successfulReconnections.get()); + status.put("consecutiveFailures", consecutiveFailures.get()); + status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); + status.put( + "connectionEstablishedTime", + connectionEstablished != null ? connectionEstablished.toString() : null); + status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); + status.put("lastErrorType", lastErrorType.get()); + status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); + return status; + } + + // --------------------------------------------------------------------------- + // Graceful shutdown + // --------------------------------------------------------------------------- + + /** Override in subclasses to add stream-specific stop logic (e.g. closing observers). */ + public void stop() { + doStop(); + } + + /** Shared shutdown logic: set running flag, shut down channel. */ + protected void doStop() { + isRunning.set(false); + shutdown(); + } + + // --------------------------------------------------------------------------- + // Configuration and channel management + // --------------------------------------------------------------------------- + public Config getConfig() { return currentConfig; } @@ -119,20 +351,18 @@ protected void shutdown() { tenant, e); channelToShutdown.shutdownNow(); - Thread.currentThread().interrupt(); // Preserve interrupt status + Thread.currentThread().interrupt(); } - // Set static field to null AFTER successful shutdown sharedChannel = null; } else { log.debug("Tenant: {} - Shared channel is already null, shut down, or terminated.", tenant); - // Ensure static field is null if channel is unusable if (sharedChannel != null && (sharedChannel.isShutdown() || sharedChannel.isTerminated())) { sharedChannel = null; } } } finally { - lock.unlock(); // Guaranteed unlock in finally block + lock.unlock(); } } @@ -160,8 +390,25 @@ private void forceShutdownSharedChannel() { } public ManagedChannel getChannel() throws UnconfiguredException { + // Check simulated disconnect window + Instant disconnectDeadline = disconnectUntil.get(); + if (disconnectDeadline != null) { + if (Instant.now().isBefore(disconnectDeadline)) { + long remainingSeconds = + java.time.Duration.between(Instant.now(), disconnectDeadline).getSeconds(); + log.warn( + "Tenant: {} - Simulated disconnect active, {} seconds remaining", + tenant, + remainingSeconds); + throw new UnconfiguredException( + "Simulated network disconnect active for " + remainingSeconds + " more seconds"); + } else { + disconnectUntil.set(null); + log.info("Tenant: {} - Simulated disconnect expired, allowing reconnection", tenant); + } + } + long getChannelStartTime = System.currentTimeMillis(); - // Double-checked locking pattern for thread-safe lazy initialization ManagedChannel localChannel = sharedChannel; if (localChannel == null || localChannel.isShutdown() || localChannel.isTerminated()) { @@ -187,7 +434,6 @@ public ManagedChannel getChannel() throws UnconfiguredException { lockWaitTime); } try { - // Double-check condition inside synchronized block localChannel = sharedChannel; var shutdown = localChannel == null || localChannel.isShutdown(); @@ -279,14 +525,29 @@ private ManagedChannel createNewChannel() throws UnconfiguredException { "TCN Gate client configuration error during channel creation", e); } catch (UnconfiguredException e) { log.error("Tenant: {} - Configuration error during shared channel creation", tenant, e); - throw e; // Re-throw specific unconfigured exception + throw e; } catch (Exception e) { log.error("Tenant: {} - Unexpected error during shared channel creation", tenant, e); throw new UnconfiguredException("Unexpected error configuring TCN Gate client channel", e); } } - public abstract void start(); + /** + * Simulate a network disconnect for the given duration. Kills the current channel and prevents + * reconnection until the duration expires. + */ + public void simulateDisconnect(int durationSeconds) { + log.warn("Tenant: {} - SIMULATING NETWORK DISCONNECT for {} seconds", tenant, durationSeconds); + disconnectUntil.set(Instant.now().plusSeconds(durationSeconds)); + forceShutdownSharedChannel(); + } + + /** Check if a simulated disconnect is currently active. */ + public boolean isDisconnectSimulationActive() { + Instant deadline = disconnectUntil.get(); + return deadline != null && Instant.now().isBefore(deadline); + } + /** * Reset gRPC's internal connect backoff on the shared channel. This forces the name resolver to * immediately retry DNS resolution instead of waiting for its own exponential backoff timer. @@ -298,18 +559,13 @@ protected void resetChannelBackoff() { } } - /** Resets the gRPC channel after a connection failure */ + /** Resets the gRPC channel after a connection failure. */ protected void resetChannel() { log.info("Tenant: {} - Resetting static shared gRPC channel after connection failure.", tenant); shutdown(); } - /** - * Helper method to handle StatusRuntimeException - * - * @param e The exception to handle - * @return true if the exception was handled - */ + /** Handle StatusRuntimeException, resetting channel on UNAVAILABLE. */ protected boolean handleStatusRuntimeException(StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { log.warn( @@ -320,7 +576,7 @@ protected boolean handleStatusRuntimeException(StatusRuntimeException e) { return false; } - /** Get channel statistics for monitoring */ + /** Get channel statistics for monitoring. */ public Map getChannelStats() { ManagedChannel channel = sharedChannel; if (channel != null) { @@ -332,4 +588,11 @@ public Map getChannelStats() { } return Map.of("channel", "null"); } + + /** Exception for hung connection detection. */ + public static class HungConnectionException extends Exception { + public HungConnectionException(String message) { + super(message); + } + } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java index 1723046..39e6011 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientEventStream.java @@ -27,22 +27,17 @@ import com.tcn.exile.plugin.PluginInterface; import io.grpc.stub.StreamObserver; import jakarta.annotation.PreDestroy; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** - * Bidirectional event streaming with immediate acknowledgment using EventStream API. + * Bidirectional event streaming with acknowledgment using the EventStream API. * *

The client drives the loop: sends a request (with ACK IDs + event count), server responds with * events, client processes them and sends the next request with ACKs. This repeats until the @@ -52,32 +47,13 @@ public class GateClientEventStream extends GateClientAbstract { private static final StructuredLogger log = new StructuredLogger(GateClientEventStream.class); private static final int BATCH_SIZE = 100; - private static final long STREAM_TIMEOUT_MINUTES = 5; - private static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; - - // Backoff configuration - private static final long BACKOFF_BASE_MS = 2000; - private static final long BACKOFF_MAX_MS = 30000; - private static final double BACKOFF_JITTER = 0.2; private final PluginInterface plugin; - private final AtomicBoolean establishedForCurrentAttempt = new AtomicBoolean(false); - private final AtomicReference lastMessageTime = new AtomicReference<>(); - - // Connection timing tracking - private final AtomicReference lastDisconnectTime = new AtomicReference<>(); - private final AtomicReference reconnectionStartTime = new AtomicReference<>(); - private final AtomicReference connectionEstablishedTime = new AtomicReference<>(); - private final AtomicLong totalReconnectionAttempts = new AtomicLong(0); - private final AtomicLong successfulReconnections = new AtomicLong(0); - private final AtomicReference lastErrorType = new AtomicReference<>(); - private final AtomicLong consecutiveFailures = new AtomicLong(0); - private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicLong eventsProcessed = new AtomicLong(0); private final AtomicLong eventsFailed = new AtomicLong(0); - // Pending ACKs buffer - survives stream reconnections. - // Event IDs are added here after successful processing and removed only after + // Pending ACKs buffer — survives stream reconnections. + // Event IDs are added after successful processing and removed only after // successful send to the server. If a stream breaks before ACKs are sent, // they carry over to the next connection attempt. private final List pendingAcks = new ArrayList<>(); @@ -91,131 +67,31 @@ public GateClientEventStream(String tenant, Config currentConfig, PluginInterfac this.plugin = plugin; } - /** Check if the connection is hung (no messages received in threshold time). */ - private void checkForHungConnection() throws HungConnectionException { - Instant lastMsg = lastMessageTime.get(); - if (lastMsg == null) { - lastMsg = connectionEstablishedTime.get(); - } - if (lastMsg != null - && lastMsg.isBefore( - Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { - throw new HungConnectionException( - "No messages received since " + lastMsg + " - connection appears hung"); - } - } - - /** Compute backoff delay with jitter based on consecutive failure count. */ - private long computeBackoffMs() { - long failures = consecutiveFailures.get(); - if (failures <= 0) { - return 0; - } - // Exponential: 2s, 4s, 8s, 16s, capped at 30s - long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); - // Add ±20% jitter to avoid thundering herd between streams - double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; - return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + @Override + protected String getStreamName() { + return "EventStream"; } @Override - public void start() { - if (isUnconfigured()) { - log.warn(LogCategory.GRPC, "NOOP", "EventStream is unconfigured, cannot stream events"); - return; - } - - // Prevent concurrent invocations from the fixed-rate scheduler - if (!isRunning.compareAndSet(false, true)) { - return; - } - - // Exponential backoff: sleep before retrying if we have consecutive failures - long backoffMs = computeBackoffMs(); - if (backoffMs > 0) { - log.debug( - LogCategory.GRPC, - "Backoff", - "Waiting %dms before reconnect attempt (consecutive failures: %d)", - backoffMs, - consecutiveFailures.get()); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - isRunning.set(false); - return; - } - } - - try { - log.debug( - LogCategory.GRPC, "Init", "EventStream task started, checking configuration status"); - - reconnectionStartTime.set(Instant.now()); - - // Reset gRPC's internal DNS backoff so name resolution is retried immediately - resetChannelBackoff(); - - runStream(); - - } catch (HungConnectionException e) { - log.warn(LogCategory.GRPC, "HungConnection", "Event stream appears hung: %s", e.getMessage()); - lastErrorType.set("HungConnection"); - consecutiveFailures.incrementAndGet(); - } catch (UnconfiguredException e) { - log.error(LogCategory.GRPC, "ConfigError", "Configuration error: %s", e.getMessage()); - lastErrorType.set("UnconfiguredException"); - consecutiveFailures.incrementAndGet(); - } catch (InterruptedException e) { - log.info(LogCategory.GRPC, "Interrupted", "Event stream interrupted"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error( - LogCategory.GRPC, - "EventStream", - "Error streaming events from server: %s", - e.getMessage(), - e); - lastErrorType.set(e.getClass().getSimpleName()); - consecutiveFailures.incrementAndGet(); - } finally { - totalReconnectionAttempts.incrementAndGet(); - lastDisconnectTime.set(Instant.now()); - isRunning.set(false); - requestObserverRef.set(null); - if (connectionEstablishedTime.get() != null) { - log.debug(LogCategory.GRPC, "Complete", "Event stream done"); - } - } + protected void onStreamDisconnected() { + requestObserverRef.set(null); } - /** Run a single stream session with bidirectional communication. */ - private void runStream() + @Override + protected void runStream() throws UnconfiguredException, InterruptedException, HungConnectionException { var latch = new CountDownLatch(1); var errorRef = new AtomicReference(); var firstResponseReceived = new AtomicBoolean(false); - // Create response handler var responseObserver = new StreamObserver() { @Override public void onNext(EventStreamResponse response) { lastMessageTime.set(Instant.now()); - // Log connected on first server response if (firstResponseReceived.compareAndSet(false, true)) { - connectionEstablishedTime.set(Instant.now()); - successfulReconnections.incrementAndGet(); - consecutiveFailures.set(0); - lastErrorType.set(null); - - log.info( - LogCategory.GRPC, - "ConnectionEstablished", - "Event stream connection took %s", - Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + onConnectionEstablished(); } if (response.getEventsCount() == 0) { @@ -230,7 +106,6 @@ public void onNext(EventStreamResponse response) { Thread.currentThread().interrupt(); return; } - // Still send next request to keep the loop alive sendNextRequest(); return; } @@ -241,45 +116,7 @@ public void onNext(EventStreamResponse response) { "Received %d events from stream", response.getEventsCount()); - // Process events and store successful ACK IDs in the pending buffer - long batchStart = System.currentTimeMillis(); - int batchSuccessCount = 0; - - for (Event event : response.getEventsList()) { - String eventId = getEventId(event); - if (processEvent(event)) { - synchronized (pendingAcks) { - pendingAcks.add(eventId); - } - batchSuccessCount++; - eventsProcessed.incrementAndGet(); - } else { - eventsFailed.incrementAndGet(); - log.warn( - LogCategory.GRPC, - "EventNotAcked", - "Event %s NOT acknowledged - will be redelivered", - eventId); - } - } - - long batchEnd = System.currentTimeMillis(); - - // Warn if batch processing is slow - if (batchSuccessCount > 0) { - long avg = (batchEnd - batchStart) / batchSuccessCount; - if (avg > 1000) { - log.warn( - LogCategory.GRPC, - "SlowBatch", - "Event batch completed %d events in %dms, avg %dms per event", - batchSuccessCount, - batchEnd - batchStart, - avg); - } - } - - // Send next request with ACKs immediately - drains pendingAcks on success + processBatch(response.getEventsList()); sendNextRequest(); } @@ -301,7 +138,7 @@ public void onCompleted() { var requestObserver = GateServiceGrpc.newStub(getChannel()).eventStream(responseObserver); requestObserverRef.set(requestObserver); - // Send initial request - include any pending ACKs from a previous broken stream + // Send initial request — include any pending ACKs from a previous broken stream List carryOverAcks; synchronized (pendingAcks) { carryOverAcks = new ArrayList<>(pendingAcks); @@ -319,33 +156,24 @@ public void onCompleted() { .setEventCount(BATCH_SIZE) .addAllAckEventIds(carryOverAcks) .build()); - // Clear pending ACKs only after successful initial send if (!carryOverAcks.isEmpty()) { synchronized (pendingAcks) { pendingAcks.removeAll(carryOverAcks); } } - // Wait for stream to complete, with periodic hung-connection checks - boolean completed = false; - long startTime = System.currentTimeMillis(); - long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); - - while (!completed && (System.currentTimeMillis() - startTime) < maxDurationMs) { - completed = latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS); - if (!completed) { - // Check for hung connection - checkForHungConnection(); - } - } + awaitStreamWithHungDetection(latch); - // Propagate error if any var error = errorRef.get(); if (error != null) { throw new RuntimeException("Stream error", error); } } + // --------------------------------------------------------------------------- + // Request & ACK management + // --------------------------------------------------------------------------- + /** * Send the next request on the stream, draining pendingAcks into the request. If the send fails, * the ACKs remain in pendingAcks for the next attempt or reconnection. @@ -357,7 +185,6 @@ private void sendNextRequest() { return; } - // Drain pending ACKs List acksToSend; synchronized (pendingAcks) { acksToSend = new ArrayList<>(pendingAcks); @@ -371,7 +198,6 @@ private void sendNextRequest() { .build(); observer.onNext(request); - // Only clear after successful send if (!acksToSend.isEmpty()) { synchronized (pendingAcks) { pendingAcks.removeAll(acksToSend); @@ -383,7 +209,6 @@ private void sendNextRequest() { acksToSend.size()); } } catch (Exception e) { - // ACKs stay in pendingAcks - will be retried on next send or reconnection log.error( LogCategory.GRPC, "RequestFailed", @@ -393,10 +218,47 @@ private void sendNextRequest() { } } + // --------------------------------------------------------------------------- + // Event processing + // --------------------------------------------------------------------------- + + /** Process a batch of events, adding successful ACK IDs to the pending buffer. */ + private void processBatch(List events) { + long batchStart = System.currentTimeMillis(); + int successCount = 0; + + for (Event event : events) { + String eventId = getEventId(event); + if (processEvent(event)) { + synchronized (pendingAcks) { + pendingAcks.add(eventId); + } + successCount++; + eventsProcessed.incrementAndGet(); + } else { + eventsFailed.incrementAndGet(); + log.warn( + LogCategory.GRPC, + "EventNotAcked", + "Event %s NOT acknowledged - will be redelivered", + eventId); + } + } + + long elapsed = System.currentTimeMillis() - batchStart; + if (successCount > 0 && (elapsed / successCount) > 1000) { + log.warn( + LogCategory.GRPC, + "SlowBatch", + "Event batch completed %d events in %dms, avg %dms per event", + successCount, + elapsed, + elapsed / successCount); + } + } + /** - * Process a single event. Returns true if successfully processed (should ACK). - * - *

This mirrors the event handling logic from GateClientPollEvents exactly. + * Process a single event by dispatching to the plugin. Returns true if successfully processed. */ private boolean processEvent(Event event) { try { @@ -408,64 +270,64 @@ private boolean processEvent(Event event) { return false; } - if (event.hasAgentCall()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received agent call event %d - %s", - event.getAgentCall().getCallSid(), - event.getAgentCall().getCallType()); - plugin.handleAgentCall(event.getAgentCall()); - } - - if (event.hasAgentResponse()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received agent response event %s", - event.getAgentResponse().getAgentCallResponseSid()); - plugin.handleAgentResponse(event.getAgentResponse()); - } - - if (event.hasTelephonyResult()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received telephony result event %d - %s", - event.getTelephonyResult().getCallSid(), - event.getTelephonyResult().getCallType()); - plugin.handleTelephonyResult(event.getTelephonyResult()); - } - - if (event.hasTask()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received task event %s", - event.getTask().getTaskSid()); - plugin.handleTask(event.getTask()); - } - - if (event.hasTransferInstance()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received transfer instance event %s", - event.getTransferInstance().getTransferInstanceId()); - plugin.handleTransferInstance(event.getTransferInstance()); - } - - if (event.hasCallRecording()) { - log.debug( - LogCategory.GRPC, - "EventProcessed", - "Received call recording event %s", - event.getCallRecording().getRecordingId()); - plugin.handleCallRecording(event.getCallRecording()); + switch (event.getEntityCase()) { + case AGENT_CALL: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received agent call event %d - %s", + event.getAgentCall().getCallSid(), + event.getAgentCall().getCallType()); + plugin.handleAgentCall(event.getAgentCall()); + break; + case AGENT_RESPONSE: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received agent response event %s", + event.getAgentResponse().getAgentCallResponseSid()); + plugin.handleAgentResponse(event.getAgentResponse()); + break; + case TELEPHONY_RESULT: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received telephony result event %d - %s", + event.getTelephonyResult().getCallSid(), + event.getTelephonyResult().getCallType()); + plugin.handleTelephonyResult(event.getTelephonyResult()); + break; + case TASK: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received task event %s", + event.getTask().getTaskSid()); + plugin.handleTask(event.getTask()); + break; + case TRANSFER_INSTANCE: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received transfer instance event %s", + event.getTransferInstance().getTransferInstanceId()); + plugin.handleTransferInstance(event.getTransferInstance()); + break; + case CALL_RECORDING: + log.debug( + LogCategory.GRPC, + "EventProcessed", + "Received call recording event %s", + event.getCallRecording().getRecordingId()); + plugin.handleCallRecording(event.getCallRecording()); + break; + default: + log.warn( + LogCategory.GRPC, "UnknownEvent", "Unknown event type: %s", event.getEntityCase()); + break; } return true; - } catch (Exception e) { log.error( LogCategory.GRPC, @@ -473,34 +335,33 @@ private boolean processEvent(Event event) { "Failed to process event: %s", e.getMessage(), e); - return false; // Don't ACK - will be redelivered + return false; } } - /** Extract event ID for ACK purposes. Each event type has a different primary identifier. */ + /** + * Extract the server-assigned entity ID for ACK purposes. The server sets exile_entity_id on each + * Event from the DB's entity_id, which may be a composite key (e.g. "call_type,call_sid" for + * telephony). We must echo this exact value back for the ACK to match. + */ private String getEventId(Event event) { - if (event.hasAgentCall()) { - return String.valueOf(event.getAgentCall().getAgentCallSid()); - } - if (event.hasTelephonyResult()) { - return String.valueOf(event.getTelephonyResult().getCallSid()); - } - if (event.hasAgentResponse()) { - return String.valueOf(event.getAgentResponse().getAgentCallResponseSid()); - } - if (event.hasTask()) { - return String.valueOf(event.getTask().getTaskSid()); - } - if (event.hasTransferInstance()) { - return event.getTransferInstance().getTransferInstanceId(); - } - if (event.hasCallRecording()) { - return event.getCallRecording().getRecordingId(); + String entityId = event.getExileEntityId(); + if (entityId == null || entityId.isEmpty()) { + log.warn( + LogCategory.GRPC, + "MissingEntityId", + "Event has no exile_entity_id, cannot ACK. Event type: %s", + event.getEntityCase()); + return "unknown"; } - log.warn(LogCategory.GRPC, "UnknownEvent", "Unknown event type, cannot extract ID"); - return "unknown"; + return entityId; } + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + + @Override public void stop() { log.info( LogCategory.GRPC, @@ -511,9 +372,6 @@ public void stop() { eventsProcessed.get(), eventsFailed.get()); - isRunning.set(false); - - // Log any un-sent ACKs so they're visible for debugging redelivery synchronized (pendingAcks) { if (!pendingAcks.isEmpty()) { log.warn( @@ -525,7 +383,6 @@ public void stop() { } } - // Gracefully close the stream var observer = requestObserverRef.get(); if (observer != null) { try { @@ -535,50 +392,22 @@ public void stop() { } } - shutdown(); - log.info(LogCategory.GRPC, "GateClientEventStreamStopped", "GateClientEventStream stopped"); + doStop(); + log.info(LogCategory.GRPC, "Stopped", "GateClientEventStream stopped"); } @PreDestroy public void destroy() { - log.info( - LogCategory.GRPC, - "GateClientEventStream@PreDestroyCalled", - "GateClientEventStream @PreDestroy called"); stop(); } public Map getStreamStatus() { - Instant lastDisconnect = lastDisconnectTime.get(); - Instant connectionEstablished = connectionEstablishedTime.get(); - Instant reconnectStart = reconnectionStartTime.get(); - Instant lastMessage = lastMessageTime.get(); - - Map status = new HashMap<>(); - status.put("isRunning", isRunning.get()); - status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); - status.put("successfulReconnections", successfulReconnections.get()); - status.put("consecutiveFailures", consecutiveFailures.get()); + Map status = buildStreamStatus(); status.put("eventsProcessed", eventsProcessed.get()); status.put("eventsFailed", eventsFailed.get()); synchronized (pendingAcks) { status.put("pendingAckCount", pendingAcks.size()); } - status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); - status.put( - "connectionEstablishedTime", - connectionEstablished != null ? connectionEstablished.toString() : null); - status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); - status.put("lastErrorType", lastErrorType.get()); - status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); - return status; } - - /** Exception for hung connection detection. */ - public static class HungConnectionException extends Exception { - public HungConnectionException(String message) { - super(message); - } - } } diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java index 07d6781..386b15c 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobQueue.java @@ -28,45 +28,21 @@ import com.tcn.exile.plugin.PluginInterface; import io.grpc.stub.StreamObserver; import jakarta.annotation.PreDestroy; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -/** Bidirectional job streaming with acknowledgment using JobQueueStream API. */ +/** Bidirectional job streaming with acknowledgment using the JobQueueStream API. */ public class GateClientJobQueue extends GateClientAbstract { private static final StructuredLogger log = new StructuredLogger(GateClientJobQueue.class); private static final int DEFAULT_TIMEOUT_SECONDS = 300; - - private static final long STREAM_TIMEOUT_MINUTES = 5; - private static final long HUNG_CONNECTION_THRESHOLD_SECONDS = 45; private static final String KEEPALIVE_JOB_ID = "keepalive"; - // Backoff configuration - private static final long BACKOFF_BASE_MS = 2000; - private static final long BACKOFF_MAX_MS = 30000; - private static final double BACKOFF_JITTER = 0.2; - private final PluginInterface plugin; - private final AtomicBoolean establishedForCurrentAttempt = new AtomicBoolean(false); - private final AtomicReference lastMessageTime = new AtomicReference<>(); - - // Connection timing tracking - private final AtomicReference lastDisconnectTime = new AtomicReference<>(); - private final AtomicReference reconnectionStartTime = new AtomicReference<>(); - private final AtomicReference connectionEstablishedTime = new AtomicReference<>(); - private final AtomicLong totalReconnectionAttempts = new AtomicLong(0); - private final AtomicLong successfulReconnections = new AtomicLong(0); - private final AtomicReference lastErrorType = new AtomicReference<>(); - private final AtomicLong consecutiveFailures = new AtomicLong(0); - private final AtomicBoolean isRunning = new AtomicBoolean(false); private final AtomicLong jobsProcessed = new AtomicLong(0); private final AtomicLong jobsFailed = new AtomicLong(0); @@ -79,127 +55,31 @@ public GateClientJobQueue(String tenant, Config currentConfig, PluginInterface p this.plugin = plugin; } - /** Check if the connection is hung (no messages received in threshold time). */ - private void checkForHungConnection() throws HungConnectionException { - Instant lastMsg = lastMessageTime.get(); - if (lastMsg == null) { - lastMsg = connectionEstablishedTime.get(); - } - if (lastMsg != null - && lastMsg.isBefore( - Instant.now().minus(HUNG_CONNECTION_THRESHOLD_SECONDS, ChronoUnit.SECONDS))) { - throw new HungConnectionException( - "No messages received since " + lastMsg + " - connection appears hung"); - } - } - - /** Compute backoff delay with jitter based on consecutive failure count. */ - private long computeBackoffMs() { - long failures = consecutiveFailures.get(); - if (failures <= 0) { - return 0; - } - // Exponential: 2s, 4s, 8s, 16s, capped at 30s - long delayMs = BACKOFF_BASE_MS * (1L << Math.min(failures - 1, 10)); - // Add ±20% jitter to avoid thundering herd between streams - double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * BACKOFF_JITTER; - return Math.min((long) (delayMs * jitter), BACKOFF_MAX_MS); + @Override + protected String getStreamName() { + return "JobQueue"; } @Override - public void start() { - if (isUnconfigured()) { - log.warn(LogCategory.GRPC, "NOOP", "JobQueue is unconfigured, cannot stream jobs"); - return; - } - - // Prevent concurrent invocations from the fixed-rate scheduler - if (!isRunning.compareAndSet(false, true)) { - return; - } - - // Exponential backoff: sleep before retrying if we have consecutive failures - long backoffMs = computeBackoffMs(); - if (backoffMs > 0) { - log.debug( - LogCategory.GRPC, - "Backoff", - "Waiting %dms before reconnect attempt (consecutive failures: %d)", - backoffMs, - consecutiveFailures.get()); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - isRunning.set(false); - return; - } - } - - try { - log.debug(LogCategory.GRPC, "Init", "JobQueue task started, checking configuration status"); - - reconnectionStartTime.set(Instant.now()); - - // Reset gRPC's internal DNS backoff so name resolution is retried immediately - resetChannelBackoff(); - - runStream(); - - } catch (HungConnectionException e) { - log.warn( - LogCategory.GRPC, "HungConnection", "Job queue stream appears hung: %s", e.getMessage()); - lastErrorType.set("HungConnection"); - consecutiveFailures.incrementAndGet(); - } catch (UnconfiguredException e) { - log.error(LogCategory.GRPC, "ConfigError", "Configuration error: %s", e.getMessage()); - lastErrorType.set("UnconfiguredException"); - consecutiveFailures.incrementAndGet(); - } catch (InterruptedException e) { - log.info(LogCategory.GRPC, "Interrupted", "Job queue stream interrupted"); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error( - LogCategory.GRPC, "JobQueue", "Error streaming jobs from server: %s", e.getMessage(), e); - lastErrorType.set(e.getClass().getSimpleName()); - consecutiveFailures.incrementAndGet(); - } finally { - totalReconnectionAttempts.incrementAndGet(); - lastDisconnectTime.set(Instant.now()); - isRunning.set(false); - requestObserverRef.set(null); - if (connectionEstablishedTime.get() != null) { - log.debug(LogCategory.GRPC, "Complete", "Job queue done"); - } - } + protected void onStreamDisconnected() { + requestObserverRef.set(null); } - /** Run a single stream session with bidirectional communication. */ - private void runStream() + @Override + protected void runStream() throws UnconfiguredException, InterruptedException, HungConnectionException { var latch = new CountDownLatch(1); var errorRef = new AtomicReference(); var firstResponseReceived = new AtomicBoolean(false); - // Create response handler var responseObserver = new StreamObserver() { @Override public void onNext(JobQueueStreamResponse response) { lastMessageTime.set(Instant.now()); - // Log connected on first server response if (firstResponseReceived.compareAndSet(false, true)) { - connectionEstablishedTime.set(Instant.now()); - successfulReconnections.incrementAndGet(); - consecutiveFailures.set(0); - lastErrorType.set(null); - - log.info( - LogCategory.GRPC, - "ConnectionEstablished", - "Job queue connection took %s", - Duration.between(reconnectionStartTime.get(), connectionEstablishedTime.get())); + onConnectionEstablished(); } if (!response.hasJob()) { @@ -210,7 +90,7 @@ public void onNext(JobQueueStreamResponse response) { var job = response.getJob(); String jobId = job.getJobId(); - // Handle keepalive - must ACK to register with presence store + // Handle keepalive — must ACK to register with presence store if (KEEPALIVE_JOB_ID.equals(jobId)) { log.debug( LogCategory.GRPC, "Keepalive", "Received keepalive, sending ACK to register"); @@ -225,11 +105,7 @@ public void onNext(JobQueueStreamResponse response) { jobId, getJobType(job)); - // Process the job - boolean success = processJob(job); - - // Send ACK only on success - if not success, job will be redelivered - if (success) { + if (processJob(job)) { sendAck(jobId); jobsProcessed.incrementAndGet(); } else { @@ -264,52 +140,44 @@ public void onCompleted() { log.debug(LogCategory.GRPC, "Init", "Sending initial keepalive to job queue..."); requestObserver.onNext(JobQueueStreamRequest.newBuilder().setJobId(KEEPALIVE_JOB_ID).build()); - // Wait for stream to complete, with periodic hung-connection checks - boolean completed = false; - long startTime = System.currentTimeMillis(); - long maxDurationMs = TimeUnit.MINUTES.toMillis(STREAM_TIMEOUT_MINUTES); - - while (!completed && (System.currentTimeMillis() - startTime) < maxDurationMs) { - completed = latch.await(HUNG_CONNECTION_THRESHOLD_SECONDS, TimeUnit.SECONDS); - if (!completed) { - // Check for hung connection - checkForHungConnection(); - } - } + awaitStreamWithHungDetection(latch); - // Propagate error if any var error = errorRef.get(); if (error != null) { throw new RuntimeException("Stream error", error); } } + // --------------------------------------------------------------------------- + // ACK management + // --------------------------------------------------------------------------- + /** Send acknowledgment for a job. */ private void sendAck(String jobId) { var observer = requestObserverRef.get(); - if (observer != null) { - try { - observer.onNext(JobQueueStreamRequest.newBuilder().setJobId(jobId).build()); - log.debug(LogCategory.GRPC, "AckSent", "Sent ACK for job: %s", jobId); - } catch (Exception e) { - log.error( - LogCategory.GRPC, - "AckFailed", - "Failed to send ACK for job %s: %s", - jobId, - e.getMessage()); - } - } else { + if (observer == null) { log.warn( LogCategory.GRPC, "AckFailed", "Cannot send ACK for job %s - no active observer", jobId); + return; + } + try { + observer.onNext(JobQueueStreamRequest.newBuilder().setJobId(jobId).build()); + log.debug(LogCategory.GRPC, "AckSent", "Sent ACK for job: %s", jobId); + } catch (Exception e) { + log.error( + LogCategory.GRPC, + "AckFailed", + "Failed to send ACK for job %s: %s", + jobId, + e.getMessage()); } } - /** - * Process a job. Returns true if successfully processed. - * - *

This mirrors the job handling logic from GateClientJobStream exactly. - */ + // --------------------------------------------------------------------------- + // Job processing + // --------------------------------------------------------------------------- + + /** Process a job by dispatching to the plugin. Returns true if successfully processed. */ private boolean processJob(StreamJobsResponse value) { long jobStartTime = System.currentTimeMillis(); @@ -443,6 +311,11 @@ private void submitJobError(String jobId, String message) { } } + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + + @Override public void stop() { log.info( LogCategory.GRPC, @@ -453,9 +326,6 @@ public void stop() { jobsProcessed.get(), jobsFailed.get()); - isRunning.set(false); - - // Gracefully close the stream var observer = requestObserverRef.get(); if (observer != null) { try { @@ -465,47 +335,19 @@ public void stop() { } } - shutdown(); - log.info(LogCategory.GRPC, "GateClientJobQueueStopped", "GateClientJobQueue stopped"); + doStop(); + log.info(LogCategory.GRPC, "Stopped", "GateClientJobQueue stopped"); } @PreDestroy public void destroy() { - log.info( - LogCategory.GRPC, - "GateClientJobQueue@PreDestroyCalled", - "GateClientJobQueue @PreDestroy called"); stop(); } public Map getStreamStatus() { - Instant lastDisconnect = lastDisconnectTime.get(); - Instant connectionEstablished = connectionEstablishedTime.get(); - Instant reconnectStart = reconnectionStartTime.get(); - Instant lastMessage = lastMessageTime.get(); - - Map status = new HashMap<>(); - status.put("isRunning", isRunning.get()); - status.put("totalReconnectionAttempts", totalReconnectionAttempts.get()); - status.put("successfulReconnections", successfulReconnections.get()); - status.put("consecutiveFailures", consecutiveFailures.get()); + Map status = buildStreamStatus(); status.put("jobsProcessed", jobsProcessed.get()); status.put("jobsFailed", jobsFailed.get()); - status.put("lastDisconnectTime", lastDisconnect != null ? lastDisconnect.toString() : null); - status.put( - "connectionEstablishedTime", - connectionEstablished != null ? connectionEstablished.toString() : null); - status.put("reconnectionStartTime", reconnectStart != null ? reconnectStart.toString() : null); - status.put("lastErrorType", lastErrorType.get()); - status.put("lastMessageTime", lastMessage != null ? lastMessage.toString() : null); - return status; } - - /** Exception for hung connection detection. */ - public static class HungConnectionException extends Exception { - public HungConnectionException(String message) { - super(message); - } - } } diff --git a/gradle.properties b/gradle.properties index 792bcb9..ca67440 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,8 +3,8 @@ micronautGradlePluginVersion=4.4.4 grpcVersion=1.68.1 protobufVersion=3.25.5 -exileapiProtobufVersion=33.4.0.1.20260114173504.d15b0c747d85 -exileapiGrpcVersion=1.78.0.1.20260114173504.d15b0c747d85 +exileapiProtobufVersion=33.5.0.1.20260211211723.9ac313171527 +exileapiGrpcVersion=1.79.0.1.20260211211723.9ac313171527 org.gradle.jvmargs=-Xmx4G From 52428266502ca843bffac04712f94e39a90f479e Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Thu, 12 Feb 2026 16:24:08 -0700 Subject: [PATCH 3/6] Updates Java version for compatibility enhancement Upgrades source and target compatibility to Java 21 in build files for improved performance and access to the latest language features. Adds @ExecuteOn(TaskExecutors.BLOCKING) annotation in AgentsController to specify execution context, promoting performance efficiency for I/O-bound operations. Enhances code readability by adjusting comment formatting. Relates to stream-testing-updates --- core/build.gradle | 4 ++-- demo/build.gradle | 4 ++-- .../java/com/tcn/exile/demo/single/AgentsController.java | 9 ++++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/build.gradle b/core/build.gradle index ae82c0b..66800ae 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -58,8 +58,8 @@ dependencies { } java { - sourceCompatibility = JavaVersion.toVersion("17") - targetCompatibility = JavaVersion.toVersion("17") + sourceCompatibility = JavaVersion.toVersion("21") + targetCompatibility = JavaVersion.toVersion("21") } graalvmNative.toolchainDetection = false diff --git a/demo/build.gradle b/demo/build.gradle index c96feae..6b430f1 100644 --- a/demo/build.gradle +++ b/demo/build.gradle @@ -70,8 +70,8 @@ application { } java { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 + sourceCompatibility = JavaVersion.VERSION_21 + targetCompatibility = JavaVersion.VERSION_21 } tasks.withType(JavaCompile) { diff --git a/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java b/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java index 89a4aea..d626ad8 100644 --- a/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java +++ b/demo/src/main/java/com/tcn/exile/demo/single/AgentsController.java @@ -30,6 +30,8 @@ import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; import io.micronaut.http.annotation.*; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.inject.Inject; @@ -43,6 +45,7 @@ import org.slf4j.LoggerFactory; @Controller("/api/agents") +@ExecuteOn(TaskExecutors.BLOCKING) @OpenAPIDefinition(tags = {@Tag(name = "agents")}) public class AgentsController { private static final Logger log = LoggerFactory.getLogger(AgentsController.class); @@ -295,7 +298,7 @@ public AgentStatus getState(@PathVariable String partnerAgentId) throws Unconfig @Tag(name = "agents") public SetAgentStatusResponse setState( @PathVariable String partnerAgentId, - @PathVariable SetAgentState state /*, @Body PauseCodeReason pauseCodeReason*/) + @PathVariable SetAgentState state /* , @Body PauseCodeReason pauseCodeReason */) throws UnconfiguredException { log.debug("setState"); var request = @@ -303,8 +306,8 @@ public SetAgentStatusResponse setState( .setPartnerAgentId(partnerAgentId) .setNewState(build.buf.gen.tcnapi.exile.gate.v2.AgentState.values()[state.getValue()]); // if (pauseCodeReason != null && pauseCodeReason.reason() != null) { - // request.setReason(pauseCodeReason.reason()); - // // request.setPauseCodeReason(pauseCodeReason.reason()); + // request.setReason(pauseCodeReason.reason()); + // // request.setPauseCodeReason(pauseCodeReason.reason()); // } var res = configChangeWatcher.getGateClient().updateAgentStatus(request.build()); return new SetAgentStatusResponse(); From bce704bc845251a4885701a1341ced6bbf2e0034 Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Tue, 17 Feb 2026 15:09:19 -0700 Subject: [PATCH 4/6] Handles routine stream closure gracefully Introduces a check for routine stream closures using a new method 'isRoutineStreamClosure'. Logs routine server-initiated closures as debug instead of error, reducing noise in logs. Improves error handling by distinguishing between expected and unexpected stream closures. Relates to stream stability improvements. --- .../gateclients/v2/GateClientAbstract.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java index 49d4187..9a30a99 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java @@ -168,7 +168,11 @@ public void start() { log.info("[{}] Interrupted", getStreamName()); Thread.currentThread().interrupt(); } catch (Exception e) { - log.error("[{}] Error: {}", getStreamName(), e.getMessage(), e); + if (isRoutineStreamClosure(e)) { + log.debug("[{}] Stream closed by server (routine reconnect)", getStreamName()); + } else { + log.error("[{}] Error: {}", getStreamName(), e.getMessage(), e); + } lastErrorType.set(e.getClass().getSimpleName()); consecutiveFailures.incrementAndGet(); } finally { @@ -595,4 +599,18 @@ public HungConnectionException(String message) { super(message); } } + + private boolean isRoutineStreamClosure(Exception e) { + Throwable cause = e; + while (cause != null) { + if (cause instanceof StatusRuntimeException sre + && sre.getStatus().getCode() == Status.Code.UNAVAILABLE + && sre.getMessage() != null + && sre.getMessage().contains("NO_ERROR")) { + return true; + } + cause = cause.getCause(); + } + return false; + } } From eeda5147f1506e0a6c9f5e0a717412710aa3c383 Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Wed, 18 Feb 2026 12:33:54 -0700 Subject: [PATCH 5/6] Updates JDK to version 21 and removes disconnect simulation Upgrades JDK setup in GitHub workflows from version 17 to 21 to leverage new features and improvements. Removes obsolete simulated network disconnect functionality from GateClientAbstract, simplifying the codebase and reducing the potential for errors during testing. Streamlines code as part of ongoing stream-testing updates. --- .github/workflows/gradle-publish.yml | 4 +- .github/workflows/gradle.yml | 8 ++-- .../gateclients/v2/GateClientAbstract.java | 38 ------------------- 3 files changed, 6 insertions(+), 44 deletions(-) diff --git a/.github/workflows/gradle-publish.yml b/.github/workflows/gradle-publish.yml index a3bcfa7..a4df842 100644 --- a/.github/workflows/gradle-publish.yml +++ b/.github/workflows/gradle-publish.yml @@ -21,10 +21,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' server-id: github # Value of the distributionManagement/repository/id field of the pom.xml settings-path: ${{ github.workspace }} # location for the settings.xml file diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index bcb1b24..d60bbc4 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -22,10 +22,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'zulu' - name: Gradle Wrapper run: ./gradlew wrapper --gradle-version 8.8 @@ -64,10 +64,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v4 with: - java-version: '17' + java-version: '21' distribution: 'temurin' # Generates and submits a dependency graph, enabling Dependabot Alerts for all project dependencies. # See: https://github.com/gradle/actions/blob/main/dependency-submission/README.md diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java index 9a30a99..36dc224 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientAbstract.java @@ -64,10 +64,6 @@ public abstract class GateClientAbstract { private ManagedChannel sharedChannel; private final ReentrantLock lock = new ReentrantLock(); - // Simulated disconnect: when set and in the future, getChannel() refuses to - // create a new channel - private final AtomicReference disconnectUntil = new AtomicReference<>(); - private Config currentConfig = null; protected final String tenant; @@ -394,24 +390,6 @@ private void forceShutdownSharedChannel() { } public ManagedChannel getChannel() throws UnconfiguredException { - // Check simulated disconnect window - Instant disconnectDeadline = disconnectUntil.get(); - if (disconnectDeadline != null) { - if (Instant.now().isBefore(disconnectDeadline)) { - long remainingSeconds = - java.time.Duration.between(Instant.now(), disconnectDeadline).getSeconds(); - log.warn( - "Tenant: {} - Simulated disconnect active, {} seconds remaining", - tenant, - remainingSeconds); - throw new UnconfiguredException( - "Simulated network disconnect active for " + remainingSeconds + " more seconds"); - } else { - disconnectUntil.set(null); - log.info("Tenant: {} - Simulated disconnect expired, allowing reconnection", tenant); - } - } - long getChannelStartTime = System.currentTimeMillis(); ManagedChannel localChannel = sharedChannel; if (localChannel == null || localChannel.isShutdown() || localChannel.isTerminated()) { @@ -536,22 +514,6 @@ private ManagedChannel createNewChannel() throws UnconfiguredException { } } - /** - * Simulate a network disconnect for the given duration. Kills the current channel and prevents - * reconnection until the duration expires. - */ - public void simulateDisconnect(int durationSeconds) { - log.warn("Tenant: {} - SIMULATING NETWORK DISCONNECT for {} seconds", tenant, durationSeconds); - disconnectUntil.set(Instant.now().plusSeconds(durationSeconds)); - forceShutdownSharedChannel(); - } - - /** Check if a simulated disconnect is currently active. */ - public boolean isDisconnectSimulationActive() { - Instant deadline = disconnectUntil.get(); - return deadline != null && Instant.now().isBefore(deadline); - } - /** * Reset gRPC's internal connect backoff on the shared channel. This forces the name resolver to * immediately retry DNS resolution instead of waiting for its own exponential backoff timer. From daa8770377038c7cf8bad3969786960265444f15 Mon Sep 17 00:00:00 2001 From: Braden Olsen Date: Thu, 19 Feb 2026 10:16:21 -0700 Subject: [PATCH 6/6] Replaces ExecutorService with virtual thread executor Switches to using a virtual thread per task executor for improved concurrency management and performance. This change enables more efficient handling of concurrent tasks by leveraging lightweight virtual threads. Relates to stream-testing-updates --- .../java/com/tcn/exile/gateclients/v2/GateClientJobStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java index e2d91fe..3ea6f87 100644 --- a/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java +++ b/core/src/main/java/com/tcn/exile/gateclients/v2/GateClientJobStream.java @@ -111,7 +111,7 @@ public void start() { return; } - ExecutorService executorService = Executors.newFixedThreadPool(2); + ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor(); ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService);