Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 83 additions & 9 deletions cmd/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"time"

loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1"
logclient "github.com/platform9/fluentd-operator/pkg/client/clientset/versioned"
Expand All @@ -27,13 +28,17 @@ var mode string
var logLevel string
var dataNs string
var dataSrc string
var dataStore string
var timeout int

const (
defaultMode = "standalone"
defaultLogLevel = "INFO"
defaultDataNs = "pf9-operators"
defaultDataSrc = "pf9-log"
defaultObjNs = "logging"
defaultMode = "k8s"
defaultLogLevel = "INFO"
defaultDataNs = "pf9-operators"
defaultDataSrc = "pf9-log"
defaultObjNs = "logging"
defaultDataStore = "elasticsearch"
defaultTimeout = 10
)

var (
Expand All @@ -44,6 +49,14 @@ var (
}
)

// esParams stores elasticsearch data
type esParams struct {
Name string
Namespace string
Deployment string
Port uint16
}

// Main starts it all
func Main() int {
log.SetFormatter(&log.JSONFormatter{})
Expand All @@ -55,7 +68,7 @@ func Main() int {
apiClient, err := apixv1beta1client.NewForConfig(config)
errExit("Failed to create client", err)

checkCRDExists(apiClient)
waitForCRDs(apiClient)
log.Print("Found output CRD")

cs, err := kubernetes.NewForConfig(config)
Expand All @@ -65,6 +78,9 @@ func Main() int {
lc, err := logclient.NewForConfig(config)
errExit("Failed to create logging operator client", err)

log.Print("Configuring default backend datastore")
configureDataStore(cs.CoreV1(), lc)

log.Print("Creating Output CRs")
createCrs(cs.CoreV1(), lc)

Expand Down Expand Up @@ -104,11 +120,43 @@ func getByKubeCfg() (*rest.Config, error) {
return clientcmd.BuildConfigFromFlags("", defaultKubeCfg)
}

func checkCRDExists(apixClient apixv1beta1client.ApiextensionsV1beta1Interface) {
_, err := apixClient.CustomResourceDefinitions().Get("outputs.logging.pf9.io", metav1.GetOptions{})
func waitForCRDs(apixClient apixv1beta1client.ApiextensionsV1beta1Interface) {
timeoutDuration := time.After(time.Duration(timeout) * time.Minute)
tickDuration := time.Tick(10 * time.Second)

for {
select {
case <-timeoutDuration:
errExit("waiting for CRD's", fmt.Errorf("Timed out waiting for CRD's to come up"))
return
case <-tickDuration:
_, err := apixClient.CustomResourceDefinitions().Get("outputs.logging.pf9.io", metav1.GetOptions{})
if err == nil {
return
}
}
}
}

errExit("Error while querying output CRD", err)
func configureDataStore(coreClient corev1.CoreV1Interface, lc logclient.LoggingV1alpha1Interface) {
object := &loggingv1alpha1.Output{}

// Check default datastore to use and set values
if dataStore == "elasticsearch" {
// Creating Output object for elastic search
params := &esParams{
Name: "es-objstore",
Namespace: dataNs,
Deployment: "elasticsearch",
Port: 9200,
}
object = params.getESOutputObject()
}

_, err := lc.LoggingV1alpha1().Outputs().Create(object)
if err != nil {
errExit("while creating default Output object", err)
}
}

func createCrs(coreClient corev1.CoreV1Interface, lc logclient.LoggingV1alpha1Interface) {
Expand Down Expand Up @@ -148,6 +196,25 @@ func errExit(msg string, err error) {
}
}

func (p *esParams) getESOutputObject() *loggingv1alpha1.Output {
url := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", p.Deployment, p.Namespace, p.Port)
return &loggingv1alpha1.Output{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: p.Namespace,
},
Spec: loggingv1alpha1.OutputSpec{
Type: "elasticsearch",
Params: []loggingv1alpha1.Param{
{
Name: "url",
Value: url,
},
},
},
}
}

func buildCmd() *cobra.Command {
cobra.OnInitialize(initCfg)
rootCmd := &cobra.Command{
Expand All @@ -162,6 +229,7 @@ func buildCmd() *cobra.Command {
pf := rootCmd.PersistentFlags()
pf.StringVar(&mode, "mode", defaultMode, "Operational mode: k8s or standalone")
viper.BindPFlag("mode", pf.Lookup("mode"))

pf.StringVar(&logLevel, "log-level", defaultLogLevel, "Log level: DEBUG, INFO, WARN or FATAL")
viper.BindPFlag("log-level", pf.Lookup("log-level"))

Expand All @@ -171,6 +239,12 @@ func buildCmd() *cobra.Command {
pf.StringVar(&dataSrc, "datasource", defaultDataSrc, "Name of secret for user config data")
viper.BindPFlag("datasource", pf.Lookup("datasource"))

pf.StringVar(&dataStore, "datastore", defaultDataStore, "Name of the backend datastore")
viper.BindPFlag("datastore", pf.Lookup("datastore"))

pf.IntVar(&timeout, "timeout", defaultTimeout, "Wait period for the CRD's")
viper.BindPFlag("timeout", pf.Lookup("timeout"))

return rootCmd
}

Expand Down
6 changes: 3 additions & 3 deletions examples/elastic-search/templates/deploy-elasticsearch.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
kind: Namespace
apiVersion: v1
metadata:
name: es-logging
name: logging

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: elasticsearch
namespace: es-logging
namespace: logging
spec:
selector:
matchLabels:
Expand All @@ -36,7 +36,7 @@ apiVersion: v1
kind: Service
metadata:
name: elasticsearch
namespace: es-logging
namespace: logging
labels:
service: elasticsearch
spec:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ spec:
type: elasticsearch
params:
- name: url
value: http://elasticsearch.es-logging.svc.cluster.local:9200
value: http://elasticsearch.logging.svc.cluster.local:9200
- name: user
value: test-elastic
- name: password
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ github.com/coreos/etcd v3.3.15+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand Down Expand Up @@ -470,6 +471,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0fZcnECiDrKJsfxka0=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
Expand Down Expand Up @@ -772,6 +774,7 @@ k8s.io/apimachinery v0.0.0-20190913080033-27d36303b655/go.mod h1:nL6pwRT8NgfF8TT
k8s.io/apimachinery v0.17.2/go.mod h1:b9qmWdKlLuU9EBh+06BtLcSf/Mu89rWL33naRxs1uZg=
k8s.io/apimachinery v0.17.3 h1:f+uZV6rm4/tHE7xXgLyToprg6xWairaClGVkm2t8omg=
k8s.io/apimachinery v0.17.3/go.mod h1:gxLnyZcGNdZTCLnq3fgzyg2A5BVCHTNDFrw8AmuJ+0g=
k8s.io/apimachinery v0.17.4 h1:UzM+38cPUJnzqSQ+E1PY4YxMHIzQyCg29LOoGfo79Zw=
k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad/go.mod h1:XPCXEwhjaFN29a8NldXA901ElnKeKLrLtREO9ZhFyhg=
k8s.io/apiserver v0.17.2/go.mod h1:lBmw/TtQdtxvrTk0e2cgtOxHizXI+d0mmGQURIHQZlo=
k8s.io/apiserver v0.17.3/go.mod h1:iJtsPpu1ZpEnHaNawpSV0nYTGBhhX2dUlnn7/QS7QiY=
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/fluentbit/fluentbit_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ func newReconciler(mgr manager.Manager) *fluentbit.Reconciler {
// add adds a new Controller to mgr
func add(mgr manager.Manager, r *fluentbit.Reconciler) error {
// Create a new controller
c, err := controller.New("fluentbit-controller", mgr, controller.Options{
Reconciler: r,
})

c, err := controller.New("fluentbit-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
Expand Down
102 changes: 4 additions & 98 deletions pkg/controller/output/output_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,8 @@ limitations under the License.
package output

import (
"bytes"
"context"
"fmt"

"github.com/platform9/fluentd-operator/pkg/fluentd"

loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1"
"github.com/platform9/fluentd-operator/pkg/resources"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/platform9/fluentd-operator/pkg/output"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -43,11 +34,7 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileOutput{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
fluentd: fluentd.New(mgr),
}
return output.New(mgr)
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -59,89 +46,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to primary resource Output
err = c.Watch(&source.Kind{Type: &loggingv1alpha1.Output{}}, &handler.EnqueueRequestForObject{})
if err != nil {
log.Error(err, "Error adding watch")
return err
}

return nil
return c.Watch(&source.Kind{Type: &loggingv1alpha1.Output{}}, &handler.EnqueueRequestForObject{})
}

// blank assignment to verify that ReconcileOutput implements reconcile.Reconciler
var _ reconcile.Reconciler = &ReconcileOutput{}

// ReconcileOutput reconciles a Output object
type ReconcileOutput struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
fluentd *fluentd.Reconciler
}

// Reconcile reads that state of the cluster for a Output object and makes changes based on the state read
// and what is in the Output.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileOutput) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Name", request.Name)
reqLogger.Info("Reconciling Output")

// Fetch the Output instance
instance := &loggingv1alpha1.Output{}
err := r.client.Get(context.TODO(), request.NamespacedName, instance)
if err != nil {
if !errors.IsNotFound(err) {
// Error reading object, requeue
return reconcile.Result{}, err
}
}

buff, err := getFluentdConfig(r.client)
if err != nil {
return reconcile.Result{}, err
}

// Update configmap for fluentd
log.Info("Refreshing fluentd...")
return reconcile.Result{}, r.fluentd.Refresh(buff)
}

func getFluentdConfig(cl client.Client) ([]byte, error) {
// Simple algorithm to render all outputs once one changes. This lets us keep thing simple and write entire config
// as one.
instances := &loggingv1alpha1.OutputList{}
lo := client.ListOptions{}

err := cl.List(context.TODO(), instances, &lo)

if err != nil {
return []byte{}, err
}

// Source rendering is not configurable yet.
renderers := []resources.Resource{
resources.NewSystem(),
resources.NewSource(),
}

for i := range instances.Items {
renderers = append(renderers, resources.NewOutput(cl, &instances.Items[i]))
}

var buff []byte
var newline bytes.Buffer
fmt.Fprintf(&newline, "\n\n")
for _, r := range renderers {
out, err := r.Render()
if err != nil {
return []byte{}, err
}
buff = append(buff, out...)
buff = append(buff, newline.Bytes()...)
}

return buff, nil
}
var _ reconcile.Reconciler = &output.Reconciler{}
10 changes: 0 additions & 10 deletions pkg/controller/output/output_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package output
import (
"context"
"encoding/json"
"testing"

loggingv1alpha1 "github.com/platform9/fluentd-operator/pkg/apis/logging/v1alpha1"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -99,11 +97,3 @@ func (t *TestClient) List(ctx context.Context, list runtime.Object, opt ...clien
func (t *TestClient) Status() client.StatusWriter {
return t.sw
}

func TestFluentdConfig(t *testing.T) {
cl := NewTestClient()
buf, err := getFluentdConfig(cl)
t.Log(err)
assert.Nil(t, err)
assert.NotEmpty(t, buf)
}
Loading