Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public void testFinalPipelineAddsField() throws Exception {
Map<String, Object> source2 = response.getHits().getHits()[1].getSourceAsMap();
return Boolean.TRUE.equals(source1.get("processed")) && Boolean.TRUE.equals(source2.get("processed"));
});

// Verify pipeline execution stats
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(stats.getPipelineStats());
assertTrue(stats.getPipelineStats().totalExecutionCount() >= 2);
assertTrue(stats.getPipelineStats().totalExecutionTimeInMillis() >= 0);
assertEquals(0, stats.getPipelineStats().totalFailedCount());
assertEquals(0, stats.getPipelineStats().totalDroppedCount());
assertEquals(0, stats.getPipelineStats().totalTimeoutCount());
}

/**
Expand All @@ -100,6 +110,13 @@ public void testFinalPipelineDropsDocument() throws Exception {
refresh(indexName);
SearchResponse response = client().prepareSearch(indexName).get();
assertThat(response.getHits().getTotalHits().value(), is(0L));

// Verify pipeline drop stats
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertTrue(stats.getPipelineStats().totalExecutionCount() >= 2);
assertTrue(stats.getPipelineStats().totalDroppedCount() >= 2);
assertEquals(0, stats.getPipelineStats().totalFailedCount());
}

/**
Expand All @@ -124,6 +141,13 @@ public void testNoPipelineConfigured() throws Exception {
Map<String, Object> source = response.getHits().getHits()[i].getSourceAsMap();
assertFalse("Document should not have 'processed' field", source.containsKey("processed"));
}

// Verify zero pipeline stats when no pipeline configured
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertEquals(0, stats.getPipelineStats().totalExecutionCount());
assertEquals(0, stats.getPipelineStats().totalDroppedCount());
assertEquals(0, stats.getPipelineStats().totalFailedCount());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,13 @@ protected Map<String, String> commitDataAsMap() {

@Override
public PollingIngestStats pollingIngestStats() {
return streamPoller.getStats();
PollingIngestStats pollerStats = streamPoller.getStats();
// Enrich with pipeline execution metrics from the shared executor
return new PollingIngestStats(
pollerStats.getMessageProcessorStats(),
pollerStats.getConsumerStats(),
pipelineExecutor.getMetrics()
);
}

private void registerDynamicIndexSettingsHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.IngestService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -51,6 +53,13 @@ public class IngestPipelineExecutor {
private final String index;
private volatile String resolvedFinalPipeline;

// Pipeline execution metrics
private final CounterMetric executionCount = new CounterMetric();
private final CounterMetric executionTimeNanos = new CounterMetric();
private final CounterMetric failedCount = new CounterMetric();
private final CounterMetric droppedCount = new CounterMetric();
private final CounterMetric timeoutCount = new CounterMetric();

/**
* Creates an IngestPipelineExecutor for the given index.
* Resolves the final pipeline from index settings and registers a dynamic settings listener.
Expand Down Expand Up @@ -106,6 +115,8 @@ public Map<String, Object> executePipelines(String id, Map<String, Object> sourc
return sourceMap;
}

long startTimeNanos = System.nanoTime();

// Build IndexRequest to carry the document through the pipeline
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(id);
Expand Down Expand Up @@ -140,15 +151,23 @@ public Map<String, Object> executePipelines(String id, Map<String, Object> sourc
try {
future.get(PIPELINE_EXECUTION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException e) {
timeoutCount.inc();
failedCount.inc();
throw new RuntimeException("Ingest pipeline execution timed out after [" + PIPELINE_EXECUTION_TIMEOUT_SECONDS + "] seconds", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
failedCount.inc();
throw new RuntimeException("Ingest pipeline execution was interrupted", e);
} catch (ExecutionException e) {
failedCount.inc();
throw new RuntimeException("Ingest pipeline execution failed", e.getCause());
} finally {
executionTimeNanos.inc(System.nanoTime() - startTimeNanos);
executionCount.inc();
}

if (dropped.get()) {
droppedCount.inc();
return null;
}

Expand All @@ -172,4 +191,29 @@ public Map<String, Object> executePipelines(String id, Map<String, Object> sourc

return indexRequest.sourceAsMap();
}

/**
* Returns pipeline execution metrics.
*/
public PipelineMetrics getMetrics() {
return new PipelineMetrics(
executionCount.count(),
TimeUnit.NANOSECONDS.toMillis(executionTimeNanos.count()),
failedCount.count(),
droppedCount.count(),
timeoutCount.count()
);
}

