feat(loki/podlogs): add pipelineStages processing to PodLogs CRD#6000
Draft
QuentinBisson wants to merge 3 commits intografana:mainfrom
Draft
feat(loki/podlogs): add pipelineStages processing to PodLogs CRD#6000QuentinBisson wants to merge 3 commits intografana:mainfrom
QuentinBisson wants to merge 3 commits intografana:mainfrom
Conversation
Adds `json:"..."` tags to every stage configuration struct in loki/process/stages and loki/process/metric. These tags are required to serialize stage configs to/from JSON for use in the PodLogs CRD (see grafana#4738). Notable serialization choices: - `time.Duration` fields (OlderThan, MaxIdle) are tagged `json:"-"` because Go serialises them as nanosecond int64 values, which are not human-readable in Kubernetes YAML. Users can rely on the default value. - `units.Base2Bytes` already implements TextMarshaler/TextUnmarshaler and serialises as "5MiB", so it is exposed normally. No behaviour change — the alloy:"..." tags that drive existing parsing are untouched; json tags are purely additive. Refs: grafana#4738 fix(loki/stages): mark GeoIPConfig.Source as required in JSON Remove omitempty from the Source field JSON tag so the field is never silently dropped when serialising a PodLogs pipeline stage config. Source is a required field (alloy:"source,attr") and omitting it in JSON would leave a nil pointer that causes the stage to do nothing. Refs: grafana#4738 Signed-off-by: QuentinBisson <quentin@giantswarm.io>
Implements grafana#4738: PodLogs resources can now declare `pipelineStages` in their spec to apply log processing stages to every log line collected by that resource, before forwarding to the fanout. Design summary: - `stages.PodLogsStageConfig`: JSON-tagged subset of StageConfig that excludes multiline (lines from different pods interleave), windowsevent, and eventlogmessage (Linux-only context). - One `stages.Pipeline` is created per PodLogs resource with stages. Per-PodLogs Prometheus metrics are namespaced via WrapRegistererWith. - `kubetail.Target` gains an optional `handler loki.EntryHandler`; when set, the tailer routes entries through that handler instead of the global one. `tailerTask.Equals` compares handler pointers so pipeline changes trigger a tailer restart automatically. - Pipeline lifecycle is managed in the reconciler: new pipelines are created before SyncTargets; old/replaced pipelines are stopped only after SyncTargets returns (which waits for stopped tailer goroutines to exit), eliminating any window where a tailer goroutine could write to a dead pipeline channel. - CRD YAML updated with a full OpenAPIV3 schema for all 25 supported stage types. Known limitation (TODO in reconciler.go): Pipeline.Start spawns N+3 goroutines per PodLogs resource. At large scale a synchronous Pipeline.Process path would reduce this to zero extra goroutines. Refs: grafana#4738
Two bugs found in review: activePipelineKeys goroutine leak: the key was pre-populated before reconcilePodLogs was called, so if reconcilePodLogs returned early (invalid relabeling, etc.) the old pipeline was never stopped even after SyncTargets removed all its tailers. Fix: check r.pipelines after reconcilePodLogs returns - only mark the key active when ensurePipeline actually succeeded. Backoff not reset on pipeline path: when a per-target pipeline handler is set, processLogStream bypasses the mutator handler that calls bo.Reset() on each received entry, so the backoff was never reset. Fix: thread an onEntrySent callback through tail→processLogStream; bo.Reset is passed on the pipeline path, a no-op on the default path. Refs: grafana#4738
4 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Embeds a
loki.process-style pipeline directly in thePodLogsCRD spec so teams can self-service drop/transform logs at collection time, without coupling to a sharedloki.processcomponent in the global Alloy config.What's added
PodLogsStageConfig— a JSON-only subset ofStageConfigthat excludes stages incompatible with a shared per-CRD pipeline:multiline: lines from different pods interleave, causing incorrect merging across pod boundaries.windowsevent/eventlogmessage: not applicable to Linux pod logs.PodLogsDropConfig— mirrorsDropConfigbut withOlderThan string(e.g."5m") instead oftime.Duration(which serialises as a nanosecond int64 in JSON).PodLogsMatchConfig— mirrorsMatchConfigbut uses[]PodLogsStageConfigfor nested stages, so the exclusions apply recursively.PipelineStages []PodLogsStageConfigfield inPodLogsSpec.SyncTargets(so no tailer goroutine writes to a dead channel), and torn down on component shutdown.loki.EntryHandlerinkubetail.Targetso tailers route entries to the pipeline rather than the global handler.pipelineStagesin the CRD YAML.Design notes
prometheus.WrapRegistererWith.Test plan
pipelineStagesdrops/transforms entries correctlypipelineStagesbehaves identically to todayolderThanduration surfaces as a reconcile error, not a silent no-oppipelineStageson a live CRD restarts the pipeline without leaking goroutinesnode_filterworks correctly with pipelinesRefs: #4738
🤖 Generated with Claude Code