Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 51 additions & 63 deletions internal/component/loki/source/aws_firehose/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ package aws_firehose

import (
"context"
"reflect"
"sync"

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/syntax/alloytypes"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/common/loki"
fnet "github.com/grafana/alloy/internal/component/common/net"
alloy_relabel "github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/loki/source/aws_firehose/internal"
"github.com/grafana/alloy/internal/component/common/relabel"
"github.com/grafana/alloy/internal/component/loki/source"
"github.com/grafana/alloy/internal/util"
)

Expand All @@ -37,7 +34,7 @@ type Arguments struct {
AccessKey alloytypes.Secret `alloy:"access_key,attr,optional"`
UseIncomingTimestamp bool `alloy:"use_incoming_timestamp,attr,optional"`
ForwardTo []loki.LogsReceiver `alloy:"forward_to,attr"`
RelabelRules alloy_relabel.Rules `alloy:"relabel_rules,attr,optional"`
RelabelRules relabel.Rules `alloy:"relabel_rules,attr,optional"`
}

// SetToDefault implements syntax.Defaulter.
Expand All @@ -52,26 +49,25 @@ type Component struct {
opts component.Options
logger log.Logger

serverMetrics *util.UncheckedCollector
handlerMetrics *internal.Metrics
metrics *metrics
serverMetrics *util.UncheckedCollector

fanout *loki.Fanout
handler loki.LogsReceiver
handler loki.LogsBatchReceiver

mut sync.Mutex
args Arguments
rbs []*relabel.Config
server *fnet.TargetServer
server *source.Server
}

// New creates a new Component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
handler: loki.NewLogsReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
serverMetrics: util.NewUncheckedCollector(nil),
handlerMetrics: internal.NewMetrics(o.Registerer),
metrics: newMetrics(o.Registerer),
opts: o,
handler: loki.NewLogsBatchReceiver(),
fanout: loki.NewFanout(args.ForwardTo),
serverMetrics: util.NewUncheckedCollector(nil),

logger: log.With(o.Logger, "component", "aws_firehose_logs"),
}
Expand All @@ -91,11 +87,11 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.Lock()
defer c.mut.Unlock()
if c.server != nil {
c.server.StopAndShutdown()
c.server.ForceShutdown()
}
}()

loki.Consume(ctx, c.handler, c.fanout)
loki.ConsumeBatch(ctx, c.handler, c.fanout)
return nil
}

Expand All @@ -109,62 +105,54 @@ func (c *Component) Update(args component.Arguments) error {

c.fanout.UpdateChildren(newArgs.ForwardTo)

var newRelabels []*relabel.Config = nil
// first condition to consider if the handler needs to be updated is if the UseIncomingTimestamp field
// changed
var handlerNeedsUpdate = c.args.UseIncomingTimestamp != newArgs.UseIncomingTimestamp

// then, if the relabel rules changed
if len(newArgs.RelabelRules) > 0 {
handlerNeedsUpdate = true
newRelabels = alloy_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules)
} else if len(c.rbs) > 0 && len(newArgs.RelabelRules) == 0 {
// nil out relabel rules if they need to be cleared
handlerNeedsUpdate = true
if newArgs.Server == nil {
newArgs.Server = &fnet.ServerConfig{}
}

if c.args.AccessKey != newArgs.AccessKey {
handlerNeedsUpdate = true
if newArgs.Server.GRPC == nil {
newArgs.Server.GRPC = &fnet.GRPCConfig{
ListenPort: 0,
ListenAddress: "127.0.0.1",
}
}

// Since the handler is created ad-hoc for the server, and the handler depends on the relabels
// consider this as a cause for server restart as well. Much simpler than adding a lock on the
// handler and doing the relabel rules change on the fly
serverNeedsUpdate := !reflect.DeepEqual(c.args.Server, newArgs.Server)
if !serverNeedsUpdate && !handlerNeedsUpdate {
c.args = newArgs
return nil
}
serverNeedsRestart := c.server.NeedsRestart(newArgs.Server) || c.args.AccessKey != newArgs.AccessKey
if serverNeedsRestart {
if c.server != nil {
c.server.Shutdown()
}

if c.server != nil {
c.server.StopAndShutdown()
}
registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)

c.server, err = source.NewServer(c.logger, registry, c.handler, source.ServerConfig{
Namespace: "loki_source_awsfirehose",
EntriesWritten: c.metrics.entriesWritten,
NetConfig: newArgs.Server,
LogsConfig: &source.LogsConfig{
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
},
})
if err != nil {
return err
}

// update relabel rules in component if needed
if handlerNeedsUpdate {
c.rbs = newRelabels
}
if err = c.server.Run(newRoutes(c.metrics, string(newArgs.AccessKey)), nil); err != nil {
return err
}

registry := prometheus.NewRegistry()
c.serverMetrics.SetCollector(registry)
c.server, err = fnet.NewTargetServer(c.logger, "loki_source_awsfirehose", registry, newArgs.Server)
if err != nil {
return err
c.args = newArgs
return nil
}

if err = c.server.MountAndRun(func(router *mux.Router) {
// re-create handler when server is re-computed
handler := internal.NewHandler(c, c.logger, c.handlerMetrics, c.rbs, newArgs.UseIncomingTimestamp, string(newArgs.AccessKey))
router.Path("/awsfirehose/api/v1/push").Methods("POST").Handler(handler)
}); err != nil {
return err
if c.server != nil {
c.server.Update(&source.LogsConfig{
RelabelRules: relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules),
UseIncomingTimestamp: newArgs.UseIncomingTimestamp,
})
}

c.args = newArgs
return nil
}

// Send implements internal.Sender so that the component is able to receive logs decoded by the handler.
func (c *Component) Send(ctx context.Context, entry loki.Entry) {
c.handler.Chan() <- entry
}
25 changes: 0 additions & 25 deletions internal/component/loki/source/aws_firehose/internal/errors.go

This file was deleted.

Loading
Loading