From 347667a2bd26cf2e4cd970bb44012ea5e5f00687 Mon Sep 17 00:00:00 2001 From: Toby Date: Mon, 8 Apr 2024 11:54:35 -0700 Subject: [PATCH 01/22] feat: TCP timeout flush --- cmd/root.go | 58 ++++++++++++++++++++++++++++++++------------- engine/engine.go | 1 + engine/interface.go | 3 +++ engine/worker.go | 25 +++++++++++++++++-- 4 files changed, 68 insertions(+), 19 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 756513a..2ea21eb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -7,6 +7,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/analyzer/tcp" @@ -176,11 +177,12 @@ type cliConfigIO struct { } type cliConfigWorkers struct { - Count int `mapstructure:"count"` - QueueSize int `mapstructure:"queueSize"` - TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"` - TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"` - UDPMaxStreams int `mapstructure:"udpMaxStreams"` + Count int `mapstructure:"count"` + QueueSize int `mapstructure:"queueSize"` + TCPMaxBufferedPagesTotal int `mapstructure:"tcpMaxBufferedPagesTotal"` + TCPMaxBufferedPagesPerConn int `mapstructure:"tcpMaxBufferedPagesPerConn"` + TCPTimeout time.Duration `mapstructure:"tcpTimeout"` + UDPMaxStreams int `mapstructure:"udpMaxStreams"` } type cliConfigRuleset struct { @@ -213,6 +215,7 @@ func (c *cliConfig) fillWorkers(config *engine.Config) error { config.WorkerQueueSize = c.Workers.QueueSize config.WorkerTCPMaxBufferedPagesTotal = c.Workers.TCPMaxBufferedPagesTotal config.WorkerTCPMaxBufferedPagesPerConn = c.Workers.TCPMaxBufferedPagesPerConn + config.WorkerTCPTimeout = c.Workers.TCPTimeout config.WorkerUDPMaxStreams = c.Workers.UDPMaxStreams return nil } @@ -340,12 +343,26 @@ func (l *engineLogger) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) } func (l *engineLogger) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { - logger.Info("TCP stream action", - zap.Int64("id", info.ID), - zap.String("src", info.SrcString()), - zap.String("dst", info.DstString()), - zap.String("action", action.String()), - zap.Bool("noMatch", noMatch)) + if noMatch { + logger.Debug("TCP stream no match", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } else { + logger.Info("TCP stream action", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } +} + +func (l *engineLogger) TCPFlush(workerID, flushed, closed int) { + logger.Debug("TCP flush", + zap.Int("workerID", workerID), + zap.Int("flushed", flushed), + zap.Int("closed", closed)) } func (l *engineLogger) UDPStreamNew(workerID int, info ruleset.StreamInfo) { @@ -366,12 +383,19 @@ func (l *engineLogger) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) } func (l *engineLogger) UDPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) { - logger.Info("UDP stream action", - zap.Int64("id", info.ID), - zap.String("src", info.SrcString()), - zap.String("dst", info.DstString()), - zap.String("action", action.String()), - zap.Bool("noMatch", noMatch)) + if noMatch { + logger.Debug("UDP stream no match", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } else { + logger.Info("UDP stream action", + zap.Int64("id", info.ID), + zap.String("src", info.SrcString()), + zap.String("dst", info.DstString()), + zap.String("action", action.String())) + } } func (l *engineLogger) ModifyError(info ruleset.StreamInfo, err error) { diff --git a/engine/engine.go b/engine/engine.go index 7c93e0a..c838e0d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -34,6 +34,7 @@ func NewEngine(config Config) (Engine, error) { Ruleset: config.Ruleset, TCPMaxBufferedPagesTotal: config.WorkerTCPMaxBufferedPagesTotal, TCPMaxBufferedPagesPerConn: config.WorkerTCPMaxBufferedPagesPerConn, + TCPTimeout: config.WorkerTCPTimeout, UDPMaxStreams: config.WorkerUDPMaxStreams, }) if err != nil { diff --git a/engine/interface.go b/engine/interface.go index fe25de5..af106f4 100644 --- a/engine/interface.go +++ b/engine/interface.go @@ -2,6 +2,7 @@ package engine import ( "context" + "time" "github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/ruleset" @@ -25,6 +26,7 @@ type Config struct { WorkerQueueSize int WorkerTCPMaxBufferedPagesTotal int WorkerTCPMaxBufferedPagesPerConn int + WorkerTCPTimeout time.Duration WorkerUDPMaxStreams int } @@ -36,6 +38,7 @@ type Logger interface { TCPStreamNew(workerID int, info ruleset.StreamInfo) TCPStreamPropUpdate(info ruleset.StreamInfo, close bool) TCPStreamAction(info ruleset.StreamInfo, action ruleset.Action, noMatch bool) + TCPFlush(workerID, flushed, closed int) UDPStreamNew(workerID int, info ruleset.StreamInfo) UDPStreamPropUpdate(info ruleset.StreamInfo, close bool) diff --git a/engine/worker.go b/engine/worker.go index 5177016..343bc57 100644 --- a/engine/worker.go +++ b/engine/worker.go @@ -2,6 +2,7 @@ package engine import ( "context" + "time" "github.com/apernet/OpenGFW/io" "github.com/apernet/OpenGFW/ruleset" @@ -14,9 +15,12 @@ import ( const ( defaultChanSize = 64 - defaultTCPMaxBufferedPagesTotal = 4096 - defaultTCPMaxBufferedPagesPerConnection = 64 + defaultTCPMaxBufferedPagesTotal = 65536 + defaultTCPMaxBufferedPagesPerConnection = 16 + defaultTCPTimeout = 10 * time.Minute defaultUDPMaxStreams = 4096 + + tcpFlushInterval = 1 * time.Minute ) type workerPacket struct { @@ -33,6 +37,7 @@ type worker struct { tcpStreamFactory *tcpStreamFactory tcpStreamPool *reassembly.StreamPool tcpAssembler *reassembly.Assembler + tcpTimeout time.Duration udpStreamFactory *udpStreamFactory udpStreamManager *udpStreamManager @@ -47,6 +52,7 @@ type workerConfig struct { Ruleset ruleset.Ruleset TCPMaxBufferedPagesTotal int TCPMaxBufferedPagesPerConn int + TCPTimeout time.Duration UDPMaxStreams int } @@ -60,6 +66,9 @@ func (c *workerConfig) fillDefaults() { if c.TCPMaxBufferedPagesPerConn <= 0 { c.TCPMaxBufferedPagesPerConn = defaultTCPMaxBufferedPagesPerConnection } + if c.TCPTimeout <= 0 { + c.TCPTimeout = defaultTCPTimeout + } if c.UDPMaxStreams <= 0 { c.UDPMaxStreams = defaultUDPMaxStreams } @@ -98,6 +107,7 @@ func newWorker(config workerConfig) (*worker, error) { tcpStreamFactory: tcpSF, tcpStreamPool: tcpStreamPool, tcpAssembler: tcpAssembler, + tcpTimeout: config.TCPTimeout, udpStreamFactory: udpSF, udpStreamManager: udpSM, modSerializeBuffer: gopacket.NewSerializeBuffer(), @@ -111,6 +121,10 @@ func (w *worker) Feed(p *workerPacket) { func (w *worker) Run(ctx context.Context) { w.logger.WorkerStart(w.id) defer w.logger.WorkerStop(w.id) + + tcpFlushTicker := time.NewTicker(tcpFlushInterval) + defer tcpFlushTicker.Stop() + for { select { case <-ctx.Done(): @@ -122,6 +136,8 @@ func (w *worker) Run(ctx context.Context) { } v, b := w.handle(wPkt.StreamID, wPkt.Packet) _ = wPkt.SetVerdict(v, b) + case <-tcpFlushTicker.C: + w.flushTCP(w.tcpTimeout) } } } @@ -176,6 +192,11 @@ func (w *worker) handleTCP(ipFlow gopacket.Flow, pMeta *gopacket.PacketMetadata, return io.Verdict(ctx.Verdict) } +func (w *worker) flushTCP(timeout time.Duration) { + flushed, closed := w.tcpAssembler.FlushCloseOlderThan(time.Now().Add(-timeout)) + w.logger.TCPFlush(w.id, flushed, closed) +} + func (w *worker) handleUDP(streamID uint32, ipFlow gopacket.Flow, udp *layers.UDP) (io.Verdict, []byte) { ctx := &udpContext{ Verdict: udpVerdictAccept, From 107e29ee2034b133c73659d6ee3c174b3f9f34f3 Mon Sep 17 00:00:00 2001 From: Toby Date: Wed, 10 Apr 2024 21:30:37 -0700 Subject: [PATCH 02/22] fix: do not reload geoip/geosite when reloading ruleset to prevent leaking references to streams --- cmd/root.go | 4 ++-- ruleset/expr.go | 14 ++++++-------- ruleset/interface.go | 4 ++-- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 2ea21eb..288e3d7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -17,6 +17,7 @@ import ( "github.com/apernet/OpenGFW/modifier" modUDP "github.com/apernet/OpenGFW/modifier/udp" "github.com/apernet/OpenGFW/ruleset" + "github.com/apernet/OpenGFW/ruleset/builtins/geo" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -259,8 +260,7 @@ func runMain(cmd *cobra.Command, args []string) { } rsConfig := &ruleset.BuiltinConfig{ Logger: &rulesetLogger{}, - GeoSiteFilename: config.Ruleset.GeoSite, - GeoIpFilename: config.Ruleset.GeoIp, + GeoMatcher: geo.NewGeoMatcher(config.Ruleset.GeoSite, config.Ruleset.GeoIp), ProtectedDialContext: engineConfig.IO.ProtectedDialContext, } rs, err := ruleset.CompileExprRules(rawRs, analyzers, modifiers, rsConfig) diff --git a/ruleset/expr.go b/ruleset/expr.go index 868a115..373d0d3 100644 --- a/ruleset/expr.go +++ b/ruleset/expr.go @@ -20,7 +20,6 @@ import ( "github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/modifier" "github.com/apernet/OpenGFW/ruleset/builtins" - "github.com/apernet/OpenGFW/ruleset/builtins/geo" ) // ExprRule is the external representation of an expression rule. @@ -302,23 +301,22 @@ type Function struct { } func buildFunctionMap(config *BuiltinConfig) map[string]*Function { - geoMatcher := geo.NewGeoMatcher(config.GeoSiteFilename, config.GeoIpFilename) return map[string]*Function{ "geoip": { - InitFunc: geoMatcher.LoadGeoIP, + InitFunc: config.GeoMatcher.LoadGeoIP, PatchFunc: nil, Func: func(params ...any) (any, error) { - return geoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil + return config.GeoMatcher.MatchGeoIp(params[0].(string), params[1].(string)), nil }, - Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoIp)}, + Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoIp)}, }, "geosite": { - InitFunc: geoMatcher.LoadGeoSite, + InitFunc: config.GeoMatcher.LoadGeoSite, PatchFunc: nil, Func: func(params ...any) (any, error) { - return geoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil + return config.GeoMatcher.MatchGeoSite(params[0].(string), params[1].(string)), nil }, - Types: []reflect.Type{reflect.TypeOf(geoMatcher.MatchGeoSite)}, + Types: []reflect.Type{reflect.TypeOf(config.GeoMatcher.MatchGeoSite)}, }, "cidr": { InitFunc: nil, diff --git a/ruleset/interface.go b/ruleset/interface.go index 535c2a4..a6df363 100644 --- a/ruleset/interface.go +++ b/ruleset/interface.go @@ -7,6 +7,7 @@ import ( "github.com/apernet/OpenGFW/analyzer" "github.com/apernet/OpenGFW/modifier" + "github.com/apernet/OpenGFW/ruleset/builtins/geo" ) type Action int @@ -102,7 +103,6 @@ type Logger interface { type BuiltinConfig struct { Logger Logger - GeoSiteFilename string - GeoIpFilename string + GeoMatcher *geo.GeoMatcher ProtectedDialContext func(ctx context.Context, network, address string) (net.Conn, error) } From d7506264ad4e74c677a5775df0007241fd33cf54 Mon Sep 17 00:00:00 2001 From: Toby Date: Mon, 6 May 2024 14:35:31 -0700 Subject: [PATCH 03/22] fix: provide correct timestamp for TCP reassembler --- engine/engine.go | 4 +++- io/interface.go | 3 +++ io/nfqueue.go | 18 +++++++++++++++--- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index c838e0d..56f5ed3 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -102,9 +102,11 @@ func (e *engine) dispatch(p io.Packet) bool { _ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil) return true } + // Convert to gopacket.Packet + packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) + packet.Metadata().Timestamp = p.Timestamp() // Load balance by stream ID index := p.StreamID() % uint32(len(e.workers)) - packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) e.workers[index].Feed(&workerPacket{ StreamID: p.StreamID(), Packet: packet, diff --git a/io/interface.go b/io/interface.go index 35aa886..af7e1e7 100644 --- a/io/interface.go +++ b/io/interface.go @@ -3,6 +3,7 @@ package io import ( "context" "net" + "time" ) type Verdict int @@ -24,6 +25,8 @@ const ( type Packet interface { // StreamID is the ID of the stream the packet belongs to. StreamID() uint32 + // Timestamp is the time the packet was received. + Timestamp() time.Time // Data is the raw packet data, starting with the IP header. Data() []byte } diff --git a/io/nfqueue.go b/io/nfqueue.go index 543f247..e84a0bb 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/coreos/go-iptables/iptables" "github.com/florianl/go-nfqueue" @@ -189,6 +190,12 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error streamID: ctIDFromCtBytes(*a.Ct), data: *a.Payload, } + // Use timestamp from attribute if available, otherwise use current time as fallback + if a.Timestamp != nil { + p.timestamp = *a.Timestamp + } else { + p.timestamp = time.Now() + } return okBoolToInt(cb(p, nil)) }, func(e error) int { @@ -312,15 +319,20 @@ func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error { var _ Packet = (*nfqueuePacket)(nil) type nfqueuePacket struct { - id uint32 - streamID uint32 - data []byte + id uint32 + streamID uint32 + timestamp time.Time + data []byte } func (p *nfqueuePacket) StreamID() uint32 { return p.streamID } +func (p *nfqueuePacket) Timestamp() time.Time { + return p.timestamp +} + func (p *nfqueuePacket) Data() []byte { return p.data } From 94387450cff92fe54fe86736290ae594a2544832 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Mon, 6 May 2024 20:28:36 +0100 Subject: [PATCH 04/22] feat: add support for pcap replay --- cmd/root.go | 41 ++++++++++++--- engine/engine.go | 11 ++++- io/interface.go | 3 ++ io/nfqueue.go | 5 ++ io/pcap.go | 126 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 176 insertions(+), 10 deletions(-) create mode 100644 io/pcap.go diff --git a/cmd/root.go b/cmd/root.go index 288e3d7..d6c0083 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,6 +43,7 @@ var logger *zap.Logger // Flags var ( cfgFile string + pcapFile string logLevel string logFormat string ) @@ -118,6 +119,7 @@ func init() { func initFlags() { rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file") + rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format") } @@ -133,6 +135,9 @@ func initConfig() { viper.AddConfigPath("$HOME/.opengfw") viper.AddConfigPath("/etc/opengfw") } + + viper.SetDefault("replay.realtime", true) + viper.SetDefault("replay.replayDelay", 10 * time.Millisecond) } func initLogger() { @@ -167,6 +172,7 @@ type cliConfig struct { IO cliConfigIO `mapstructure:"io"` Workers cliConfigWorkers `mapstructure:"workers"` Ruleset cliConfigRuleset `mapstructure:"ruleset"` + Replay cliConfigReplay `mapstructure:"replay"` } type cliConfigIO struct { @@ -177,6 +183,11 @@ type cliConfigIO struct { RST bool `mapstructure:"rst"` } +type cliConfigReplay struct { + Realtime bool `mapstructure:"realtime"` + ReplayDelay time.Duration `mapstructure:"replayDelay"` +} + type cliConfigWorkers struct { Count int `mapstructure:"count"` QueueSize int `mapstructure:"queueSize"` @@ -197,17 +208,31 @@ func (c *cliConfig) fillLogger(config *engine.Config) error { } func (c *cliConfig) fillIO(config *engine.Config) error { - nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, - ReadBuffer: c.IO.ReadBuffer, - WriteBuffer: c.IO.WriteBuffer, - Local: c.IO.Local, - RST: c.IO.RST, - }) + var ioImpl io.PacketIO + var err error + if pcapFile != "" { + // Setup IO for pcap file replay + logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) + ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, + ReplayDelay: c.Replay.ReplayDelay, + }) + } else { + // Setup IO for nfqueue + ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ + QueueSize: c.IO.QueueSize, + ReadBuffer: c.IO.ReadBuffer, + WriteBuffer: c.IO.WriteBuffer, + Local: c.IO.Local, + RST: c.IO.RST, + }) + } + if err != nil { return configError{Field: "io", Err: err} } - config.IO = nfio + config.IO = ioImpl return nil } diff --git a/engine/engine.go b/engine/engine.go index 56f5ed3..1270efb 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error { } func (e *engine) Run(ctx context.Context) error { + workerCtx, workerCancel := context.WithCancel(ctx) + defer workerCancel() // Stop workers + + // Register IO shutdown ioCtx, ioCancel := context.WithCancel(ctx) - defer ioCancel() // Stop workers & IO + e.io.SetCancelFunc(ioCancel) + defer ioCancel() // Stop IO // Start workers for _, w := range e.workers { - go w.Run(ioCtx) + go w.Run(workerCtx) } // Register IO callback @@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error { return err case <-ctx.Done(): return nil + case <-ioCtx.Done(): + return nil } } diff --git a/io/interface.go b/io/interface.go index af7e1e7..f996789 100644 --- a/io/interface.go +++ b/io/interface.go @@ -48,6 +48,9 @@ type PacketIO interface { ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) // Close closes the packet IO. Close() error + // SetCancelFunc gives packet IO access to context cancel function, enabling it to + // trigger a shutdown + SetCancelFunc(cancelFunc context.CancelFunc) error } type ErrInvalidPacket struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index e84a0bb..f1a64df 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error { return n.n.Close() } +// nfqueue IO does not issue shutdown +func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + return nil +} + func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { rules, err := generateNftRules(local, rst) if err != nil { diff --git a/io/pcap.go b/io/pcap.go new file mode 100644 index 0000000..36bbbaa --- /dev/null +++ b/io/pcap.go @@ -0,0 +1,126 @@ +package io + +import ( + "context" + "hash/crc32" + "net" + "sort" + "strings" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +var _ PacketIO = (*pcapPacketIO)(nil) + +type pcapPacketIO struct { + pcap *pcap.Handle + lastTime *time.Time + ioCancel context.CancelFunc + config PcapPacketIOConfig +} + +type PcapPacketIOConfig struct { + PcapFile string + Realtime bool + ReplayDelay time.Duration +} + +func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { + handle, err := pcap.OpenOffline(config.PcapFile) + + if err != nil { + return nil, err + } + + print(config.ReplayDelay) + + return &pcapPacketIO{ + pcap: handle, + lastTime: nil, + ioCancel: nil, + config: config, + }, nil +} + +func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { + go func() { + packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType()) + for packet := range packetSource.Packets() { + p.wait(packet) + + networkLayer := packet.NetworkLayer() + if networkLayer != nil { + src, dst := networkLayer.NetworkFlow().Endpoints() + endpoints := []string{src.String(), dst.String()} + sort.Strings(endpoints) + id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) + + cb(&pcapPacket{ + streamID: id, + data: packet.LinkLayer().LayerPayload(), + }, nil) + } + } + // Give the workers a chance to finish everything + time.Sleep(time.Second) + // Stop the engine when all packets are finished + p.ioCancel() + }() + + return nil +} + +func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { + return nil, nil +} + +func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { + return nil +} + +func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + p.ioCancel = cancelFunc + return nil +} + +func (p *pcapPacketIO) Close() error { + return nil +} + +// Intentionally slow down the replay +// In realtime mode, this is to match the timestamps in the capture +// In non realtime mode, this helps to avoid flooding the workers +func (p *pcapPacketIO) wait(packet gopacket.Packet) error { + if !p.config.Realtime { + time.Sleep(p.config.ReplayDelay) + return nil + } + + if p.lastTime == nil { + p.lastTime = &packet.Metadata().Timestamp + } else { + t := packet.Metadata().Timestamp.Sub(*p.lastTime) + time.Sleep(t) + p.lastTime = &packet.Metadata().Timestamp + } + + return nil +} + +var _ Packet = (*pcapPacket)(nil) + +type pcapPacket struct { + streamID uint32 + data []byte +} + +func (p *pcapPacket) StreamID() uint32 { + return p.streamID +} + +func (p *pcapPacket) Data() []byte { + return p.data +} + From f01b79e6255751bf2fe7a5db22226d205748e2b6 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Mon, 6 May 2024 23:04:54 +0100 Subject: [PATCH 05/22] rebase and remove replayDelay --- cmd/root.go | 3 --- io/pcap.go | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index d6c0083..136f456 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -137,7 +137,6 @@ func initConfig() { } viper.SetDefault("replay.realtime", true) - viper.SetDefault("replay.replayDelay", 10 * time.Millisecond) } func initLogger() { @@ -185,7 +184,6 @@ type cliConfigIO struct { type cliConfigReplay struct { Realtime bool `mapstructure:"realtime"` - ReplayDelay time.Duration `mapstructure:"replayDelay"` } type cliConfigWorkers struct { @@ -216,7 +214,6 @@ func (c *cliConfig) fillIO(config *engine.Config) error { ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ PcapFile: pcapFile, Realtime: c.Replay.Realtime, - ReplayDelay: c.Replay.ReplayDelay, }) } else { // Setup IO for nfqueue diff --git a/io/pcap.go b/io/pcap.go index 36bbbaa..995332b 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -24,7 +24,6 @@ type pcapPacketIO struct { type PcapPacketIOConfig struct { PcapFile string Realtime bool - ReplayDelay time.Duration } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { @@ -34,8 +33,6 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { return nil, err } - print(config.ReplayDelay) - return &pcapPacketIO{ pcap: handle, lastTime: nil, @@ -58,8 +55,9 @@ func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) cb(&pcapPacket{ - streamID: id, - data: packet.LinkLayer().LayerPayload(), + streamID: id, + timestamp: packet.Metadata().Timestamp, + data: packet.LinkLayer().LayerPayload(), }, nil) } } @@ -91,10 +89,8 @@ func (p *pcapPacketIO) Close() error { // Intentionally slow down the replay // In realtime mode, this is to match the timestamps in the capture -// In non realtime mode, this helps to avoid flooding the workers func (p *pcapPacketIO) wait(packet gopacket.Packet) error { if !p.config.Realtime { - time.Sleep(p.config.ReplayDelay) return nil } @@ -112,15 +108,19 @@ func (p *pcapPacketIO) wait(packet gopacket.Packet) error { var _ Packet = (*pcapPacket)(nil) type pcapPacket struct { - streamID uint32 - data []byte + streamID uint32 + timestamp time.Time + data []byte } func (p *pcapPacket) StreamID() uint32 { return p.streamID } +func (p *pcapPacket) Timestamp() time.Time { + return p.timestamp +} + func (p *pcapPacket) Data() []byte { return p.data } - From abd7725feda4909931df90ae62461ea08428eaa0 Mon Sep 17 00:00:00 2001 From: eddc005 Date: Tue, 7 May 2024 21:50:06 +0100 Subject: [PATCH 06/22] close pcap properly and implement ProtectedDialContext --- io/pcap.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/io/pcap.go b/io/pcap.go index 995332b..c2dfae0 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -19,6 +19,8 @@ type pcapPacketIO struct { lastTime *time.Time ioCancel context.CancelFunc config PcapPacketIOConfig + + dialer *net.Dialer } type PcapPacketIOConfig struct { @@ -38,6 +40,7 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { lastTime: nil, ioCancel: nil, config: config, + dialer: &net.Dialer{}, }, nil } @@ -70,8 +73,9 @@ func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { return nil } +// A normal dialer is sufficient as pcap IO does not mess up with the networking func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { - return nil, nil + return p.dialer.DialContext(ctx, network, address) } func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { @@ -84,6 +88,7 @@ func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { } func (p *pcapPacketIO) Close() error { + p.pcap.Close() return nil } From 70fee141033dbf743e76a419a596ffe22ebc1898 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 22:50:58 -0700 Subject: [PATCH 07/22] chore: format --- cmd/root.go | 6 +++--- io/pcap.go | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 136f456..79078ea 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -183,7 +183,7 @@ type cliConfigIO struct { } type cliConfigReplay struct { - Realtime bool `mapstructure:"realtime"` + Realtime bool `mapstructure:"realtime"` } type cliConfigWorkers struct { @@ -212,8 +212,8 @@ func (c *cliConfig) fillIO(config *engine.Config) error { // Setup IO for pcap file replay logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ - PcapFile: pcapFile, - Realtime: c.Replay.Realtime, + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, }) } else { // Setup IO for nfqueue diff --git a/io/pcap.go b/io/pcap.go index c2dfae0..520da17 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -20,17 +20,16 @@ type pcapPacketIO struct { ioCancel context.CancelFunc config PcapPacketIOConfig - dialer *net.Dialer + dialer *net.Dialer } type PcapPacketIOConfig struct { - PcapFile string - Realtime bool + PcapFile string + Realtime bool } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { handle, err := pcap.OpenOffline(config.PcapFile) - if err != nil { return nil, err } From 76c0f47832140dac528aba42b6d80bf551b4f7a8 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:05:06 -0700 Subject: [PATCH 08/22] chore: do not default replay.realtime to true --- cmd/root.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 79078ea..1ccf025 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,8 +135,6 @@ func initConfig() { viper.AddConfigPath("$HOME/.opengfw") viper.AddConfigPath("/etc/opengfw") } - - viper.SetDefault("replay.realtime", true) } func initLogger() { From 5e15fd6dd937fa040254fe368ee2b6645d3f50ff Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:12:24 -0700 Subject: [PATCH 09/22] ci: install pcap for build --- .github/workflows/check.yaml | 3 +++ .github/workflows/release.yaml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index ac9e66d..352ac7c 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,6 +23,9 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go vet ./... - name: staticcheck diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0da1054..b227e16 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,6 +24,9 @@ jobs: with: go-version: "1.22" + - name: Install pcap + run: sudo apt install -y libpcap-dev + - name: Build env: GOOS: ${{ matrix.goos }} From 0daaa32fc6c77fa4566266986d9a6f84722e6903 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:13:58 -0700 Subject: [PATCH 10/22] ci: install pcap for build 2 --- .github/workflows/check.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index 352ac7c..a24f198 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -47,4 +47,7 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go test ./... From dabcc9566c2f8af2a7cb831a2ea23fad2cd6c57b Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:38:28 -0700 Subject: [PATCH 11/22] ci: enable cgo --- .github/workflows/release.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b227e16..43c9377 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -31,7 +31,6 @@ jobs: env: GOOS: ${{ matrix.goos }} GOARCH: ${{ matrix.goarch }} - CGO_ENABLED: 0 run: | mkdir -p build go build -o build/OpenGFW-${GOOS}-${GOARCH} -ldflags "-s -w" . From 2ac8783eb66328e580b5958c696e82c53cb254dd Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:43:54 -0700 Subject: [PATCH 12/22] Revert "Merge pull request #132 from eddc005/feat-pcap" This reverts commit c453020349c4245c4884f4f1f6238098e4bf4906, reversing changes made to 5723490a6c45eb5f2e03fa60cb7fe3bfc048995e. --- .github/workflows/check.yaml | 6 -- .github/workflows/release.yaml | 3 - cmd/root.go | 36 ++------- engine/engine.go | 11 +-- io/interface.go | 3 - io/nfqueue.go | 5 -- io/pcap.go | 130 --------------------------------- 7 files changed, 10 insertions(+), 184 deletions(-) delete mode 100644 io/pcap.go diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index a24f198..ac9e66d 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,9 +23,6 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go vet ./... - name: staticcheck @@ -47,7 +44,4 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go test ./... diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 43c9377..ba0565c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,9 +24,6 @@ jobs: with: go-version: "1.22" - - name: Install pcap - run: sudo apt install -y libpcap-dev - - name: Build env: GOOS: ${{ matrix.goos }} diff --git a/cmd/root.go b/cmd/root.go index 1ccf025..288e3d7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,7 +43,6 @@ var logger *zap.Logger // Flags var ( cfgFile string - pcapFile string logLevel string logFormat string ) @@ -119,7 +118,6 @@ func init() { func initFlags() { rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file") - rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format") } @@ -169,7 +167,6 @@ type cliConfig struct { IO cliConfigIO `mapstructure:"io"` Workers cliConfigWorkers `mapstructure:"workers"` Ruleset cliConfigRuleset `mapstructure:"ruleset"` - Replay cliConfigReplay `mapstructure:"replay"` } type cliConfigIO struct { @@ -180,10 +177,6 @@ type cliConfigIO struct { RST bool `mapstructure:"rst"` } -type cliConfigReplay struct { - Realtime bool `mapstructure:"realtime"` -} - type cliConfigWorkers struct { Count int `mapstructure:"count"` QueueSize int `mapstructure:"queueSize"` @@ -204,30 +197,17 @@ func (c *cliConfig) fillLogger(config *engine.Config) error { } func (c *cliConfig) fillIO(config *engine.Config) error { - var ioImpl io.PacketIO - var err error - if pcapFile != "" { - // Setup IO for pcap file replay - logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) - ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ - PcapFile: pcapFile, - Realtime: c.Replay.Realtime, - }) - } else { - // Setup IO for nfqueue - ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, - ReadBuffer: c.IO.ReadBuffer, - WriteBuffer: c.IO.WriteBuffer, - Local: c.IO.Local, - RST: c.IO.RST, - }) - } - + nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ + QueueSize: c.IO.QueueSize, + ReadBuffer: c.IO.ReadBuffer, + WriteBuffer: c.IO.WriteBuffer, + Local: c.IO.Local, + RST: c.IO.RST, + }) if err != nil { return configError{Field: "io", Err: err} } - config.IO = ioImpl + config.IO = nfio return nil } diff --git a/engine/engine.go b/engine/engine.go index 1270efb..56f5ed3 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,17 +58,12 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error { } func (e *engine) Run(ctx context.Context) error { - workerCtx, workerCancel := context.WithCancel(ctx) - defer workerCancel() // Stop workers - - // Register IO shutdown ioCtx, ioCancel := context.WithCancel(ctx) - e.io.SetCancelFunc(ioCancel) - defer ioCancel() // Stop IO + defer ioCancel() // Stop workers & IO // Start workers for _, w := range e.workers { - go w.Run(workerCtx) + go w.Run(ioCtx) } // Register IO callback @@ -90,8 +85,6 @@ func (e *engine) Run(ctx context.Context) error { return err case <-ctx.Done(): return nil - case <-ioCtx.Done(): - return nil } } diff --git a/io/interface.go b/io/interface.go index f996789..af7e1e7 100644 --- a/io/interface.go +++ b/io/interface.go @@ -48,9 +48,6 @@ type PacketIO interface { ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) // Close closes the packet IO. Close() error - // SetCancelFunc gives packet IO access to context cancel function, enabling it to - // trigger a shutdown - SetCancelFunc(cancelFunc context.CancelFunc) error } type ErrInvalidPacket struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index f1a64df..e84a0bb 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -281,11 +281,6 @@ func (n *nfqueuePacketIO) Close() error { return n.n.Close() } -// nfqueue IO does not issue shutdown -func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { - return nil -} - func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { rules, err := generateNftRules(local, rst) if err != nil { diff --git a/io/pcap.go b/io/pcap.go deleted file mode 100644 index 520da17..0000000 --- a/io/pcap.go +++ /dev/null @@ -1,130 +0,0 @@ -package io - -import ( - "context" - "hash/crc32" - "net" - "sort" - "strings" - "time" - - "github.com/google/gopacket" - "github.com/google/gopacket/pcap" -) - -var _ PacketIO = (*pcapPacketIO)(nil) - -type pcapPacketIO struct { - pcap *pcap.Handle - lastTime *time.Time - ioCancel context.CancelFunc - config PcapPacketIOConfig - - dialer *net.Dialer -} - -type PcapPacketIOConfig struct { - PcapFile string - Realtime bool -} - -func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { - handle, err := pcap.OpenOffline(config.PcapFile) - if err != nil { - return nil, err - } - - return &pcapPacketIO{ - pcap: handle, - lastTime: nil, - ioCancel: nil, - config: config, - dialer: &net.Dialer{}, - }, nil -} - -func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { - go func() { - packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType()) - for packet := range packetSource.Packets() { - p.wait(packet) - - networkLayer := packet.NetworkLayer() - if networkLayer != nil { - src, dst := networkLayer.NetworkFlow().Endpoints() - endpoints := []string{src.String(), dst.String()} - sort.Strings(endpoints) - id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) - - cb(&pcapPacket{ - streamID: id, - timestamp: packet.Metadata().Timestamp, - data: packet.LinkLayer().LayerPayload(), - }, nil) - } - } - // Give the workers a chance to finish everything - time.Sleep(time.Second) - // Stop the engine when all packets are finished - p.ioCancel() - }() - - return nil -} - -// A normal dialer is sufficient as pcap IO does not mess up with the networking -func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { - return p.dialer.DialContext(ctx, network, address) -} - -func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { - return nil -} - -func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { - p.ioCancel = cancelFunc - return nil -} - -func (p *pcapPacketIO) Close() error { - p.pcap.Close() - return nil -} - -// Intentionally slow down the replay -// In realtime mode, this is to match the timestamps in the capture -func (p *pcapPacketIO) wait(packet gopacket.Packet) error { - if !p.config.Realtime { - return nil - } - - if p.lastTime == nil { - p.lastTime = &packet.Metadata().Timestamp - } else { - t := packet.Metadata().Timestamp.Sub(*p.lastTime) - time.Sleep(t) - p.lastTime = &packet.Metadata().Timestamp - } - - return nil -} - -var _ Packet = (*pcapPacket)(nil) - -type pcapPacket struct { - streamID uint32 - timestamp time.Time - data []byte -} - -func (p *pcapPacket) StreamID() uint32 { - return p.streamID -} - -func (p *pcapPacket) Timestamp() time.Time { - return p.timestamp -} - -func (p *pcapPacket) Data() []byte { - return p.data -} From b51ea5fa0716b28d80a23a88bcdb446687b4f573 Mon Sep 17 00:00:00 2001 From: Toby Date: Tue, 7 May 2024 23:44:00 -0700 Subject: [PATCH 13/22] Revert "Merge pull request #134 from apernet/ci-cgo" This reverts commit 5014523ae01d25b9fb69d5cd457f7b062bf71a7a, reversing changes made to c453020349c4245c4884f4f1f6238098e4bf4906. --- .github/workflows/release.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index ba0565c..0da1054 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -28,6 +28,7 @@ jobs: env: GOOS: ${{ matrix.goos }} GOARCH: ${{ matrix.goarch }} + CGO_ENABLED: 0 run: | mkdir -p build go build -o build/OpenGFW-${GOOS}-${GOARCH} -ldflags "-s -w" . From 8cab86b9245763b95e9d15be7ea627dd7e216dfe Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:13:49 +0800 Subject: [PATCH 14/22] Reapply "Merge pull request #132 from eddc005/feat-pcap" This reverts commit 2ac8783eb66328e580b5958c696e82c53cb254dd. --- .github/workflows/check.yaml | 6 ++ .github/workflows/release.yaml | 3 + cmd/root.go | 36 +++++++-- engine/engine.go | 11 ++- io/interface.go | 3 + io/nfqueue.go | 5 ++ io/pcap.go | 130 +++++++++++++++++++++++++++++++++ 7 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 io/pcap.go diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index ac9e66d..a24f198 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,6 +23,9 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go vet ./... - name: staticcheck @@ -44,4 +47,7 @@ jobs: with: go-version: 'stable' + - name: Install pcap + run: sudo apt install -y libpcap-dev + - run: go test ./... diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 0da1054..b227e16 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,6 +24,9 @@ jobs: with: go-version: "1.22" + - name: Install pcap + run: sudo apt install -y libpcap-dev + - name: Build env: GOOS: ${{ matrix.goos }} diff --git a/cmd/root.go b/cmd/root.go index 288e3d7..1ccf025 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,6 +43,7 @@ var logger *zap.Logger // Flags var ( cfgFile string + pcapFile string logLevel string logFormat string ) @@ -118,6 +119,7 @@ func init() { func initFlags() { rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file") + rootCmd.PersistentFlags().StringVarP(&pcapFile, "pcap", "p", "", "pcap file (optional)") rootCmd.PersistentFlags().StringVarP(&logLevel, "log-level", "l", envOrDefaultString(appLogLevelEnv, "info"), "log level") rootCmd.PersistentFlags().StringVarP(&logFormat, "log-format", "f", envOrDefaultString(appLogFormatEnv, "console"), "log format") } @@ -167,6 +169,7 @@ type cliConfig struct { IO cliConfigIO `mapstructure:"io"` Workers cliConfigWorkers `mapstructure:"workers"` Ruleset cliConfigRuleset `mapstructure:"ruleset"` + Replay cliConfigReplay `mapstructure:"replay"` } type cliConfigIO struct { @@ -177,6 +180,10 @@ type cliConfigIO struct { RST bool `mapstructure:"rst"` } +type cliConfigReplay struct { + Realtime bool `mapstructure:"realtime"` +} + type cliConfigWorkers struct { Count int `mapstructure:"count"` QueueSize int `mapstructure:"queueSize"` @@ -197,17 +204,30 @@ func (c *cliConfig) fillLogger(config *engine.Config) error { } func (c *cliConfig) fillIO(config *engine.Config) error { - nfio, err := io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, - ReadBuffer: c.IO.ReadBuffer, - WriteBuffer: c.IO.WriteBuffer, - Local: c.IO.Local, - RST: c.IO.RST, - }) + var ioImpl io.PacketIO + var err error + if pcapFile != "" { + // Setup IO for pcap file replay + logger.Info("replaying from pcap file", zap.String("pcap file", pcapFile)) + ioImpl, err = io.NewPcapPacketIO(io.PcapPacketIOConfig{ + PcapFile: pcapFile, + Realtime: c.Replay.Realtime, + }) + } else { + // Setup IO for nfqueue + ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ + QueueSize: c.IO.QueueSize, + ReadBuffer: c.IO.ReadBuffer, + WriteBuffer: c.IO.WriteBuffer, + Local: c.IO.Local, + RST: c.IO.RST, + }) + } + if err != nil { return configError{Field: "io", Err: err} } - config.IO = nfio + config.IO = ioImpl return nil } diff --git a/engine/engine.go b/engine/engine.go index 56f5ed3..1270efb 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,12 +58,17 @@ func (e *engine) UpdateRuleset(r ruleset.Ruleset) error { } func (e *engine) Run(ctx context.Context) error { + workerCtx, workerCancel := context.WithCancel(ctx) + defer workerCancel() // Stop workers + + // Register IO shutdown ioCtx, ioCancel := context.WithCancel(ctx) - defer ioCancel() // Stop workers & IO + e.io.SetCancelFunc(ioCancel) + defer ioCancel() // Stop IO // Start workers for _, w := range e.workers { - go w.Run(ioCtx) + go w.Run(workerCtx) } // Register IO callback @@ -85,6 +90,8 @@ func (e *engine) Run(ctx context.Context) error { return err case <-ctx.Done(): return nil + case <-ioCtx.Done(): + return nil } } diff --git a/io/interface.go b/io/interface.go index af7e1e7..f996789 100644 --- a/io/interface.go +++ b/io/interface.go @@ -48,6 +48,9 @@ type PacketIO interface { ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) // Close closes the packet IO. Close() error + // SetCancelFunc gives packet IO access to context cancel function, enabling it to + // trigger a shutdown + SetCancelFunc(cancelFunc context.CancelFunc) error } type ErrInvalidPacket struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index e84a0bb..f1a64df 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -281,6 +281,11 @@ func (n *nfqueuePacketIO) Close() error { return n.n.Close() } +// nfqueue IO does not issue shutdown +func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + return nil +} + func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { rules, err := generateNftRules(local, rst) if err != nil { diff --git a/io/pcap.go b/io/pcap.go new file mode 100644 index 0000000..520da17 --- /dev/null +++ b/io/pcap.go @@ -0,0 +1,130 @@ +package io + +import ( + "context" + "hash/crc32" + "net" + "sort" + "strings" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" +) + +var _ PacketIO = (*pcapPacketIO)(nil) + +type pcapPacketIO struct { + pcap *pcap.Handle + lastTime *time.Time + ioCancel context.CancelFunc + config PcapPacketIOConfig + + dialer *net.Dialer +} + +type PcapPacketIOConfig struct { + PcapFile string + Realtime bool +} + +func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { + handle, err := pcap.OpenOffline(config.PcapFile) + if err != nil { + return nil, err + } + + return &pcapPacketIO{ + pcap: handle, + lastTime: nil, + ioCancel: nil, + config: config, + dialer: &net.Dialer{}, + }, nil +} + +func (p *pcapPacketIO) Register(ctx context.Context, cb PacketCallback) error { + go func() { + packetSource := gopacket.NewPacketSource(p.pcap, p.pcap.LinkType()) + for packet := range packetSource.Packets() { + p.wait(packet) + + networkLayer := packet.NetworkLayer() + if networkLayer != nil { + src, dst := networkLayer.NetworkFlow().Endpoints() + endpoints := []string{src.String(), dst.String()} + sort.Strings(endpoints) + id := crc32.Checksum([]byte(strings.Join(endpoints, ",")), crc32.IEEETable) + + cb(&pcapPacket{ + streamID: id, + timestamp: packet.Metadata().Timestamp, + data: packet.LinkLayer().LayerPayload(), + }, nil) + } + } + // Give the workers a chance to finish everything + time.Sleep(time.Second) + // Stop the engine when all packets are finished + p.ioCancel() + }() + + return nil +} + +// A normal dialer is sufficient as pcap IO does not mess up with the networking +func (p *pcapPacketIO) ProtectedDialContext(ctx context.Context, network, address string) (net.Conn, error) { + return p.dialer.DialContext(ctx, network, address) +} + +func (p *pcapPacketIO) SetVerdict(pkt Packet, v Verdict, newPacket []byte) error { + return nil +} + +func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { + p.ioCancel = cancelFunc + return nil +} + +func (p *pcapPacketIO) Close() error { + p.pcap.Close() + return nil +} + +// Intentionally slow down the replay +// In realtime mode, this is to match the timestamps in the capture +func (p *pcapPacketIO) wait(packet gopacket.Packet) error { + if !p.config.Realtime { + return nil + } + + if p.lastTime == nil { + p.lastTime = &packet.Metadata().Timestamp + } else { + t := packet.Metadata().Timestamp.Sub(*p.lastTime) + time.Sleep(t) + p.lastTime = &packet.Metadata().Timestamp + } + + return nil +} + +var _ Packet = (*pcapPacket)(nil) + +type pcapPacket struct { + streamID uint32 + timestamp time.Time + data []byte +} + +func (p *pcapPacket) StreamID() uint32 { + return p.streamID +} + +func (p *pcapPacket) Timestamp() time.Time { + return p.timestamp +} + +func (p *pcapPacket) Data() []byte { + return p.data +} From 7456e5907e21fc214bfbcf92733355789c242e6c Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:22:17 +0800 Subject: [PATCH 15/22] refactor(pcap): switch to pcapgo --- io/pcap.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/io/pcap.go b/io/pcap.go index 520da17..1da66a7 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -3,19 +3,22 @@ package io import ( "context" "hash/crc32" + "io" "net" + "os" "sort" "strings" "time" "github.com/google/gopacket" - "github.com/google/gopacket/pcap" + "github.com/google/gopacket/pcapgo" ) var _ PacketIO = (*pcapPacketIO)(nil) type pcapPacketIO struct { - pcap *pcap.Handle + pcapFile io.ReadCloser + pcap *pcapgo.Reader lastTime *time.Time ioCancel context.CancelFunc config PcapPacketIOConfig @@ -29,12 +32,18 @@ type PcapPacketIOConfig struct { } func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { - handle, err := pcap.OpenOffline(config.PcapFile) + pcapFile, err := os.Open(config.PcapFile) + if err != nil { + return nil, err + } + + handle, err := pcapgo.NewReader(pcapFile) if err != nil { return nil, err } return &pcapPacketIO{ + pcapFile: pcapFile, pcap: handle, lastTime: nil, ioCancel: nil, @@ -87,8 +96,7 @@ func (p *pcapPacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { } func (p *pcapPacketIO) Close() error { - p.pcap.Close() - return nil + return p.pcapFile.Close() } // Intentionally slow down the replay From cb0427bfbbd61cee981d1371d58315e850b00323 Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:26:15 +0800 Subject: [PATCH 16/22] Revert "ci: install pcap for build" This reverts commit 5e15fd6dd937fa040254fe368ee2b6645d3f50ff. --- .github/workflows/check.yaml | 3 --- .github/workflows/release.yaml | 3 --- 2 files changed, 6 deletions(-) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index a24f198..a1f2c4a 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -23,9 +23,6 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go vet ./... - name: staticcheck diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b227e16..0da1054 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -24,9 +24,6 @@ jobs: with: go-version: "1.22" - - name: Install pcap - run: sudo apt install -y libpcap-dev - - name: Build env: GOOS: ${{ matrix.goos }} From 301f9af3d43603a88dedd222cb2c6552df2b485a Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:26:19 +0800 Subject: [PATCH 17/22] Revert "ci: install pcap for build 2" This reverts commit 0daaa32fc6c77fa4566266986d9a6f84722e6903. --- .github/workflows/check.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index a1f2c4a..ac9e66d 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -44,7 +44,4 @@ jobs: with: go-version: 'stable' - - name: Install pcap - run: sudo apt install -y libpcap-dev - - run: go test ./... From 1934c065ecbe52692aeefa1f7171480588154be7 Mon Sep 17 00:00:00 2001 From: Haruue Date: Wed, 8 May 2024 19:35:17 +0800 Subject: [PATCH 18/22] feat(pcap): impl realtime wait() with time offset --- io/pcap.go | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/io/pcap.go b/io/pcap.go index 1da66a7..9801f9c 100644 --- a/io/pcap.go +++ b/io/pcap.go @@ -17,11 +17,11 @@ import ( var _ PacketIO = (*pcapPacketIO)(nil) type pcapPacketIO struct { - pcapFile io.ReadCloser - pcap *pcapgo.Reader - lastTime *time.Time - ioCancel context.CancelFunc - config PcapPacketIOConfig + pcapFile io.ReadCloser + pcap *pcapgo.Reader + timeOffset *time.Duration + ioCancel context.CancelFunc + config PcapPacketIOConfig dialer *net.Dialer } @@ -43,12 +43,12 @@ func NewPcapPacketIO(config PcapPacketIOConfig) (PacketIO, error) { } return &pcapPacketIO{ - pcapFile: pcapFile, - pcap: handle, - lastTime: nil, - ioCancel: nil, - config: config, - dialer: &net.Dialer{}, + pcapFile: pcapFile, + pcap: handle, + timeOffset: nil, + ioCancel: nil, + config: config, + dialer: &net.Dialer{}, }, nil } @@ -101,20 +101,18 @@ func (p *pcapPacketIO) Close() error { // Intentionally slow down the replay // In realtime mode, this is to match the timestamps in the capture -func (p *pcapPacketIO) wait(packet gopacket.Packet) error { +func (p *pcapPacketIO) wait(packet gopacket.Packet) { if !p.config.Realtime { - return nil + return } - if p.lastTime == nil { - p.lastTime = &packet.Metadata().Timestamp + if p.timeOffset == nil { + offset := time.Since(packet.Metadata().Timestamp) + p.timeOffset = &offset } else { - t := packet.Metadata().Timestamp.Sub(*p.lastTime) + t := time.Until(packet.Metadata().Timestamp.Add(*p.timeOffset)) time.Sleep(t) - p.lastTime = &packet.Metadata().Timestamp } - - return nil } var _ Packet = (*pcapPacket)(nil) From d3f1785ac9e9ca70f10693beabdb542b4f1c52d6 Mon Sep 17 00:00:00 2001 From: Keith Petkus Date: Thu, 8 Aug 2024 13:24:49 -0400 Subject: [PATCH 19/22] feat: netlink queueNum/table config options --- cmd/root.go | 4 ++++ io/nfqueue.go | 56 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 1ccf025..93a4791 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -174,6 +174,8 @@ type cliConfig struct { type cliConfigIO struct { QueueSize uint32 `mapstructure:"queueSize"` + QueueNum uint16 `mapstructure:"queueNum"` + Table string `mapstructure:"table"` ReadBuffer int `mapstructure:"rcvBuf"` WriteBuffer int `mapstructure:"sndBuf"` Local bool `mapstructure:"local"` @@ -221,6 +223,8 @@ func (c *cliConfig) fillIO(config *engine.Config) error { WriteBuffer: c.IO.WriteBuffer, Local: c.IO.Local, RST: c.IO.RST, + QueueNum: c.IO.QueueNum, + Table: c.IO.Table, }) } diff --git a/io/nfqueue.go b/io/nfqueue.go index f1a64df..eeca6d7 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -19,18 +19,18 @@ import ( ) const ( - nfqueueNum = 100 + nfqueueDefaultQueueNum = 100 nfqueueMaxPacketLen = 0xFFFF nfqueueDefaultQueueSize = 128 nfqueueConnMarkAccept = 1001 nfqueueConnMarkDrop = 1002 - nftFamily = "inet" - nftTable = "opengfw" + nftFamily = "inet" + nftDefaultTable = "opengfw" ) -func generateNftRules(local, rst bool) (*nftTableSpec, error) { +func generateNftRules(local, rst bool, nfqueueNum int, nftTable string) (*nftTableSpec, error) { if local && rst { return nil, errors.New("tcp rst is not supported in local mode") } @@ -64,7 +64,7 @@ func generateNftRules(local, rst bool) (*nftTableSpec, error) { return table, nil } -func generateIptRules(local, rst bool) ([]iptRule, error) { +func generateIptRules(local, rst bool, nfqueueNum int) ([]iptRule, error) { if local && rst { return nil, errors.New("tcp rst is not supported in local mode") } @@ -94,10 +94,12 @@ var _ PacketIO = (*nfqueuePacketIO)(nil) var errNotNFQueuePacket = errors.New("not an NFQueue packet") type nfqueuePacketIO struct { - n *nfqueue.Nfqueue - local bool - rst bool - rSet bool // whether the nftables/iptables rules have been set + n *nfqueue.Nfqueue + local bool + rst bool + rSet bool // whether the nftables/iptables rules have been set + queueNum int + table string // nftable name // iptables not nil = use iptables instead of nftables ipt4 *iptables.IPTables @@ -108,6 +110,8 @@ type nfqueuePacketIO struct { type NFQueuePacketIOConfig struct { QueueSize uint32 + QueueNum uint16 + Table string ReadBuffer int WriteBuffer int Local bool @@ -118,6 +122,12 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { if config.QueueSize == 0 { config.QueueSize = nfqueueDefaultQueueSize } + if config.QueueNum == 0 { + config.QueueNum = nfqueueDefaultQueueNum + } + if config.Table == "" { + config.Table = nftDefaultTable + } var ipt4, ipt6 *iptables.IPTables var err error if nftCheck() != nil { @@ -132,7 +142,7 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { } } n, err := nfqueue.Open(&nfqueue.Config{ - NfQueue: nfqueueNum, + NfQueue: config.QueueNum, MaxPacketLen: nfqueueMaxPacketLen, MaxQueueLen: config.QueueSize, Copymode: nfqueue.NfQnlCopyPacket, @@ -156,11 +166,13 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { } } return &nfqueuePacketIO{ - n: n, - local: config.Local, - rst: config.RST, - ipt4: ipt4, - ipt6: ipt6, + n: n, + local: config.Local, + rst: config.RST, + queueNum: int(config.QueueNum), + table: config.Table, + ipt4: ipt4, + ipt6: ipt6, protectedDialer: &net.Dialer{ Control: func(network, address string, c syscall.RawConn) error { var err error @@ -214,7 +226,7 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error if n.ipt4 != nil { err = n.setupIpt(n.local, n.rst, false) } else { - err = n.setupNft(n.local, n.rst, false) + err = n.setupNft(n.local, n.rst, false, n.queueNum) } if err != nil { return err @@ -274,7 +286,7 @@ func (n *nfqueuePacketIO) Close() error { if n.ipt4 != nil { _ = n.setupIpt(n.local, n.rst, true) } else { - _ = n.setupNft(n.local, n.rst, true) + _ = n.setupNft(n.local, n.rst, true, n.queueNum) } n.rSet = false } @@ -286,17 +298,17 @@ func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { return nil } -func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { - rules, err := generateNftRules(local, rst) +func (n *nfqueuePacketIO) setupNft(local, rst, remove bool, nfqueueNum int) error { + rules, err := generateNftRules(local, rst, nfqueueNum, n.table) if err != nil { return err } rulesText := rules.String() if remove { - err = nftDelete(nftFamily, nftTable) + err = nftDelete(nftFamily, n.table) } else { // Delete first to make sure no leftover rules - _ = nftDelete(nftFamily, nftTable) + _ = nftDelete(nftFamily, n.table) err = nftAdd(rulesText) } if err != nil { @@ -306,7 +318,7 @@ func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { } func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error { - rules, err := generateIptRules(local, rst) + rules, err := generateIptRules(local, rst, n.queueNum) if err != nil { return err } From d8d7c5b477e710ba5cb28377c2f5db311f824409 Mon Sep 17 00:00:00 2001 From: Haruue Date: Sun, 27 Oct 2024 15:44:04 +0900 Subject: [PATCH 20/22] chore: allow set nfqueue num to 0 --- cmd/root.go | 14 +++++++------- io/nfqueue.go | 11 ++++++----- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 93a4791..fc09a96 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -173,13 +173,13 @@ type cliConfig struct { } type cliConfigIO struct { - QueueSize uint32 `mapstructure:"queueSize"` - QueueNum uint16 `mapstructure:"queueNum"` - Table string `mapstructure:"table"` - ReadBuffer int `mapstructure:"rcvBuf"` - WriteBuffer int `mapstructure:"sndBuf"` - Local bool `mapstructure:"local"` - RST bool `mapstructure:"rst"` + QueueSize uint32 `mapstructure:"queueSize"` + QueueNum *uint16 `mapstructure:"queueNum"` + Table string `mapstructure:"table"` + ReadBuffer int `mapstructure:"rcvBuf"` + WriteBuffer int `mapstructure:"sndBuf"` + Local bool `mapstructure:"local"` + RST bool `mapstructure:"rst"` } type cliConfigReplay struct { diff --git a/io/nfqueue.go b/io/nfqueue.go index eeca6d7..58c85c9 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -110,7 +110,7 @@ type nfqueuePacketIO struct { type NFQueuePacketIOConfig struct { QueueSize uint32 - QueueNum uint16 + QueueNum *uint16 Table string ReadBuffer int WriteBuffer int @@ -122,8 +122,9 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { if config.QueueSize == 0 { config.QueueSize = nfqueueDefaultQueueSize } - if config.QueueNum == 0 { - config.QueueNum = nfqueueDefaultQueueNum + if config.QueueNum == nil { + queueNum := uint16(nfqueueDefaultQueueNum) + config.QueueNum = &queueNum } if config.Table == "" { config.Table = nftDefaultTable @@ -142,7 +143,7 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { } } n, err := nfqueue.Open(&nfqueue.Config{ - NfQueue: config.QueueNum, + NfQueue: *config.QueueNum, MaxPacketLen: nfqueueMaxPacketLen, MaxQueueLen: config.QueueSize, Copymode: nfqueue.NfQnlCopyPacket, @@ -169,7 +170,7 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { n: n, local: config.Local, rst: config.RST, - queueNum: int(config.QueueNum), + queueNum: int(*config.QueueNum), table: config.Table, ipt4: ipt4, ipt6: ipt6, From 5f4df7e806fc453dbc3d9a3a81e6f4487ed46e49 Mon Sep 17 00:00:00 2001 From: Haruue Date: Sun, 27 Oct 2024 15:44:13 +0900 Subject: [PATCH 21/22] chore: rm nfqueueNum parameter in setupNft() --- io/nfqueue.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/io/nfqueue.go b/io/nfqueue.go index 58c85c9..402be06 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -227,7 +227,7 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error if n.ipt4 != nil { err = n.setupIpt(n.local, n.rst, false) } else { - err = n.setupNft(n.local, n.rst, false, n.queueNum) + err = n.setupNft(n.local, n.rst, false) } if err != nil { return err @@ -287,7 +287,7 @@ func (n *nfqueuePacketIO) Close() error { if n.ipt4 != nil { _ = n.setupIpt(n.local, n.rst, true) } else { - _ = n.setupNft(n.local, n.rst, true, n.queueNum) + _ = n.setupNft(n.local, n.rst, true) } n.rSet = false } @@ -299,8 +299,8 @@ func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { return nil } -func (n *nfqueuePacketIO) setupNft(local, rst, remove bool, nfqueueNum int) error { - rules, err := generateNftRules(local, rst, nfqueueNum, n.table) +func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { + rules, err := generateNftRules(local, rst, n.queueNum, n.table) if err != nil { return err } From 0e97c9f0864274bf82406bb5277f72ed3fc23ad8 Mon Sep 17 00:00:00 2001 From: Haruue Date: Mon, 28 Oct 2024 10:17:46 +0900 Subject: [PATCH 22/22] feat: connmark accept/drop config options --- cmd/root.go | 26 +++++++----- io/nfqueue.go | 111 +++++++++++++++++++++++++++++--------------------- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index fc09a96..1513cca 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -173,13 +173,16 @@ type cliConfig struct { } type cliConfigIO struct { - QueueSize uint32 `mapstructure:"queueSize"` - QueueNum *uint16 `mapstructure:"queueNum"` - Table string `mapstructure:"table"` - ReadBuffer int `mapstructure:"rcvBuf"` - WriteBuffer int `mapstructure:"sndBuf"` - Local bool `mapstructure:"local"` - RST bool `mapstructure:"rst"` + QueueSize uint32 `mapstructure:"queueSize"` + QueueNum *uint16 `mapstructure:"queueNum"` + Table string `mapstructure:"table"` + ConnMarkAccept uint32 `mapstructure:"connMarkAccept"` + ConnMarkDrop uint32 `mapstructure:"connMarkDrop"` + + ReadBuffer int `mapstructure:"rcvBuf"` + WriteBuffer int `mapstructure:"sndBuf"` + Local bool `mapstructure:"local"` + RST bool `mapstructure:"rst"` } type cliConfigReplay struct { @@ -218,13 +221,16 @@ func (c *cliConfig) fillIO(config *engine.Config) error { } else { // Setup IO for nfqueue ioImpl, err = io.NewNFQueuePacketIO(io.NFQueuePacketIOConfig{ - QueueSize: c.IO.QueueSize, + QueueSize: c.IO.QueueSize, + QueueNum: c.IO.QueueNum, + Table: c.IO.Table, + ConnMarkAccept: c.IO.ConnMarkAccept, + ConnMarkDrop: c.IO.ConnMarkDrop, + ReadBuffer: c.IO.ReadBuffer, WriteBuffer: c.IO.WriteBuffer, Local: c.IO.Local, RST: c.IO.RST, - QueueNum: c.IO.QueueNum, - Table: c.IO.Table, }) } diff --git a/io/nfqueue.go b/io/nfqueue.go index 402be06..20f0c11 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -23,25 +23,24 @@ const ( nfqueueMaxPacketLen = 0xFFFF nfqueueDefaultQueueSize = 128 - nfqueueConnMarkAccept = 1001 - nfqueueConnMarkDrop = 1002 + nfqueueDefaultConnMarkAccept = 1001 nftFamily = "inet" nftDefaultTable = "opengfw" ) -func generateNftRules(local, rst bool, nfqueueNum int, nftTable string) (*nftTableSpec, error) { - if local && rst { +func (n *nfqueuePacketIO) generateNftRules() (*nftTableSpec, error) { + if n.local && n.rst { return nil, errors.New("tcp rst is not supported in local mode") } table := &nftTableSpec{ Family: nftFamily, - Table: nftTable, + Table: n.table, } - table.Defines = append(table.Defines, fmt.Sprintf("define ACCEPT_CTMARK=%d", nfqueueConnMarkAccept)) - table.Defines = append(table.Defines, fmt.Sprintf("define DROP_CTMARK=%d", nfqueueConnMarkDrop)) - table.Defines = append(table.Defines, fmt.Sprintf("define QUEUE_NUM=%d", nfqueueNum)) - if local { + table.Defines = append(table.Defines, fmt.Sprintf("define ACCEPT_CTMARK=%d", n.connMarkAccept)) + table.Defines = append(table.Defines, fmt.Sprintf("define DROP_CTMARK=%d", n.connMarkDrop)) + table.Defines = append(table.Defines, fmt.Sprintf("define QUEUE_NUM=%d", n.queueNum)) + if n.local { table.Chains = []nftChainSpec{ {Chain: "INPUT", Header: "type filter hook input priority filter; policy accept;"}, {Chain: "OUTPUT", Header: "type filter hook output priority filter; policy accept;"}, @@ -55,7 +54,7 @@ func generateNftRules(local, rst bool, nfqueueNum int, nftTable string) (*nftTab c := &table.Chains[i] c.Rules = append(c.Rules, "meta mark $ACCEPT_CTMARK ct mark set $ACCEPT_CTMARK") // Bypass protected connections c.Rules = append(c.Rules, "ct mark $ACCEPT_CTMARK counter accept") - if rst { + if n.rst { c.Rules = append(c.Rules, "ip protocol tcp ct mark $DROP_CTMARK counter reject with tcp reset") } c.Rules = append(c.Rules, "ct mark $DROP_CTMARK counter drop") @@ -64,12 +63,12 @@ func generateNftRules(local, rst bool, nfqueueNum int, nftTable string) (*nftTab return table, nil } -func generateIptRules(local, rst bool, nfqueueNum int) ([]iptRule, error) { - if local && rst { +func (n *nfqueuePacketIO) generateIptRules() ([]iptRule, error) { + if n.local && n.rst { return nil, errors.New("tcp rst is not supported in local mode") } var chains []string - if local { + if n.local { chains = []string{"INPUT", "OUTPUT"} } else { chains = []string{"FORWARD"} @@ -77,13 +76,13 @@ func generateIptRules(local, rst bool, nfqueueNum int) ([]iptRule, error) { rules := make([]iptRule, 0, 4*len(chains)) for _, chain := range chains { // Bypass protected connections - rules = append(rules, iptRule{"filter", chain, []string{"-m", "mark", "--mark", strconv.Itoa(nfqueueConnMarkAccept), "-j", "CONNMARK", "--set-mark", strconv.Itoa(nfqueueConnMarkAccept)}}) - rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkAccept), "-j", "ACCEPT"}}) - if rst { - rules = append(rules, iptRule{"filter", chain, []string{"-p", "tcp", "-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkDrop), "-j", "REJECT", "--reject-with", "tcp-reset"}}) + rules = append(rules, iptRule{"filter", chain, []string{"-m", "mark", "--mark", strconv.Itoa(n.connMarkAccept), "-j", "CONNMARK", "--set-mark", strconv.Itoa(n.connMarkAccept)}}) + rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(n.connMarkAccept), "-j", "ACCEPT"}}) + if n.rst { + rules = append(rules, iptRule{"filter", chain, []string{"-p", "tcp", "-m", "connmark", "--mark", strconv.Itoa(n.connMarkDrop), "-j", "REJECT", "--reject-with", "tcp-reset"}}) } - rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(nfqueueConnMarkDrop), "-j", "DROP"}}) - rules = append(rules, iptRule{"filter", chain, []string{"-j", "NFQUEUE", "--queue-num", strconv.Itoa(nfqueueNum), "--queue-bypass"}}) + rules = append(rules, iptRule{"filter", chain, []string{"-m", "connmark", "--mark", strconv.Itoa(n.connMarkDrop), "-j", "DROP"}}) + rules = append(rules, iptRule{"filter", chain, []string{"-j", "NFQUEUE", "--queue-num", strconv.Itoa(n.queueNum), "--queue-bypass"}}) } return rules, nil @@ -94,12 +93,14 @@ var _ PacketIO = (*nfqueuePacketIO)(nil) var errNotNFQueuePacket = errors.New("not an NFQueue packet") type nfqueuePacketIO struct { - n *nfqueue.Nfqueue - local bool - rst bool - rSet bool // whether the nftables/iptables rules have been set - queueNum int - table string // nftable name + n *nfqueue.Nfqueue + local bool + rst bool + rSet bool // whether the nftables/iptables rules have been set + queueNum int + table string // nftable name + connMarkAccept int + connMarkDrop int // iptables not nil = use iptables instead of nftables ipt4 *iptables.IPTables @@ -109,9 +110,12 @@ type nfqueuePacketIO struct { } type NFQueuePacketIOConfig struct { - QueueSize uint32 - QueueNum *uint16 - Table string + QueueSize uint32 + QueueNum *uint16 + Table string + ConnMarkAccept uint32 + ConnMarkDrop uint32 + ReadBuffer int WriteBuffer int Local bool @@ -129,6 +133,19 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { if config.Table == "" { config.Table = nftDefaultTable } + if config.ConnMarkAccept == 0 { + config.ConnMarkAccept = nfqueueDefaultConnMarkAccept + } + if config.ConnMarkDrop == 0 { + config.ConnMarkDrop = config.ConnMarkAccept + 1 + if config.ConnMarkDrop == 0 { + // Overflow + config.ConnMarkDrop = 1 + } + } + if config.ConnMarkAccept == config.ConnMarkDrop { + return nil, errors.New("connMarkAccept and connMarkDrop cannot be the same") + } var ipt4, ipt6 *iptables.IPTables var err error if nftCheck() != nil { @@ -167,18 +184,20 @@ func NewNFQueuePacketIO(config NFQueuePacketIOConfig) (PacketIO, error) { } } return &nfqueuePacketIO{ - n: n, - local: config.Local, - rst: config.RST, - queueNum: int(*config.QueueNum), - table: config.Table, - ipt4: ipt4, - ipt6: ipt6, + n: n, + local: config.Local, + rst: config.RST, + queueNum: int(*config.QueueNum), + table: config.Table, + connMarkAccept: int(config.ConnMarkAccept), + connMarkDrop: int(config.ConnMarkDrop), + ipt4: ipt4, + ipt6: ipt6, protectedDialer: &net.Dialer{ Control: func(network, address string, c syscall.RawConn) error { var err error cErr := c.Control(func(fd uintptr) { - err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, nfqueueConnMarkAccept) + err = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, int(config.ConnMarkAccept)) }) if cErr != nil { return cErr @@ -225,9 +244,9 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error } if !n.rSet { if n.ipt4 != nil { - err = n.setupIpt(n.local, n.rst, false) + err = n.setupIpt(false) } else { - err = n.setupNft(n.local, n.rst, false) + err = n.setupNft(false) } if err != nil { return err @@ -267,11 +286,11 @@ func (n *nfqueuePacketIO) SetVerdict(p Packet, v Verdict, newPacket []byte) erro case VerdictAcceptModify: return n.n.SetVerdictModPacket(nP.id, nfqueue.NfAccept, newPacket) case VerdictAcceptStream: - return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfAccept, nfqueueConnMarkAccept) + return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfAccept, n.connMarkAccept) case VerdictDrop: return n.n.SetVerdict(nP.id, nfqueue.NfDrop) case VerdictDropStream: - return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfDrop, nfqueueConnMarkDrop) + return n.n.SetVerdictWithConnMark(nP.id, nfqueue.NfDrop, n.connMarkDrop) default: // Invalid verdict, ignore for now return nil @@ -285,9 +304,9 @@ func (n *nfqueuePacketIO) ProtectedDialContext(ctx context.Context, network, add func (n *nfqueuePacketIO) Close() error { if n.rSet { if n.ipt4 != nil { - _ = n.setupIpt(n.local, n.rst, true) + _ = n.setupIpt(true) } else { - _ = n.setupNft(n.local, n.rst, true) + _ = n.setupNft(true) } n.rSet = false } @@ -299,8 +318,8 @@ func (n *nfqueuePacketIO) SetCancelFunc(cancelFunc context.CancelFunc) error { return nil } -func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { - rules, err := generateNftRules(local, rst, n.queueNum, n.table) +func (n *nfqueuePacketIO) setupNft(remove bool) error { + rules, err := n.generateNftRules() if err != nil { return err } @@ -318,8 +337,8 @@ func (n *nfqueuePacketIO) setupNft(local, rst, remove bool) error { return nil } -func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error { - rules, err := generateIptRules(local, rst, n.queueNum) +func (n *nfqueuePacketIO) setupIpt(remove bool) error { + rules, err := n.generateIptRules() if err != nil { return err }