From be4c407a5a8e52e41be23f4809e6dec9081293be Mon Sep 17 00:00:00 2001 From: Darragh O'Reilly Date: Wed, 24 Sep 2025 17:38:42 +0100 Subject: [PATCH] Add plugin and artifact for sending client events to rsyslog This artifact sends events from configured client event artifacts to the local rsyslog listening with the imuxsock input module on the specified unix domain socket. The plugin uses a file backed ring buffer when needed and should be able to survive short outages of the rsyslog daemon (like restarts) without losing events - up to 1GB of events. Tested with this rsyslog config: input(type="imuxsock" socket="/tmp/velo-socket" ruleset="velo-ruleset") ruleset(name="velo-ruleset") { action(type="omfile" file="/tmp/velo.log") } --- .../definitions/Rsyslog/Events/Clients.yaml | 44 ++++ go.mod | 1 + go.sum | 2 + vql/server/rsyslog.go | 199 ++++++++++++++++++ 4 files changed, 246 insertions(+) create mode 100644 artifacts/definitions/Rsyslog/Events/Clients.yaml create mode 100644 vql/server/rsyslog.go diff --git a/artifacts/definitions/Rsyslog/Events/Clients.yaml b/artifacts/definitions/Rsyslog/Events/Clients.yaml new file mode 100644 index 000000000..475eaadfa --- /dev/null +++ b/artifacts/definitions/Rsyslog/Events/Clients.yaml @@ -0,0 +1,44 @@ +name: RsyslogShipper.Events.Clients + +description: | + This server side event monitoring artifact will watch a selection of client + monitoring artifacts for new events and push them rsyslog. + + NOTE: You must ensure you are collecting these artifacts from the + clients by adding them to the "Client Events" GUI. + +type: SERVER_EVENT + +parameters: + - name: UnixSocket + description: Path to the Unix Domain Socket to send events to + type: string + default: /tmp/velo-socket + + - name: Threads + description: Number of threads to start up to post events + type: int + default: 2 + + - name: Artifacts + type: artifactset + artifact_type: CLIENT_EVENT + description: Client artifacts to monitor + +sources: + - query: | + LET _ <= SELECT log(message="ERROR: parameter Artifacts cannot be empty!") FROM scope() WHERE len(list=Artifacts) = 0 + + LET artifacts_to_watch = SELECT Artifact FROM Artifacts + WHERE log(message="Sending events from client artifact " + Artifact + " to rsyslog") + + LET events = SELECT * FROM foreach( + row=artifacts_to_watch, + async=TRUE, // Required for event queries in foreach() + query={ + SELECT *, Artifact, timestamp(epoch=now()) AS timestamp + FROM watch_monitoring(artifact=Artifact) + }) + + SELECT * FROM rsyslog_upload(query=events, unix_domain=UnixSocket, threads=Threads) + diff --git a/go.mod b/go.mod index c560c851e..9229e30b1 100644 --- a/go.mod +++ b/go.mod @@ -198,6 +198,7 @@ require ( github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/pgzip v1.2.5 // indirect github.com/kr/fs v0.1.0 // indirect + github.com/leodido/go-syslog v1.0.1 // indirect github.com/lestrrat-go/strftime v1.0.5 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect diff --git a/go.sum b/go.sum index 973f8998f..45e782b8f 100644 --- a/go.sum +++ b/go.sum @@ -402,6 +402,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/leodido/go-syslog v1.0.1 h1:/I6CcKOIT/Auo/vjvemIaQYnSNFmcv6Xd6kqXaKHZyM= +github.com/leodido/go-syslog v1.0.1/go.mod h1:iGQLav8eZdt0+DaWcqmGKurRtDPyxwD1Dvc4DG5GMoU= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= github.com/lestrrat-go/strftime v1.0.5 h1:A7H3tT8DhTz8u65w+JRpiBxM4dINQhUXAZnhBa2xeOE= diff --git a/vql/server/rsyslog.go b/vql/server/rsyslog.go new file mode 100644 index 000000000..df594601b --- /dev/null +++ b/vql/server/rsyslog.go @@ -0,0 +1,199 @@ +package server + +import ( + "context" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/Velocidex/ordereddict" + "github.com/leodido/go-syslog/rfc5424" + "golang.org/x/time/rate" + "www.velocidex.com/golang/velociraptor/acls" + "www.velocidex.com/golang/velociraptor/file_store/api" + "www.velocidex.com/golang/velociraptor/file_store/directory" + "www.velocidex.com/golang/velociraptor/utils" + "www.velocidex.com/golang/velociraptor/vql" + "www.velocidex.com/golang/vfilter" + "www.velocidex.com/golang/vfilter/arg_parser" +) + +const pluginName = "rsyslog_upload" + +type rsyslogUploadPluginArgs struct { + Query vfilter.StoredQuery `vfilter:"required,field=query,doc=Source for rows to upload."` + UnixDomain string `vfilter:"required,field=unix_domain,doc=path to unix domain socket rsyslog listens on"` + Threads int `vfilter:"optional,field=threads,doc=How many threads to use to send events."` +} + +type rsyslogUploadPlugin struct{} + +func (r rsyslogUploadPlugin) Info( + scope vfilter.Scope, typeMap *vfilter.TypeMap, +) *vfilter.PluginInfo { + return &vfilter.PluginInfo{ + Name: pluginName, + Doc: "Upload rows to rsyslog", + ArgType: typeMap.AddType(scope, &rsyslogUploadPluginArgs{}), + Metadata: vql.VQLMetadata().Permissions(acls.COLLECT_SERVER).Build(), + } +} + +func (r rsyslogUploadPlugin) Call( + ctx context.Context, scope vfilter.Scope, args *ordereddict.Dict, +) <-chan vfilter.Row { + // this plugin does not send anything to its output channel + outputCh := make(chan vfilter.Row) + + go func() { + defer close(outputCh) + + err := vql.CheckAccess(scope, acls.COLLECT_SERVER) + if err != nil { + scope.Log("%s: check access failed: %v", pluginName, err) + return + } + + arg := rsyslogUploadPluginArgs{} + err = arg_parser.ExtractArgsWithContext(ctx, scope, args, &arg) + if err != nil { + scope.Log("%s: parsing args: %v", pluginName, err) + return + } + if arg.UnixDomain == "" { + scope.Log("%s: parameter UnixDomain must be set", pluginName) + return + } + if arg.Threads == 0 { + arg.Threads = 1 + } + + configObj, ok := vql.GetServerConfig(scope) + if !ok { + scope.Log("%s: could not get config from scope", pluginName) + return + } + + options := api.QueueOptions{ + DisableFileBuffering: false, + FileBufferLeaseSize: 100, + OwnerName: pluginName, + } + + listenerCtx, cancelListener := context.WithCancel(context.Background()) + defer cancelListener() + + listener, err := directory.NewListener(configObj, listenerCtx, pluginName, options) + if err != nil { + scope.Log("%s: could not create listener: %v", pluginName, err) + return + } + + scope.Log("%s: starting %d worker threads", pluginName, arg.Threads) + wg := sync.WaitGroup{} + for i := 0; i < arg.Threads; i++ { + wg.Add(1) + go rsyslogSender(ctx, &wg, arg.UnixDomain, scope, listener.Output()) + } + + rowCh := arg.Query.Eval(ctx, scope) + + quitLoop := false + for !quitLoop { + select { + case <-ctx.Done(): + listener.Close() + quitLoop = true + case row, ok := <-rowCh: + if !ok { + continue + } + listener.Send(vfilter.RowToDict(ctx, scope, row)) + } + } + + // the workers will return when they detect that + // the listener had closed its output channel + wg.Wait() + }() + return outputCh +} + +func rsyslogSender( + ctx context.Context, wg *sync.WaitGroup, address string, + scope vfilter.Scope, rowCh <-chan *ordereddict.Dict, +) { + defer func() { + scope.Log("%s: worker done", pluginName) + wg.Done() + }() + + scope.Log("%s: worker started", pluginName) + var ( + pid = strconv.Itoa(os.Getpid()) + conn net.Conn + message string + rrDialLog = rate.Sometimes{Interval: time.Minute} + rrWriteLog = rate.Sometimes{Interval: time.Minute} + ) + for { + if conn == nil { + var err error + conn, err = net.DialTimeout("unixgram", address, time.Second) + if err != nil { + rrDialLog.Do(func() { scope.Log("%s: dialing: %v", pluginName, err) }) + utils.SleepWithCtx(ctx, time.Second) + if ctx.Err() != nil { + // avoid spinning here if rsyslogd is not + // listening when the plugin is shutting down. + return + } + conn = nil // probably not needed, but no harm. + continue // retry dial + } + scope.Log("%s: worker connected!", pluginName) + } + if message == "" { + row, ok := <-rowCh + if !ok { + // the listener closed its channel + return + } + var err error + message, err = rowToRsyslogString(row, pid) + if err != nil { + scope.Log("%s: creating rsyslog message: %v", pluginName, err) + return + } + } + conn.SetWriteDeadline(time.Now().Add(time.Second)) + _, err := io.WriteString(conn, message) + if err != nil { + rrWriteLog.Do(func() { scope.Log("%s: writing to conn: %v", pluginName, err) }) + conn.Close() + conn = nil // conn is an interface! + continue // Retry sending the same message on the next iteration. + } + + // the message was sent successfully. + message = "" + } +} + +func rowToRsyslogString(row *ordereddict.Dict, pid string) (string, error) { + message := rfc5424.SyslogMessage{} + message.SetPriority(0) + message.SetVersion(1) + message.SetAppname("velociraptor") + message.SetProcID(pid) + message.SetMessage(row.String()) // json + + return message.String() +} + +func init() { + vql.RegisterPlugin(&rsyslogUploadPlugin{}) +}