diff --git a/go.mod b/go.mod index c8524364a5a..d5d45cfbcf4 100644 --- a/go.mod +++ b/go.mod @@ -94,7 +94,10 @@ require ( gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 + k8s.io/api v0.28.4 + k8s.io/apimachinery v0.28.4 k8s.io/apiserver v0.28.4 + k8s.io/client-go v0.28.4 modernc.org/sqlite v1.42.2 ) @@ -134,6 +137,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect + github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.0.0 // indirect @@ -153,6 +157,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.5 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/gotnospirit/makeplural v0.0.0-20180622080156-a5f48d94d976 // indirect @@ -236,15 +241,15 @@ require ( go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/arch v0.15.0 // indirect golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect + golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/term v0.35.0 // indirect golang.org/x/tools v0.37.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect - k8s.io/api v0.28.4 // indirect - k8s.io/apimachinery v0.28.4 // indirect k8s.io/klog/v2 v2.100.1 // indirect + k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect modernc.org/libc v1.66.10 // indirect modernc.org/mathutil v1.7.1 // indirect @@ -252,6 +257,7 @@ require ( rsc.io/binaryregexp v0.2.0 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect + sigs.k8s.io/yaml v1.3.0 // indirect ) replace golang.org/x/time => github.com/crowdsecurity/time v0.13.0-crowdsec.20250912 diff --git a/go.sum b/go.sum index 57ac167a41c..e8db5a93719 100644 --- a/go.sum +++ b/go.sum @@ -158,6 +158,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= +github.com/emicklei/go-restful/v3 v3.9.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8= github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= @@ -243,6 +245,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -688,6 +692,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -811,8 +817,12 @@ k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= k8s.io/apiserver v0.28.4 h1:BJXlaQbAU/RXYX2lRz+E1oPe3G3TKlozMMCZWu5GMgg= k8s.io/apiserver v0.28.4/go.mod h1:Idq71oXugKZoVGUUL2wgBCTHbUR+FYTWa4rq9j4n23w= +k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= +k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= modernc.org/cc/v4 v4.26.5 h1:xM3bX7Mve6G8K8b+T11ReenJOT+BmVqQj0FY5T4+5Y4= diff --git a/pkg/acquisition/modules/journalctl/config.go b/pkg/acquisition/modules/journalctl/config.go index 74d202062ef..ce748b7d595 100644 --- a/pkg/acquisition/modules/journalctl/config.go +++ b/pkg/acquisition/modules/journalctl/config.go @@ -18,7 +18,7 @@ type Configuration struct { configuration.DataSourceCommonCfg `yaml:",inline"` Filters []string `yaml:"journalctl_filter"` - since string // set only by DSN + since string // set only by DSN } func ConfigurationFromYAML(y []byte) (Configuration, error) { @@ -70,9 +70,7 @@ func (s *Source) Configure(_ context.Context, yamlConfig []byte, logger *log.Ent } s.setLogger(logger, 0, s.src) - s.metricsLevel = metricsLevel - return nil } diff --git a/pkg/acquisition/modules/kubernetes.go b/pkg/acquisition/modules/kubernetes.go new file mode 100644 index 00000000000..ffb1c4f5683 --- /dev/null +++ b/pkg/acquisition/modules/kubernetes.go @@ -0,0 +1,5 @@ +//go:build !no_datasource_kubernetes + +package modules + +import _ "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/kubernetes" // register the datasource diff --git a/pkg/acquisition/modules/kubernetes/config.go b/pkg/acquisition/modules/kubernetes/config.go new file mode 100644 index 00000000000..9be67250a9b --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/config.go @@ -0,0 +1,84 @@ +package kubernetes + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + yaml "github.com/goccy/go-yaml" + log "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/clientcmd/api" + + "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" + "github.com/crowdsecurity/crowdsec/pkg/metrics" +) + +type Configuration struct { + configuration.DataSourceCommonCfg `yaml:",inline"` + + Selector string `yaml:"selector"` + Namespace string `yaml:"namespace"` + Auth *Auth `yaml:"auth,omitempty"` + KubeConfigFile string `yaml:"kube_config,omitempty"` +} + +type Auth struct { + Cluster api.Cluster `yaml:"cluster,omitempty"` + User api.AuthInfo `yaml:"user,omitempty"` +} + +func (s *Source) UnmarshalConfig(yamlConfig []byte) error { + s.Config = Configuration{ + Selector: "", + Namespace: "default", + } + + if err := yaml.UnmarshalWithOptions(yamlConfig, &s.Config, yaml.Strict()); err != nil { + return fmt.Errorf("while parsing KubernetesAcquisition configuration: %s", yaml.FormatError(err, false, false)) + } + + if s.logger != nil { + s.logger.Tracef("Kubernetes configuration: %+v", s.Config) + } + + return nil +} + +func (s *Source) Configure(ctx context.Context, yamlConfig []byte, logger *log.Entry, metricsLevel metrics.AcquisitionMetricsLevel) error { + s.logger = logger + s.metricsLevel = metricsLevel + + err := s.UnmarshalConfig(yamlConfig) + if err != nil { + return err + } + + return nil +} + +func (c *Configuration) SetDefaults() { + if c.Namespace == "" { + c.Namespace = "default" + } + + if c.Mode == "" { + c.Mode = configuration.TAIL_MODE + } + if c.Auth == nil && c.KubeConfigFile == "" { + home, _ := os.UserHomeDir() + c.KubeConfigFile = filepath.Join(home, ".kube", "config") + } +} + +func (s *Source) Validate() error { + if s.Config.Selector == "" { + return errors.New("label must be set in kubernetes acquisition") + } + if s.Config.Auth != nil && s.Config.KubeConfigFile != "" { + return errors.New("cannot use both auth and kube_config options") + + } + return nil +} diff --git a/pkg/acquisition/modules/kubernetes/init.go b/pkg/acquisition/modules/kubernetes/init.go new file mode 100644 index 00000000000..14e70b9f609 --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/init.go @@ -0,0 +1,20 @@ +package kubernetes + +import ( + "github.com/crowdsecurity/crowdsec/pkg/acquisition/registry" + "github.com/crowdsecurity/crowdsec/pkg/acquisition/types" +) + +var ( + // verify interface compliance + _ types.DataSource = (*Source)(nil) + _ types.Tailer = (*Source)(nil) + _ types.MetricsProvider = (*Source)(nil) +) + +const ModuleName = "kubernetes" + +//nolint:gochecknoinits +func init() { + registry.RegisterFactory(ModuleName, func() types.DataSource { return &Source{} }) +} diff --git a/pkg/acquisition/modules/kubernetes/metrics.go b/pkg/acquisition/modules/kubernetes/metrics.go new file mode 100644 index 00000000000..a6bdc1504e0 --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/metrics.go @@ -0,0 +1,19 @@ +package kubernetes + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/crowdsecurity/crowdsec/pkg/metrics" +) + +func (*Source) GetMetrics() []prometheus.Collector { + return []prometheus.Collector{ + metrics.KubernetesDataSourceLinesRead, + } +} + +func (*Source) GetAggregMetrics() []prometheus.Collector { + return []prometheus.Collector{ + metrics.KubernetesDataSourceLinesRead, + } +} diff --git a/pkg/acquisition/modules/kubernetes/run.go b/pkg/acquisition/modules/kubernetes/run.go new file mode 100644 index 00000000000..5cd8a27c7a7 --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/run.go @@ -0,0 +1,179 @@ +package kubernetes + +import ( + "bufio" + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/tomb.v2" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/crowdsecurity/crowdsec/pkg/metrics" + "github.com/crowdsecurity/crowdsec/pkg/pipeline" +) + +func (s *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { + s.logger.Debug("In oneshot") + return nil +} + +func (s *Source) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { + var wg sync.WaitGroup + var mu sync.Mutex + + s.logger.Info("Starting Kubernetes acquisition") + + cfg, err := s.buildConfig() + if err != nil { + return err + } + cs, err := kubernetes.NewForConfig(cfg) + if err != nil { + return fmt.Errorf("can't create a kubernetes client: %s", err) + } + + cancels := map[string]context.CancelFunc{} + + f := informers.NewSharedInformerFactoryWithOptions(cs, 0, + informers.WithNamespace(s.Config.Namespace), + informers.WithTweakListOptions(func(o *metav1.ListOptions) { o.LabelSelector = s.Config.Selector }), + ) + inf := f.Core().V1().Pods().Informer() + + // We ignore the ResourceEventHandlerRegistration returned by + // AddEventHandler since we don't need to remove the handlers until shutdown, + // and we will stop the entire informer at that time. + _, err = inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { s.startPod(ctx, cs, obj.(*corev1.Pod), out, &wg, &mu, cancels) }, + UpdateFunc: func(_, newObj interface{}) { + s.startPod(ctx, cs, newObj.(*corev1.Pod), out, &wg, &mu, cancels) + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + t, _ := obj.(cache.DeletedFinalStateUnknown) + pod, _ = t.Obj.(*corev1.Pod) + } + if pod != nil { + s.stopPod(pod, &mu, cancels) + } + }, + }) + + if err != nil { + return fmt.Errorf("while adding event handler: %w", err) + } + f.Start(ctx.Done()) + if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) { + return errors.New("cache sync failed") + } + + <-ctx.Done() + mu.Lock() + for _, c := range cancels { + c() + } + mu.Unlock() + wg.Wait() + + return nil +} + +func (s *Source) Dump() any { + return s +} + +func (*Source) followPodLogs(ctx context.Context, cs *kubernetes.Clientset, ns, pod, container string, onLine func(string) error) error { + req := cs.CoreV1().Pods(ns).GetLogs(pod, &corev1.PodLogOptions{Container: container, Follow: true, Timestamps: true}) + stream, err := req.Stream(ctx) + if err != nil { + return err + } + defer stream.Close() + + sc := bufio.NewScanner(stream) + for sc.Scan() { + if err := ctx.Err(); err != nil { + return err + } + if err := onLine(sc.Text()); err != nil { + return err + } + } + return sc.Err() + +} + +func (s *Source) podWorker(meta context.Context, cs *kubernetes.Clientset, pod *corev1.Pod, out chan pipeline.Event, wg *sync.WaitGroup) context.CancelFunc { + podCtx, cancel := context.WithCancel(meta) + wg.Add(1) + go func() { + defer wg.Done() + var cw sync.WaitGroup + for _, c := range pod.Spec.Containers { + c := c.Name + cw.Add(1) + go func() { + defer cw.Done() + _ = s.followPodLogs(podCtx, cs, pod.Namespace, pod.Name, c, func(line string) error { + source := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + l := pipeline.Line{} + l.Raw = line + l.Labels = s.Config.Labels + l.Time = time.Now().UTC() + l.Src = source + l.Process = true + l.Module = s.GetName() + if s.metricsLevel != metrics.AcquisitionMetricsLevelNone { + metrics.KubernetesDataSourceLinesRead.With(prometheus.Labels{"source": source, "acquis_type": l.Labels["type"], "datasource_type": ModuleName}).Inc() + } + evt := pipeline.MakeEvent(true, pipeline.LOG, true) + evt.Line = l + evt.Process = true + evt.Type = pipeline.LOG + out <- evt + return nil + }) + }() + } + cw.Wait() + }() + return cancel +} + +func shouldTail(p *corev1.Pod) bool { return p.Status.Phase == corev1.PodRunning } + +func (s *Source) startPod(meta context.Context, cs *kubernetes.Clientset, p *corev1.Pod, out chan pipeline.Event, wg *sync.WaitGroup, mu *sync.Mutex, cancels map[string]context.CancelFunc) { + if !shouldTail(p) { + return + } + key := string(p.UID) + mu.Lock() + if _, ok := cancels[key]; ok { + mu.Unlock() + return + } + cancels[key] = s.podWorker(meta, cs, p, out, wg) + mu.Unlock() +} + +func (*Source) stopPod(p *corev1.Pod, mu *sync.Mutex, cancels map[string]context.CancelFunc) { + key := string(p.UID) + mu.Lock() + cancel, ok := cancels[key] + if ok { + delete(cancels, key) + } + mu.Unlock() + if ok { + cancel() + } +} diff --git a/pkg/acquisition/modules/kubernetes/source.go b/pkg/acquisition/modules/kubernetes/source.go new file mode 100644 index 00000000000..1ef65cf464b --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/source.go @@ -0,0 +1,30 @@ +package kubernetes + +import ( + log "github.com/sirupsen/logrus" + + "github.com/crowdsecurity/crowdsec/pkg/metrics" +) + +type Source struct { + metricsLevel metrics.AcquisitionMetricsLevel + Config Configuration + + logger *log.Entry +} + +func (*Source) GetName() string { + return ModuleName +} + +func (s *Source) GetMode() string { + return s.Config.Mode +} + +func (*Source) CanRun() error { + return nil +} + +func (s *Source) GetUuid() string { + return s.Config.UniqueId +} diff --git a/pkg/acquisition/modules/kubernetes/utils.go b/pkg/acquisition/modules/kubernetes/utils.go new file mode 100644 index 00000000000..1accadfa45c --- /dev/null +++ b/pkg/acquisition/modules/kubernetes/utils.go @@ -0,0 +1,31 @@ +package kubernetes + +import ( + "errors" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func (s *Source) buildConfig() (*rest.Config, error) { + cfg, err := rest.InClusterConfig() + if err == nil { + return cfg, nil + } + if s.Config.KubeConfigFile != "" { + return clientcmd.BuildConfigFromFlags("", s.Config.KubeConfigFile) + } + + if s.Config.Auth != nil { + loadingRules := &clientcmd.ClientConfigLoadingRules{} + configOverrides := &clientcmd.ConfigOverrides{ + ClusterInfo: s.Config.Auth.Cluster, + AuthInfo: s.Config.Auth.User, + CurrentContext: "", + } + kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) + return kubeConfig.ClientConfig() + } + // This should never happen, but just in case... + return nil, errors.New("could not create kubernetes client configuration") +} diff --git a/pkg/acquisition/schemas/datasource.yaml b/pkg/acquisition/schemas/datasource.yaml index 4fef69cf509..a1ca051f51a 100644 --- a/pkg/acquisition/schemas/datasource.yaml +++ b/pkg/acquisition/schemas/datasource.yaml @@ -6,3 +6,4 @@ description: > configuration DataSourceCommonCfg. anyOf: - $ref: docker.yaml + - $ref: kubernetes.yaml diff --git a/pkg/acquisition/schemas/kubernetes.yaml b/pkg/acquisition/schemas/kubernetes.yaml new file mode 100644 index 00000000000..31189c4f95a --- /dev/null +++ b/pkg/acquisition/schemas/kubernetes.yaml @@ -0,0 +1,96 @@ +$schema: https://json-schema.org/draft/2020-12/schema +title: CrowdSec Kubernetes datasource +description: > + Schema for Kubernetes acquisition entries consumed by CrowdSec. Every field + mirrors pkg/acquisition/modules/kubernetes.Configuration and the embedded + configuration.DataSourceCommonCfg. +type: object +additionalProperties: false +properties: + source: + type: string + const: kubernetes + description: > + Must be kubernetes to bind this acquisition entry to the Kubernetes + datasource. + mode: + type: string + enum: [tail] + default: tail + description: > + Acquisition mode (only tail streaming is supported). + labels: + type: object + minProperties: 1 + description: > + Labels attached to emitted events (for example type: kubernetes). + additionalProperties: + type: string + properties: + type: + type: string + description: Parser/collection selector; strongly recommended. + log_level: + type: string + enum: [panic, fatal, error, warn, warning, info, debug, trace] + description: > + Overrides the module logger level for this datasource. + name: + type: string + description: Friendly identifier for the datasource entry. + use_time_machine: + type: boolean + default: false + description: > + Replays past events when supported by the acquisition module. + unique_id: + type: string + description: > + Stable identifier injected by cscli/crowdsec auto-run (usually not user set). + transform: + type: string + description: > + expr program applied to events before they enter the pipeline. + selector: + type: string + minLength: 1 + description: > + Kubernetes label selector applied to pods; at least one selector is required. + namespace: + type: string + description: > + Namespace whose pods should be tailed; defaults to default when omitted. + kube_config: + type: string + description: > + Path to a kubeconfig file to use when running outside the cluster. + auth: + type: object + minProperties: 1 + additionalProperties: false + description: > + Inline Kubernetes client authentication overriding kubeconfig defaults. + Mirrors clientcmd/api.Cluster and AuthInfo fields; keys match kubeconfig + file entries (for example server, certificate-authority, client-certificate, + token, exec, auth-provider, ...). + properties: + cluster: + type: object + additionalProperties: true + description: > + Optional cluster stanza (clientcmd/api.Cluster) with fields such as + server, certificate-authority, tls-server-name, or + insecure-skip-tls-verify. + user: + type: object + additionalProperties: true + description: > + Optional user stanza (clientcmd/api.AuthInfo) with fields such as + client-certificate, client-key, token, token-file, exec or auth-provider. +required: + - source + - selector +allOf: + - description: auth and kube_config are mutually exclusive. + not: + required: [auth, kube_config] diff --git a/pkg/cwversion/component/component.go b/pkg/cwversion/component/component.go index 3423f2423f2..86dc9ba87e1 100644 --- a/pkg/cwversion/component/component.go +++ b/pkg/cwversion/component/component.go @@ -16,6 +16,7 @@ var Built = map[string]bool{ "datasource_k8s-audit": false, "datasource_kafka": false, "datasource_kinesis": false, + "datasource_kubernetes": false, "datasource_loki": false, "datasource_s3": false, "datasource_syslog": false, diff --git a/pkg/database/flush.go b/pkg/database/flush.go index e508c53c0c7..9b37b05c09e 100644 --- a/pkg/database/flush.go +++ b/pkg/database/flush.go @@ -198,6 +198,7 @@ func (c *Client) flushAgents(ctx context.Context, authType string, duration *tim machine.LastHeartbeatLTE(time.Now().UTC().Add(-*duration)), machine.Not(machine.HasAlerts()), machine.AuthTypeEQ(authType), + machine.IpAddressNotIn("127.0.0.1", "::1"), ).Exec(ctx) if err != nil { c.Log.Errorf("while auto-deleting expired machines (%s): %s", authType, err) diff --git a/pkg/metrics/acquisition_kubernetes.go b/pkg/metrics/acquisition_kubernetes.go new file mode 100644 index 00000000000..4c2799b9cc9 --- /dev/null +++ b/pkg/metrics/acquisition_kubernetes.go @@ -0,0 +1,21 @@ +//go:build !no_datasource_kubernetes + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const KubernetesDataSourceLinesReadMetricName = "cs_kubernetessource_hits_total" + +var KubernetesDataSourceLinesRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: KubernetesDataSourceLinesReadMetricName, + Help: "Total lines that were read.", + }, + []string{"source", "datasource_type", "acquis_type"}) + +//nolint:gochecknoinits +func init() { + RegisterAcquisitionMetric(KubernetesDataSourceLinesReadMetricName) +}