/**
* Pipeline execution metrics for pull-based ingestion.
*/
@PublicApi(since = "3.6.0")
public record PipelineMetrics(
long totalExecutionCount,
long totalExecutionTimeInMillis,
long totalFailedCount,
long totalDroppedCount,
long totalTimeoutCount
) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,34 @@
public class PollingIngestStats implements Writeable, ToXContentFragment {
private final MessageProcessorStats messageProcessorStats;
private final ConsumerStats consumerStats;
private final PipelineStats pipelineStats;

public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerStats consumerStats) {
this(messageProcessorStats, consumerStats, new PipelineStats(0, 0, 0, 0, 0));
}

public PollingIngestStats(
MessageProcessorStats messageProcessorStats,
ConsumerStats consumerStats,
IngestPipelineExecutor.PipelineMetrics pipelineMetrics
) {
this(
messageProcessorStats,
consumerStats,
new PipelineStats(
pipelineMetrics.totalExecutionCount(),
pipelineMetrics.totalExecutionTimeInMillis(),
pipelineMetrics.totalFailedCount(),
pipelineMetrics.totalDroppedCount(),
pipelineMetrics.totalTimeoutCount()
)
);
}

public PollingIngestStats(MessageProcessorStats messageProcessorStats, ConsumerStats consumerStats, PipelineStats pipelineStats) {
this.messageProcessorStats = messageProcessorStats;
this.consumerStats = consumerStats;
this.pipelineStats = pipelineStats;
}

public PollingIngestStats(StreamInput in) throws IOException {
Expand Down Expand Up @@ -68,6 +92,18 @@ public PollingIngestStats(StreamInput in) throws IOException {
totalDuplicateMessageSkippedCount,
pointerBasedLag
);

if (in.getVersion().onOrAfter(Version.V_3_6_0)) {
this.pipelineStats = new PipelineStats(
in.readLong(),
in.readLong(),
in.readLong(),
in.readLong(),
in.readLong()
);
} else {
this.pipelineStats = new PipelineStats(0, 0, 0, 0, 0);
}
}

@Override
Expand All @@ -88,6 +124,14 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_4_0)) {
out.writeLong(consumerStats.pointerBasedLag);
}

if (out.getVersion().onOrAfter(Version.V_3_6_0)) {
out.writeLong(pipelineStats.totalExecutionCount);
out.writeLong(pipelineStats.totalExecutionTimeInMillis);
out.writeLong(pipelineStats.totalFailedCount);
out.writeLong(pipelineStats.totalDroppedCount);
out.writeLong(pipelineStats.totalTimeoutCount);
}
}

@Override
Expand All @@ -110,6 +154,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("lag_in_millis", consumerStats.lagInMillis);
builder.field("pointer_based_lag", consumerStats.pointerBasedLag);
builder.endObject();
builder.startObject("pipeline_stats");
builder.field("total_execution_count", pipelineStats.totalExecutionCount);
builder.field("total_execution_time_in_millis", pipelineStats.totalExecutionTimeInMillis);
builder.field("total_failed_count", pipelineStats.totalFailedCount);
builder.field("total_dropped_count", pipelineStats.totalDroppedCount);
builder.field("total_timeout_count", pipelineStats.totalTimeoutCount);
builder.endObject();
builder.endObject();
return builder;
}
Expand All @@ -122,17 +173,23 @@ public ConsumerStats getConsumerStats() {
return consumerStats;
}

public PipelineStats getPipelineStats() {
return pipelineStats;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PollingIngestStats)) return false;
PollingIngestStats that = (PollingIngestStats) o;
return Objects.equals(messageProcessorStats, that.messageProcessorStats) && Objects.equals(consumerStats, that.consumerStats);
return Objects.equals(messageProcessorStats, that.messageProcessorStats)
&& Objects.equals(consumerStats, that.consumerStats)
&& Objects.equals(pipelineStats, that.pipelineStats);
}

@Override
public int hashCode() {
return Objects.hash(messageProcessorStats, consumerStats);
return Objects.hash(messageProcessorStats, consumerStats, pipelineStats);
}

