diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java index b78d0ef98bfc8..51adc1ccfa378 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestPipelineFromKafkaIT.java @@ -77,6 +77,16 @@ public void testFinalPipelineAddsField() throws Exception { Map source2 = response.getHits().getHits()[1].getSourceAsMap(); return Boolean.TRUE.equals(source1.get("processed")) && Boolean.TRUE.equals(source2.get("processed")); }); + + // Verify pipeline execution stats + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(stats.getPipelineStats()); + assertTrue(stats.getPipelineStats().totalExecutionCount() >= 2); + assertTrue(stats.getPipelineStats().totalExecutionTimeInMillis() >= 0); + assertEquals(0, stats.getPipelineStats().totalFailedCount()); + assertEquals(0, stats.getPipelineStats().totalDroppedCount()); + assertEquals(0, stats.getPipelineStats().totalTimeoutCount()); } /** @@ -100,6 +110,13 @@ public void testFinalPipelineDropsDocument() throws Exception { refresh(indexName); SearchResponse response = client().prepareSearch(indexName).get(); assertThat(response.getHits().getTotalHits().value(), is(0L)); + + // Verify pipeline drop stats + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertTrue(stats.getPipelineStats().totalExecutionCount() >= 2); + assertTrue(stats.getPipelineStats().totalDroppedCount() >= 2); + assertEquals(0, stats.getPipelineStats().totalFailedCount()); } /** @@ -124,6 +141,13 @@ public void testNoPipelineConfigured() throws Exception { Map source = response.getHits().getHits()[i].getSourceAsMap(); assertFalse("Document should not have 'processed' field", source.containsKey("processed")); } + + // Verify zero pipeline stats when no pipeline configured + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertEquals(0, stats.getPipelineStats().totalExecutionCount()); + assertEquals(0, stats.getPipelineStats().totalDroppedCount()); + assertEquals(0, stats.getPipelineStats().totalFailedCount()); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index ab8aeef204013..c05885f4a5783 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -525,7 +525,13 @@ protected Map commitDataAsMap() { @Override public PollingIngestStats pollingIngestStats() { - return streamPoller.getStats(); + PollingIngestStats pollerStats = streamPoller.getStats(); + // Enrich with pipeline execution metrics from the shared executor + return new PollingIngestStats( + pollerStats.getMessageProcessorStats(), + pollerStats.getConsumerStats(), + pipelineExecutor.getMetrics() + ); } private void registerDynamicIndexSettingsHandlers() { diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java b/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java index ba79f59cf944b..4619f9ee4090a 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/IngestPipelineExecutor.java @@ -12,6 +12,8 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.metrics.CounterMetric; import org.opensearch.index.IndexSettings; import org.opensearch.ingest.IngestService; import org.opensearch.threadpool.ThreadPool; @@ -51,6 +53,13 @@ public class IngestPipelineExecutor { private final String index; private volatile String resolvedFinalPipeline; + // Pipeline execution metrics + private final CounterMetric executionCount = new CounterMetric(); + private final CounterMetric executionTimeNanos = new CounterMetric(); + private final CounterMetric failedCount = new CounterMetric(); + private final CounterMetric droppedCount = new CounterMetric(); + private final CounterMetric timeoutCount = new CounterMetric(); + /** * Creates an IngestPipelineExecutor for the given index. * Resolves the final pipeline from index settings and registers a dynamic settings listener. @@ -106,6 +115,8 @@ public Map executePipelines(String id, Map sourc return sourceMap; } + long startTimeNanos = System.nanoTime(); + // Build IndexRequest to carry the document through the pipeline IndexRequest indexRequest = new IndexRequest(index); indexRequest.id(id); @@ -140,15 +151,23 @@ public Map executePipelines(String id, Map sourc try { future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (TimeoutException e) { + timeoutCount.inc(); + failedCount.inc(); throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + failedCount.inc(); throw new RuntimeException("Ingest pipeline execution was interrupted", e); } catch (ExecutionException e) { + failedCount.inc(); throw new RuntimeException("Ingest pipeline execution failed", e.getCause()); + } finally { + executionTimeNanos.inc(System.nanoTime() - startTimeNanos); + executionCount.inc(); } if (dropped.get()) { + droppedCount.inc(); return null; } @@ -172,4 +191,29 @@ public Map executePipelines(String id, Map sourc return indexRequest.sourceAsMap(); } + + /** + * Returns pipeline execution metrics. + */ + public PipelineMetrics getMetrics() { + return new PipelineMetrics( + executionCount.count(), + TimeUnit.NANOSECONDS.toMillis(executionTimeNanos.count()), + failedCount.count(), + droppedCount.count(), + timeoutCount.count() + ); + } + + /** + * Pipeline execution metrics for pull-based ingestion. + */ + @PublicApi(since = "3.6.0") + public record PipelineMetrics( + long totalExecutionCount, + long totalExecutionTimeInMillis, + long totalFailedCount, + long totalDroppedCount, + long totalTimeoutCount + ) {} } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java index ce3b2fe21aa47..a3ccb6c150540 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java @@ -26,10 +26,34 @@ public class PollingIngestStats implements Writeable, ToXContentFragment { private final MessageProcessorStats messageProcessorStats; private final ConsumerStats consumerStats; + private final PipelineStats pipelineStats; public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerStats consumerStats) { + this(messageProcessorStats, consumerStats, new PipelineStats(0, 0, 0, 0, 0)); + } + + public PollingIngestStats( + MessageProcessorStats messageProcessorStats, + ConsumerStats consumerStats, + IngestPipelineExecutor.PipelineMetrics pipelineMetrics + ) { + this( + messageProcessorStats, + consumerStats, + new PipelineStats( + pipelineMetrics.totalExecutionCount(), + pipelineMetrics.totalExecutionTimeInMillis(), + pipelineMetrics.totalFailedCount(), + pipelineMetrics.totalDroppedCount(), + pipelineMetrics.totalTimeoutCount() + ) + ); + } + + public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerStats consumerStats, PipelineStats pipelineStats) { this.messageProcessorStats = messageProcessorStats; this.consumerStats = consumerStats; + this.pipelineStats = pipelineStats; } public PollingIngestStats(StreamInput in) throws IOException { @@ -68,6 +92,18 @@ public PollingIngestStats(StreamInput in) throws IOException { totalDuplicateMessageSkippedCount, pointerBasedLag ); + + if (in.getVersion().onOrAfter(Version.V_3_6_0)) { + this.pipelineStats = new PipelineStats( + in.readLong(), + in.readLong(), + in.readLong(), + in.readLong(), + in.readLong() + ); + } else { + this.pipelineStats = new PipelineStats(0, 0, 0, 0, 0); + } } @Override @@ -88,6 +124,14 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_4_0)) { out.writeLong(consumerStats.pointerBasedLag); } + + if (out.getVersion().onOrAfter(Version.V_3_6_0)) { + out.writeLong(pipelineStats.totalExecutionCount); + out.writeLong(pipelineStats.totalExecutionTimeInMillis); + out.writeLong(pipelineStats.totalFailedCount); + out.writeLong(pipelineStats.totalDroppedCount); + out.writeLong(pipelineStats.totalTimeoutCount); + } } @Override @@ -110,6 +154,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("lag_in_millis", consumerStats.lagInMillis); builder.field("pointer_based_lag", consumerStats.pointerBasedLag); builder.endObject(); + builder.startObject("pipeline_stats"); + builder.field("total_execution_count", pipelineStats.totalExecutionCount); + builder.field("total_execution_time_in_millis", pipelineStats.totalExecutionTimeInMillis); + builder.field("total_failed_count", pipelineStats.totalFailedCount); + builder.field("total_dropped_count", pipelineStats.totalDroppedCount); + builder.field("total_timeout_count", pipelineStats.totalTimeoutCount); + builder.endObject(); builder.endObject(); return builder; } @@ -122,17 +173,23 @@ public ConsumerStats getConsumerStats() { return consumerStats; } + public PipelineStats getPipelineStats() { + return pipelineStats; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof PollingIngestStats)) return false; PollingIngestStats that = (PollingIngestStats) o; - return Objects.equals(messageProcessorStats, that.messageProcessorStats) && Objects.equals(consumerStats, that.consumerStats); + return Objects.equals(messageProcessorStats, that.messageProcessorStats) + && Objects.equals(consumerStats, that.consumerStats) + && Objects.equals(pipelineStats, that.pipelineStats); } @Override public int hashCode() { - return Objects.hash(messageProcessorStats, consumerStats); + return Objects.hash(messageProcessorStats, consumerStats, pipelineStats); } /** @@ -153,6 +210,14 @@ public record ConsumerStats(long totalPolledCount, long lagInMillis, long totalC long totalPollerMessageDroppedCount, long totalDuplicateMessageSkippedCount, long pointerBasedLag) { } + /** + * Stats for pipeline execution in pull-based ingestion. + */ + @PublicApi(since = "3.6.0") + public record PipelineStats(long totalExecutionCount, long totalExecutionTimeInMillis, long totalFailedCount, long totalDroppedCount, + long totalTimeoutCount) { + } + /** * Builder for {@link PollingIngestStats} */ @@ -171,6 +236,11 @@ public static class Builder { private long totalPollerMessageDroppedCount; private long totalDuplicateMessageSkippedCount; private long pointerBasedLag; + private long pipelineExecutionCount; + private long pipelineExecutionTimeInMillis; + private long pipelineFailedCount; + private long pipelineDroppedCount; + private long pipelineTimeoutCount; public Builder() {} @@ -243,6 +313,15 @@ public Builder setPointerBasedLag(long pointerBasedLag) { return this; } + public Builder setPipelineStats(IngestPipelineExecutor.PipelineMetrics metrics) { + this.pipelineExecutionCount = metrics.totalExecutionCount(); + this.pipelineExecutionTimeInMillis = metrics.totalExecutionTimeInMillis(); + this.pipelineFailedCount = metrics.totalFailedCount(); + this.pipelineDroppedCount = metrics.totalDroppedCount(); + this.pipelineTimeoutCount = metrics.totalTimeoutCount(); + return this; + } + public PollingIngestStats build() { MessageProcessorStats messageProcessorStats = new MessageProcessorStats( totalProcessedCount, @@ -261,7 +340,14 @@ public PollingIngestStats build() { totalDuplicateMessageSkippedCount, pointerBasedLag ); - return new PollingIngestStats(messageProcessorStats, consumerStats); + PipelineStats pipelineStats = new PipelineStats( + pipelineExecutionCount, + pipelineExecutionTimeInMillis, + pipelineFailedCount, + pipelineDroppedCount, + pipelineTimeoutCount + ); + return new PollingIngestStats(messageProcessorStats, consumerStats, pipelineStats); } } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java index 61ea51c74fcc6..1642ddce39cd4 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/IngestPipelineExecutorTests.java @@ -58,6 +58,11 @@ public void testExecutePipelines_NoPipeline_ReturnsSourceUnchanged() throws Exce assertSame(source, result); verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + // No pipeline configured — metrics should be zero + IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics(); + assertEquals(0, metrics.totalExecutionCount()); + assertEquals(0, metrics.totalFailedCount()); } // --- Execution: pipeline transforms source --- @@ -73,6 +78,13 @@ public void testExecutePipelines_TransformsSource() throws Exception { assertNotNull(result); verify(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); + + // Verify metrics + IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics(); + assertEquals(1, metrics.totalExecutionCount()); + assertTrue(metrics.totalExecutionTimeInMillis() >= 0); + assertEquals(0, metrics.totalFailedCount()); + assertEquals(0, metrics.totalDroppedCount()); } // --- Execution: pipeline drops document --- @@ -94,9 +106,13 @@ public void testExecutePipelines_DropsDocument() throws Exception { Map result = executor.executePipelines("1", source); assertNull(result); - } - // --- Execution: pipeline failure --- + // Verify drop metric + IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics(); + assertEquals(1, metrics.totalExecutionCount()); + assertEquals(1, metrics.totalDroppedCount()); + assertEquals(0, metrics.totalFailedCount()); + } public void testExecutePipelines_Failure() { doAnswer(invocation -> { @@ -115,6 +131,12 @@ public void testExecutePipelines_Failure() { Exception e = expectThrows(RuntimeException.class, () -> executor.executePipelines("1", source)); assertTrue(e.getMessage().contains("Ingest pipeline execution failed")); assertTrue(e.getCause().getMessage().contains("processor failed")); + + // Verify failure metric + IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics(); + assertEquals(1, metrics.totalExecutionCount()); + assertEquals(1, metrics.totalFailedCount()); + assertEquals(0, metrics.totalDroppedCount()); } public void testExecutePipelines_CompletionException() {