From 91267bcd75fbfe2569906583ce6fbb5a4b87dcb4 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Sun, 11 Jan 2026 16:55:26 +0200 Subject: [PATCH] chore: decouple scraping logic from vmagent code --- api/operator/v1beta1/common_scrapeparams.go | 75 +++ api/operator/v1beta1/vmagent_types.go | 93 +-- api/operator/v1beta1/zz_generated.deepcopy.go | 5 + config/examples/vmagent-full.yaml | 2 +- docs/api.md | 1 + docs/resources/vmagent.md | 8 +- .../operator/factory/build/defaults.go | 3 + .../factory/vmagent/collect_scrapes.go | 182 ------ .../operator/factory/vmagent/nodescrape.go | 18 +- .../factory/vmagent/nodescrape_test.go | 4 +- .../operator/factory/vmagent/objects.go | 570 ++++++++++++++++++ ...ollect_scrapes_test.go => objects_test.go} | 33 +- .../operator/factory/vmagent/podscrape.go | 34 +- .../factory/vmagent/podscrape_test.go | 4 +- .../operator/factory/vmagent/probe.go | 20 +- .../operator/factory/vmagent/probe_test.go | 4 +- .../operator/factory/vmagent/scrapeconfig.go | 10 +- .../factory/vmagent/scrapeconfig_test.go | 3 +- .../operator/factory/vmagent/servicescrape.go | 20 +- .../factory/vmagent/servicescrape_test.go | 7 +- .../operator/factory/vmagent/staticscrape.go | 12 +- .../factory/vmagent/staticscrape_test.go | 3 +- .../operator/factory/vmagent/vmagent.go | 16 +- .../factory/vmagent/vmagent_scrapeconfig.go | 487 +-------------- .../operator/factory/vmagent/vmagent_test.go | 34 +- .../operator/factory/vmdistributed/vmagent.go | 3 +- .../controller/operator/vmagent_controller.go | 2 +- .../operator/vmnodescrape_controller.go | 12 +- .../operator/vmpodscrape_controller.go | 7 +- .../controller/operator/vmprobe_controller.go | 10 +- .../operator/vmscrapeconfig_controller.go | 10 +- .../operator/vmservicescrape_controller.go | 10 +- .../operator/vmstaticscrape_controller.go | 10 +- test/e2e/vmagent_test.go | 2 +- 34 files changed, 871 insertions(+), 843 deletions(-) delete mode 100644 internal/controller/operator/factory/vmagent/collect_scrapes.go create mode 100644 internal/controller/operator/factory/vmagent/objects.go rename internal/controller/operator/factory/vmagent/{collect_scrapes_test.go => objects_test.go} (94%) diff --git a/api/operator/v1beta1/common_scrapeparams.go b/api/operator/v1beta1/common_scrapeparams.go index 0e4374b9d..82637f270 100644 --- a/api/operator/v1beta1/common_scrapeparams.go +++ b/api/operator/v1beta1/common_scrapeparams.go @@ -8,6 +8,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" ) // AttachMetadata configures metadata attachment @@ -539,6 +541,11 @@ type CommonScrapeParams struct { // it doesn't affect metrics ingested directly by push API's // +optional ExternalLabels map[string]string `json:"externalLabels,omitempty"` + // IngestOnlyMode switches vmagent into unmanaged mode + // it disables any config generation for scraping + // Currently it prevents vmagent from managing tls and auth options for remote write + // +optional + IngestOnlyMode *bool `json:"ingestOnlyMode,omitempty"` // EnableKubernetesAPISelectors instructs vmagent to use CRD scrape objects spec.selectors for // Kubernetes API list and watch requests. // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#list-and-watch-filtering @@ -548,8 +555,58 @@ type CommonScrapeParams struct { CommonScrapeSecurityEnforcements `json:",inline,omitempty"` } +func (cr *CommonScrapeParams) externalLabelName() string { + // Use "prometheus" external label name by default if field is missing. + // in case of migration from prometheus to vmagent, it helps to have same labels + // Do not add external label if field is set to empty string. + if cr.ExternalLabelName != nil { + return *cr.ExternalLabelName + } + if cr.VMAgentExternalLabelName != nil { + return *cr.VMAgentExternalLabelName + } + return "prometheus" +} + +func (cr *CommonScrapeParams) externalLabels(defaultLabelValue string) map[string]string { + m := map[string]string{} + + prometheusExternalLabelName := cr.externalLabelName() + if len(prometheusExternalLabelName) > 0 { + m[prometheusExternalLabelName] = defaultLabelValue + } + + for n, v := range cr.ExternalLabels { + m[n] = v + } + return m +} + +// ScrapeSelectors gets object and namespace sepectors +func (cr *CommonScrapeParams) ScrapeSelectors(scrape client.Object) (*metav1.LabelSelector, *metav1.LabelSelector) { + switch s := scrape.(type) { + case *VMNodeScrape: + return cr.NodeScrapeSelector, cr.NodeScrapeNamespaceSelector + case *VMServiceScrape: + return cr.ServiceScrapeSelector, cr.ServiceScrapeNamespaceSelector + case *VMPodScrape: + return cr.PodScrapeSelector, cr.PodScrapeNamespaceSelector + case *VMProbe: + return cr.ProbeSelector, cr.ProbeNamespaceSelector + case *VMStaticScrape: + return cr.StaticScrapeSelector, cr.StaticScrapeNamespaceSelector + case *VMScrapeConfig: + return cr.ScrapeConfigSelector, cr.ScrapeConfigNamespaceSelector + default: + panic(fmt.Sprintf("BUG: scrape kind %T is not supported", s)) + } +} + // isUnmanaged checks if object should managed any config objects func (cr *CommonScrapeParams) isUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.NodeScrapeSelector == nil && cr.NodeScrapeNamespaceSelector == nil && cr.ServiceScrapeSelector == nil && cr.ServiceScrapeNamespaceSelector == nil && @@ -561,36 +618,54 @@ func (cr *CommonScrapeParams) isUnmanaged() bool { // isNodeScrapeUnmanaged checks if scraping agent should managed any VMNodeScrape objects func (cr *CommonScrapeParams) isNodeScrapeUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.NodeScrapeSelector == nil && cr.NodeScrapeNamespaceSelector == nil } // isServiceScrapeUnmanaged checks if scraping agent should managed any VMServiceScrape objects func (cr *CommonScrapeParams) isServiceScrapeUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.ServiceScrapeSelector == nil && cr.ServiceScrapeNamespaceSelector == nil } // isUnmanaged checks if scraping agent should managed any VMPodScrape objects func (cr *CommonScrapeParams) isPodScrapeUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.PodScrapeSelector == nil && cr.PodScrapeNamespaceSelector == nil } // isProbeUnmanaged checks if scraping agent should managed any VMProbe objects func (cr *CommonScrapeParams) isProbeUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.ProbeSelector == nil && cr.ProbeNamespaceSelector == nil } // isStaticScrapeUnmanaged checks if scraping agent should managed any VMStaticScrape objects func (cr *CommonScrapeParams) isStaticScrapeUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.StaticScrapeSelector == nil && cr.StaticScrapeNamespaceSelector == nil } // isScrapeConfigUnmanaged checks if scraping agent should managed any VMScrapeConfig objects func (cr *CommonScrapeParams) isScrapeConfigUnmanaged() bool { + if ptr.Deref(cr.IngestOnlyMode, false) { + return true + } return !cr.SelectAllByDefault && cr.ScrapeConfigSelector == nil && cr.ScrapeConfigNamespaceSelector == nil } diff --git a/api/operator/v1beta1/vmagent_types.go b/api/operator/v1beta1/vmagent_types.go index 5aa1374aa..5f7c261ac 100644 --- a/api/operator/v1beta1/vmagent_types.go +++ b/api/operator/v1beta1/vmagent_types.go @@ -34,14 +34,12 @@ type VMAgentSpec struct { // +optional // +kubebuilder:validation:Enum=default;json LogFormat string `json:"logFormat,omitempty"` - // APIServerConfig allows specifying a host and auth methods to access apiserver. // If left empty, VMAgent is assumed to run inside of the cluster // and will discover API servers automatically and use the pod's CA certificate // and bearer token file at /var/run/secrets/kubernetes.io/serviceaccount/. // +optional APIServerConfig *APIServerConfig `json:"apiServerConfig,omitempty"` - // RemoteWrite list of victoria metrics /some other remote write system // for vm it must looks like: http://victoria-metrics-single:8428/api/v1/write // or for cluster different url @@ -105,11 +103,6 @@ type VMAgentSpec struct { // ClaimTemplates allows adding additional VolumeClaimTemplates for VMAgent in StatefulMode ClaimTemplates []corev1.PersistentVolumeClaim `json:"claimTemplates,omitempty"` - // IngestOnlyMode switches vmagent into unmanaged mode - // it disables any config generation for scraping - // Currently it prevents vmagent from managing tls and auth options for remote write - // +optional - IngestOnlyMode bool `json:"ingestOnlyMode,omitempty"` // License allows to configure license key to be used for enterprise features. // Using license key is supported starting from VictoriaMetrics v1.94.0. @@ -218,6 +211,11 @@ func (cr *VMAgent) GetShardCount() int { return *cr.Spec.ShardCount } +// ExternalLabels returns external labels for scraping +func (cr *VMAgent) ExternalLabels() map[string]string { + return cr.Spec.externalLabels(fmt.Sprintf("%s/%s", cr.Namespace, cr.Name)) +} + // GetReloadURL implements reloadable interface func (cr *VMAgent) GetReloadURL(host string) string { return BuildLocalURL(reloadAuthKey, host, cr.Spec.Port, reloadPath, cr.Spec.ExtraArgs) @@ -567,7 +565,7 @@ func (cr *VMAgent) IsOwnsServiceAccount() bool { } func (cr *VMAgent) GetClusterRoleName() string { - return fmt.Sprintf("monitoring:%s:vmagent-%s", cr.Namespace, cr.Name) + return fmt.Sprintf("monitoring:%s:%s", cr.Namespace, cr.PrefixedName()) } // AsURL - returns url for http access @@ -612,67 +610,36 @@ func (*VMAgent) ProbeNeedLiveness() bool { return true } -// IsUnmanaged checks if object should managed any config objects -func (cr *VMAgent) IsUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true - } - return cr.Spec.isUnmanaged() -} - -// IsNodeScrapeUnmanaged checks if vmagent should managed any VMNodeScrape objects -func (cr *VMAgent) IsNodeScrapeUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true - } - return cr.Spec.isNodeScrapeUnmanaged() +// ScrapeSelectors gets object and namespace sepectors +func (cr *VMAgent) ScrapeSelectors(scrape client.Object) (*metav1.LabelSelector, *metav1.LabelSelector) { + return cr.Spec.ScrapeSelectors(scrape) } -// IsServiceScrapeUnmanaged checks if vmagent should managed any VMServiceScrape objects -func (cr *VMAgent) IsServiceScrapeUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true - } - return cr.Spec.isServiceScrapeUnmanaged() -} - -// IsUnmanaged checks if vmagent should managed any VMPodScrape objects -func (cr *VMAgent) IsPodScrapeUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { +// IsUnmanaged checks if object should managed any config objects +func (cr *VMAgent) IsUnmanaged(scrape client.Object) bool { + if !cr.DeletionTimestamp.IsZero() || cr.Spec.ParsingError != "" { return true } - return cr.Spec.isPodScrapeUnmanaged() -} - -// IsProbeUnmanaged checks if vmagent should managed any VMProbe objects -func (cr *VMAgent) IsProbeUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true + if scrape == nil { + return cr.Spec.isUnmanaged() + } + switch s := scrape.(type) { + case *VMNodeScrape: + return cr.Spec.DaemonSetMode || cr.Spec.isNodeScrapeUnmanaged() + case *VMServiceScrape: + return cr.Spec.DaemonSetMode || cr.Spec.isServiceScrapeUnmanaged() + case *VMPodScrape: + return cr.Spec.isPodScrapeUnmanaged() + case *VMProbe: + return cr.Spec.DaemonSetMode || cr.Spec.isProbeUnmanaged() + case *VMStaticScrape: + return cr.Spec.DaemonSetMode || cr.Spec.isStaticScrapeUnmanaged() + case *VMScrapeConfig: + return cr.Spec.DaemonSetMode || cr.Spec.isScrapeConfigUnmanaged() + default: + panic(fmt.Sprintf("BUG: scrape kind %T is not supported", s)) } - return cr.Spec.isProbeUnmanaged() -} -// IsStaticScrapeUnmanaged checks if vmagent should managed any VMStaticScrape objects -func (cr *VMAgent) IsStaticScrapeUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true - } - return cr.Spec.isStaticScrapeUnmanaged() -} - -// IsScrapeConfigUnmanaged checks if vmagent should managed any VMScrapeConfig objects -func (cr *VMAgent) IsScrapeConfigUnmanaged() bool { - // fast path - if cr.Spec.IngestOnlyMode { - return true - } - return cr.Spec.isScrapeConfigUnmanaged() } // LastAppliedSpecAsPatch return last applied cluster spec as patch annotation diff --git a/api/operator/v1beta1/zz_generated.deepcopy.go b/api/operator/v1beta1/zz_generated.deepcopy.go index 7f1d3651d..ea66d4eda 100644 --- a/api/operator/v1beta1/zz_generated.deepcopy.go +++ b/api/operator/v1beta1/zz_generated.deepcopy.go @@ -801,6 +801,11 @@ func (in *CommonScrapeParams) DeepCopyInto(out *CommonScrapeParams) { (*out)[key] = val } } + if in.IngestOnlyMode != nil { + in, out := &in.IngestOnlyMode, &out.IngestOnlyMode + *out = new(bool) + **out = **in + } out.CommonScrapeSecurityEnforcements = in.CommonScrapeSecurityEnforcements } diff --git a/config/examples/vmagent-full.yaml b/config/examples/vmagent-full.yaml index 9cfde39ec..ea4d061ea 100644 --- a/config/examples/vmagent-full.yaml +++ b/config/examples/vmagent-full.yaml @@ -83,7 +83,7 @@ spec: repository: victoriametrics/vmagent tag: v1.46.0 pullPolicy: IfNotPresent - vmAgentExternalLabelName: vmagent + externalLabelName: vmagent scrapeInterval: 30s externalLabels: key: value diff --git a/docs/api.md b/docs/api.md index 21dc40fb6..7a791a71f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1757,6 +1757,7 @@ Appears in: [VMAgentSpec](#vmagentspec) | globalScrapeMetricRelabelConfigs#
_[RelabelConfig](#relabelconfig) array_ | _(Optional)_
GlobalScrapeMetricRelabelConfigs is a global metric relabel configuration, which is applied to each scrape job. | | globalScrapeRelabelConfigs#
_[RelabelConfig](#relabelconfig) array_ | _(Optional)_
GlobalScrapeRelabelConfigs is a global relabel configuration, which is applied to each samples of each scrape job during service discovery. | | ignoreNamespaceSelectors#
_boolean_ | _(Optional)_
IgnoreNamespaceSelectors if set to true will ignore NamespaceSelector settings from
scrape objects, and they will only discover endpoints
within their current namespace. Defaults to false. | +| ingestOnlyMode#
_boolean_ | _(Optional)_
IngestOnlyMode switches vmagent into unmanaged mode
it disables any config generation for scraping
Currently it prevents vmagent from managing tls and auth options for remote write | | inlineScrapeConfig#
_string_ | _(Optional)_
InlineScrapeConfig As scrape configs are appended, the user is responsible to make sure it
is valid. Note that using this feature may expose the possibility to
break upgrades of VMAgent. It is advised to review VMAgent release
notes to ensure that no incompatible scrape configs are going to break
VMAgent after the upgrade.
it should be defined as single yaml file.
inlineScrapeConfig: \|
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"] | | maxScrapeInterval#
_string_ | _(Required)_
MaxScrapeInterval allows limiting maximum scrape interval for VMServiceScrape, VMPodScrape and other scrapes
If interval is higher than defined limit, `maxScrapeInterval` will be used. | | minScrapeInterval#
_string_ | _(Required)_
MinScrapeInterval allows limiting minimal scrape interval for VMServiceScrape, VMPodScrape and other scrapes
If interval is lower than defined limit, `minScrapeInterval` will be used. | diff --git a/docs/resources/vmagent.md b/docs/resources/vmagent.md index 07fce71e5..af74c7449 100644 --- a/docs/resources/vmagent.md +++ b/docs/resources/vmagent.md @@ -193,7 +193,7 @@ metadata: spec: # ... selectAllByDefault: true - vmAgentExternalLabelName: vmagent_ha + externalLabelName: vmagent_ha remoteWrite: - url: "http://vmsingle-example.default.svc:8428/api/v1/write" scrapeInterval: 30s @@ -225,7 +225,7 @@ metadata: spec: # ... selectAllByDefault: true - vmAgentExternalLabelName: vmagent_ha + externalLabelName: vmagent_ha remoteWrite: - url: "http://vmsingle-example.default.svc:8428/api/v1/write" scrapeInterval: 30s @@ -259,7 +259,7 @@ metadata: spec: # ... selectAllByDefault: true - vmAgentExternalLabelName: vmagent_ha + externalLabelName: vmagent_ha remoteWrite: - url: "http://vmsingle-example.default.svc:8428/api/v1/write" # Replication: @@ -785,7 +785,7 @@ spec: scrapeTimeout: 10s externalLabels: cluster: my-cluster - vmAgentExternalLabelName: example + externalLabelName: example remoteWrite: - url: "http://vmsingle-example.default.svc:8428/api/v1/write" inlineRelabelConfig: diff --git a/internal/controller/operator/factory/build/defaults.go b/internal/controller/operator/factory/build/defaults.go index 3fe338372..f7dfef965 100644 --- a/internal/controller/operator/factory/build/defaults.go +++ b/internal/controller/operator/factory/build/defaults.go @@ -212,6 +212,9 @@ func addVMAgentDefaults(objI any) { cv := config.ApplicationDefaults(c.VMAgentDefault) addDefaultsToCommonParams(&cr.Spec.CommonDefaultableParams, cr.Spec.License, &cv) addDefaultsToConfigReloader(&cr.Spec.CommonConfigReloaderParams, ptr.Deref(cr.Spec.UseDefaultResources, false)) + if cr.Spec.IngestOnlyMode == nil { + cr.Spec.IngestOnlyMode = ptr.To(false) + } } func addVLAgentDefaults(objI any) { diff --git a/internal/controller/operator/factory/vmagent/collect_scrapes.go b/internal/controller/operator/factory/vmagent/collect_scrapes.go deleted file mode 100644 index 51819e359..000000000 --- a/internal/controller/operator/factory/vmagent/collect_scrapes.go +++ /dev/null @@ -1,182 +0,0 @@ -package vmagent - -import ( - "context" - "fmt" - - "sigs.k8s.io/controller-runtime/pkg/client" - - vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" - "github.com/VictoriaMetrics/operator/internal/config" - "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/k8stools" - "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger" -) - -func selectScrapeConfigs(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMScrapeConfig, []string, error) { - if cr.Spec.DaemonSetMode { - return nil, nil, nil - } - - var selectedConfigs []*vmv1beta1.VMScrapeConfig - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.ScrapeConfigNamespaceSelector, - ObjectSelector: cr.Spec.ScrapeConfigSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMScrapeConfigList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - selectedConfigs = append(selectedConfigs, item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} - -func selectPodScrapes(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMPodScrape, []string, error) { - var selectedConfigs []*vmv1beta1.VMPodScrape - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.PodScrapeNamespaceSelector, - ObjectSelector: cr.Spec.PodScrapeSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMPodScrapeList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - selectedConfigs = append(selectedConfigs, item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} - -func selectProbes(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMProbe, []string, error) { - if cr.Spec.DaemonSetMode { - return nil, nil, nil - } - var selectedConfigs []*vmv1beta1.VMProbe - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.ProbeNamespaceSelector, - ObjectSelector: cr.Spec.ProbeSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMProbeList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - selectedConfigs = append(selectedConfigs, item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} - -func selectNodeScrapes(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMNodeScrape, []string, error) { - if cr.Spec.DaemonSetMode { - return nil, nil, nil - } - if !config.IsClusterWideAccessAllowed() && cr.IsOwnsServiceAccount() { - logger.WithContext(ctx).Info("cannot use VMNodeScrape at operator in single namespace mode with default permissions." + - " Create ServiceAccount for VMAgent manually if needed. Skipping config generation for it") - return nil, nil, nil - } - - var selectedConfigs []*vmv1beta1.VMNodeScrape - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.NodeScrapeNamespaceSelector, - ObjectSelector: cr.Spec.NodeScrapeSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMNodeScrapeList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - selectedConfigs = append(selectedConfigs, item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} - -func selectStaticScrapes(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMStaticScrape, []string, error) { - if cr.Spec.DaemonSetMode { - return nil, nil, nil - } - var selectedConfigs []*vmv1beta1.VMStaticScrape - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.StaticScrapeNamespaceSelector, - ObjectSelector: cr.Spec.StaticScrapeSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMStaticScrapeList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - selectedConfigs = append(selectedConfigs, item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} - -func selectServiceScrapes(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) ([]*vmv1beta1.VMServiceScrape, []string, error) { - if cr.Spec.DaemonSetMode { - return nil, nil, nil - } - - var selectedConfigs []*vmv1beta1.VMServiceScrape - var nsn []string - opts := &k8stools.SelectorOpts{ - SelectAll: cr.Spec.SelectAllByDefault, - NamespaceSelector: cr.Spec.ServiceScrapeNamespaceSelector, - ObjectSelector: cr.Spec.ServiceScrapeSelector, - DefaultNamespace: cr.Namespace, - } - if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMServiceScrapeList) { - for i := range list.Items { - item := &list.Items[i] - if !item.DeletionTimestamp.IsZero() { - continue - } - rclient.Scheme().Default(item) - nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) - selectedConfigs = append(selectedConfigs, item) - } - }); err != nil { - return nil, nil, err - } - return selectedConfigs, nsn, nil -} diff --git a/internal/controller/operator/factory/vmagent/nodescrape.go b/internal/controller/operator/factory/vmagent/nodescrape.go index 25fe9d7f2..f73444e5b 100644 --- a/internal/controller/operator/factory/vmagent/nodescrape.go +++ b/internal/controller/operator/factory/vmagent/nodescrape.go @@ -12,13 +12,13 @@ import ( func generateNodeScrapeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, + pos *parsedObjects, sc *vmv1beta1.VMNodeScrape, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - apiserverConfig := cr.Spec.APIServerConfig - se := cr.Spec.CommonScrapeSecurityEnforcements + se := &sp.CommonScrapeSecurityEnforcements cfg := yaml.MapSlice{ { Key: "job_name", @@ -26,18 +26,18 @@ func generateNodeScrapeConfig( }, } - scrapeClass := getScrapeClass(sc.Spec.ScrapeClassName, cr) + scrapeClass := getScrapeClass(sc.Spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&sc.Spec.EndpointAuth, scrapeClass) mergeEndpointRelabelingsWithScrapeClass(&sc.Spec.EndpointRelabelings, scrapeClass) } - setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, sp) k8sSDOpts := generateK8SSDConfigOptions{ - shouldAddSelectors: cr.Spec.EnableKubernetesAPISelectors, + shouldAddSelectors: sp.EnableKubernetesAPISelectors, selectors: sc.Spec.Selector, - apiServerConfig: apiserverConfig, + apiServerConfig: pos.APIServerConfig, role: k8sSDRoleNode, namespace: sc.Namespace, } @@ -51,7 +51,7 @@ func generateNodeScrapeConfig( var relabelings []yaml.MapSlice - skipRelabelSelectors := cr.Spec.EnableKubernetesAPISelectors + skipRelabelSelectors := sp.EnableKubernetesAPISelectors relabelings = addSelectorToRelabelingFor(relabelings, "node", spec.Selector, skipRelabelSelectors) // Add __address__ as internalIP and pod and service labels into proper labels. relabelings = append(relabelings, []yaml.MapSlice{ @@ -102,7 +102,7 @@ func generateNodeScrapeConfig( for _, c := range spec.RelabelConfigs { relabelings = append(relabelings, generateRelabelConfig(c)) } - for _, trc := range cr.Spec.NodeScrapeRelabelTemplate { + for _, trc := range sp.NodeScrapeRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } diff --git a/internal/controller/operator/factory/vmagent/nodescrape_test.go b/internal/controller/operator/factory/vmagent/nodescrape_test.go index 2f512f0c8..66db773a6 100644 --- a/internal/controller/operator/factory/vmagent/nodescrape_test.go +++ b/internal/controller/operator/factory/vmagent/nodescrape_test.go @@ -27,7 +27,9 @@ func Test_generateNodeScrapeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generateNodeScrapeConfig(ctx, o.cr, o.sc, ac) + pos := &parsedObjects{Namespace: o.cr.Namespace} + sp := &o.cr.Spec.CommonScrapeParams + got, err := generateNodeScrapeConfig(ctx, sp, pos, o.sc, ac) if err != nil { t.Errorf("cannot generate NodeScrapeConfig, err: %e", err) return diff --git a/internal/controller/operator/factory/vmagent/objects.go b/internal/controller/operator/factory/vmagent/objects.go new file mode 100644 index 000000000..c7eb22741 --- /dev/null +++ b/internal/controller/operator/factory/vmagent/objects.go @@ -0,0 +1,570 @@ +package vmagent + +import ( + "context" + "fmt" + "reflect" + + "gopkg.in/yaml.v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" + "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/build" + "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/k8stools" + "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/logger" + "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/reconcile" +) + +type parsedObjects struct { + APIServerConfig *vmv1beta1.APIServerConfig + Namespace string + ExternalLabels map[string]string + MustUseNodeSelector bool + HasClusterWideAccess bool + IgnoreNamespaceSelectors bool + serviceScrapes *build.ChildObjects[*vmv1beta1.VMServiceScrape] + podScrapes *build.ChildObjects[*vmv1beta1.VMPodScrape] + staticScrapes *build.ChildObjects[*vmv1beta1.VMStaticScrape] + nodeScrapes *build.ChildObjects[*vmv1beta1.VMNodeScrape] + probes *build.ChildObjects[*vmv1beta1.VMProbe] + scrapeConfigs *build.ChildObjects[*vmv1beta1.VMScrapeConfig] +} + +func (pos *parsedObjects) updateMetrics(ctx context.Context) { + pos.serviceScrapes.UpdateMetrics(ctx) + pos.podScrapes.UpdateMetrics(ctx) + pos.staticScrapes.UpdateMetrics(ctx) + pos.nodeScrapes.UpdateMetrics(ctx) + pos.probes.UpdateMetrics(ctx) + pos.scrapeConfigs.UpdateMetrics(ctx) +} + +func (pos *parsedObjects) init(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + if err := pos.selectPodScrapes(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting PodScrapes failed: %w", err) + } + if err := pos.selectServiceScrapes(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting ServiceScrapes failed: %w", err) + } + if err := pos.selectProbes(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting VMProbes failed: %w", err) + } + if err := pos.selectNodeScrapes(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting VMNodeScrapes failed: %w", err) + } + if err := pos.selectStaticScrapes(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting VMStaticScrapes failed: %w", err) + } + if err := pos.selectScrapeConfigs(ctx, rclient, sp); err != nil { + return fmt.Errorf("selecting ScrapeConfigs failed: %w", err) + } + return nil +} + +func (pos *parsedObjects) selectScrapeConfigs(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMScrapeConfig + var nsn []string + if !pos.MustUseNodeSelector { + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.ScrapeConfigNamespaceSelector, + ObjectSelector: sp.ScrapeConfigSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMScrapeConfigList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + selectedConfigs = append(selectedConfigs, item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + } + }); err != nil { + return err + } + } + pos.scrapeConfigs = build.NewChildObjects("vmscrapeconfig", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) selectPodScrapes(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMPodScrape + var nsn []string + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.PodScrapeNamespaceSelector, + ObjectSelector: sp.PodScrapeSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMPodScrapeList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + selectedConfigs = append(selectedConfigs, item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + } + }); err != nil { + return err + } + pos.podScrapes = build.NewChildObjects("vmpodscrape", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) selectProbes(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMProbe + var nsn []string + if !pos.MustUseNodeSelector { + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.ProbeNamespaceSelector, + ObjectSelector: sp.ProbeSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMProbeList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + selectedConfigs = append(selectedConfigs, item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + } + }); err != nil { + return err + } + } + pos.probes = build.NewChildObjects("vmprobe", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) selectNodeScrapes(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMNodeScrape + var nsn []string + if !pos.MustUseNodeSelector { + if pos.HasClusterWideAccess { + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.NodeScrapeNamespaceSelector, + ObjectSelector: sp.NodeScrapeSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMNodeScrapeList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + selectedConfigs = append(selectedConfigs, item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + + } + }); err != nil { + return err + } + } else { + logger.WithContext(ctx).Info("cannot use VMNodeScrape at operator in single namespace mode with default permissions." + + " Create ServiceAccount manually if needed. Skipping config generation for it") + } + } + pos.nodeScrapes = build.NewChildObjects("vmnodescrape", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) selectStaticScrapes(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMStaticScrape + var nsn []string + if !pos.MustUseNodeSelector { + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.StaticScrapeNamespaceSelector, + ObjectSelector: sp.StaticScrapeSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMStaticScrapeList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + selectedConfigs = append(selectedConfigs, item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + } + }); err != nil { + return err + } + } + pos.staticScrapes = build.NewChildObjects("vmstaticscrape", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) selectServiceScrapes(ctx context.Context, rclient client.Client, sp *vmv1beta1.CommonScrapeParams) error { + var selectedConfigs []*vmv1beta1.VMServiceScrape + var nsn []string + if !pos.MustUseNodeSelector { + opts := &k8stools.SelectorOpts{ + SelectAll: sp.SelectAllByDefault, + NamespaceSelector: sp.ServiceScrapeNamespaceSelector, + ObjectSelector: sp.ServiceScrapeSelector, + DefaultNamespace: pos.Namespace, + } + if err := k8stools.VisitSelected(ctx, rclient, opts, func(list *vmv1beta1.VMServiceScrapeList) { + for i := range list.Items { + item := &list.Items[i] + if !item.DeletionTimestamp.IsZero() { + continue + } + rclient.Scheme().Default(item) + nsn = append(nsn, fmt.Sprintf("%s/%s", item.Namespace, item.Name)) + selectedConfigs = append(selectedConfigs, item) + } + }); err != nil { + return err + } + } + pos.serviceScrapes = build.NewChildObjects("vmservicescrape", selectedConfigs, nsn) + return nil +} + +func (pos *parsedObjects) validateObjects(sp *vmv1beta1.CommonScrapeParams) { + pos.serviceScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMServiceScrape) error { + if sp.ArbitraryFSAccessThroughSMs.Deny { + for _, ep := range sc.Spec.Endpoints { + if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { + return err + } + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) + pos.podScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMPodScrape) error { + if sp.ArbitraryFSAccessThroughSMs.Deny { + for _, ep := range sc.Spec.PodMetricsEndpoints { + if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { + return err + } + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) + pos.staticScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMStaticScrape) error { + if sp.ArbitraryFSAccessThroughSMs.Deny { + for _, ep := range sc.Spec.TargetEndpoints { + if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { + return err + } + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) + pos.nodeScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMNodeScrape) error { + if sp.ArbitraryFSAccessThroughSMs.Deny { + if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { + return err + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) + pos.probes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMProbe) error { + if sp.ArbitraryFSAccessThroughSMs.Deny { + if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { + return err + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) + pos.scrapeConfigs.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMScrapeConfig) error { + // TODO: @f41gh7 validate per configuration FS access + if sp.ArbitraryFSAccessThroughSMs.Deny { + if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { + return err + } + } + if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, sp); err != nil { + return err + } + if !build.MustSkipRuntimeValidation { + return sc.Validate() + } + return nil + }) +} + +// updateStatusesForScrapeObjects updates status of either selected childObject or all child objects +func (pos *parsedObjects) updateStatusesForScrapeObjects(ctx context.Context, rclient client.Client, parentName string, childObject client.Object) error { + pos.updateMetrics(ctx) + if childObject != nil && !reflect.ValueOf(childObject).IsNil() { + // fast path + switch t := childObject.(type) { + case *vmv1beta1.VMStaticScrape: + if o := pos.staticScrapes.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMStaticScrape{o}) + } + case *vmv1beta1.VMProbe: + if o := pos.probes.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMProbe{o}) + } + case *vmv1beta1.VMScrapeConfig: + if o := pos.scrapeConfigs.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMScrapeConfig{o}) + } + case *vmv1beta1.VMNodeScrape: + if o := pos.nodeScrapes.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMNodeScrape{o}) + } + case *vmv1beta1.VMPodScrape: + if o := pos.podScrapes.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMPodScrape{o}) + } + case *vmv1beta1.VMServiceScrape: + if o := pos.serviceScrapes.Get(t); o != nil { + return reconcile.StatusForChildObjects(ctx, rclient, parentName, []*vmv1beta1.VMServiceScrape{o}) + } + } + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.serviceScrapes.All()); err != nil { + return fmt.Errorf("cannot update statuses for service scrape objects: %w", err) + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.podScrapes.All()); err != nil { + return fmt.Errorf("cannot update statuses for pod scrape objects: %w", err) + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.nodeScrapes.All()); err != nil { + return fmt.Errorf("cannot update statuses for node scrape objects: %w", err) + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.probes.All()); err != nil { + return fmt.Errorf("cannot update statuses for probe scrape objects: %w", err) + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.staticScrapes.All()); err != nil { + return fmt.Errorf("cannot update statuses for static scrape objects: %w", err) + } + if err := reconcile.StatusForChildObjects(ctx, rclient, parentName, pos.scrapeConfigs.All()); err != nil { + return fmt.Errorf("cannot update statuses for scrapeconfig scrape objects: %w", err) + } + return nil +} + +// generateConfig generates yaml scrape configuration from collected scrape objects +func (pos *parsedObjects) generateConfig(ctx context.Context, sp *vmv1beta1.CommonScrapeParams, ac *build.AssetsCache) ([]byte, error) { + var additionalScrapeConfigs []byte + if sp.AdditionalScrapeConfigs != nil { + sc, err := ac.LoadKeyFromSecret(pos.Namespace, sp.AdditionalScrapeConfigs) + if err != nil { + return nil, fmt.Errorf("loading additional scrape configs from Secret failed: %w", err) + } + additionalScrapeConfigs = []byte(sc) + } + cfg := yaml.MapSlice{} + scrapeInterval := defaultScrapeInterval + if sp.ScrapeInterval != "" { + scrapeInterval = sp.ScrapeInterval + } + globalItems := yaml.MapSlice{ + {Key: "scrape_interval", Value: scrapeInterval}, + {Key: "external_labels", Value: stringMapToMapSlice(pos.ExternalLabels)}, + } + + if sp.SampleLimit > 0 { + globalItems = append(globalItems, yaml.MapItem{ + Key: "sample_limit", + Value: sp.SampleLimit, + }) + } + + if sp.ScrapeTimeout != "" { + globalItems = append(globalItems, yaml.MapItem{ + Key: "scrape_timeout", + Value: sp.ScrapeTimeout, + }) + } + + if len(sp.GlobalScrapeMetricRelabelConfigs) > 0 { + globalItems = append(globalItems, yaml.MapItem{ + Key: "metric_relabel_configs", + Value: sp.GlobalScrapeMetricRelabelConfigs, + }) + } + if len(sp.GlobalScrapeRelabelConfigs) > 0 { + globalItems = append(globalItems, yaml.MapItem{ + Key: "relabel_configs", + Value: sp.GlobalScrapeRelabelConfigs, + }) + } + + cfg = append(cfg, yaml.MapItem{Key: "global", Value: globalItems}) + + var scrapeConfigs []yaml.MapSlice + var err error + + err = pos.serviceScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMServiceScrape) error { + scrapeConfigsLen := len(scrapeConfigs) + for i, ep := range sc.Spec.Endpoints { + s, err := generateServiceScrapeConfig( + ctx, + sp, + pos, + sc, + ep, i, + ac, + ) + if err != nil { + scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] + return err + } + scrapeConfigs = append(scrapeConfigs, s) + } + return nil + }) + if err != nil { + return nil, err + } + + err = pos.podScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMPodScrape) error { + scrapeConfigsLen := len(scrapeConfigs) + for i, ep := range sc.Spec.PodMetricsEndpoints { + s, err := generatePodScrapeConfig( + ctx, + sp, + pos, + sc, ep, i, + ac, + ) + if err != nil { + scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] + return err + } + scrapeConfigs = append(scrapeConfigs, s) + } + return nil + }) + if err != nil { + return nil, err + } + + err = pos.probes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMProbe) error { + s, err := generateProbeConfig( + ctx, + sp, + pos, + sc, + ac, + ) + if err != nil { + return err + } + scrapeConfigs = append(scrapeConfigs, s) + return nil + }) + if err != nil { + return nil, err + } + + err = pos.nodeScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMNodeScrape) error { + s, err := generateNodeScrapeConfig( + ctx, + sp, + pos, + sc, + ac, + ) + if err != nil { + return err + } + scrapeConfigs = append(scrapeConfigs, s) + + return nil + }) + if err != nil { + return nil, err + } + + err = pos.staticScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMStaticScrape) error { + scrapeConfigsLen := len(scrapeConfigs) + for i, ep := range sc.Spec.TargetEndpoints { + s, err := generateStaticScrapeConfig( + ctx, + sp, + sc, + ep, i, + ac, + ) + if err != nil { + scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] + return err + } + scrapeConfigs = append(scrapeConfigs, s) + } + return nil + }) + if err != nil { + return nil, err + } + + err = pos.scrapeConfigs.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMScrapeConfig) error { + s, err := generateScrapeConfig( + ctx, + sp, + sc, + ac, + ) + if err != nil { + return err + } + scrapeConfigs = append(scrapeConfigs, s) + + return nil + }) + if err != nil { + return nil, err + } + + var additionalScrapeConfigsYaml []yaml.MapSlice + if err := yaml.Unmarshal(additionalScrapeConfigs, &additionalScrapeConfigsYaml); err != nil { + return nil, fmt.Errorf("unmarshalling additional scrape configs failed: %w", err) + } + + var inlineScrapeConfigsYaml []yaml.MapSlice + if len(sp.InlineScrapeConfig) > 0 { + if err := yaml.Unmarshal([]byte(sp.InlineScrapeConfig), &inlineScrapeConfigsYaml); err != nil { + return nil, fmt.Errorf("unmarshalling inline additional scrape configs failed: %w", err) + } + } + additionalScrapeConfigsYaml = append(additionalScrapeConfigsYaml, inlineScrapeConfigsYaml...) + cfg = append(cfg, yaml.MapItem{ + Key: "scrape_configs", + Value: append(scrapeConfigs, additionalScrapeConfigsYaml...), + }) + + return yaml.Marshal(cfg) +} diff --git a/internal/controller/operator/factory/vmagent/collect_scrapes_test.go b/internal/controller/operator/factory/vmagent/objects_test.go similarity index 94% rename from internal/controller/operator/factory/vmagent/collect_scrapes_test.go rename to internal/controller/operator/factory/vmagent/objects_test.go index 2ba32df18..d6bb7bc56 100644 --- a/internal/controller/operator/factory/vmagent/collect_scrapes_test.go +++ b/internal/controller/operator/factory/vmagent/objects_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -55,13 +56,11 @@ func TestSelectServiceMonitors(t *testing.T) { f := func(o opts) { t.Helper() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) - got, _, err := selectServiceScrapes(context.TODO(), o.cr, fclient) - if err != nil { - t.Errorf("SelectServiceScrapes() error = %v", err) - return - } + sp := &o.cr.Spec.CommonScrapeParams + pos := &parsedObjects{Namespace: o.cr.Namespace} + assert.NoError(t, pos.selectServiceScrapes(context.TODO(), fclient, sp)) gotNames := []string{} - for _, monitorName := range got { + for _, monitorName := range pos.serviceScrapes.All() { gotNames = append(gotNames, fmt.Sprintf("%s/%s", monitorName.Namespace, monitorName.Name)) } sort.Strings(gotNames) @@ -287,14 +286,12 @@ func TestSelectPodMonitors(t *testing.T) { } f := func(o opts) { fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) - got, _, err := selectPodScrapes(context.TODO(), o.cr, fclient) - if err != nil { - t.Errorf("SelectPodScrapes() error = %v", err) - return - } + sp := &o.cr.Spec.CommonScrapeParams + pos := &parsedObjects{Namespace: o.cr.Namespace} + assert.NoError(t, pos.selectPodScrapes(context.TODO(), fclient, sp)) var gotNames []string - for _, k := range got { + for _, k := range pos.podScrapes.All() { gotNames = append(gotNames, fmt.Sprintf("%s/%s", k.Namespace, k.Name)) } sort.Strings(gotNames) @@ -375,18 +372,16 @@ func TestSelectProbes(t *testing.T) { f := func(o opts) { t.Helper() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) - got, _, err := selectProbes(context.TODO(), o.cr, fclient) - if err != nil { - t.Errorf("SelectProbes() error = %v", err) - return - } + sp := &o.cr.Spec.CommonScrapeParams + pos := &parsedObjects{Namespace: o.cr.Namespace} + assert.NoError(t, pos.selectProbes(context.TODO(), fclient, sp)) var result []string - for _, k := range got { + for _, k := range pos.probes.All() { result = append(result, fmt.Sprintf("%s/%s", k.Namespace, k.Name)) } sort.Strings(result) if !reflect.DeepEqual(result, o.want) { - t.Errorf("SelectProbes(): %s", cmp.Diff(got, o.want)) + t.Errorf("SelectProbes(): %s", cmp.Diff(result, o.want)) } } diff --git a/internal/controller/operator/factory/vmagent/podscrape.go b/internal/controller/operator/factory/vmagent/podscrape.go index beaa520a9..83e37ca35 100644 --- a/internal/controller/operator/factory/vmagent/podscrape.go +++ b/internal/controller/operator/factory/vmagent/podscrape.go @@ -12,15 +12,15 @@ import ( func generatePodScrapeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, + pos *parsedObjects, sc *vmv1beta1.VMPodScrape, ep vmv1beta1.PodMetricsEndpoint, i int, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - apiserverConfig := cr.Spec.APIServerConfig - se := cr.Spec.CommonScrapeSecurityEnforcements + se := &sp.CommonScrapeSecurityEnforcements cfg := yaml.MapSlice{ { Key: "job_name", @@ -28,14 +28,14 @@ func generatePodScrapeConfig( }, } - scrapeClass := getScrapeClass(spec.ScrapeClassName, cr) + scrapeClass := getScrapeClass(spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&ep.EndpointAuth, scrapeClass) mergeEndpointRelabelingsWithScrapeClass(&ep.EndpointRelabelings, scrapeClass) mergeAttachMetadataWithScrapeClass(&ep.AttachMetadata, scrapeClass) } - selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.NamespaceSelector, sc.Namespace, se.IgnoreNamespaceSelectors) + selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.NamespaceSelector, sc.Namespace, pos.IgnoreNamespaceSelectors) if ep.AttachMetadata.Node == nil && sc.Spec.AttachMetadata.Node != nil { ep.AttachMetadata.Node = spec.AttachMetadata.Node } @@ -44,16 +44,14 @@ func generatePodScrapeConfig( } k8sSDOpts := generateK8SSDConfigOptions{ - namespaces: selectedNamespaces, - shouldAddSelectors: cr.Spec.EnableKubernetesAPISelectors, - selectors: spec.Selector, - apiServerConfig: apiserverConfig, - role: k8sSDRolePod, - attachMetadata: &ep.AttachMetadata, - namespace: sc.Namespace, - } - if cr.Spec.DaemonSetMode { - k8sSDOpts.mustUseNodeSelector = true + namespaces: selectedNamespaces, + shouldAddSelectors: sp.EnableKubernetesAPISelectors, + selectors: spec.Selector, + apiServerConfig: pos.APIServerConfig, + role: k8sSDRolePod, + attachMetadata: &ep.AttachMetadata, + namespace: sc.Namespace, + mustUseNodeSelector: pos.MustUseNodeSelector, } if c, err := generateK8SSDConfig(ac, k8sSDOpts); err != nil { return nil, err @@ -69,7 +67,7 @@ func generatePodScrapeConfig( ep.SeriesLimit = spec.SeriesLimit } - setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, sp) cfg = addCommonScrapeParamsTo(cfg, ep.EndpointScrapeParams, se) @@ -83,7 +81,7 @@ func generatePodScrapeConfig( }) } - skipRelabelSelectors := cr.Spec.EnableKubernetesAPISelectors + skipRelabelSelectors := sp.EnableKubernetesAPISelectors relabelings = addSelectorToRelabelingFor(relabelings, "pod", spec.Selector, skipRelabelSelectors) // Filter targets based on correct port for the endpoint. @@ -177,7 +175,7 @@ func generatePodScrapeConfig( for _, c := range ep.RelabelConfigs { relabelings = append(relabelings, generateRelabelConfig(c)) } - for _, trc := range cr.Spec.PodScrapeRelabelTemplate { + for _, trc := range sp.PodScrapeRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } // Because of security risks, whenever enforcedNamespaceLabel is set, we want to append it to the diff --git a/internal/controller/operator/factory/vmagent/podscrape_test.go b/internal/controller/operator/factory/vmagent/podscrape_test.go index 17f109b82..30131ce16 100644 --- a/internal/controller/operator/factory/vmagent/podscrape_test.go +++ b/internal/controller/operator/factory/vmagent/podscrape_test.go @@ -26,7 +26,9 @@ func Test_generatePodScrapeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(nil) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generatePodScrapeConfig(ctx, o.cr, o.sc, o.ep, 0, ac) + pos := &parsedObjects{Namespace: o.cr.Namespace} + sp := &o.cr.Spec.CommonScrapeParams + got, err := generatePodScrapeConfig(ctx, sp, pos, o.sc, o.ep, 0, ac) if err != nil { t.Errorf("cannot generate PodScrapeConfig, err: %e", err) return diff --git a/internal/controller/operator/factory/vmagent/probe.go b/internal/controller/operator/factory/vmagent/probe.go index 6cbbf3267..d9d85618a 100644 --- a/internal/controller/operator/factory/vmagent/probe.go +++ b/internal/controller/operator/factory/vmagent/probe.go @@ -12,13 +12,13 @@ import ( func generateProbeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, + pos *parsedObjects, sc *vmv1beta1.VMProbe, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - apiserverConfig := cr.Spec.APIServerConfig - se := cr.Spec.CommonScrapeSecurityEnforcements + se := &sp.CommonScrapeSecurityEnforcements cfg := yaml.MapSlice{ { Key: "job_name", @@ -26,7 +26,7 @@ func generateProbeConfig( }, } - scrapeClass := getScrapeClass(spec.ScrapeClassName, cr) + scrapeClass := getScrapeClass(spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&spec.EndpointAuth, scrapeClass) } @@ -47,7 +47,7 @@ func generateProbeConfig( spec.Scheme = spec.VMProberSpec.Scheme } - setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, sp) cfg = addCommonScrapeParamsTo(cfg, spec.EndpointScrapeParams, se) @@ -80,15 +80,15 @@ func generateProbeConfig( } if spec.Targets.Ingress != nil { - skipRelabelSelectors := cr.Spec.EnableKubernetesAPISelectors + skipRelabelSelectors := sp.EnableKubernetesAPISelectors relabelings = addSelectorToRelabelingFor(relabelings, "ingress", spec.Targets.Ingress.Selector, skipRelabelSelectors) - selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.Targets.Ingress.NamespaceSelector, sc.Namespace, se.IgnoreNamespaceSelectors) + selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.Targets.Ingress.NamespaceSelector, sc.Namespace, pos.IgnoreNamespaceSelectors) k8sSDOpts := generateK8SSDConfigOptions{ namespaces: selectedNamespaces, - shouldAddSelectors: cr.Spec.EnableKubernetesAPISelectors, + shouldAddSelectors: sp.EnableKubernetesAPISelectors, selectors: spec.Targets.Ingress.Selector, - apiServerConfig: apiserverConfig, + apiServerConfig: pos.APIServerConfig, role: k8sSDRoleIngress, namespace: sc.Namespace, } @@ -152,7 +152,7 @@ func generateProbeConfig( }, }...) - for _, trc := range cr.Spec.ProbeScrapeRelabelTemplate { + for _, trc := range sp.ProbeScrapeRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } // Because of security risks, whenever enforcedNamespaceLabel is set, we want to append it to the diff --git a/internal/controller/operator/factory/vmagent/probe_test.go b/internal/controller/operator/factory/vmagent/probe_test.go index 784750e35..124f68516 100644 --- a/internal/controller/operator/factory/vmagent/probe_test.go +++ b/internal/controller/operator/factory/vmagent/probe_test.go @@ -28,7 +28,9 @@ func Test_generateProbeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generateProbeConfig(ctx, o.cr, o.sc, ac) + pos := &parsedObjects{Namespace: o.cr.Namespace} + sp := &o.cr.Spec.CommonScrapeParams + got, err := generateProbeConfig(ctx, sp, pos, o.sc, ac) if err != nil { t.Errorf("cannot generate ProbeConfig, err: %e", err) return diff --git a/internal/controller/operator/factory/vmagent/scrapeconfig.go b/internal/controller/operator/factory/vmagent/scrapeconfig.go index 79c7d5f7f..a4f5ec82e 100644 --- a/internal/controller/operator/factory/vmagent/scrapeconfig.go +++ b/internal/controller/operator/factory/vmagent/scrapeconfig.go @@ -13,12 +13,12 @@ import ( func generateScrapeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, sc *vmv1beta1.VMScrapeConfig, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - se := cr.Spec.CommonScrapeSecurityEnforcements + se := &sp.CommonScrapeSecurityEnforcements jobName := fmt.Sprintf("scrapeConfig/%s/%s", sc.Namespace, sc.Name) cfg := yaml.MapSlice{ { @@ -27,13 +27,13 @@ func generateScrapeConfig( }, } - scrapeClass := getScrapeClass(spec.ScrapeClassName, cr) + scrapeClass := getScrapeClass(spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&spec.EndpointAuth, scrapeClass) mergeEndpointRelabelingsWithScrapeClass(&spec.EndpointRelabelings, scrapeClass) } - setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &spec.EndpointScrapeParams, sp) cfg = addCommonScrapeParamsTo(cfg, spec.EndpointScrapeParams, se) @@ -41,7 +41,7 @@ func generateScrapeConfig( for _, c := range spec.RelabelConfigs { relabelings = append(relabelings, generateRelabelConfig(c)) } - for _, trc := range cr.Spec.ScrapeConfigRelabelTemplate { + for _, trc := range sp.ScrapeConfigRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } // Because of security risks, whenever enforcedNamespaceLabel is set, we want to append it to the diff --git a/internal/controller/operator/factory/vmagent/scrapeconfig_test.go b/internal/controller/operator/factory/vmagent/scrapeconfig_test.go index 55c0ec5db..5dc75339f 100644 --- a/internal/controller/operator/factory/vmagent/scrapeconfig_test.go +++ b/internal/controller/operator/factory/vmagent/scrapeconfig_test.go @@ -29,7 +29,8 @@ func TestGenerateScrapeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generateScrapeConfig(ctx, o.cr, o.sc, ac) + sp := &o.cr.Spec.CommonScrapeParams + got, err := generateScrapeConfig(ctx, sp, o.sc, ac) if err != nil { t.Errorf("cannot execute generateScrapeConfig, err: %e", err) return diff --git a/internal/controller/operator/factory/vmagent/servicescrape.go b/internal/controller/operator/factory/vmagent/servicescrape.go index 8a86a9bb3..5fa7ffdf6 100644 --- a/internal/controller/operator/factory/vmagent/servicescrape.go +++ b/internal/controller/operator/factory/vmagent/servicescrape.go @@ -12,16 +12,16 @@ import ( func generateServiceScrapeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, + pos *parsedObjects, sc *vmv1beta1.VMServiceScrape, ep vmv1beta1.Endpoint, i int, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - apiserverConfig := cr.Spec.APIServerConfig - se := cr.Spec.CommonScrapeSecurityEnforcements - scrapeClass := getScrapeClass(spec.ScrapeClassName, cr) + se := &sp.CommonScrapeSecurityEnforcements + scrapeClass := getScrapeClass(spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&ep.EndpointAuth, scrapeClass) mergeEndpointRelabelingsWithScrapeClass(&ep.EndpointRelabelings, scrapeClass) @@ -39,7 +39,7 @@ func generateServiceScrapeConfig( spec.DiscoveryRole = k8sSDRoleEndpoints } - selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.NamespaceSelector, sc.Namespace, se.IgnoreNamespaceSelectors) + selectedNamespaces := getNamespacesFromNamespaceSelector(&spec.NamespaceSelector, sc.Namespace, pos.IgnoreNamespaceSelectors) if ep.AttachMetadata.Node == nil && spec.AttachMetadata.Node != nil { ep.AttachMetadata.Node = spec.AttachMetadata.Node } @@ -48,9 +48,9 @@ func generateServiceScrapeConfig( } k8sSDOpts := generateK8SSDConfigOptions{ namespaces: selectedNamespaces, - shouldAddSelectors: cr.Spec.EnableKubernetesAPISelectors, + shouldAddSelectors: sp.EnableKubernetesAPISelectors, selectors: spec.Selector, - apiServerConfig: apiserverConfig, + apiServerConfig: pos.APIServerConfig, role: spec.DiscoveryRole, attachMetadata: &ep.AttachMetadata, namespace: sc.Namespace, @@ -68,14 +68,14 @@ func generateServiceScrapeConfig( ep.SeriesLimit = spec.SeriesLimit } - setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, sp) cfg = addCommonScrapeParamsTo(cfg, ep.EndpointScrapeParams, se) var relabelings []yaml.MapSlice // Exact label matches. - skipRelabelSelectors := cr.Spec.EnableKubernetesAPISelectors + skipRelabelSelectors := sp.EnableKubernetesAPISelectors relabelings = addSelectorToRelabelingFor(relabelings, "service", spec.Selector, skipRelabelSelectors) // Filter targets based on correct port for the endpoint. @@ -243,7 +243,7 @@ func generateServiceScrapeConfig( relabelings = append(relabelings, generateRelabelConfig(c)) } - for _, trc := range cr.Spec.ServiceScrapeRelabelTemplate { + for _, trc := range sp.ServiceScrapeRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } diff --git a/internal/controller/operator/factory/vmagent/servicescrape_test.go b/internal/controller/operator/factory/vmagent/servicescrape_test.go index 5bb512aed..931694132 100644 --- a/internal/controller/operator/factory/vmagent/servicescrape_test.go +++ b/internal/controller/operator/factory/vmagent/servicescrape_test.go @@ -29,7 +29,12 @@ func Test_generateServiceScrapeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generateServiceScrapeConfig(ctx, o.cr, o.sc, o.sc.Spec.Endpoints[0], 0, ac) + pos := &parsedObjects{ + Namespace: o.cr.Namespace, + APIServerConfig: o.cr.Spec.APIServerConfig, + } + sp := &o.cr.Spec.CommonScrapeParams + got, err := generateServiceScrapeConfig(ctx, sp, pos, o.sc, o.sc.Spec.Endpoints[0], 0, ac) if err != nil { t.Errorf("cannot generate ServiceScrapeConfig, err: %e", err) return diff --git a/internal/controller/operator/factory/vmagent/staticscrape.go b/internal/controller/operator/factory/vmagent/staticscrape.go index 0788bcb38..2f6188e69 100644 --- a/internal/controller/operator/factory/vmagent/staticscrape.go +++ b/internal/controller/operator/factory/vmagent/staticscrape.go @@ -12,14 +12,14 @@ import ( func generateStaticScrapeConfig( ctx context.Context, - cr *vmv1beta1.VMAgent, + sp *vmv1beta1.CommonScrapeParams, sc *vmv1beta1.VMStaticScrape, ep *vmv1beta1.TargetEndpoint, i int, ac *build.AssetsCache, ) (yaml.MapSlice, error) { spec := &sc.Spec - se := cr.Spec.CommonScrapeSecurityEnforcements + se := &sp.CommonScrapeSecurityEnforcements cfg := yaml.MapSlice{ { Key: "job_name", @@ -27,7 +27,7 @@ func generateStaticScrapeConfig( }, } - scrapeClass := getScrapeClass(spec.ScrapeClassName, cr) + scrapeClass := getScrapeClass(spec.ScrapeClassName, sp) if scrapeClass != nil { mergeEndpointAuthWithScrapeClass(&ep.EndpointAuth, scrapeClass) mergeEndpointRelabelingsWithScrapeClass(&ep.EndpointRelabelings, scrapeClass) @@ -48,9 +48,9 @@ func generateStaticScrapeConfig( ep.SeriesLimit = spec.SeriesLimit } if ep.ScrapeTimeout == "" { - ep.ScrapeTimeout = cr.Spec.ScrapeTimeout + ep.ScrapeTimeout = sp.ScrapeTimeout } - setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, cr) + setScrapeIntervalToWithLimit(ctx, &ep.EndpointScrapeParams, sp) cfg = addCommonScrapeParamsTo(cfg, ep.EndpointScrapeParams, se) @@ -66,7 +66,7 @@ func generateStaticScrapeConfig( for _, c := range ep.RelabelConfigs { relabelings = append(relabelings, generateRelabelConfig(c)) } - for _, trc := range cr.Spec.StaticScrapeRelabelTemplate { + for _, trc := range sp.StaticScrapeRelabelTemplate { relabelings = append(relabelings, generateRelabelConfig(trc)) } // Because of security risks, whenever enforcedNamespaceLabel is set, we want to append it to the diff --git a/internal/controller/operator/factory/vmagent/staticscrape_test.go b/internal/controller/operator/factory/vmagent/staticscrape_test.go index 62f94ec12..2974318bd 100644 --- a/internal/controller/operator/factory/vmagent/staticscrape_test.go +++ b/internal/controller/operator/factory/vmagent/staticscrape_test.go @@ -28,7 +28,8 @@ func Test_generateStaticScrapeConfig(t *testing.T) { ctx := context.Background() fclient := k8stools.GetTestClientWithObjects(o.predefinedObjects) ac := getAssetsCache(ctx, fclient, o.cr) - got, err := generateStaticScrapeConfig(ctx, o.cr, o.sc, o.sc.Spec.TargetEndpoints[0], 0, ac) + sp := &o.cr.Spec.CommonScrapeParams + got, err := generateStaticScrapeConfig(ctx, sp, o.sc, o.sc.Spec.TargetEndpoints[0], 0, ac) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/internal/controller/operator/factory/vmagent/vmagent.go b/internal/controller/operator/factory/vmagent/vmagent.go index d62a92c95..e40120287 100644 --- a/internal/controller/operator/factory/vmagent/vmagent.go +++ b/internal/controller/operator/factory/vmagent/vmagent.go @@ -105,7 +105,7 @@ func CreateOrUpdate(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.C if err := reconcile.ServiceAccount(ctx, rclient, build.ServiceAccount(cr), prevSA); err != nil { return fmt.Errorf("failed create service account: %w", err) } - if !cr.Spec.IngestOnlyMode { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) { if err := createK8sAPIAccess(ctx, rclient, cr, prevCR, config.IsClusterWideAccessAllowed()); err != nil { return fmt.Errorf("cannot create vmagent role and binding for it, err: %w", err) } @@ -217,7 +217,7 @@ func createOrUpdateApp(ctx context.Context, rclient client.Client, cr, prevCR *v rv.err = fmt.Errorf("cannot fill placeholders for %T sharded vmagent(%d): %w", newAppTpl, shardNum, err) return } - if !cr.Spec.IngestOnlyMode { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) { patchShardContainers(newApp.Spec.Template.Spec.Containers, shardNum, shardCount) } } @@ -231,7 +231,7 @@ func createOrUpdateApp(ctx context.Context, rclient client.Client, cr, prevCR *v rv.err = fmt.Errorf("cannot fill placeholders for prev %T sharded vmagent(%d): %w", newAppTpl, shardNum, err) return } - if !prevCR.Spec.IngestOnlyMode { + if !ptr.Deref(prevCR.Spec.IngestOnlyMode, false) { patchShardContainers(prevApp.Spec.Template.Spec.Containers, shardNum, shardCount) } } else { @@ -255,7 +255,7 @@ func createOrUpdateApp(ctx context.Context, rclient client.Client, cr, prevCR *v rv.err = fmt.Errorf("cannot fill placeholders for %T in sharded vmagent(%d): %w", newAppTpl, shardNum, err) return } - if !cr.Spec.IngestOnlyMode { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) { patchShardContainers(newApp.Spec.Template.Spec.Containers, shardNum, shardCount) } } @@ -269,7 +269,7 @@ func createOrUpdateApp(ctx context.Context, rclient client.Client, cr, prevCR *v rv.err = fmt.Errorf("cannot fill placeholders for prev %T in sharded vmagent(%d): %w", newAppTpl, shardNum, err) return } - if !prevCR.Spec.IngestOnlyMode { + if !ptr.Deref(prevCR.Spec.IngestOnlyMode, false) { patchShardContainers(prevApp.Spec.Template.Spec.Containers, shardNum, shardCount) } } else { @@ -548,7 +548,7 @@ func newPodSpec(cr *vmv1beta1.VMAgent, ac *build.AssetsCache) (*corev1.PodSpec, volumes = append(volumes, cr.Spec.Volumes...) - if !cr.Spec.IngestOnlyMode { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) { args = append(args, fmt.Sprintf("-promscrape.config=%s", path.Join(vmAgentConfOutDir, configFilename))) @@ -675,7 +675,7 @@ func newPodSpec(cr *vmv1beta1.VMAgent, ac *build.AssetsCache) (*corev1.PodSpec, var operatorContainers []corev1.Container var ic []corev1.Container // conditional add config reloader container - if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrRule() { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrRule() { ss := &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: cr.PrefixedName(), @@ -684,7 +684,7 @@ func newPodSpec(cr *vmv1beta1.VMAgent, ac *build.AssetsCache) (*corev1.PodSpec, } configReloader := build.ConfigReloaderContainer(false, cr, crMounts, ss) operatorContainers = append(operatorContainers, configReloader) - if !cr.Spec.IngestOnlyMode { + if !ptr.Deref(cr.Spec.IngestOnlyMode, false) { ic = append(ic, build.ConfigReloaderContainer(true, cr, crMounts, ss)) build.AddStrictSecuritySettingsToContainers(cr.Spec.SecurityContext, ic, useStrictSecurity) } diff --git a/internal/controller/operator/factory/vmagent/vmagent_scrapeconfig.go b/internal/controller/operator/factory/vmagent/vmagent_scrapeconfig.go index 21c71a68c..cd8f76cd4 100644 --- a/internal/controller/operator/factory/vmagent/vmagent_scrapeconfig.go +++ b/internal/controller/operator/factory/vmagent/vmagent_scrapeconfig.go @@ -21,122 +21,6 @@ import ( "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/reconcile" ) -type parsedObjects struct { - serviceScrapes *build.ChildObjects[*vmv1beta1.VMServiceScrape] - podScrapes *build.ChildObjects[*vmv1beta1.VMPodScrape] - staticScrapes *build.ChildObjects[*vmv1beta1.VMStaticScrape] - nodeScrapes *build.ChildObjects[*vmv1beta1.VMNodeScrape] - probes *build.ChildObjects[*vmv1beta1.VMProbe] - scrapeConfigs *build.ChildObjects[*vmv1beta1.VMScrapeConfig] -} - -func (so *parsedObjects) updateMetrics(ctx context.Context) { - so.serviceScrapes.UpdateMetrics(ctx) - so.podScrapes.UpdateMetrics(ctx) - so.staticScrapes.UpdateMetrics(ctx) - so.nodeScrapes.UpdateMetrics(ctx) - so.probes.UpdateMetrics(ctx) - so.scrapeConfigs.UpdateMetrics(ctx) -} - -func (so *parsedObjects) validateObjects(cr *vmv1beta1.VMAgent) { - so.serviceScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMServiceScrape) error { - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - for _, ep := range sc.Spec.Endpoints { - if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { - return err - } - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) - - so.podScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMPodScrape) error { - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - for _, ep := range sc.Spec.PodMetricsEndpoints { - if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { - return err - } - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) - so.staticScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMStaticScrape) error { - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - for _, ep := range sc.Spec.TargetEndpoints { - if err := testForArbitraryFSAccess(ep.EndpointAuth); err != nil { - return err - } - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) - - so.nodeScrapes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMNodeScrape) error { - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { - return err - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) - - so.probes.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMProbe) error { - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { - return err - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) - - so.scrapeConfigs.ForEachCollectSkipInvalid(func(sc *vmv1beta1.VMScrapeConfig) error { - // TODO: @f41gh7 validate per configuration FS access - if cr.Spec.ArbitraryFSAccessThroughSMs.Deny { - if err := testForArbitraryFSAccess(sc.Spec.EndpointAuth); err != nil { - return err - } - } - if err := validateScrapeClassExists(sc.Spec.ScrapeClassName, cr); err != nil { - return err - } - if !build.MustSkipRuntimeValidation { - return sc.Validate() - } - return nil - }) -} - // CreateOrUpdateScrapeConfig builds scrape configuration for VMAgent func CreateOrUpdateScrapeConfig(ctx context.Context, rclient client.Client, cr *vmv1beta1.VMAgent, childObject client.Object) error { var prevCR *vmv1beta1.VMAgent @@ -152,7 +36,7 @@ func CreateOrUpdateScrapeConfig(ctx context.Context, rclient client.Client, cr * } func createOrUpdateScrapeConfig(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMAgent, childObject client.Object, ac *build.AssetsCache) error { - if cr.Spec.IngestOnlyMode { + if ptr.Deref(cr.Spec.IngestOnlyMode, false) { return nil } // HACK: newPodSpec could load content into ac and it must be called @@ -163,62 +47,32 @@ func createOrUpdateScrapeConfig(ctx context.Context, rclient client.Client, cr, return err } - serviceScrapes, nsnServiceScrapes, err := selectServiceScrapes(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting ServiceScrapes failed: %w", err) - } - - podScrapes, nsnPodScrapes, err := selectPodScrapes(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting PodScrapes failed: %w", err) - } - - probes, nsnProbes, err := selectProbes(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting VMProbes failed: %w", err) - } - - nodeScrapes, nsnNodeScrapes, err := selectNodeScrapes(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting VMNodeScrapes failed: %w", err) - } - - staticScrapes, nsnStaticScrapes, err := selectStaticScrapes(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting VMStaticScrapes failed: %w", err) - } - - scrapeConfigs, nsnScrapeConfigs, err := selectScrapeConfigs(ctx, cr, rclient) - if err != nil { - return fmt.Errorf("selecting ScrapeConfigs failed: %w", err) - } pos := &parsedObjects{ - serviceScrapes: build.NewChildObjects("vmservicescrape", serviceScrapes, nsnServiceScrapes), - podScrapes: build.NewChildObjects("vmpodscrape", podScrapes, nsnPodScrapes), - probes: build.NewChildObjects("vmprobe", probes, nsnProbes), - nodeScrapes: build.NewChildObjects("vmnodescrape", nodeScrapes, nsnNodeScrapes), - staticScrapes: build.NewChildObjects("vmstaticscrape", staticScrapes, nsnStaticScrapes), - scrapeConfigs: build.NewChildObjects("vmscrapeconfig", scrapeConfigs, nsnScrapeConfigs), + Namespace: cr.Namespace, + APIServerConfig: cr.Spec.APIServerConfig, + MustUseNodeSelector: cr.Spec.DaemonSetMode, + HasClusterWideAccess: config.IsClusterWideAccessAllowed() || !cr.IsOwnsServiceAccount(), + ExternalLabels: cr.ExternalLabels(), + IgnoreNamespaceSelectors: cr.Spec.IgnoreNamespaceSelectors, + } + if !pos.HasClusterWideAccess { + logger.WithContext(ctx).Info("setting discovery for the single namespace only." + + "Since operator launched with set WATCH_NAMESPACE param. " + + "Set custom ServiceAccountName property for VMAgent if needed.") + pos.IgnoreNamespaceSelectors = true } - pos.validateObjects(cr) - - var additionalScrapeConfigs []byte - - if cr.Spec.AdditionalScrapeConfigs != nil { - sc, err := ac.LoadKeyFromSecret(cr.Namespace, cr.Spec.AdditionalScrapeConfigs) - if err != nil { - return fmt.Errorf("loading additional scrape configs from Secret failed: %w", err) - } - additionalScrapeConfigs = []byte(sc) + sp := &cr.Spec.CommonScrapeParams + if err := pos.init(ctx, rclient, sp); err != nil { + return err } + pos.validateObjects(sp) + // Update secret based on the most recent configuration. - generatedConfig, err := generateConfig( + generatedConfig, err := pos.generateConfig( ctx, - cr, - pos, + sp, ac, - additionalScrapeConfigs, ) if err != nil { return fmt.Errorf("generating config for vmagent failed: %w", err) @@ -246,66 +100,14 @@ func createOrUpdateScrapeConfig(ctx context.Context, rclient client.Client, cr, } } - if err := pos.updateStatusesForScrapeObjects(ctx, rclient, cr, childObject); err != nil { + parentName := fmt.Sprintf("%s.%s.vmagent", cr.Name, cr.Namespace) + if err := pos.updateStatusesForScrapeObjects(ctx, rclient, parentName, childObject); err != nil { return err } return nil } -func (pos *parsedObjects) updateStatusesForScrapeObjects(ctx context.Context, rclient client.Client, cr *vmv1beta1.VMAgent, childObject client.Object) error { - parentObject := fmt.Sprintf("%s.%s.vmagent", cr.Name, cr.Namespace) - pos.updateMetrics(ctx) - if childObject != nil && !reflect.ValueOf(childObject).IsNil() { - // fast path - switch t := childObject.(type) { - case *vmv1beta1.VMStaticScrape: - if o := pos.staticScrapes.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMStaticScrape{o}) - } - case *vmv1beta1.VMProbe: - if o := pos.probes.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMProbe{o}) - } - case *vmv1beta1.VMScrapeConfig: - if o := pos.scrapeConfigs.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMScrapeConfig{o}) - } - case *vmv1beta1.VMNodeScrape: - if o := pos.nodeScrapes.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMNodeScrape{o}) - } - case *vmv1beta1.VMPodScrape: - if o := pos.podScrapes.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMPodScrape{o}) - } - case *vmv1beta1.VMServiceScrape: - if o := pos.serviceScrapes.Get(t); o != nil { - return reconcile.StatusForChildObjects(ctx, rclient, parentObject, []*vmv1beta1.VMServiceScrape{o}) - } - } - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.serviceScrapes.All()); err != nil { - return fmt.Errorf("cannot update statuses for service scrape objects: %w", err) - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.podScrapes.All()); err != nil { - return fmt.Errorf("cannot update statuses for pod scrape objects: %w", err) - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.nodeScrapes.All()); err != nil { - return fmt.Errorf("cannot update statuses for node scrape objects: %w", err) - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.probes.All()); err != nil { - return fmt.Errorf("cannot update statuses for probe scrape objects: %w", err) - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.staticScrapes.All()); err != nil { - return fmt.Errorf("cannot update statuses for static scrape objects: %w", err) - } - if err := reconcile.StatusForChildObjects(ctx, rclient, parentObject, pos.scrapeConfigs.All()); err != nil { - return fmt.Errorf("cannot update statuses for scrapeconfig scrape objects: %w", err) - } - return nil -} - // TODO: @f41gh7 validate VMScrapeParams func testForArbitraryFSAccess(e vmv1beta1.EndpointAuth) error { if e.BearerTokenFile != "" { @@ -335,12 +137,12 @@ func testForArbitraryFSAccess(e vmv1beta1.EndpointAuth) error { return nil } -func setScrapeIntervalToWithLimit(ctx context.Context, dst *vmv1beta1.EndpointScrapeParams, cr *vmv1beta1.VMAgent) { +func setScrapeIntervalToWithLimit(ctx context.Context, dst *vmv1beta1.EndpointScrapeParams, sp *vmv1beta1.CommonScrapeParams) { if dst.ScrapeInterval == "" { dst.ScrapeInterval = dst.Interval } - originInterval, minIntervalStr, maxIntervalStr := dst.ScrapeInterval, cr.Spec.MinScrapeInterval, cr.Spec.MaxScrapeInterval + originInterval, minIntervalStr, maxIntervalStr := dst.ScrapeInterval, sp.MinScrapeInterval, sp.MaxScrapeInterval if originInterval == "" || (minIntervalStr == nil && maxIntervalStr == nil) { // fast path return @@ -387,200 +189,6 @@ const ( var invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) -func generateConfig( - ctx context.Context, - cr *vmv1beta1.VMAgent, - pos *parsedObjects, - ac *build.AssetsCache, - additionalScrapeConfigs []byte, -) ([]byte, error) { - cfg := yaml.MapSlice{} - if !config.IsClusterWideAccessAllowed() && cr.IsOwnsServiceAccount() { - logger.WithContext(ctx).Info("setting discovery for the single namespace only." + - "Since operator launched with set WATCH_NAMESPACE param. " + - "Set custom ServiceAccountName property for VMAgent if needed.") - cr.Spec.IgnoreNamespaceSelectors = true - } - - scrapeInterval := defaultScrapeInterval - if cr.Spec.ScrapeInterval != "" { - scrapeInterval = cr.Spec.ScrapeInterval - } - globalItems := yaml.MapSlice{ - {Key: "scrape_interval", Value: scrapeInterval}, - {Key: "external_labels", Value: buildExternalLabels(cr)}, - } - - if cr.Spec.SampleLimit > 0 { - globalItems = append(globalItems, yaml.MapItem{ - Key: "sample_limit", - Value: cr.Spec.SampleLimit, - }) - } - - if cr.Spec.ScrapeTimeout != "" { - globalItems = append(globalItems, yaml.MapItem{ - Key: "scrape_timeout", - Value: cr.Spec.ScrapeTimeout, - }) - } - - if len(cr.Spec.GlobalScrapeMetricRelabelConfigs) > 0 { - globalItems = append(globalItems, yaml.MapItem{ - Key: "metric_relabel_configs", - Value: cr.Spec.GlobalScrapeMetricRelabelConfigs, - }) - } - if len(cr.Spec.GlobalScrapeRelabelConfigs) > 0 { - globalItems = append(globalItems, yaml.MapItem{ - Key: "relabel_configs", - Value: cr.Spec.GlobalScrapeRelabelConfigs, - }) - } - - cfg = append(cfg, yaml.MapItem{Key: "global", Value: globalItems}) - - var scrapeConfigs []yaml.MapSlice - var err error - - err = pos.serviceScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMServiceScrape) error { - scrapeConfigsLen := len(scrapeConfigs) - for i, ep := range sc.Spec.Endpoints { - s, err := generateServiceScrapeConfig( - ctx, - cr, - sc, - ep, i, - ac, - ) - if err != nil { - scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] - return err - } - scrapeConfigs = append(scrapeConfigs, s) - } - return nil - }) - if err != nil { - return nil, err - } - - err = pos.podScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMPodScrape) error { - scrapeConfigsLen := len(scrapeConfigs) - for i, ep := range sc.Spec.PodMetricsEndpoints { - s, err := generatePodScrapeConfig( - ctx, - cr, - sc, ep, i, - ac, - ) - if err != nil { - scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] - return err - } - scrapeConfigs = append(scrapeConfigs, s) - } - return nil - }) - if err != nil { - return nil, err - } - - err = pos.probes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMProbe) error { - s, err := generateProbeConfig( - ctx, - cr, - sc, - ac, - ) - if err != nil { - return err - } - scrapeConfigs = append(scrapeConfigs, s) - return nil - }) - if err != nil { - return nil, err - } - - err = pos.nodeScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMNodeScrape) error { - s, err := generateNodeScrapeConfig( - ctx, - cr, - sc, - ac, - ) - if err != nil { - return err - } - scrapeConfigs = append(scrapeConfigs, s) - - return nil - }) - if err != nil { - return nil, err - } - - err = pos.staticScrapes.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMStaticScrape) error { - scrapeConfigsLen := len(scrapeConfigs) - for i, ep := range sc.Spec.TargetEndpoints { - s, err := generateStaticScrapeConfig( - ctx, - cr, - sc, - ep, i, - ac, - ) - if err != nil { - scrapeConfigs = scrapeConfigs[:scrapeConfigsLen] - return err - } - scrapeConfigs = append(scrapeConfigs, s) - } - return nil - }) - if err != nil { - return nil, err - } - - err = pos.scrapeConfigs.ForEachCollectSkipNotFound(func(sc *vmv1beta1.VMScrapeConfig) error { - s, err := generateScrapeConfig( - ctx, - cr, - sc, - ac, - ) - if err != nil { - return err - } - scrapeConfigs = append(scrapeConfigs, s) - - return nil - }) - if err != nil { - return nil, err - } - - var additionalScrapeConfigsYaml []yaml.MapSlice - if err := yaml.Unmarshal(additionalScrapeConfigs, &additionalScrapeConfigsYaml); err != nil { - return nil, fmt.Errorf("unmarshalling additional scrape configs failed: %w", err) - } - - var inlineScrapeConfigsYaml []yaml.MapSlice - if len(cr.Spec.InlineScrapeConfig) > 0 { - if err := yaml.Unmarshal([]byte(cr.Spec.InlineScrapeConfig), &inlineScrapeConfigsYaml); err != nil { - return nil, fmt.Errorf("unmarshalling inline additional scrape configs failed: %w", err) - } - } - additionalScrapeConfigsYaml = append(additionalScrapeConfigsYaml, inlineScrapeConfigsYaml...) - cfg = append(cfg, yaml.MapItem{ - Key: "scrape_configs", - Value: append(scrapeConfigs, additionalScrapeConfigsYaml...), - }) - - return yaml.Marshal(cfg) -} - func sanitizeLabelName(name string) string { return invalidLabelCharRE.ReplaceAllString(name, "_") } @@ -896,38 +504,6 @@ func enforceNamespaceLabel(relabelings []yaml.MapSlice, namespace, enforcedNames }) } -func buildExternalLabels(cr *vmv1beta1.VMAgent) yaml.MapSlice { - m := map[string]string{} - sp := cr.Spec.CommonScrapeParams - - // Use "prometheus" external label name by default if field is missing. - // in case of migration from prometheus to vmagent, it helps to have same labels - // Do not add external label if field is set to empty string. - prometheusExternalLabelName := "prometheus" - var labelName *string - if sp.ExternalLabelName != nil { - labelName = sp.ExternalLabelName - } else if sp.VMAgentExternalLabelName != nil { - labelName = sp.VMAgentExternalLabelName - } - if labelName != nil { - if *labelName != "" { - prometheusExternalLabelName = *labelName - } else { - prometheusExternalLabelName = "" - } - } - - if prometheusExternalLabelName != "" { - m[prometheusExternalLabelName] = fmt.Sprintf("%s/%s", cr.Namespace, cr.Name) - } - - for n, v := range sp.ExternalLabels { - m[n] = v - } - return stringMapToMapSlice(m) -} - func buildVMScrapeParams(namespace string, cfg *vmv1beta1.VMScrapeParams, ac *build.AssetsCache) (yaml.MapSlice, error) { var r yaml.MapSlice if cfg == nil { @@ -1009,7 +585,7 @@ func addSelectorToRelabelingFor(relabelings []yaml.MapSlice, typeName string, se return relabelings } -func addCommonScrapeParamsTo(cfg yaml.MapSlice, cs vmv1beta1.EndpointScrapeParams, se vmv1beta1.CommonScrapeSecurityEnforcements) yaml.MapSlice { +func addCommonScrapeParamsTo(cfg yaml.MapSlice, cs vmv1beta1.EndpointScrapeParams, se *vmv1beta1.CommonScrapeSecurityEnforcements) yaml.MapSlice { hl := honorLabels(cs.HonorLabels, se.OverrideHonorLabels) cfg = append(cfg, yaml.MapItem{ Key: "honor_labels", @@ -1064,7 +640,7 @@ func addCommonScrapeParamsTo(cfg yaml.MapSlice, cs vmv1beta1.EndpointScrapeParam return cfg } -func addMetricRelabelingsTo(cfg yaml.MapSlice, src []*vmv1beta1.RelabelConfig, se vmv1beta1.CommonScrapeSecurityEnforcements) yaml.MapSlice { +func addMetricRelabelingsTo(cfg yaml.MapSlice, src []*vmv1beta1.RelabelConfig, se *vmv1beta1.CommonScrapeSecurityEnforcements) yaml.MapSlice { if len(src) == 0 { return cfg } @@ -1132,16 +708,17 @@ func getAssetsCache(ctx context.Context, rclient client.Client, cr *vmv1beta1.VM } return build.NewAssetsCache(ctx, rclient, cfg) } -func validateScrapeClassExists(scrapeClassName *string, cr *vmv1beta1.VMAgent) error { + +func validateScrapeClassExists(scrapeClassName *string, sp *vmv1beta1.CommonScrapeParams) error { if scrapeClassName == nil { return nil } - for _, sc := range cr.Spec.ScrapeClasses { + for _, sc := range sp.ScrapeClasses { if sc.Name == *scrapeClassName { return nil } } - return fmt.Errorf("scrape class %q not found in VMAgent %s/%s", *scrapeClassName, cr.Namespace, cr.Name) + return fmt.Errorf("scrape class %q not supported", *scrapeClassName) } func mergeEndpointAuthWithScrapeClass(authz *vmv1beta1.EndpointAuth, scrapeClass *vmv1beta1.ScrapeClass) { @@ -1287,9 +864,9 @@ func mergeTLSConfigs(left, right *vmv1beta1.TLSConfig) *vmv1beta1.TLSConfig { return left } -func getScrapeClass(name *string, vmagent *vmv1beta1.VMAgent) *vmv1beta1.ScrapeClass { +func getScrapeClass(name *string, sp *vmv1beta1.CommonScrapeParams) *vmv1beta1.ScrapeClass { var defaultClass *vmv1beta1.ScrapeClass - for _, scrapeClass := range vmagent.Spec.ScrapeClasses { + for _, scrapeClass := range sp.ScrapeClasses { if ptr.Deref(name, "") == scrapeClass.Name { return &scrapeClass } diff --git a/internal/controller/operator/factory/vmagent/vmagent_test.go b/internal/controller/operator/factory/vmagent/vmagent_test.go index d59965d43..7f6022b99 100644 --- a/internal/controller/operator/factory/vmagent/vmagent_test.go +++ b/internal/controller/operator/factory/vmagent/vmagent_test.go @@ -155,7 +155,9 @@ func TestCreateOrUpdate(t *testing.T) { }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{}, StatefulMode: true, - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, StatefulStorage: &vmv1beta1.StorageSpec{ VolumeClaimTemplate: vmv1beta1.EmbeddedPersistentVolumeClaim{ Spec: corev1.PersistentVolumeClaimSpec{ @@ -545,7 +547,9 @@ func TestCreateOrUpdate(t *testing.T) { }, StatefulRollingUpdateStrategy: appsv1.RollingUpdateStatefulSetStrategyType, StatefulMode: true, - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{ ReplicaCount: ptr.To[int32](2), }, @@ -629,8 +633,10 @@ func TestCreateOrUpdate(t *testing.T) { CommonApplicationDeploymentParams: vmv1beta1.CommonApplicationDeploymentParams{ ReplicaCount: ptr.To(int32(1)), }, - StatefulMode: true, - IngestOnlyMode: true, + StatefulMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, StatefulStorage: &vmv1beta1.StorageSpec{ VolumeClaimTemplate: vmv1beta1.EmbeddedPersistentVolumeClaim{ Spec: corev1.PersistentVolumeClaimSpec{ @@ -2238,7 +2244,9 @@ func TestMakeSpecForAgentOk(t *testing.T) { cr: &vmv1beta1.VMAgent{ ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "default"}, Spec: vmv1beta1.VMAgentSpec{ - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{ Image: vmv1beta1.Image{ Repository: "vm-repo", @@ -2324,7 +2332,9 @@ serviceaccountname: vmagent-agent cr: &vmv1beta1.VMAgent{ ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "default"}, Spec: vmv1beta1.VMAgentSpec{ - IngestOnlyMode: false, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(false), + }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{ Image: vmv1beta1.Image{ Tag: "v1.97.1", @@ -2463,7 +2473,9 @@ serviceaccountname: vmagent-agent cr: &vmv1beta1.VMAgent{ ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "default"}, Spec: vmv1beta1.VMAgentSpec{ - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{ Image: vmv1beta1.Image{ Tag: "v1.97.1", @@ -2544,7 +2556,9 @@ serviceaccountname: vmagent-agent cr: &vmv1beta1.VMAgent{ ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "default"}, Spec: vmv1beta1.VMAgentSpec{ - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{ Image: vmv1beta1.Image{ Tag: "v1.97.1", @@ -2628,7 +2642,9 @@ serviceaccountname: vmagent-agent cr: &vmv1beta1.VMAgent{ ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "default"}, Spec: vmv1beta1.VMAgentSpec{ - IngestOnlyMode: true, + CommonScrapeParams: vmv1beta1.CommonScrapeParams{ + IngestOnlyMode: ptr.To(true), + }, CommonDefaultableParams: vmv1beta1.CommonDefaultableParams{ Image: vmv1beta1.Image{ Tag: "v1.97.1", diff --git a/internal/controller/operator/factory/vmdistributed/vmagent.go b/internal/controller/operator/factory/vmdistributed/vmagent.go index 0b964b401..eb7d65489 100644 --- a/internal/controller/operator/factory/vmdistributed/vmagent.go +++ b/internal/controller/operator/factory/vmdistributed/vmagent.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" vmv1alpha1 "github.com/VictoriaMetrics/operator/api/operator/v1alpha1" @@ -176,7 +177,7 @@ func updateOrCreateVMAgent(ctx context.Context, rclient client.Client, cr *vmv1a if err := json.Unmarshal(vmagentData, &vmagentSpec); err != nil { return nil, fmt.Errorf("failed to unmarshal spec.vmagent.spec of VMDistributed=%s/%s: %w", cr.Name, cr.Namespace, err) } - vmagentSpec.IngestOnlyMode = true + vmagentSpec.IngestOnlyMode = ptr.To(true) if vmagentSpec.RemoteWriteSettings == nil { vmagentSpec.RemoteWriteSettings = &vmv1beta1.VMAgentRemoteWriteSettings{} } diff --git a/internal/controller/operator/vmagent_controller.go b/internal/controller/operator/vmagent_controller.go index e4948eee9..f43eccaa0 100644 --- a/internal/controller/operator/vmagent_controller.go +++ b/internal/controller/operator/vmagent_controller.go @@ -88,7 +88,7 @@ func (r *VMAgentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re if err := r.Get(ctx, req.NamespacedName, instance); err != nil { return result, &getError{origin: err, controller: "vmagent", requestObject: req} } - if !instance.IsUnmanaged() { + if !instance.IsUnmanaged(nil) { agentSync.Lock() defer agentSync.Unlock() } diff --git a/internal/controller/operator/vmnodescrape_controller.go b/internal/controller/operator/vmnodescrape_controller.go index 4c883cae5..cf571c14a 100644 --- a/internal/controller/operator/vmnodescrape_controller.go +++ b/internal/controller/operator/vmnodescrape_controller.go @@ -92,21 +92,17 @@ func (r *VMNodeScrapeReconciler) Reconcile(ctx context.Context, req ctrl.Request for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsNodeScrapeUnmanaged() { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) ctx := logger.AddToContext(ctx, l) - if item.Spec.DaemonSetMode { - continue - } - // only check selector when deleting object, - // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(instance) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.NodeScrapeNamespaceSelector, - ObjectSelector: item.Spec.NodeScrapeSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/internal/controller/operator/vmpodscrape_controller.go b/internal/controller/operator/vmpodscrape_controller.go index d0b428c4b..9166d7558 100644 --- a/internal/controller/operator/vmpodscrape_controller.go +++ b/internal/controller/operator/vmpodscrape_controller.go @@ -90,7 +90,7 @@ func (r *VMPodScrapeReconciler) Reconcile(ctx context.Context, req ctrl.Request) for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsPodScrapeUnmanaged() { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) @@ -99,10 +99,11 @@ func (r *VMPodScrapeReconciler) Reconcile(ctx context.Context, req ctrl.Request) // only check selector when deleting object, // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(instance) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.PodScrapeNamespaceSelector, - ObjectSelector: item.Spec.PodScrapeSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/internal/controller/operator/vmprobe_controller.go b/internal/controller/operator/vmprobe_controller.go index c38ef89d9..c674bdc1a 100644 --- a/internal/controller/operator/vmprobe_controller.go +++ b/internal/controller/operator/vmprobe_controller.go @@ -91,10 +91,7 @@ func (r *VMProbeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsProbeUnmanaged() { - continue - } - if item.Spec.DaemonSetMode { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) @@ -103,10 +100,11 @@ func (r *VMProbeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re // only check selector when deleting object, // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(item) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.ProbeNamespaceSelector, - ObjectSelector: item.Spec.ProbeSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/internal/controller/operator/vmscrapeconfig_controller.go b/internal/controller/operator/vmscrapeconfig_controller.go index c3505b858..bf2049a25 100644 --- a/internal/controller/operator/vmscrapeconfig_controller.go +++ b/internal/controller/operator/vmscrapeconfig_controller.go @@ -90,21 +90,19 @@ func (r *VMScrapeConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reque for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsScrapeConfigUnmanaged() { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) ctx := logger.AddToContext(ctx, l) - if item.Spec.DaemonSetMode { - continue - } // only check selector when deleting object, // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(instance) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.ScrapeConfigNamespaceSelector, - ObjectSelector: item.Spec.ScrapeConfigSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/internal/controller/operator/vmservicescrape_controller.go b/internal/controller/operator/vmservicescrape_controller.go index a6629998b..1dec5a9bb 100644 --- a/internal/controller/operator/vmservicescrape_controller.go +++ b/internal/controller/operator/vmservicescrape_controller.go @@ -91,21 +91,19 @@ func (r *VMServiceScrapeReconciler) Reconcile(ctx context.Context, req ctrl.Requ for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsServiceScrapeUnmanaged() { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) ctx := logger.AddToContext(ctx, l) - if item.Spec.DaemonSetMode { - continue - } // only check selector when deleting object, // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(instance) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.ServiceScrapeNamespaceSelector, - ObjectSelector: item.Spec.ServiceScrapeSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/internal/controller/operator/vmstaticscrape_controller.go b/internal/controller/operator/vmstaticscrape_controller.go index 5a0ff4444..fff27da5d 100644 --- a/internal/controller/operator/vmstaticscrape_controller.go +++ b/internal/controller/operator/vmstaticscrape_controller.go @@ -70,21 +70,19 @@ func (r *VMStaticScrapeReconciler) Reconcile(ctx context.Context, req ctrl.Reque for i := range objects.Items { item := &objects.Items[i] - if !item.DeletionTimestamp.IsZero() || item.Spec.ParsingError != "" || item.IsStaticScrapeUnmanaged() { + if item.IsUnmanaged(instance) { continue } l := l.WithValues("vmagent", item.Name, "parent_namespace", item.Namespace) ctx := logger.AddToContext(ctx, l) - if item.Spec.DaemonSetMode { - continue - } // only check selector when deleting object, // since labels can be changed when updating and we can't tell if it was selected before, and we can't tell if it's creating or updating. if !instance.DeletionTimestamp.IsZero() { + objectSelector, namespaceSelector := item.ScrapeSelectors(instance) opts := &k8stools.SelectorOpts{ SelectAll: item.Spec.SelectAllByDefault, - NamespaceSelector: item.Spec.StaticScrapeNamespaceSelector, - ObjectSelector: item.Spec.StaticScrapeSelector, + NamespaceSelector: namespaceSelector, + ObjectSelector: objectSelector, DefaultNamespace: instance.Namespace, } match, err := isSelectorsMatchesTargetCRD(ctx, r.Client, instance, item, opts) diff --git a/test/e2e/vmagent_test.go b/test/e2e/vmagent_test.go index 17ed3fe4b..16097e01f 100644 --- a/test/e2e/vmagent_test.go +++ b/test/e2e/vmagent_test.go @@ -494,7 +494,7 @@ var _ = Describe("test vmagent Controller", Label("vm", "agent", "vmagent"), fun cr.Spec.ReplicaCount = ptr.To[int32](1) cr.Spec.ShardCount = ptr.To(2) cr.Spec.StatefulMode = true - cr.Spec.IngestOnlyMode = true + cr.Spec.IngestOnlyMode = ptr.To(true) }, verify: func(cr *vmv1beta1.VMAgent) { var createdSts appsv1.StatefulSet