/**
Expand All @@ -153,6 +210,14 @@ public record ConsumerStats(long totalPolledCount, long lagInMillis, long totalC
long totalPollerMessageDroppedCount, long totalDuplicateMessageSkippedCount, long pointerBasedLag) {
}

/**
* Stats for pipeline execution in pull-based ingestion.
*/
@PublicApi(since = "3.6.0")
public record PipelineStats(long totalExecutionCount, long totalExecutionTimeInMillis, long totalFailedCount, long totalDroppedCount,
long totalTimeoutCount) {
}

/**
* Builder for {@link PollingIngestStats}
*/
Expand All @@ -171,6 +236,11 @@ public static class Builder {
private long totalPollerMessageDroppedCount;
private long totalDuplicateMessageSkippedCount;
private long pointerBasedLag;
private long pipelineExecutionCount;
private long pipelineExecutionTimeInMillis;
private long pipelineFailedCount;
private long pipelineDroppedCount;
private long pipelineTimeoutCount;

public Builder() {}

Expand Down Expand Up @@ -243,6 +313,15 @@ public Builder setPointerBasedLag(long pointerBasedLag) {
return this;
}

public Builder setPipelineStats(IngestPipelineExecutor.PipelineMetrics metrics) {
this.pipelineExecutionCount = metrics.totalExecutionCount();
this.pipelineExecutionTimeInMillis = metrics.totalExecutionTimeInMillis();
this.pipelineFailedCount = metrics.totalFailedCount();
this.pipelineDroppedCount = metrics.totalDroppedCount();
this.pipelineTimeoutCount = metrics.totalTimeoutCount();
return this;
}

public PollingIngestStats build() {
MessageProcessorStats messageProcessorStats = new MessageProcessorStats(
totalProcessedCount,
Expand All @@ -261,7 +340,14 @@ public PollingIngestStats build() {
totalDuplicateMessageSkippedCount,
pointerBasedLag
);
return new PollingIngestStats(messageProcessorStats, consumerStats);
PipelineStats pipelineStats = new PipelineStats(
pipelineExecutionCount,
pipelineExecutionTimeInMillis,
pipelineFailedCount,
pipelineDroppedCount,
pipelineTimeoutCount
);
return new PollingIngestStats(messageProcessorStats, consumerStats, pipelineStats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void testExecutePipelines_NoPipeline_ReturnsSourceUnchanged() throws Exce

assertSame(source, result);
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString());

// No pipeline configured — metrics should be zero
IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics();
assertEquals(0, metrics.totalExecutionCount());
assertEquals(0, metrics.totalFailedCount());
}

// --- Execution: pipeline transforms source ---
Expand All @@ -73,6 +78,13 @@ public void testExecutePipelines_TransformsSource() throws Exception {

assertNotNull(result);
verify(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString());

// Verify metrics
IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics();
assertEquals(1, metrics.totalExecutionCount());
assertTrue(metrics.totalExecutionTimeInMillis() >= 0);
assertEquals(0, metrics.totalFailedCount());
assertEquals(0, metrics.totalDroppedCount());
}

// --- Execution: pipeline drops document ---
Expand All @@ -94,9 +106,13 @@ public void testExecutePipelines_DropsDocument() throws Exception {
Map<String, Object> result = executor.executePipelines("1", source);

assertNull(result);
}

// --- Execution: pipeline failure ---
// Verify drop metric
IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics();
assertEquals(1, metrics.totalExecutionCount());
assertEquals(1, metrics.totalDroppedCount());
assertEquals(0, metrics.totalFailedCount());
}

public void testExecutePipelines_Failure() {
doAnswer(invocation -> {
Expand All @@ -115,6 +131,12 @@ public void testExecutePipelines_Failure() {
Exception e = expectThrows(RuntimeException.class, () -> executor.executePipelines("1", source));
assertTrue(e.getMessage().contains("Ingest pipeline execution failed"));
assertTrue(e.getCause().getMessage().contains("processor failed"));

// Verify failure metric
IngestPipelineExecutor.PipelineMetrics metrics = executor.getMetrics();
assertEquals(1, metrics.totalExecutionCount());
assertEquals(1, metrics.totalFailedCount());
assertEquals(0, metrics.totalDroppedCount());
}

public void testExecutePipelines_CompletionException() {
Expand Down
Loading