From e47d988278642df8e773bd552933014c9f062700 Mon Sep 17 00:00:00 2001 From: mugdha-adhav Date: Thu, 12 Mar 2020 15:20:25 +0530 Subject: [PATCH 1/3] Restructure Output controller by segregating Reconciler --- go.sum | 2 + .../fluentbit/fluentbit_controller.go | 5 +- pkg/controller/output/output_controller.go | 102 +---------------- .../output/output_controller_test.go | 10 -- pkg/output/reconciler.go | 103 ++++++++++++++++++ 5 files changed, 110 insertions(+), 112 deletions(-) create mode 100644 pkg/output/reconciler.go diff --git a/go.sum b/go.sum index 4c612b9..2658788 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,7 @@ github.com/coreos/etcd v3.3.15+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -470,6 +471,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0fZcnECiDrKJsfxka0= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= diff --git a/pkg/controller/fluentbit/fluentbit_controller.go b/pkg/controller/fluentbit/fluentbit_controller.go index 10e2cfa..bab2d52 100644 --- a/pkg/controller/fluentbit/fluentbit_controller.go +++ b/pkg/controller/fluentbit/fluentbit_controller.go @@ -41,10 +41,7 @@ func newReconciler(mgr manager.Manager) *fluentbit.Reconciler { // add adds a new Controller to mgr func add(mgr manager.Manager, r *fluentbit.Reconciler) error { // Create a new controller - c, err := controller.New("fluentbit-controller", mgr, controller.Options{ - Reconciler: r, - }) - + c, err := controller.New("fluentbit-controller", mgr, controller.Options{Reconciler: r}) if err != nil { return err } diff --git a/pkg/controller/output/output_controller.go b/pkg/controller/output/output_controller.go index a882d27..8272819 100644 --- a/pkg/controller/output/output_controller.go +++ b/pkg/controller/output/output_controller.go @@ -14,17 +14,8 @@ limitations under the License. package output import ( - "bytes" - "context" - "fmt" - - "github.com/platform9/fluentd-operator/pkg/fluentd" - loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1" - "github.com/platform9/fluentd-operator/pkg/resources" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/platform9/fluentd-operator/pkg/output" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -43,11 +34,7 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) reconcile.Reconciler { - return &ReconcileOutput{ - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - fluentd: fluentd.New(mgr), - } + return output.New(mgr) } // add adds a new Controller to mgr with r as the reconcile.Reconciler @@ -59,89 +46,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { } // Watch for changes to primary resource Output - err = c.Watch(&source.Kind{Type: &loggingv1alpha1.Output{}}, &handler.EnqueueRequestForObject{}) - if err != nil { - log.Error(err, "Error adding watch") - return err - } - - return nil + return c.Watch(&source.Kind{Type: &loggingv1alpha1.Output{}}, &handler.EnqueueRequestForObject{}) } // blank assignment to verify that ReconcileOutput implements reconcile.Reconciler -var _ reconcile.Reconciler = &ReconcileOutput{} - -// ReconcileOutput reconciles a Output object -type ReconcileOutput struct { - // This client, initialized using mgr.Client() above, is a split client - // that reads objects from the cache and writes to the apiserver - client client.Client - scheme *runtime.Scheme - fluentd *fluentd.Reconciler -} - -// Reconcile reads that state of the cluster for a Output object and makes changes based on the state read -// and what is in the Output.Spec -// Note: -// The Controller will requeue the Request to be processed again if the returned error is non-nil or -// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. -func (r *ReconcileOutput) Reconcile(request reconcile.Request) (reconcile.Result, error) { - reqLogger := log.WithValues("Request.Name", request.Name) - reqLogger.Info("Reconciling Output") - - // Fetch the Output instance - instance := &loggingv1alpha1.Output{} - err := r.client.Get(context.TODO(), request.NamespacedName, instance) - if err != nil { - if !errors.IsNotFound(err) { - // Error reading object, requeue - return reconcile.Result{}, err - } - } - - buff, err := getFluentdConfig(r.client) - if err != nil { - return reconcile.Result{}, err - } - - // Update configmap for fluentd - log.Info("Refreshing fluentd...") - return reconcile.Result{}, r.fluentd.Refresh(buff) -} - -func getFluentdConfig(cl client.Client) ([]byte, error) { - // Simple algorithm to render all outputs once one changes. This lets us keep thing simple and write entire config - // as one. - instances := &loggingv1alpha1.OutputList{} - lo := client.ListOptions{} - - err := cl.List(context.TODO(), instances, &lo) - - if err != nil { - return []byte{}, err - } - - // Source rendering is not configurable yet. - renderers := []resources.Resource{ - resources.NewSystem(), - resources.NewSource(), - } - - for i := range instances.Items { - renderers = append(renderers, resources.NewOutput(cl, &instances.Items[i])) - } - - var buff []byte - var newline bytes.Buffer - fmt.Fprintf(&newline, "\n\n") - for _, r := range renderers { - out, err := r.Render() - if err != nil { - return []byte{}, err - } - buff = append(buff, out...) - buff = append(buff, newline.Bytes()...) - } - - return buff, nil -} +var _ reconcile.Reconciler = &output.Reconciler{} diff --git a/pkg/controller/output/output_controller_test.go b/pkg/controller/output/output_controller_test.go index 20816b9..bb368d6 100644 --- a/pkg/controller/output/output_controller_test.go +++ b/pkg/controller/output/output_controller_test.go @@ -16,10 +16,8 @@ package output import ( "context" "encoding/json" - "testing" loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1" - "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" @@ -99,11 +97,3 @@ func (t *TestClient) List(ctx context.Context, list runtime.Object, opt ...clien func (t *TestClient) Status() client.StatusWriter { return t.sw } - -func TestFluentdConfig(t *testing.T) { - cl := NewTestClient() - buf, err := getFluentdConfig(cl) - t.Log(err) - assert.Nil(t, err) - assert.NotEmpty(t, buf) -} diff --git a/pkg/output/reconciler.go b/pkg/output/reconciler.go new file mode 100644 index 0000000..dd691dc --- /dev/null +++ b/pkg/output/reconciler.go @@ -0,0 +1,103 @@ +package output + +import ( + "bytes" + "context" + "fmt" + + loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1" + "github.com/platform9/fluentd-operator/pkg/fluentd" + "github.com/platform9/fluentd-operator/pkg/resources" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" +) + +var log = logf.Log.WithName("output_reconciler") + +// Reconciler reconciles a Output object +type Reconciler struct { + // This client, initialized using mgr.Client() above, is a split client + // that reads objects from the cache and writes to the apiserver + client client.Client + scheme *runtime.Scheme + fluentd *fluentd.Reconciler +} + +// New returns new instance of reconciler +func New(mgr manager.Manager) *Reconciler { + return &Reconciler{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + fluentd: fluentd.New(mgr), + } +} + +// Reconcile reads that state of the cluster for a Output object and makes changes based on the state read +// and what is in the Output.Spec +// Note: +// The Controller will requeue the Request to be processed again if the returned error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (r *Reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + reqLogger := log.WithValues("Request.Name", request.Name) + reqLogger.Info("Reconciling Output") + + // Fetch the Output instance + instance := &loggingv1alpha1.Output{} + err := r.client.Get(context.TODO(), request.NamespacedName, instance) + if err != nil { + if !errors.IsNotFound(err) { + // Error reading object, requeue + return reconcile.Result{}, err + } + } + + buff, err := getFluentdConfig(r.client) + if err != nil { + return reconcile.Result{}, err + } + + // Update configmap for fluentd + log.Info("Refreshing fluentd...") + return reconcile.Result{}, r.fluentd.Refresh(buff) +} + +func getFluentdConfig(cl client.Client) ([]byte, error) { + // Simple algorithm to render all outputs once one changes. This lets us keep thing simple and write entire config + // as one. + instances := &loggingv1alpha1.OutputList{} + lo := client.ListOptions{} + + err := cl.List(context.TODO(), instances, &lo) + + if err != nil { + return []byte{}, err + } + + // Source rendering is not configurable yet. + renderers := []resources.Resource{ + resources.NewSystem(), + resources.NewSource(), + } + + for i := range instances.Items { + renderers = append(renderers, resources.NewOutput(cl, &instances.Items[i])) + } + + var buff []byte + var newline bytes.Buffer + fmt.Fprintf(&newline, "\n\n") + for _, r := range renderers { + out, err := r.Render() + if err != nil { + return []byte{}, err + } + buff = append(buff, out...) + buff = append(buff, newline.Bytes()...) + } + + return buff, nil +} From b988e67e99b5fd29b43d1139e339d7afb85f1bb0 Mon Sep 17 00:00:00 2001 From: mugdha-adhav Date: Sun, 15 Mar 2020 22:10:41 +0530 Subject: [PATCH 2/3] Modify elasticsearch example to use logging namespace --- examples/elastic-search/templates/deploy-elasticsearch.yaml | 6 +++--- examples/elastic-search/templates/object-elasticsearch.yaml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/elastic-search/templates/deploy-elasticsearch.yaml b/examples/elastic-search/templates/deploy-elasticsearch.yaml index 24b7577..788ae55 100644 --- a/examples/elastic-search/templates/deploy-elasticsearch.yaml +++ b/examples/elastic-search/templates/deploy-elasticsearch.yaml @@ -1,7 +1,7 @@ kind: Namespace apiVersion: v1 metadata: - name: es-logging + name: logging --- @@ -9,7 +9,7 @@ apiVersion: apps/v1 kind: Deployment metadata: name: elasticsearch - namespace: es-logging + namespace: logging spec: selector: matchLabels: @@ -36,7 +36,7 @@ apiVersion: v1 kind: Service metadata: name: elasticsearch - namespace: es-logging + namespace: logging labels: service: elasticsearch spec: diff --git a/examples/elastic-search/templates/object-elasticsearch.yaml b/examples/elastic-search/templates/object-elasticsearch.yaml index 1183259..7a099b7 100644 --- a/examples/elastic-search/templates/object-elasticsearch.yaml +++ b/examples/elastic-search/templates/object-elasticsearch.yaml @@ -6,7 +6,7 @@ spec: type: elasticsearch params: - name: url - value: http://elasticsearch.es-logging.svc.cluster.local:9200 + value: http://elasticsearch.logging.svc.cluster.local:9200 - name: user value: test-elastic - name: password From cb24660dbbb4e04b8dde0fd1ea6388a78d064766 Mon Sep 17 00:00:00 2001 From: mugdha-adhav Date: Mon, 16 Mar 2020 10:18:36 +0530 Subject: [PATCH 3/3] Add elasticsearch as default datastore in fluentd-operator helper --- cmd/helper/helper.go | 92 +++++++++++++++++++++++++++++++++++++++----- go.sum | 1 + 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/cmd/helper/helper.go b/cmd/helper/helper.go index 067c6e3..9cf6c42 100644 --- a/cmd/helper/helper.go +++ b/cmd/helper/helper.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path" + "time" loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1" logclient "github.com/platform9/fluentd-operator/pkg/client/clientset/versioned" @@ -27,13 +28,17 @@ var mode string var logLevel string var dataNs string var dataSrc string +var dataStore string +var timeout int const ( - defaultMode = "standalone" - defaultLogLevel = "INFO" - defaultDataNs = "pf9-operators" - defaultDataSrc = "pf9-log" - defaultObjNs = "logging" + defaultMode = "k8s" + defaultLogLevel = "INFO" + defaultDataNs = "pf9-operators" + defaultDataSrc = "pf9-log" + defaultObjNs = "logging" + defaultDataStore = "elasticsearch" + defaultTimeout = 10 ) var ( @@ -44,6 +49,14 @@ var ( } ) +// esParams stores elasticsearch data +type esParams struct { + Name string + Namespace string + Deployment string + Port uint16 +} + // Main starts it all func Main() int { log.SetFormatter(&log.JSONFormatter{}) @@ -55,7 +68,7 @@ func Main() int { apiClient, err := apixv1beta1client.NewForConfig(config) errExit("Failed to create client", err) - checkCRDExists(apiClient) + waitForCRDs(apiClient) log.Print("Found output CRD") cs, err := kubernetes.NewForConfig(config) @@ -65,6 +78,9 @@ func Main() int { lc, err := logclient.NewForConfig(config) errExit("Failed to create logging operator client", err) + log.Print("Configuring default backend datastore") + configureDataStore(cs.CoreV1(), lc) + log.Print("Creating Output CRs") createCrs(cs.CoreV1(), lc) @@ -104,11 +120,43 @@ func getByKubeCfg() (*rest.Config, error) { return clientcmd.BuildConfigFromFlags("", defaultKubeCfg) } -func checkCRDExists(apixClient apixv1beta1client.ApiextensionsV1beta1Interface) { - _, err := apixClient.CustomResourceDefinitions().Get("outputs.logging.pf9.io", metav1.GetOptions{}) +func waitForCRDs(apixClient apixv1beta1client.ApiextensionsV1beta1Interface) { + timeoutDuration := time.After(time.Duration(timeout) * time.Minute) + tickDuration := time.Tick(10 * time.Second) + + for { + select { + case <-timeoutDuration: + errExit("waiting for CRD's", fmt.Errorf("Timed out waiting for CRD's to come up")) + return + case <-tickDuration: + _, err := apixClient.CustomResourceDefinitions().Get("outputs.logging.pf9.io", metav1.GetOptions{}) + if err == nil { + return + } + } + } +} - errExit("Error while querying output CRD", err) +func configureDataStore(coreClient corev1.CoreV1Interface, lc logclient.LoggingV1alpha1Interface) { + object := &loggingv1alpha1.Output{} + + // Check default datastore to use and set values + if dataStore == "elasticsearch" { + // Creating Output object for elastic search + params := &esParams{ + Name: "es-objstore", + Namespace: dataNs, + Deployment: "elasticsearch", + Port: 9200, + } + object = params.getESOutputObject() + } + _, err := lc.LoggingV1alpha1().Outputs().Create(object) + if err != nil { + errExit("while creating default Output object", err) + } } func createCrs(coreClient corev1.CoreV1Interface, lc logclient.LoggingV1alpha1Interface) { @@ -148,6 +196,25 @@ func errExit(msg string, err error) { } } +func (p *esParams) getESOutputObject() *loggingv1alpha1.Output { + url := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", p.Deployment, p.Namespace, p.Port) + return &loggingv1alpha1.Output{ + ObjectMeta: metav1.ObjectMeta{ + Name: p.Name, + Namespace: p.Namespace, + }, + Spec: loggingv1alpha1.OutputSpec{ + Type: "elasticsearch", + Params: []loggingv1alpha1.Param{ + { + Name: "url", + Value: url, + }, + }, + }, + } +} + func buildCmd() *cobra.Command { cobra.OnInitialize(initCfg) rootCmd := &cobra.Command{ @@ -162,6 +229,7 @@ func buildCmd() *cobra.Command { pf := rootCmd.PersistentFlags() pf.StringVar(&mode, "mode", defaultMode, "Operational mode: k8s or standalone") viper.BindPFlag("mode", pf.Lookup("mode")) + pf.StringVar(&logLevel, "log-level", defaultLogLevel, "Log level: DEBUG, INFO, WARN or FATAL") viper.BindPFlag("log-level", pf.Lookup("log-level")) @@ -171,6 +239,12 @@ func buildCmd() *cobra.Command { pf.StringVar(&dataSrc, "datasource", defaultDataSrc, "Name of secret for user config data") viper.BindPFlag("datasource", pf.Lookup("datasource")) + pf.StringVar(&dataStore, "datastore", defaultDataStore, "Name of the backend datastore") + viper.BindPFlag("datastore", pf.Lookup("datastore")) + + pf.IntVar(&timeout, "timeout", defaultTimeout, "Wait period for the CRD's") + viper.BindPFlag("timeout", pf.Lookup("timeout")) + return rootCmd } diff --git a/go.sum b/go.sum index 2658788..fd3e3c9 100644 --- a/go.sum +++ b/go.sum @@ -774,6 +774,7 @@ k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655/go.mod h1:nL6pwRT8NgfF8TT k8s.io/apimachinery v0.17.2/go.mod h1:b9qmWdKlLuU9EBh+06BtLcSf/Mu89rWL33naRxs1uZg= k8s.io/apimachinery v0.17.3 h1:f+uZV6rm4/tHE7xXgLyToprg6xWairaClGVkm2t8omg= k8s.io/apimachinery v0.17.3/go.mod h1:gxLnyZcGNdZTCLnq3fgzyg2A5BVCHTNDFrw8AmuJ+0g= +k8s.io/apimachinery v0.17.4 h1:UzM+38cPUJnzqSQ+E1PY4YxMHIzQyCg29LOoGfo79Zw= k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad/go.mod h1:XPCXEwhjaFN29a8NldXA901ElnKeKLrLtREO9ZhFyhg= k8s.io/apiserver v0.17.2/go.mod h1:lBmw/TtQdtxvrTk0e2cgtOxHizXI+d0mmGQURIHQZlo= k8s.io/apiserver v0.17.3/go.mod h1:iJtsPpu1ZpEnHaNawpSV0nYTGBhhX2dUlnn7/QS7QiY=