From 38894f6e48171c7c757a3610b203dad70915d4bb Mon Sep 17 00:00:00 2001 From: ethfoo Date: Thu, 13 Jul 2023 22:42:02 +0800 Subject: [PATCH 01/19] Release: add v1.5.0-rc.0 changelog --- Makefile | 2 +- doc/changelog/CHANGELOG-v1.md | 91 ++++++++++++++++++++++++++++++++ pkg/sink/elasticsearch/config.go | 6 --- pkg/util/persistence/config.go | 1 - 4 files changed, 92 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 00a2f8419..8929bff45 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,7 @@ docker-build: ## Docker build -t ${REPO}:${TAG}, try: make docker-build REPO=, ${TAG} generated by git +docker-multi-arch: ## Docker buildx, try: make docker-multi-arch REPO=, ${TAG} generated by git docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push LOG_DIR ?= /tmp/log ## log directory diff --git a/doc/changelog/CHANGELOG-v1.md b/doc/changelog/CHANGELOG-v1.md index 533013337..effe93f8e 100644 --- a/doc/changelog/CHANGELOG-v1.md +++ b/doc/changelog/CHANGELOG-v1.md @@ -1,3 +1,94 @@ +# Release v1.5.0-rc.0 + +### :star2: Features +- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible. + +- Added rocketmq sink (#530) +- Added franzKafka source (#573) +- Added kata runtime (#554) +- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450) +- sink codec support printEvents (#448) +- Added queue in LogConfig/ClusterLogConfig (#457) +- Changed `olivere/elastic` to the official elasticsearch go client (#581) +- Supported `copytruncate` in file source (#571) +- Added `genfiles` sub command (#471) +- Added queue in LogConfig/ClusterLogConfig queue (#457) +- Added `sortBy` field in elasticsearch source (#473) +- Added host VM mode with Kubernetes as the configuration center (#449) (#489) +- New `addHostMeta` interceptor (#474) +- Added persistence driver `badger` (#475) (#584) +- Ignore LogConfig with sidecar injection annotation (#478) +- Added `toStr` action in transformer interceptor (#482) +- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460) +- Get loggie version with api and sub command (#496) (#508) +- Added the `worker` and the `clientId` in Kafka source (#506) (#507) +- Upgrade `kafka-go` version (#506) (#567) +- Added resultStatus in dev sink which can be used to simulate failure, drop (#531) +- Pretty error when unmarshal yaml configuration failed (#539) +- Added default topic if render kafka topic failed (#550) +- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560) +- Supported multiple topics in kafka source (#548) +- Added default index if render elasticsearch index failed (#551) (#553) +- The default `maxOpenFds` is set to 4096 (#559) +- Supported default `sinkRef` in kubernetes discovery (#555) +- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569) +- Supported omit empty fields in Kubernetes discovery (#570) +- Optimizes `maxbytes` interceptors (#575) +- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585) +- Added `cheanUnfinished` in cleanFiles (#580) +- Added `target` in `maxbyte` interceptor (#588) +- Added `partionKey` in franzKafka (#562) +- Added `highPrecision` in `rateLimit` interceptor (#525) + +### :bug: Bug Fixes +- Fixed panic when kubeEvent Series is nil (#459) +- Upgraded `automaxprocs` version to v1.5.1 (#488) +- Fixed set defaults failed in `fieldsUnderKey` (#513) +- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515) +- Fixed grpc batch out-of-order data streams (#517) +- Fixed large line may cause oom (#529) +- Fixed duplicated batchSize in queue (#533) +- Fixed sqlite locked panic (#524) +- Fixed command can't be used in multi-arch container (#541) +- Fixed `logger listener` may cause block (#561) (#552) +- Fixed `sink concurrency` deepCopy failed (#563) +- Drop events when partial error in elasticsearch sink (#572) + +# Release v1.4.0 + +### :star2: Features + +- Added Loggie dashboard feature for easier troubleshooting (#416) +- Enhanced log alerting function with more flexible log alert detection rules and added alertWebhook sink (#392) +- Added sink concurrency support for automatic adaptation based on downstream delay (#376) +- Added franzKafka sink for users who prefer the franz kafka library (#423) +- Added elasticsearch source (#345) +- Added zinc sink (#254) +- Added pulsar sink (#417) +- Added grok action to transformer interceptor (#418) +- Added split action to transformer interceptor (#411) +- Added jsonEncode action to transformer interceptor (#421) +- Added fieldsFromPath configuration to source for obtaining fields from file content (#401) +- Added fieldsRef parameter to filesource listener for obtaining key value from fields configuration and adding to metrics as label (#402) +- In transformer interceptor, added dropIfError support to drop event if action execution fails (#409) +- Added info listener which currently exposes loggie_info_stat metrics and displays version label (#410) +- Added support for customized kafka sink partition key +- Added sasl support to Kafka source (#415) +- Added https insecureSkipVerify support to loki sink (#422) +- Optimized file source for large files (#430) +- Changed default value of file source maxOpenFds to 1024 (#437) +- ContainerRuntime can now be set to none (#439) +- Upgraded to go 1.18 (#440) +- Optimize the configuration parameters to remove the redundancy generated by rendering + +### :bug: Bug Fixes + +- Added source fields to filesource listener (#402) +- Fixed issue of transformer copy action not copying non-string body (#420) +- Added fetching of logs file from UpperDir when rootfs collection is enabled (#414) +- Fix pipeline restart npe (#454) +- Fix create dir soft link job (#453) + # Release v1.4.0-rc.0 ### :star2: Features diff --git a/pkg/sink/elasticsearch/config.go b/pkg/sink/elasticsearch/config.go index b27606c96..fddb51dfc 100644 --- a/pkg/sink/elasticsearch/config.go +++ b/pkg/sink/elasticsearch/config.go @@ -39,12 +39,6 @@ type Config struct { SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"` } -type TLS struct { - CAFile string `yaml:"caFile,omitempty"` - CertFile string `yaml:"certFile,omitempty"` - KeyFile string `yaml:"keyFile,omitempty"` -} - type RenderIndexFail struct { DropEvent bool `yaml:"dropEvent,omitempty" default:"true"` IgnoreError bool `yaml:"ignoreError,omitempty"` diff --git a/pkg/util/persistence/config.go b/pkg/util/persistence/config.go index a97394478..d213643b1 100644 --- a/pkg/util/persistence/config.go +++ b/pkg/util/persistence/config.go @@ -34,7 +34,6 @@ type DbConfig struct { File string `yaml:"file,omitempty"` FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"` BufferSize int `yaml:"bufferSize,omitempty" default:"2048"` - TableName string `yaml:"tableName,omitempty" default:"registry"` CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"` } From e6c4e3782046f03a8e066f21b093412e5cf9b20b Mon Sep 17 00:00:00 2001 From: snowsi <42464513+snowsi@users.noreply.github.com> Date: Fri, 21 Jul 2023 11:14:11 +0800 Subject: [PATCH 02/19] Fix: Convert to string type when returning [] byte type (#596) Co-authored-by: jishengjie --- pkg/interceptor/transformer/condition/equal.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/interceptor/transformer/condition/equal.go b/pkg/interceptor/transformer/condition/equal.go index f704ac055..eca2f5e21 100644 --- a/pkg/interceptor/transformer/condition/equal.go +++ b/pkg/interceptor/transformer/condition/equal.go @@ -51,5 +51,10 @@ func NewEqual(args []string) (*Equal, error) { } func (eq *Equal) Check(e api.Event) bool { - return eq.value == eventops.Get(e, eq.field) + value := eventops.Get(e, eq.field) + if byteValue, ok := value.([]byte); ok { + value = string(byteValue) + } + + return eq.value == value } From 1ea5bf997e32d95994c7a7583aa019de442c57f8 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Mon, 24 Jul 2023 17:39:53 +0800 Subject: [PATCH 03/19] Feat: support build core components of Loggie (#598) --- Makefile | 8 ++-- pkg/include/{include.go => all.go} | 4 +- pkg/include/core.go | 62 ++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) rename pkg/include/{include.go => all.go} (98%) create mode 100644 pkg/include/core.go diff --git a/Makefile b/Makefile index 8929bff45..e3f41e088 100644 --- a/Makefile +++ b/Makefile @@ -82,13 +82,13 @@ benchmark: ## Run benchmark ##@ Build -build: ## go build - CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go +build: ## go build, EXT_BUILD_TAGS=include_core would only build core package + CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -tags ${EXT_BUILD_TAGS} -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go ##@ Build(without sqlite) -build-in-badger: ## go build without sqlite - GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go +build-in-badger: ## go build without sqlite, EXT_BUILD_TAGS=include_core would only build core package + GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger,${EXT_BUILD_TAGS} -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go ##@ Images diff --git a/pkg/include/include.go b/pkg/include/all.go similarity index 98% rename from pkg/include/include.go rename to pkg/include/all.go index d4180bc2a..9b1e7cb24 100644 --- a/pkg/include/include.go +++ b/pkg/include/all.go @@ -1,5 +1,7 @@ +//go:build !include_core + /* -Copyright 2021 Loggie Authors +Copyright 2023 Loggie Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/include/core.go b/pkg/include/core.go new file mode 100644 index 000000000..912a41852 --- /dev/null +++ b/pkg/include/core.go @@ -0,0 +1,62 @@ +//go:build include_core + +/* +Copyright 2023 Loggie Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package include + +import ( + _ "github.com/loggie-io/loggie/pkg/eventbus/export/prometheus" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/filesource" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/info" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/logalerting" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/queue" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/reload" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/sink" + _ "github.com/loggie-io/loggie/pkg/eventbus/listener/sys" + _ "github.com/loggie-io/loggie/pkg/interceptor/addhostmeta" + _ "github.com/loggie-io/loggie/pkg/interceptor/addk8smeta" + _ "github.com/loggie-io/loggie/pkg/interceptor/limit" + _ "github.com/loggie-io/loggie/pkg/interceptor/logalert" + _ "github.com/loggie-io/loggie/pkg/interceptor/logalert/condition" + _ "github.com/loggie-io/loggie/pkg/interceptor/maxbytes" + _ "github.com/loggie-io/loggie/pkg/interceptor/metric" + _ "github.com/loggie-io/loggie/pkg/interceptor/retry" + _ "github.com/loggie-io/loggie/pkg/interceptor/schema" + _ "github.com/loggie-io/loggie/pkg/interceptor/transformer" + _ "github.com/loggie-io/loggie/pkg/interceptor/transformer/action" + _ "github.com/loggie-io/loggie/pkg/interceptor/transformer/condition" + _ "github.com/loggie-io/loggie/pkg/queue/channel" + _ "github.com/loggie-io/loggie/pkg/queue/memory" + _ "github.com/loggie-io/loggie/pkg/sink/alertwebhook" + _ "github.com/loggie-io/loggie/pkg/sink/codec/json" + _ "github.com/loggie-io/loggie/pkg/sink/codec/raw" + _ "github.com/loggie-io/loggie/pkg/sink/dev" + _ "github.com/loggie-io/loggie/pkg/sink/elasticsearch" + _ "github.com/loggie-io/loggie/pkg/sink/file" + _ "github.com/loggie-io/loggie/pkg/sink/franz" + _ "github.com/loggie-io/loggie/pkg/sink/kafka" + _ "github.com/loggie-io/loggie/pkg/source/codec/json" + _ "github.com/loggie-io/loggie/pkg/source/codec/regex" + _ "github.com/loggie-io/loggie/pkg/source/dev" + _ "github.com/loggie-io/loggie/pkg/source/elasticsearch" + _ "github.com/loggie-io/loggie/pkg/source/file" + _ "github.com/loggie-io/loggie/pkg/source/file/process" + _ "github.com/loggie-io/loggie/pkg/source/franz" + _ "github.com/loggie-io/loggie/pkg/source/kafka" +) From e637a3ff89e0a45338420bb36a9a850c7f06e8fd Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 25 Jul 2023 10:27:38 +0800 Subject: [PATCH 04/19] Feat: add addonMetaSchema in file source (#599) --- pkg/source/file/config.go | 7 +++ pkg/source/file/source.go | 113 ++++++++++++++++++++++++++++++++++---- 2 files changed, 109 insertions(+), 11 deletions(-) diff --git a/pkg/source/file/config.go b/pkg/source/file/config.go index 48e662589..4f5d50331 100644 --- a/pkg/source/file/config.go +++ b/pkg/source/file/config.go @@ -45,6 +45,7 @@ type CollectConfig struct { RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected AddonMeta bool `yaml:"addonMeta,omitempty"` + AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"` excludeFilePatterns []*regexp.Regexp Charset string `yaml:"charset,omitempty" default:"utf-8"` @@ -54,6 +55,12 @@ type CollectConfig struct { FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"` } +type AddonMetaSchema struct { + Fields map[string]string `yaml:"fields,omitempty"` + FieldsUnderRoot bool `yaml:"underRoot,omitempty"` + FieldsUnderKey string `yaml:"key,omitempty" default:"state"` +} + type LineDelimiterValue struct { Charset string `yaml:"charset,omitempty" default:"utf-8"` LineType string `yaml:"type,omitempty" default:"auto"` diff --git a/pkg/source/file/source.go b/pkg/source/file/source.go index b1b2d0b17..e940727b4 100644 --- a/pkg/source/file/source.go +++ b/pkg/source/file/source.go @@ -71,6 +71,19 @@ type Source struct { multilineProcessor *MultiProcessor mTask *MultiTask codec codec.Codec + + addonMetaField *AddonMetaFields +} + +type AddonMetaFields struct { + Pipeline string `yaml:"pipeline,omitempty"` + Source string `yaml:"source,omitempty"` + Filename string `yaml:"filename,omitempty"` + Timestamp string `yaml:"timestamp,omitempty"` + Offset string `yaml:"offset,omitempty"` + Bytes string `yaml:"bytes,omitempty"` + Line string `yaml:"line,omitempty"` + Hostname string `yaml:"hostname,omitempty"` } func (s *Source) Config() interface{} { @@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error { s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout } + if s.config.CollectConfig.AddonMeta { + s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields) + } + // init reader chan size s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds @@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) { s.productFunc = productFunc s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig) if s.config.CollectConfig.AddonMeta { - s.productFunc = addonMetaProductFunc(s.productFunc) + s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema) } if s.config.ReaderConfig.MultiConfig.Active { s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc) @@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap } } -func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc { +func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc { return func(event api.Event) api.Result { s, _ := event.Meta().Get(SystemStateKey) state := s.(*persistence.State) addonMeta := make(map[string]interface{}) - addonMeta["pipeline"] = state.PipelineName - addonMeta["source"] = state.SourceName - addonMeta["filename"] = state.Filename - addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout) - addonMeta["offset"] = state.Offset - addonMeta["bytes"] = state.ContentBytes - addonMeta["hostname"] = global.NodeName - - event.Header()["state"] = addonMeta + + // if fields is nil, use default config + if fields == nil { + addonMeta["pipeline"] = state.PipelineName + addonMeta["source"] = state.SourceName + addonMeta["filename"] = state.Filename + addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout) + addonMeta["offset"] = state.Offset + addonMeta["bytes"] = state.ContentBytes + addonMeta["hostname"] = global.NodeName + } else { + + if fields.Pipeline != "" { + addonMeta[fields.Pipeline] = state.PipelineName + } + if fields.Source != "" { + addonMeta[fields.Source] = state.SourceName + } + if fields.Filename != "" { + addonMeta[fields.Filename] = state.Filename + } + if fields.Timestamp != "" { + addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout) + } + if fields.Offset != "" { + addonMeta[fields.Offset] = state.Offset + } + if fields.Bytes != "" { + addonMeta[fields.Bytes] = state.ContentBytes + } + if fields.Line != "" { + addonMeta[fields.Line] = state.LineNumber + } + if fields.Hostname != "" { + addonMeta[fields.Hostname] = global.NodeName + } + } + + if schema.FieldsUnderRoot { + for k, v := range addonMeta { + event.Header()[k] = v + } + } else { + event.Header()[schema.FieldsUnderKey] = addonMeta + } + productFunc(event) return result.Success() } } + +func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields { + if len(fields) == 0 { + return nil + } + + amf := &AddonMetaFields{} + for k, v := range fields { + switch v { + case "${_meta.pipeline}": + amf.Pipeline = k + + case "${_meta.source}": + amf.Source = k + + case "${_meta.filename}": + amf.Filename = k + + case "${_meta.timestamp}": + amf.Timestamp = k + + case "${_meta.offset}": + amf.Offset = k + + case "${_meta.bytes}": + amf.Bytes = k + + case "${_meta.line}": + amf.Line = k + + case "${_meta.hostname}": + amf.Hostname = k + } + } + + return amf +} From 8febfeda70bc776b9158cf74a2ef86d14f6881e2 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 25 Jul 2023 17:29:05 +0800 Subject: [PATCH 05/19] Feat: add timestamp and bodyKey in source (#600) --- loggie.yml | 18 ++++++++++-------- pkg/core/source/config.go | 23 +++++++++++++++++++++++ pkg/pipeline/pipeline.go | 33 +++++++++++++++++++++++++++------ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/loggie.yml b/loggie.yml index 64ba5814e..48835afb3 100644 --- a/loggie.yml +++ b/loggie.yml @@ -14,7 +14,7 @@ loggie: sink: ~ queue: ~ pipeline: ~ - normalize: ~ + sys: ~ discovery: enabled: false @@ -31,15 +31,17 @@ loggie: defaults: sink: type: dev - interceptors: - - type: schema - name: global - order: 700 - addMeta: - timestamp: - key: "@timestamp" sources: - type: file + timestampKey: "@timestamp" + bodyKey: "message" + fieldsUnderRoot: true + addonMeta: true + addonMetaSchema: + underRoot: true + fields: + filename: "${_meta.filename}" + line: "${_meta.line}" watcher: maxOpenFds: 6000 http: diff --git a/pkg/core/source/config.go b/pkg/core/source/config.go index 64bb44432..70c2cc3dd 100644 --- a/pkg/core/source/config.go +++ b/pkg/core/source/config.go @@ -38,6 +38,11 @@ type Config struct { FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"` FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"` Codec *codec.Config `yaml:"codec,omitempty"` + + TimestampKey string `yaml:"timestampKey,omitempty"` + TimestampLocation string `yaml:"timestampLocation,omitempty"` + TimestampLayout string `yaml:"timestampLayout,omitempty"` + BodyKey string `yaml:"bodyKey,omitempty"` } func (c *Config) DeepCopy() *Config { @@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config { FieldsFromEnv: newFieldsFromEnv, FieldsFromPath: newFieldsFromPath, Codec: c.Codec.DeepCopy(), + + TimestampKey: c.TimestampKey, + TimestampLocation: c.TimestampLocation, + TimestampLayout: c.TimestampLayout, + BodyKey: c.BodyKey, } return out @@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) { } else { c.Codec.Merge(from.Codec) } + + if c.TimestampKey == "" { + c.TimestampKey = from.TimestampKey + } + if c.TimestampLocation == "" { + c.TimestampLocation = from.TimestampLocation + } + if c.TimestampLayout == "" { + c.TimestampLayout = from.TimestampLayout + } + if c.BodyKey == "" { + c.BodyKey = from.BodyKey + } } func MergeSourceList(base []*Config, from []*Config) []*Config { diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 850657a48..269cf168b 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -18,6 +18,7 @@ package pipeline import ( "fmt" + timeutil "github.com/loggie-io/loggie/pkg/util/time" "io/ioutil" "os" "strconv" @@ -46,10 +47,9 @@ import ( ) const ( - FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot" - FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey" - fieldsFromPathMaxBytes = 1024 + + defaultTsLayout = "2006-01-02T15:04:05.000Z" ) var ( @@ -1054,11 +1054,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) { func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) { // add meta fields - e.Meta().Set(event.SystemProductTimeKey, time.Now()) + now := time.Now() + e.Meta().Set(event.SystemProductTimeKey, now) e.Meta().Set(event.SystemPipelineKey, p.name) e.Meta().Set(event.SystemSourceKey, config.Name) - e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot) - e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey) header := e.Header() if header == nil { @@ -1073,6 +1072,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) { // add header source fields from file AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey) + + // remap timestamp + if config.TimestampKey != "" { + layout := config.TimestampLayout + if layout == "" { + layout = defaultTsLayout + } + + // conf.Location could be "" or "UTC" or "Local" + // default "" indicate "UTC" + ts, err := timeutil.Format(now, config.TimestampLocation, layout) + if err != nil { + log.Warn("time format system product timestamp err: %+v", err) + return + } + header[config.TimestampKey] = ts + } + + if config.BodyKey != "" { + header[config.BodyKey] = util.ByteToStringUnsafe(e.Body()) + e.Fill(e.Meta(), header, []byte{}) + } } func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) { From 51aa8c881d4997f7f3f532f23b2e439f77e5fc19 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 1 Aug 2023 09:59:20 +0800 Subject: [PATCH 06/19] Fix: invalid pipelines will not stop all the pipelines running (#602) --- pkg/control/config.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/control/config.go b/pkg/control/config.go index 0b0f4b7e5..d224b8fbc 100644 --- a/pkg/control/config.go +++ b/pkg/control/config.go @@ -130,11 +130,21 @@ func ReadPipelineConfigFromFile(path string, ignore FileIgnore) (*PipelineConfig for _, fn := range all { pipes := &PipelineConfig{} unpack := cfg.UnPackFromFile(fn, pipes) - if err = unpack.Defaults().Validate().Do(); err != nil { - log.Error("invalid pipeline configs: %v, \n%s", err, unpack.Contents()) + if err = unpack.Do(); err != nil { + log.Error("read pipeline configs from path %s failed: %v", path, err) continue } - pipecfgs.AddPipelines(pipes.Pipelines) + + for _, p := range pipes.Pipelines { + pip := p + if err := cfg.NewUnpack(nil, &pip, nil).Defaults().Validate().Do(); err != nil { + // ignore invalid pipeline, but continue to read other pipelines + // invalid pipeline will check by reloader later + log.Error("pipeline: %s configs invalid: %v", p.Name, err) + continue + } + pipecfgs.AddPipelines([]pipeline.Config{pip}) + } } return pipecfgs, nil } From 2bcb0632e6aa2c3d564d8d51d1a31fe5b05b9564 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 1 Aug 2023 10:27:05 +0800 Subject: [PATCH 07/19] Fix: cfg npe in query pipelines (#604) --- pkg/ops/helper/help.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/ops/helper/help.go b/pkg/ops/helper/help.go index f5ce54ec8..77eeba0d4 100644 --- a/pkg/ops/helper/help.go +++ b/pkg/ops/helper/help.go @@ -174,6 +174,9 @@ func diffPipes(request *http.Request) string { func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config { result := make(map[string]pipeline.Config) + if cfgInPath == nil { + return result + } setResult := func(pipData pipeline.Config, srcData ...*source.Config) { pip, ok := result[pipData.Name] From 01a0a300c42494424063626b12a223b0feae9ab0 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Tue, 1 Aug 2023 19:39:46 +0800 Subject: [PATCH 08/19] Feat: optimize elasticsearch sink request buffer (#608) --- go.mod | 2 +- pkg/core/reloader/http.go | 3 +- pkg/eventbus/listener/sys/listener.go | 7 +- pkg/ops/dashboard/loggie/loggiedetail.go | 3 +- pkg/pipeline/pipeline.go | 3 +- pkg/sink/codec/json/json.go | 3 +- pkg/sink/elasticsearch/client.go | 121 ++++++++++-------- .../sink/elasticsearch/elasticsearch.go | 85 ++++++++++++ 8 files changed, 164 insertions(+), 63 deletions(-) create mode 100644 test/benchmark/sink/elasticsearch/elasticsearch.go diff --git a/go.mod b/go.mod index 96cece6a0..5b72c9f99 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,6 @@ require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/danieljoos/wincred v1.0.2 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect github.com/emirpasic/gods v1.12.0 // indirect github.com/fatih/color v1.10.0 // indirect @@ -174,6 +173,7 @@ require ( require ( github.com/apache/rocketmq-client-go/v2 v2.1.1 + github.com/dustin/go-humanize v1.0.0 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/goccy/go-yaml v1.11.0 github.com/mattn/go-sqlite3 v1.11.0 diff --git a/pkg/core/reloader/http.go b/pkg/core/reloader/http.go index a66396f15..0c3cfce29 100644 --- a/pkg/core/reloader/http.go +++ b/pkg/core/reloader/http.go @@ -17,7 +17,6 @@ limitations under the License. package reloader import ( - "io/ioutil" "net/http" "os" "path/filepath" @@ -49,7 +48,7 @@ func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request continue } - content, err := ioutil.ReadFile(m) + content, err := os.ReadFile(m) if err != nil { log.Warn("read config error. err: %v", err) return diff --git a/pkg/eventbus/listener/sys/listener.go b/pkg/eventbus/listener/sys/listener.go index ef93fb4a2..42bc07392 100644 --- a/pkg/eventbus/listener/sys/listener.go +++ b/pkg/eventbus/listener/sys/listener.go @@ -19,6 +19,7 @@ package sys import ( "encoding/json" "fmt" + "github.com/dustin/go-humanize" "os" "strconv" "time" @@ -48,8 +49,9 @@ func makeListener() eventbus.Listener { } type sysData struct { - MemoryRss uint64 `json:"memRss"` - CPUPercent float64 `json:"cpuPercent"` + MemoryRss uint64 `json:"-"` + MemoryRssHumanize string `json:"memRss"` + CPUPercent float64 `json:"cpuPercent"` } type Config struct { @@ -122,6 +124,7 @@ func (l *Listener) getSysStat() error { return err } l.data.MemoryRss = mem.RSS + l.data.MemoryRssHumanize = humanize.Bytes(mem.RSS) cpuPer, err := l.proc.Percent(1 * time.Second) if err != nil { diff --git a/pkg/ops/dashboard/loggie/loggiedetail.go b/pkg/ops/dashboard/loggie/loggiedetail.go index 75cace9f3..00e2a3a91 100644 --- a/pkg/ops/dashboard/loggie/loggiedetail.go +++ b/pkg/ops/dashboard/loggie/loggiedetail.go @@ -25,6 +25,7 @@ import ( "github.com/loggie-io/loggie/pkg/ops/helper" "github.com/loggie-io/loggie/pkg/util" "github.com/rivo/tview" + "io" "io/ioutil" "net/http" "net/url" @@ -234,7 +235,7 @@ func (p *LogStatusPanel) SetData() { } defer resp.Body.Close() - out, err := ioutil.ReadAll(resp.Body) + out, err := io.ReadAll(resp.Body) if err != nil { return } diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 269cf168b..bc62fe4c1 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -19,7 +19,6 @@ package pipeline import ( "fmt" timeutil "github.com/loggie-io/loggie/pkg/util/time" - "io/ioutil" "os" "strconv" "strings" @@ -1032,7 +1031,7 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) { } for k, pathKey := range fieldsFromPath { - out, err := ioutil.ReadFile(pathKey) + out, err := os.ReadFile(pathKey) if err != nil { log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err) continue diff --git a/pkg/sink/codec/json/json.go b/pkg/sink/codec/json/json.go index 93227c61e..7e78d977a 100644 --- a/pkg/sink/codec/json/json.go +++ b/pkg/sink/codec/json/json.go @@ -18,6 +18,7 @@ package json import ( "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/util" "time" jsoniter "github.com/json-iterator/go" @@ -73,7 +74,7 @@ func (j *Json) Encode(e api.Event) ([]byte, error) { beatsFormat(e) } else if len(e.Body()) != 0 { // put body in header - header[eventer.Body] = string(e.Body()) + header[eventer.Body] = util.ByteToStringUnsafe(e.Body()) } var result []byte diff --git a/pkg/sink/elasticsearch/client.go b/pkg/sink/elasticsearch/client.go index b9c27cf5c..612c71470 100644 --- a/pkg/sink/elasticsearch/client.go +++ b/pkg/sink/elasticsearch/client.go @@ -21,7 +21,6 @@ import ( "context" "fmt" es "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" jsoniter "github.com/json-iterator/go" "github.com/loggie-io/loggie/pkg/core/api" eventer "github.com/loggie-io/loggie/pkg/core/event" @@ -44,8 +43,6 @@ type ClientSet struct { cli *es.Client opType string - buf *bytes.Buffer - aux []byte reqCount int codec codec.Codec @@ -54,6 +51,70 @@ type ClientSet struct { documentIdPattern *pattern.Pattern } +type bulkRequest struct { + lines []line +} + +type line struct { + meta []byte + body []byte +} + +func (b *bulkRequest) body() []byte { + var buf bytes.Buffer + size := 0 + for _, l := range b.lines { + size += len(l.meta) + len(l.body) + 1 + } + buf.Grow(size) + + for _, l := range b.lines { + buf.Write(l.meta) + buf.Write(l.body) + buf.WriteRune('\n') + } + return buf.Bytes() +} + +func (b *bulkRequest) add(body []byte, action string, documentID string, index string) { + if len(body) == 0 { + return + } + + var buf bytes.Buffer + var aux []byte + + // { "index" : { "_index" : "test", "_id" : "1" } } + buf.WriteRune('{') + aux = strconv.AppendQuote(aux, action) + buf.Write(aux) + aux = aux[:0] + buf.WriteRune(':') + buf.WriteRune('{') + if documentID != "" { + buf.WriteString(`"_id":`) + aux = strconv.AppendQuote(aux, documentID) + buf.Write(aux) + aux = aux[:0] + } + + if index != "" { + buf.WriteString(`"_index":`) + aux = strconv.AppendQuote(aux, index) + buf.Write(aux) + } + buf.WriteRune('}') + buf.WriteRune('}') + buf.WriteRune('\n') + + l := line{ + meta: buf.Bytes(), + body: body, + } + + b.lines = append(b.lines, l) +} + type Client interface { Bulk(ctx context.Context, batch api.Batch) error Stop() @@ -94,8 +155,6 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d config: config, cli: cli, opType: config.OpType, - buf: bytes.NewBuffer(make([]byte, 0, config.SendBuffer)), - aux: make([]byte, 0, 512), reqCount: 0, codec: cod, indexPattern: indexPattern, @@ -109,16 +168,11 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error { return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null") } - bulkReq := esapi.BulkRequest{} - - if c.config.Etype != "" { - bulkReq.DocumentType = c.config.Etype - } defer func() { - c.buf.Reset() c.reqCount = 0 }() + req := bulkRequest{} for _, event := range batch.Events() { headerObj := runtime.NewObject(event.Header()) @@ -160,19 +214,14 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error { } c.reqCount++ - if err := c.writeMeta(c.opType, docId, idx); err != nil { - return err - } - if err := c.writeBody(data); err != nil { - return err - } + req.add(data, c.opType, docId, idx) } if c.reqCount == 0 { return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null") } - resp, err := c.cli.Bulk(bytes.NewReader(c.buf.Bytes()), + resp, err := c.cli.Bulk(bytes.NewReader(req.body()), c.cli.Bulk.WithDocumentType(c.config.Etype), c.cli.Bulk.WithParameters(c.config.Params), c.cli.Bulk.WithHeader(c.config.Headers)) @@ -213,39 +262,3 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error { func (c *ClientSet) Stop() { // Do nothing } - -// { "index" : { "_index" : "test", "_id" : "1" } } -func (c *ClientSet) writeMeta(action string, documentID string, index string) error { - c.buf.WriteRune('{') - c.aux = strconv.AppendQuote(c.aux, action) - c.buf.Write(c.aux) - c.aux = c.aux[:0] - c.buf.WriteRune(':') - c.buf.WriteRune('{') - if documentID != "" { - c.buf.WriteString(`"_id":`) - c.aux = strconv.AppendQuote(c.aux, documentID) - c.buf.Write(c.aux) - c.aux = c.aux[:0] - } - - if index != "" { - c.buf.WriteString(`"_index":`) - c.aux = strconv.AppendQuote(c.aux, index) - c.buf.Write(c.aux) - c.aux = c.aux[:0] - } - c.buf.WriteRune('}') - c.buf.WriteRune('}') - c.buf.WriteRune('\n') - return nil -} - -func (c *ClientSet) writeBody(body []byte) error { - if len(body) == 0 { - return nil - } - c.buf.Write(body) - c.buf.WriteRune('\n') - return nil -} diff --git a/test/benchmark/sink/elasticsearch/elasticsearch.go b/test/benchmark/sink/elasticsearch/elasticsearch.go new file mode 100644 index 000000000..6a30508eb --- /dev/null +++ b/test/benchmark/sink/elasticsearch/elasticsearch.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "github.com/loggie-io/loggie/pkg/control" + "github.com/loggie-io/loggie/pkg/core/cfg" + "github.com/loggie-io/loggie/pkg/core/interceptor" + "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/core/queue" + "github.com/loggie-io/loggie/pkg/eventbus" + "github.com/loggie-io/loggie/pkg/eventbus/export/logger" + "github.com/loggie-io/loggie/pkg/interceptor/maxbytes" + "github.com/loggie-io/loggie/pkg/interceptor/metric" + "github.com/loggie-io/loggie/pkg/interceptor/retry" + "github.com/loggie-io/loggie/pkg/pipeline" + "github.com/loggie-io/loggie/pkg/queue/channel" + "net/http" + "time" + + _ "github.com/loggie-io/loggie/pkg/include" +) + +const pipe1 = ` +pipelines: + - name: test + sources: + - type: dev + name: test + qps: 100 + byteSize: 10240 + eventsTotal: 10000 + sink: + type: elasticsearch + parallelism: 3 + hosts: ["localhost:9200"] + index: "loggie-benchmark-${+YYYY.MM.DD}" +` + +func main() { + log.InitDefaultLogger() + pipeline.SetDefaultConfigRaw(pipeline.Config{ + Queue: &queue.Config{ + Type: channel.Type, + }, + Interceptors: []*interceptor.Config{ + { + Type: metric.Type, + }, + { + Type: maxbytes.Type, + }, + { + Type: retry.Type, + }, + }, + }) + + eventbus.StartAndRun(eventbus.Config{ + LoggerConfig: logger.Config{ + Enabled: true, + Period: 5 * time.Second, + Pretty: false, + }, + ListenerConfigs: map[string]cfg.CommonCfg{ + "sink": map[string]interface{}{ + "period": 5 * time.Second, + }, + "sys": map[string]interface{}{ + "period": 5 * time.Second, + }, + }, + }) + + pipecfgs := &control.PipelineConfig{} + if err := cfg.UnPackFromRaw([]byte(pipe1), pipecfgs).Defaults().Validate().Do(); err != nil { + log.Panic("pipeline configs invalid: %v", err) + } + + controller := control.NewController() + controller.Start(pipecfgs) + + if err := http.ListenAndServe(fmt.Sprintf(":9196"), nil); err != nil { + log.Fatal("http listen and serve err: %v", err) + } +} From 35220b1d2c541a70b685c854d5bd26ed65d9a498 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Fri, 4 Aug 2023 14:38:02 +0800 Subject: [PATCH 09/19] Feat: add discoveryNodes params in elasticsearch sink (#612) --- pkg/sink/elasticsearch/client.go | 22 ++++++++++-------- pkg/sink/elasticsearch/config.go | 40 +++++++++++++++++--------------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/pkg/sink/elasticsearch/client.go b/pkg/sink/elasticsearch/client.go index 612c71470..30a5bac14 100644 --- a/pkg/sink/elasticsearch/client.go +++ b/pkg/sink/elasticsearch/client.go @@ -29,7 +29,7 @@ import ( "github.com/loggie-io/loggie/pkg/util/pattern" "github.com/loggie-io/loggie/pkg/util/runtime" "github.com/pkg/errors" - "io/ioutil" + "os" "strconv" "strings" ) @@ -129,7 +129,7 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d } var ca []byte if config.CACertPath != "" { - caData, err := ioutil.ReadFile(config.CACertPath) + caData, err := os.ReadFile(config.CACertPath) if err != nil { return nil, err } @@ -137,14 +137,16 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d } cfg := es.Config{ - Addresses: config.Hosts, - DisableRetry: true, - Username: config.UserName, - Password: config.Password, - APIKey: config.APIKey, - ServiceToken: config.ServiceToken, - CompressRequestBody: config.Compress, - CACert: ca, + Addresses: config.Hosts, + DisableRetry: true, + Username: config.UserName, + Password: config.Password, + APIKey: config.APIKey, + ServiceToken: config.ServiceToken, + CompressRequestBody: config.Compress, + DiscoverNodesOnStart: config.DiscoverNodesOnStart, + DiscoverNodesInterval: config.DiscoverNodesInterval, + CACert: ca, } cli, err := es.NewClient(cfg) if err != nil { diff --git a/pkg/sink/elasticsearch/config.go b/pkg/sink/elasticsearch/config.go index fddb51dfc..74f0b9b37 100644 --- a/pkg/sink/elasticsearch/config.go +++ b/pkg/sink/elasticsearch/config.go @@ -16,27 +16,29 @@ limitations under the License. package elasticsearch -import "github.com/loggie-io/loggie/pkg/util/pattern" +import ( + "github.com/loggie-io/loggie/pkg/util/pattern" + "time" +) type Config struct { - Hosts []string `yaml:"hosts,omitempty" validate:"required"` - UserName string `yaml:"username,omitempty"` - Password string `yaml:"password,omitempty"` - Index string `yaml:"index,omitempty"` - Headers map[string]string `yaml:"headers,omitempty"` - Params map[string]string `yaml:"parameters,omitempty"` - IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"` - Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility - DocumentId string `yaml:"documentId,omitempty"` - Sniff *bool `yaml:"sniff,omitempty"` // deprecated - APIKey string `yaml:"apiKey,omitempty"` - ServiceToken string `yaml:"serviceToken,omitempty"` - CACertPath string `yaml:"caCertPath,omitempty"` - Compress bool `yaml:"compress,omitempty"` - Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above - OpType string `yaml:"opType,omitempty" default:"index"` - - SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"` + Hosts []string `yaml:"hosts,omitempty" validate:"required"` + UserName string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` + Index string `yaml:"index,omitempty"` + Headers map[string]string `yaml:"headers,omitempty"` + Params map[string]string `yaml:"parameters,omitempty"` + IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"` + Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility + DocumentId string `yaml:"documentId,omitempty"` + APIKey string `yaml:"apiKey,omitempty"` + ServiceToken string `yaml:"serviceToken,omitempty"` + CACertPath string `yaml:"caCertPath,omitempty"` + Compress bool `yaml:"compress,omitempty"` + Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above + OpType string `yaml:"opType,omitempty" default:"index"` + DiscoverNodesOnStart bool `yaml:"discoverNodesOnStart,omitempty"` + DiscoverNodesInterval time.Duration `yaml:"discoverNodesInterval,omitempty"` } type RenderIndexFail struct { From 452ee1d14518e6da3fed0b92e2a74588af16f9a6 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Fri, 4 Aug 2023 14:58:20 +0800 Subject: [PATCH 10/19] Fix: add podNamespace as prefix of source name when pod is matched by clusterlogconfig (#613) --- pkg/discovery/kubernetes/controller/selectpodhandler.go | 2 +- pkg/discovery/kubernetes/helper/config.go | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/discovery/kubernetes/controller/selectpodhandler.go b/pkg/discovery/kubernetes/controller/selectpodhandler.go index 480ef79e7..125202781 100644 --- a/pkg/discovery/kubernetes/controller/selectpodhandler.go +++ b/pkg/discovery/kubernetes/controller/selectpodhandler.go @@ -364,7 +364,7 @@ func (c *Controller) makeConfigPerSource(s *source.Config, pod *corev1.Pod, lgc } // change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod - filesrc.Name = helper.GenTypePodSourceName(pod.Name, status.Name, filesrc.Name) + filesrc.Name = helper.GenTypePodSourceName(lgc.Namespace, pod.Namespace, pod.Name, status.Name, filesrc.Name) // inject default pod metadata if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil { diff --git a/pkg/discovery/kubernetes/helper/config.go b/pkg/discovery/kubernetes/helper/config.go index 681e34e6c..3fb485efc 100644 --- a/pkg/discovery/kubernetes/helper/config.go +++ b/pkg/discovery/kubernetes/helper/config.go @@ -122,7 +122,12 @@ func ToPipelineInterceptor(interceptorsRaw string, interceptorRef string, interc return interConfList, nil } -func GenTypePodSourceName(podName string, containerName string, sourceName string) string { +func GenTypePodSourceName(lgcNamespace string, podNamespace string, podName string, containerName string, sourceName string) string { + // if lgcNamespace is empty, we use podNamespace as the first part of the source name, + // because this is the pod matched by clusterLogConfig, if the pod namespace is not added, it may cause the source to be duplicated + if lgcNamespace == "" { + return fmt.Sprintf("%s/%s/%s/%s", podNamespace, podName, containerName, sourceName) + } return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName) } From 31b8a3644c9a800b325e64980b23e3e0a82d6554 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Fri, 4 Aug 2023 15:11:22 +0800 Subject: [PATCH 11/19] changelog: add release v1.5 changelog --- doc/changelog/CHANGELOG-v1.md | 65 ++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/doc/changelog/CHANGELOG-v1.md b/doc/changelog/CHANGELOG-v1.md index effe93f8e..3cf371273 100644 --- a/doc/changelog/CHANGELOG-v1.md +++ b/doc/changelog/CHANGELOG-v1.md @@ -1,8 +1,71 @@ -# Release v1.5.0-rc.0 +# Release v1.5.0 ### :star2: Features - [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible. +- Added rocketmq sink (#530) +- Added franzKafka source (#573) +- Added kata runtime (#554) +- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450) +- sink codec support printEvents (#448) +- Added queue in LogConfig/ClusterLogConfig (#457) +- Changed `olivere/elastic` to the official elasticsearch go client (#581) +- Supported `copytruncate` in file source (#571) +- Added `genfiles` sub command (#471) +- Added queue in LogConfig/ClusterLogConfig queue (#457) +- Added `sortBy` field in elasticsearch source (#473) +- Added host VM mode with Kubernetes as the configuration center (#449) (#489) +- New `addHostMeta` interceptor (#474) +- Added persistence driver `badger` (#475) (#584) +- Ignore LogConfig with sidecar injection annotation (#478) +- Added `toStr` action in transformer interceptor (#482) +- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460) +- Get loggie version with api and sub command (#496) (#508) +- Added the `worker` and the `clientId` in Kafka source (#506) (#507) +- Upgrade `kafka-go` version (#506) (#567) +- Added resultStatus in dev sink which can be used to simulate failure, drop (#531) +- Pretty error when unmarshal yaml configuration failed (#539) +- Added default topic if render kafka topic failed (#550) +- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560) +- Supported multiple topics in kafka source (#548) +- Added default index if render elasticsearch index failed (#551) (#553) +- The default `maxOpenFds` is set to 4096 (#559) +- Supported default `sinkRef` in kubernetes discovery (#555) +- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569) +- Supported omit empty fields in Kubernetes discovery (#570) +- Optimizes `maxbytes` interceptors (#575) +- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585) +- Added `cheanUnfinished` in cleanFiles (#580) +- Added `target` in `maxbyte` interceptor (#588) +- Added `partionKey` in franzKafka (#562) +- Added `highPrecision` in `rateLimit` interceptor (#525) +- Supported build core components of Loggie (#598) +- Added `addonMetaSchema` in file source (#599) +- Added `timestamp` and `bodyKey` in source (#600) +- Added `discoverNodesOnStart` and `discoverNodesInterval` in elasticsearch sink (#612) +### :bug: Bug Fixes +- Fixed panic when kubeEvent Series is nil (#459) +- Upgraded `automaxprocs` version to v1.5.1 (#488) +- Fixed set defaults failed in `fieldsUnderKey` (#513) +- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515) +- Fixed grpc batch out-of-order data streams (#517) +- Fixed large line may cause oom (#529) +- Fixed duplicated batchSize in queue (#533) +- Fixed sqlite locked panic (#524) +- Fixed command can't be used in multi-arch container (#541) +- Fixed `logger listener` may cause block (#561) (#552) +- Fixed `sink concurrency` deepCopy failed (#563) +- Drop events when partial error in elasticsearch sink (#572) +- Fix convert to string type when returning byte array type in transformer interceptor (#596) +- Invalid pipelines will not stop all the pipelines running (#602) +- Fixed configuration npe when query pipelines (#604) +- Optimized elasticsearch sink request buffer (#608) +- Add pod namespace as prefix of source name when pod is matched by ClusterLogConfig (#613) + +# Release v1.5.0-rc.0 + +### :star2: Features +- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible. - Added rocketmq sink (#530) - Added franzKafka source (#573) - Added kata runtime (#554) From 8d3e229cbfadbfbf538aded4580895aac674e191 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Wed, 30 Aug 2023 11:05:53 +0800 Subject: [PATCH 12/19] Fix: readFromTail when existAckOffset is zero (#624) --- pkg/source/file/watch.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/source/file/watch.go b/pkg/source/file/watch.go index 51c2e6ff3..cf67532a0 100644 --- a/pkg/source/file/watch.go +++ b/pkg/source/file/watch.go @@ -318,10 +318,8 @@ func (w *Watcher) eventBus(e jobEvent) { } } // Pre-allocation offset - if existAckOffset == 0 { - if e.job.task.config.ReadFromTail { - existAckOffset = fileSize - } + if e.job.task.config.ReadFromTail { + existAckOffset = fileSize w.preAllocationOffset(existAckOffset, job) } // set ack offset From ab290429017aa7b162c5278d9d1292a2aefa8883 Mon Sep 17 00:00:00 2001 From: ethfoo Date: Fri, 23 Feb 2024 09:57:43 +0800 Subject: [PATCH 13/19] Fix: skip check the others runtime type (#667) --- pkg/discovery/kubernetes/runtime/cri.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/discovery/kubernetes/runtime/cri.go b/pkg/discovery/kubernetes/runtime/cri.go index 6bca952e1..b3467aab8 100644 --- a/pkg/discovery/kubernetes/runtime/cri.go +++ b/pkg/discovery/kubernetes/runtime/cri.go @@ -115,13 +115,11 @@ func (c *ContainerD) GetRootfsPath(ctx context.Context, containerId string, cont if err != nil { return nil, err } - } else if runtime == string(RuncRuntimeType) { + } else { prefix, err = c.getRuncRuntimeRootfsPath(infoMap, containerId) if err != nil { return nil, err } - } else { - return nil, errors.Errorf("Unknown runtime type from container(id: %s) status", containerId) } var rootfsPaths []string From a8e07b7a6862be67189809e6c74df489d812aa06 Mon Sep 17 00:00:00 2001 From: 08fly <79070936+08fly@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:00:00 +0800 Subject: [PATCH 14/19] Fix: insert registry when existAckOffset is zero (#670) Co-authored-by: tianyafei --- pkg/source/file/watch.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/source/file/watch.go b/pkg/source/file/watch.go index cf67532a0..82b3f9c6a 100644 --- a/pkg/source/file/watch.go +++ b/pkg/source/file/watch.go @@ -318,8 +318,10 @@ func (w *Watcher) eventBus(e jobEvent) { } } // Pre-allocation offset - if e.job.task.config.ReadFromTail { - existAckOffset = fileSize + if existAckOffset == 0 || e.job.task.config.ReadFromTail { + if e.job.task.config.ReadFromTail { + existAckOffset = fileSize + } w.preAllocationOffset(existAckOffset, job) } // set ack offset From 3c170ac0e1c5a17054bd27ffe3e30592b8781bf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BD=B3=E6=85=A7?= <18868732681@139.com> Date: Tue, 2 Jul 2024 10:32:15 +0800 Subject: [PATCH 15/19] Feat:add heartbeat --- cmd/loggie/main.go | 13 ++++++++++ loggie.yml | 5 ++++ pkg/core/sysconfig/config.go | 2 ++ pkg/heartbeat/heartbeat.go | 50 ++++++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+) create mode 100644 pkg/heartbeat/heartbeat.go diff --git a/cmd/loggie/main.go b/cmd/loggie/main.go index 5902aba71..bce9dd10a 100644 --- a/cmd/loggie/main.go +++ b/cmd/loggie/main.go @@ -19,6 +19,7 @@ package main import ( "flag" "fmt" + "github.com/loggie-io/loggie/pkg/heartbeat" "github.com/loggie-io/loggie/pkg/ops" "github.com/pkg/errors" "go.uber.org/automaxprocs/maxprocs" @@ -49,6 +50,7 @@ var ( pipelineConfigPath string configType string nodeName string + mainServer string ) func init() { @@ -58,6 +60,7 @@ func init() { flag.StringVar(&pipelineConfigPath, "config.pipeline", "pipelines.yml", "reloadable config file") flag.StringVar(&configType, "config.from", "file", "config from file or env") flag.StringVar(&nodeName, "meta.nodeName", hostName, "override nodeName") + flag.StringVar(&mainServer, "mainServer", mainServer, "config mainServer") } func main() { @@ -83,6 +86,11 @@ func main() { global.NodeName = nodeName log.Info("node name: %s", nodeName) + if mainServer == "" { + log.Fatal("mainServer can not be null") + return + } + // system config file syscfg := sysconfig.Config{} cfg.UnpackTypeDefaultsAndValidate(strings.ToLower(configType), globalConfigFile, &syscfg) @@ -143,6 +151,11 @@ func main() { }() } + // heartbeat + if syscfg.Loggie.HeartbeatConfig.Address != "" { + go heartbeat.Start(syscfg.Loggie.HeartbeatConfig) + } + log.Info("started Loggie") <-stopCh log.Info("shutting down Loggie") diff --git a/loggie.yml b/loggie.yml index 48835afb3..20a707ad1 100644 --- a/loggie.yml +++ b/loggie.yml @@ -46,3 +46,8 @@ loggie: maxOpenFds: 6000 http: enabled: true + + heartbeat: + address: http://127.0.0.1:80/api/heartbeat + interval: 30 + nodeName: 01 \ No newline at end of file diff --git a/pkg/core/sysconfig/config.go b/pkg/core/sysconfig/config.go index 4ad75f8c2..188a6bbbe 100644 --- a/pkg/core/sysconfig/config.go +++ b/pkg/core/sysconfig/config.go @@ -25,6 +25,7 @@ import ( "github.com/loggie-io/loggie/pkg/core/source" "github.com/loggie-io/loggie/pkg/discovery" "github.com/loggie-io/loggie/pkg/eventbus" + "github.com/loggie-io/loggie/pkg/heartbeat" "github.com/loggie-io/loggie/pkg/interceptor/maxbytes" "github.com/loggie-io/loggie/pkg/interceptor/metric" "github.com/loggie-io/loggie/pkg/interceptor/retry" @@ -45,6 +46,7 @@ type Loggie struct { Defaults Defaults `yaml:"defaults"` Db persistence.DbConfig `yaml:"db"` ErrorAlertConfig log.AfterErrorConfiguration `yaml:"errorAlert"` + HeartbeatConfig heartbeat.Config `yaml:"heartbeat"` } type Defaults struct { diff --git a/pkg/heartbeat/heartbeat.go b/pkg/heartbeat/heartbeat.go new file mode 100644 index 000000000..fe78b9f81 --- /dev/null +++ b/pkg/heartbeat/heartbeat.go @@ -0,0 +1,50 @@ +package heartbeat + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" +) + +type Config struct { + Address string `yaml:"address"` + Interval time.Duration `yaml:"interval"` + NodeName string `yaml:"nodeName"` +} + +func Start(config Config) { + timer := time.NewTicker(config.Interval * time.Second) + defer timer.Stop() + + // 循环接收定时器的触发事件 + for range timer.C { + // 执行发送 POST 请求的操作 + err := sendPostRequest(config) + if err != nil { + fmt.Println("send heartbeat post failed:", err) + } + } +} + +func sendPostRequest(config Config) error { + // fmt.Println("sending post request...") + data := Config{ + NodeName: config.NodeName, + } + + jsonData, err := json.Marshal(data) + if err != nil { + return err + } + + body := bytes.NewBuffer(jsonData) + + resp, err := http.Post(config.Address, "application/json", body) + if err != nil { + return err + } + defer resp.Body.Close() + return nil +} From 6b1bb1fcb0a737b3545d74382fceeefcfe3b1451 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BD=B3=E6=85=A7?= <18868732681@139.com> Date: Tue, 2 Jul 2024 11:37:15 +0800 Subject: [PATCH 16/19] modify the printing of heartbeat logs --- pkg/heartbeat/heartbeat.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/heartbeat/heartbeat.go b/pkg/heartbeat/heartbeat.go index fe78b9f81..b67fd5d32 100644 --- a/pkg/heartbeat/heartbeat.go +++ b/pkg/heartbeat/heartbeat.go @@ -3,6 +3,7 @@ package heartbeat import ( "bytes" "encoding/json" + "errors" "fmt" "net/http" "time" @@ -45,6 +46,10 @@ func sendPostRequest(config Config) error { if err != nil { return err } + if resp.StatusCode != 200 { + return errors.New(fmt.Sprintf("【warning】heatbeat request got status code: %d", resp.StatusCode)) + } defer resp.Body.Close() + //fmt.Println("heatbeat success :", resp.StatusCode) return nil } From 1a0d45651edf6900a86a71c7893564e044ca69ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BD=B3=E6=85=A7?= <18868732681@139.com> Date: Tue, 2 Jul 2024 11:43:12 +0800 Subject: [PATCH 17/19] modify the printing of heartbeat logs --- pkg/heartbeat/heartbeat.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/heartbeat/heartbeat.go b/pkg/heartbeat/heartbeat.go index b67fd5d32..08cb5f88c 100644 --- a/pkg/heartbeat/heartbeat.go +++ b/pkg/heartbeat/heartbeat.go @@ -47,7 +47,8 @@ func sendPostRequest(config Config) error { return err } if resp.StatusCode != 200 { - return errors.New(fmt.Sprintf("【warning】heatbeat request got status code: %d", resp.StatusCode)) + return errors.New(fmt.Sprintf("The request to the beat server: %s received a response code: %d", + config.Address, resp.StatusCode)) } defer resp.Body.Close() //fmt.Println("heatbeat success :", resp.StatusCode) From e28ccb6ea636397084c42ae100edf7ace2a6c173 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BD=B3=E6=85=A7?= <18868732681@139.com> Date: Tue, 2 Jul 2024 11:53:25 +0800 Subject: [PATCH 18/19] del test param mainServer --- cmd/loggie/main.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cmd/loggie/main.go b/cmd/loggie/main.go index bce9dd10a..36d3209b5 100644 --- a/cmd/loggie/main.go +++ b/cmd/loggie/main.go @@ -60,7 +60,6 @@ func init() { flag.StringVar(&pipelineConfigPath, "config.pipeline", "pipelines.yml", "reloadable config file") flag.StringVar(&configType, "config.from", "file", "config from file or env") flag.StringVar(&nodeName, "meta.nodeName", hostName, "override nodeName") - flag.StringVar(&mainServer, "mainServer", mainServer, "config mainServer") } func main() { @@ -86,11 +85,6 @@ func main() { global.NodeName = nodeName log.Info("node name: %s", nodeName) - if mainServer == "" { - log.Fatal("mainServer can not be null") - return - } - // system config file syscfg := sysconfig.Config{} cfg.UnpackTypeDefaultsAndValidate(strings.ToLower(configType), globalConfigFile, &syscfg) From 1b8841d29fc9759f77ef4f0d9d1ab6adf48d6162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BD=B3=E6=85=A7?= <18868732681@139.com> Date: Thu, 11 Jul 2024 11:12:33 +0800 Subject: [PATCH 19/19] =?UTF-8?q?=E3=80=90add=E3=80=91=E5=9C=A8=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=8E=A5=E5=8F=A3=E6=B7=BB=E5=8A=A0=E8=87=AA=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E9=80=BB=E8=BE=91=EF=BC=88=E5=BE=85=E4=BC=98=E5=8C=96?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 ++ pkg/heartbeat/heartbeat.go | 38 +++++++++++++++++++++++++++++++++++--- vendor/modules.txt | 5 +++++ 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 5b72c9f99..20a02806e 100644 --- a/go.mod +++ b/go.mod @@ -176,6 +176,7 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/goccy/go-yaml v1.11.0 + github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/mattn/go-sqlite3 v1.11.0 k8s.io/metrics v0.25.4 sigs.k8s.io/controller-runtime v0.13.1 diff --git a/go.sum b/go.sum index 66b1cdd94..c990181dc 100644 --- a/go.sum +++ b/go.sum @@ -548,6 +548,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7VjIE6z8dIvMsI4/s+1qr5EL+zoIGev1BQj1eoJ8= +github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/flux v0.65.1/go.mod h1:J754/zds0vvpfwuq7Gc2wRdVwEodfpCFM7mYlOw2LqY= github.com/influxdata/influxdb v1.8.3/go.mod h1:JugdFhsvvI8gadxOI6noqNeeBHvWNTbfYGtiAn+2jhI= diff --git a/pkg/heartbeat/heartbeat.go b/pkg/heartbeat/heartbeat.go index 08cb5f88c..bbf4fbe2b 100644 --- a/pkg/heartbeat/heartbeat.go +++ b/pkg/heartbeat/heartbeat.go @@ -5,6 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/inconshreveable/go-update" + "io" + "log" "net/http" "time" ) @@ -30,7 +33,7 @@ func Start(config Config) { } func sendPostRequest(config Config) error { - // fmt.Println("sending post request...") + fmt.Println("sending post request... v1") data := Config{ NodeName: config.NodeName, } @@ -46,11 +49,40 @@ func sendPostRequest(config Config) error { if err != nil { return err } + defer resp.Body.Close() if resp.StatusCode != 200 { return errors.New(fmt.Sprintf("The request to the beat server: %s received a response code: %d", config.Address, resp.StatusCode)) } - defer resp.Body.Close() - //fmt.Println("heatbeat success :", resp.StatusCode) + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return err + } + bodyString := string(bodyBytes) + // 现在你可以使用bodyString了 + fmt.Println(bodyString) + + // TODO 根据心跳返回的内容,配置文件 + // TODO 根据心跳返回的内容,自我更新程序 + doUpdate() return nil } + +func doUpdate() { + // 下载更新文件 + updateURL := "http://127.0.0.1:8000/static/loggie" + resp, err := http.Get(updateURL) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + err = update.Apply(resp.Body, update.Options{}) + if err != nil { + // 错误处理 + } + + // 在这里写应用程序的逻辑 + fmt.Println("应用程序更新成功!") +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 70f957f26..f36556c23 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -320,6 +320,11 @@ github.com/hpcloud/tail/winfile # github.com/imdario/mergo v0.3.12 ## explicit; go 1.13 github.com/imdario/mergo +# github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf +## explicit +github.com/inconshreveable/go-update +github.com/inconshreveable/go-update/internal/binarydist +github.com/inconshreveable/go-update/internal/osext # github.com/jcmturner/aescts/v2 v2.0.0 ## explicit; go 1.13 github.com/jcmturner/aescts/v2