Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/config/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ func (c *Config) BindFlags(cmd *cobra.Command) {
// Config path
cmd.Flags().StringVar(&c.ConfigPath, "config", c.ConfigPath, "Path to config file")

// Queue configuration
cmd.Flags().IntVar(&c.Queue.StartNum, "queue-num", c.Queue.StartNum, "Netfilter queue number")
cmd.Flags().IntVar(&c.Queue.Threads, "threads", c.Queue.Threads, "Number of worker threads")
cmd.Flags().UintVar(&c.Queue.Mark, "mark", c.Queue.Mark, "Packet mark value (default 32768)")
cmd.Flags().BoolVar(&c.Queue.IPv4Enabled, "ipv4", c.Queue.IPv4Enabled, "Enable IPv4 processing")
cmd.Flags().BoolVar(&c.Queue.IPv6Enabled, "ipv6", c.Queue.IPv6Enabled, "Enable IPv6 processing")

// System configuration
cmd.Flags().IntVar(&c.System.Tables.MonitorInterval, "tables-monitor-interval", c.System.Tables.MonitorInterval, "Tables monitor interval in seconds (default 10, 0 to disable)")
cmd.Flags().BoolVar(&c.System.Tables.SkipSetup, "skip-tables", c.System.Tables.SkipSetup, "Skip iptables/nftables setup on startup")
Expand Down
6 changes: 6 additions & 0 deletions src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ var DefaultConfig = Config{
ConfigPath: "",

Queue: QueueConfig{
Mode: "nfqueue",
StartNum: 537,
Mark: 1 << 15,
Threads: 4,
Expand All @@ -163,6 +164,11 @@ var DefaultConfig = Config{
Enabled: false,
Size: 88,
},
TUN: TUNConfig{
DeviceName: "b4tun0",
Address: "10.255.0.1/30",
RouteTable: 100,
},
},

Sets: []*SetConfig{},
Expand Down
13 changes: 13 additions & 0 deletions src/config/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ func (c *Config) Validate() error {
return fmt.Errorf("threads must be at least 1")
}

if c.Queue.Mode == "" {
c.Queue.Mode = "nfqueue"
}
if c.Queue.Mode != "nfqueue" && c.Queue.Mode != "tun" {
return fmt.Errorf("queue mode must be 'nfqueue' or 'tun'")
}

if c.Queue.Mode == "tun" {
if c.Queue.TUN.OutInterface == "" {
return fmt.Errorf("tun out_interface is required in TUN mode (e.g. eth0, wan0)")
}
}

if c.Queue.StartNum < 0 || c.Queue.StartNum > 65535 {
return fmt.Errorf("queue-num must be between 0 and 65535")
}
Expand Down
10 changes: 10 additions & 0 deletions src/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ApiConfig struct {
}

type QueueConfig struct {
Mode string `json:"mode" bson:"mode"` // "nfqueue" (default) or "tun"
StartNum int `json:"start_num" bson:"start_num"`
Threads int `json:"threads" bson:"threads"`
Mark uint `json:"mark" bson:"mark"`
Expand All @@ -36,6 +37,15 @@ type QueueConfig struct {
Interfaces []string `json:"interfaces" bson:"interfaces"`
Devices DevicesConfig `json:"devices" bson:"devices"`
MSSClamp MSSClampConfig `json:"mss_clamp" bson:"mss_clamp"`
TUN TUNConfig `json:"tun" bson:"tun"`
}

type TUNConfig struct {
DeviceName string `json:"device_name" bson:"device_name"` // TUN device name, default: "b4tun0"
Address string `json:"address" bson:"address"` // TUN device address, default: "10.255.0.1/30"
OutInterface string `json:"out_interface" bson:"out_interface"` // Real outbound interface, e.g. "eth0", "wan0"
OutGateway string `json:"out_gateway" bson:"out_gateway"` // Real gateway IP, auto-detected if empty
RouteTable int `json:"route_table" bson:"route_table"` // Policy routing table number, default: 100
}

type DevicesConfig struct {
Expand Down
18 changes: 18 additions & 0 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package engine

// PacketVerdict tells the packet source what to do with the original packet.
type PacketVerdict int

const (
// VerdictAccept forwards the packet unchanged.
VerdictAccept PacketVerdict = iota
// VerdictDrop suppresses the original packet (modified copies are already sent by the handler).
VerdictDrop
)

// Engine is the interface for a packet processing backend.
// Both NFQUEUE and TUN implement this interface.
type Engine interface {
Start() error
Stop()
}
100 changes: 62 additions & 38 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"github.com/daniellavrushin/b4/quic"
"github.com/daniellavrushin/b4/socks5"
"github.com/daniellavrushin/b4/tables"
b4tun "github.com/daniellavrushin/b4/tun"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -123,39 +124,58 @@

log.Infof("Loaded targets: %d domains, %d IPs across %d sets", totalDomains, totalIps, len(cfg.Sets))

// Setup iptables/nftables rules
if !cfg.System.Tables.SkipSetup {
log.Tracef("Clearing existing iptables/nftables rules")
tables.ClearRules(&cfg)
pool := nfq.NewPool(&cfg)

var tunEngine *b4tun.Engine
var tablesMonitor *tables.Monitor

log.Tracef("Adding tables rules")
if err := tables.AddRules(&cfg); err != nil {
metrics.RecordEvent("error", fmt.Sprintf("Failed to add tables rules: %v", err))
return fmt.Errorf("failed to add tables rules: %w", err)
if cfg.Queue.Mode == "tun" {
// TUN mode: no iptables/nftables needed
log.Infof("Starting TUN engine (device: %s, out: %s, threads: %d)",
cfg.Queue.TUN.DeviceName, cfg.Queue.TUN.OutInterface, cfg.Queue.Threads)
metrics.TablesStatus = "tun"

tunEngine = b4tun.NewEngine(&cfg, pool)
if err := tunEngine.Start(); err != nil {
metrics.RecordEvent("error", fmt.Sprintf("TUN engine start failed: %v", err))
metrics.NFQueueStatus = "error"
return fmt.Errorf("TUN engine start failed: %w", err)
}
metrics.RecordEvent("info", "Tables rules configured successfully")

metrics.RecordEvent("info", fmt.Sprintf("TUN engine started with %d threads", cfg.Queue.Threads))
metrics.NFQueueStatus = "active (tun)"
} else {
log.Infof("Skipping tables setup (--skip-tables)")
metrics.TablesStatus = "skipped"
}
// NFQUEUE mode: setup iptables/nftables rules
if !cfg.System.Tables.SkipSetup {
log.Tracef("Clearing existing iptables/nftables rules")
tables.ClearRules(&cfg)

log.Tracef("Adding tables rules")
if err := tables.AddRules(&cfg); err != nil {
metrics.RecordEvent("error", fmt.Sprintf("Failed to add tables rules: %v", err))
return fmt.Errorf("failed to add tables rules: %w", err)
}
metrics.RecordEvent("info", "Tables rules configured successfully")
} else {
log.Infof("Skipping tables setup (--skip-tables)")
metrics.TablesStatus = "skipped"
}

// Start netfilter queue pool
log.Infof("Starting netfilter queue pool (queue: %d, threads: %d)", cfg.Queue.StartNum, cfg.Queue.Threads)
pool := nfq.NewPool(&cfg)
if err := pool.Start(); err != nil {
metrics.RecordEvent("error", fmt.Sprintf("NFQueue start failed: %v", err))
metrics.NFQueueStatus = "error"
return fmt.Errorf("netfilter queue start failed: %w", err)
}
log.Infof("Starting netfilter queue pool (queue: %d, threads: %d)", cfg.Queue.StartNum, cfg.Queue.Threads)
if err := pool.Start(); err != nil {
metrics.RecordEvent("error", fmt.Sprintf("NFQueue start failed: %v", err))
metrics.NFQueueStatus = "error"
return fmt.Errorf("netfilter queue start failed: %w", err)
}

metrics.RecordEvent("info", fmt.Sprintf("NFQueue started with %d threads", cfg.Queue.Threads))
metrics.NFQueueStatus = "active"
metrics.RecordEvent("info", fmt.Sprintf("NFQueue started with %d threads", cfg.Queue.Threads))
metrics.NFQueueStatus = "active"

// Start tables monitor to handle rule restoration if system wipes them
var tablesMonitor *tables.Monitor
if !cfg.System.Tables.SkipSetup && cfg.System.Tables.MonitorInterval > 0 {
tablesMonitor = tables.NewMonitor(&cfg)
tablesMonitor.Start()
// Start tables monitor to handle rule restoration if system wipes them
if !cfg.System.Tables.SkipSetup && cfg.System.Tables.MonitorInterval > 0 {
tablesMonitor = tables.NewMonitor(&cfg)
tablesMonitor.Start()
}
}

// Start internal web server if configured
Expand Down Expand Up @@ -185,10 +205,10 @@
metrics.RecordEvent("info", fmt.Sprintf("Shutdown initiated by signal: %v", sig))

// Perform graceful shutdown with timeout
return gracefulShutdown(&cfg, pool, httpServer, socks5Server, metrics)
return gracefulShutdown(&cfg, pool, tunEngine, httpServer, socks5Server, metrics)
}

func gracefulShutdown(cfg *config.Config, pool *nfq.Pool, httpServer *http.Server, socks5Server *socks5.Server, metrics *handler.MetricsCollector) error {
func gracefulShutdown(cfg *config.Config, pool *nfq.Pool, tunEngine *b4tun.Engine, httpServer *http.Server, socks5Server *socks5.Server, metrics *handler.MetricsCollector) error {

Check failure on line 211 in src/main.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=DanielLavrushin_b4&issues=AZzTeee9k6QWoM0HgAH9&open=AZzTeee9k6QWoM0HgAH9&pullRequest=112
// Create shutdown context with timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -230,33 +250,37 @@
log.Infof("Shutting down WebSocket connections...")
b4http.Shutdown()

// Stop NFQueue pool
// Stop packet engine
wg.Add(1)
go func() {
defer wg.Done()
log.Infof("Stopping netfilter queue pool...")
metrics.NFQueueStatus = "stopping"

// Use a goroutine with timeout for pool.Stop()
stopDone := make(chan struct{})
go func() {
pool.Stop()
if tunEngine != nil {
log.Infof("Stopping TUN engine...")
tunEngine.Stop()
} else {
log.Infof("Stopping netfilter queue pool...")
pool.Stop()
}
Comment on lines +261 to +267
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TUN mode the shutdown path stops tunEngine but never stops the nfq.Pool. The pool was still created (and starts DHCP + cleanup goroutines) and, more importantly, the workers' raw socket senders initialized by tunEngine.Start will never be closed and in-flight worker goroutines won't be waited on. Consider always calling pool.Stop() during shutdown (even in TUN mode), or add a dedicated pool shutdown for the non-NFQUEUE resources used by TUN mode.

Copilot uses AI. Check for mistakes.
close(stopDone)
}()

select {
case <-stopDone:
log.Infof("Netfilter queue pool stopped")
log.Infof("Packet engine stopped")
case <-shutdownCtx.Done():
log.Errorf("Netfilter queue pool stop timed out")
shutdownErrors <- fmt.Errorf("NFQueue stop timeout")
log.Errorf("Packet engine stop timed out")
shutdownErrors <- fmt.Errorf("engine stop timeout")
}

quic.Shutdown()
}()

// Clean up iptables/nftables rules
if !cfg.System.Tables.SkipSetup {
// Clean up iptables/nftables rules (only in nfqueue mode)
if tunEngine == nil && !cfg.System.Tables.SkipSetup {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
49 changes: 11 additions & 38 deletions src/nfq/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

"github.com/daniellavrushin/b4/config"
"github.com/daniellavrushin/b4/dns"
"github.com/daniellavrushin/b4/engine"
"github.com/daniellavrushin/b4/log"
"github.com/daniellavrushin/b4/sock"
"github.com/florianl/go-nfqueue"
)

func (w *Worker) processDnsPacket(ipVersion byte, sport uint16, dport uint16, payload []byte, raw []byte, ihl int, id uint32, srcMac string) int {
func (w *Worker) processDnsPacket(ipVersion byte, sport uint16, dport uint16, payload []byte, raw []byte, ihl int, srcMac string) engine.PacketVerdict {

Check failure on line 14 in src/nfq/dns.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 57 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=DanielLavrushin_b4&issues=AZzTeeXYk6QWoM0HgAH4&open=AZzTeeXYk6QWoM0HgAH4&pullRequest=112

if dport == 53 {
domain, ok := dns.ParseQueryDomain(payload)
Expand All @@ -21,19 +21,13 @@

targetIP := net.ParseIP(set.DNS.TargetDNS)
if targetIP == nil {
if err := w.q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to set verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

if ipVersion == IPv4 {
targetDNS := targetIP.To4()
if targetDNS == nil {
if err := w.q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to set verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

originalDst := make(net.IP, 4)
Expand All @@ -49,27 +43,18 @@
} else {
_ = w.sock.SendIPv4(raw, targetDNS)
}
if err := w.q.SetVerdict(id, nfqueue.NfDrop); err != nil {
log.Tracef("failed to set drop verdict on packet %d: %v", id, err)
}
log.Infof("DNS redirect: %s -> %s (set: %s)", domain, set.DNS.TargetDNS, set.Name)
return 0
return engine.VerdictDrop

} else {
cfg := w.getConfig()
if !cfg.Queue.IPv6Enabled {
if err := w.q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to set verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

targetDNS := targetIP.To16()
if targetDNS == nil {
if err := w.q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to set verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

originalDst := make(net.IP, 16)
Expand All @@ -84,11 +69,8 @@
} else {
_ = w.sock.SendIPv6(raw, targetDNS)
}
if err := w.q.SetVerdict(id, nfqueue.NfDrop); err != nil {
log.Tracef("failed to set drop verdict on packet %d: %v", id, err)
}
log.Infof("DNS redirect (IPv6): %s -> %s (set: %s)", domain, set.DNS.TargetDNS, set.Name)
return 0
return engine.VerdictDrop
}
}
}
Expand All @@ -102,10 +84,7 @@
sock.FixUDPChecksum(raw, ihl)
dns.DnsNATDelete(net.IP(raw[16:20]), dport)
_ = w.sock.SendIPv4(raw, net.IP(raw[16:20]))
if err := w.q.SetVerdict(id, nfqueue.NfDrop); err != nil {
log.Tracef("failed to set drop verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictDrop
}
} else {
cfg := w.getConfig()
Expand All @@ -115,19 +94,13 @@
sock.FixUDPChecksumV6(raw)
dns.DnsNATDelete(net.IP(raw[24:40]), dport)
_ = w.sock.SendIPv6(raw, net.IP(raw[24:40]))
if err := w.q.SetVerdict(id, nfqueue.NfDrop); err != nil {
log.Tracef("failed to set drop verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictDrop
}
}
}
}

if err := w.q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to set verdict on packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

func (w *Worker) sendFragmentedDNSQueryV4(cfg *config.SetConfig, raw []byte, ihl int, dst net.IP) {
Expand Down
9 changes: 3 additions & 6 deletions src/nfq/inc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"time"

"github.com/daniellavrushin/b4/config"
"github.com/daniellavrushin/b4/engine"
"github.com/daniellavrushin/b4/log"
"github.com/daniellavrushin/b4/sock"
"github.com/florianl/go-nfqueue"
)

var corruptionStrategies = []string{"badsum", "badseq", "badack", "all"}

func (w *Worker) HandleIncoming(q *nfqueue.Nfqueue, id uint32, v byte, raw []byte, ihl int, src net.IP, dstStr string, dport uint16, srcStr string, sport uint16, payload []byte) int {
func (w *Worker) HandleIncoming(v byte, raw []byte, ihl int, src net.IP, dstStr string, dport uint16, srcStr string, sport uint16, payload []byte) engine.PacketVerdict {

Check warning on line 17 in src/nfq/inc.go

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/nfq/inc.go#L17

Method HandleIncoming has 9 parameters (limit is 8)

Check warning on line 17 in src/nfq/inc.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 9 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=DanielLavrushin_b4&issues=AZzTeecsk6QWoM0HgAH6&open=AZzTeecsk6QWoM0HgAH6&pullRequest=112

Check failure on line 17 in src/nfq/inc.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 42 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=DanielLavrushin_b4&issues=AZzTeecsk6QWoM0HgAH5&open=AZzTeecsk6QWoM0HgAH5&pullRequest=112
incomingSet := connState.GetSetForIncoming(dstStr, dport, srcStr, sport)

if incomingSet != nil && incomingSet.TCP.Incoming.Mode != config.ConfigOff {
Expand Down Expand Up @@ -61,10 +61,7 @@
}
}

if err := q.SetVerdict(id, nfqueue.NfAccept); err != nil {
log.Tracef("failed to accept incoming packet %d: %v", id, err)
}
return 0
return engine.VerdictAccept
}

func (w *Worker) applyCorruption(fake []byte, ihl int, strategy string) {
Expand Down
Loading