perf(loki/stages): add SyncStage to reduce pipeline goroutines (N+3 → 1 per PodLogs)#6001
Draft
QuentinBisson wants to merge 6 commits intografana:mainfrom
Draft
perf(loki/stages): add SyncStage to reduce pipeline goroutines (N+3 → 1 per PodLogs)#6001QuentinBisson wants to merge 6 commits intografana:mainfrom
QuentinBisson wants to merge 6 commits intografana:mainfrom
Conversation
Adds `json:"..."` tags to every stage configuration struct in loki/process/stages and loki/process/metric. These tags are required to serialize stage configs to/from JSON for use in the PodLogs CRD (see grafana#4738). Notable serialization choices: - `time.Duration` fields (OlderThan, MaxIdle) are tagged `json:"-"` because Go serialises them as nanosecond int64 values, which are not human-readable in Kubernetes YAML. Users can rely on the default value. - `units.Base2Bytes` already implements TextMarshaler/TextUnmarshaler and serialises as "5MiB", so it is exposed normally. No behaviour change — the alloy:"..." tags that drive existing parsing are untouched; json tags are purely additive. Refs: grafana#4738 fix(loki/stages): mark GeoIPConfig.Source as required in JSON Remove omitempty from the Source field JSON tag so the field is never silently dropped when serialising a PodLogs pipeline stage config. Source is a required field (alloy:"source,attr") and omitting it in JSON would leave a nil pointer that causes the stage to do nothing. Refs: grafana#4738 Signed-off-by: QuentinBisson <quentin@giantswarm.io>
Implements grafana#4738: PodLogs resources can now declare `pipelineStages` in their spec to apply log processing stages to every log line collected by that resource, before forwarding to the fanout. Design summary: - `stages.PodLogsStageConfig`: JSON-tagged subset of StageConfig that excludes multiline (lines from different pods interleave), windowsevent, and eventlogmessage (Linux-only context). - One `stages.Pipeline` is created per PodLogs resource with stages. Per-PodLogs Prometheus metrics are namespaced via WrapRegistererWith. - `kubetail.Target` gains an optional `handler loki.EntryHandler`; when set, the tailer routes entries through that handler instead of the global one. `tailerTask.Equals` compares handler pointers so pipeline changes trigger a tailer restart automatically. - Pipeline lifecycle is managed in the reconciler: new pipelines are created before SyncTargets; old/replaced pipelines are stopped only after SyncTargets returns (which waits for stopped tailer goroutines to exit), eliminating any window where a tailer goroutine could write to a dead pipeline channel. - CRD YAML updated with a full OpenAPIV3 schema for all 25 supported stage types. Known limitation (TODO in reconciler.go): Pipeline.Start spawns N+3 goroutines per PodLogs resource. At large scale a synchronous Pipeline.Process path would reduce this to zero extra goroutines. Refs: grafana#4738
Two bugs found in review: activePipelineKeys goroutine leak: the key was pre-populated before reconcilePodLogs was called, so if reconcilePodLogs returned early (invalid relabeling, etc.) the old pipeline was never stopped even after SyncTargets removed all its tailers. Fix: check r.pipelines after reconcilePodLogs returns - only mark the key active when ensurePipeline actually succeeded. Backoff not reset on pipeline path: when a per-target pipeline handler is set, processLogStream bypasses the mutator handler that calls bo.Reset() on each received entry, so the backoff was never reset. Fix: thread an onEntrySent callback through tail→processLogStream; bo.Reset is passed on the pipeline path, a no-op on the default path. Refs: grafana#4738
Pipeline.Start spawns N+3 goroutines per pipeline: one RunWith goroutine per stage, one for the label-extraction init step, and two adapter goroutines. At large scale (hundreds of PodLogs with stages) this adds noticeable overhead.
This commit introduces SyncStage, an optional interface for stages that can process a single entry synchronously without goroutines:
```
ProcessEntry(e Entry) []Entry
```
All stages supported in PodLogsStageConfig implement SyncStage except match (which uses nested sub-pipelines — tracked as a follow-up). stageProcessor gains ProcessEntry for free, covering the 13+ stages that are already Processor-based (regex, logfmt, template, etc.).
Custom implementations are added to: cri, decolorize, drop, geoip,
json, labels, limit, metric, pack, sampling, structured_metadata,
structured_metadata_drop, and truncate.
Pipeline gains:
```
CanProcessSync() bool — true when all stages are SyncStage
ProcessEntry(e loki.Entry) []loki.Entry — synchronous application
StartSync(in, out) — 1-goroutine handler using ProcessEntry
```
ensurePipeline in the PodLogs reconciler now calls StartSync when the pipeline supports it:
```
Before (3 stages): 6 goroutines per PodLogs resource with stages
After (3 stages): 1 goroutine per PodLogs resource with stages
```
For 500 PodLogs with 3-stage pipelines that's 3000 → 500 goroutines. Pipelines containing match fall back to Start (unchanged behaviour).
Refs: grafana#4738
Several issues found in review: PodLogsMatchConfig: use PodLogsStageConfig for nested match stages so that incompatible stage types (multiline, windowsevent, eventlogmessage) are excluded recursively. PodLogsStageConfig.MatchConfig is now *PodLogsMatchConfig instead of *MatchConfig. matcherStage SyncStage: implement ProcessEntry and canProcessSync on matcherStage. Drop actions are always sync-safe; keep actions are sync-safe only when the nested pipeline is also sync-capable. Pipeline. CanProcessSync checks the new syncChecker interface so a match stage with a non-sync inner pipeline does not falsely advertise sync support. cri: extract processLine() shared helper used by both Run and ProcessEntry, eliminating ~60 lines of duplication. truncate: extract applyRules() shared helper; Run is now a one-liner via RunWith; debug logs are present in both paths. structured_metadata: add missing debug log calls to ProcessEntry to match the Run implementation. processEntryFull: add private pipeline method that applies stages to an already-initialized Entry without reinitializing Extracted from labels. Used by ProcessEntry and by matcherStage for nested pipelines. Refs: grafana#4738
… CRD Using json:"-" on DropConfig.OlderThan silently discarded the field during JSON unmarshaling, so a PodLogs user writing older_than: 5m would get no error and no filtering — a silent no-op. Fix: add PodLogsDropConfig with OlderThan string (human-readable, e.g. "5m") and a toDropConfig() conversion that calls time.ParseDuration and returns an error on invalid input. PodLogsStageConfig.DropConfig is now *PodLogsDropConfig instead of *DropConfig, matching the same pattern used for PodLogsMatchConfig. ToStageConfig and ConvertPodLogsStages now return errors so that any invalid field value (malformed duration, etc.) surfaces immediately when the pipeline is built rather than being silently ignored. Also revert the json:"-" workaround on DropConfig.OlderThan now that the CRD path uses its own type. Refs: grafana#4738
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a
SyncStageoptional interface to theloki.processstages package so that pipelines whose stages all support synchronous processing can run in a single goroutine instead of theN+3goroutine chain produced byPipeline.Start.This is particularly important for
loki.source.podlogs(#6000), where each PodLogs CRD withpipelineStagescreates one pipeline. At scale (100–500 PodLogs CRDs), the goroutine savings are significant.Changes
SyncStageinterface +Pipeline.StartSyncSyncStageinterface:ProcessEntry(e Entry) []EntryPipeline.CanProcessSync()returns true when all stages implementSyncStage(deep-checks nested match pipelines viasyncChecker)Pipeline.StartSync()runs the whole pipeline in 1 goroutine instead of N+3Pipeline.processEntryFull()private method applies stages to an already-initialisedEntry(used bymatcherStage)Stages that now implement
SyncStageProcessor-based stages (regex, json, logfmt, labels, template, tenant, timestamp, output, replace, pattern, luhn, label_drop, label_keep, static_labels, docker, …) getProcessEntryfor free viastageProcessorcri:processLine()helper extracted —RunandProcessEntryshare the same implementationtruncate:applyRules()helper extracted —Runsimplified toRunWith(in, m.applyRules)drop:ProcessEntryaddedgeoip:ProcessEntryaddedsampling:ProcessEntryaddedpack:ProcessEntryaddedlimit:ProcessEntryaddedmatch:ProcessEntry+canProcessSync()— keep actions are sync-safe when the nested pipeline is also sync-capable; drop actions are always sync-safeGoroutine comparison (N = number of stages)
Pipeline.StartPipeline.StartSyncTest plan
Pipeline.CanProcessSync()returns false for pipelines containing multilinePipeline.CanProcessSync()returns true for all-sync pipelines including nested matchRefs: #4738
🤖 Generated with Claude Code