Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/adrg/xdg v0.5.3
github.com/alphadose/haxmap v1.4.1
github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f
github.com/apoxy-dev/icx v0.12.1
github.com/apoxy-dev/icx v0.14.0
github.com/avast/retry-go/v4 v4.6.1
github.com/bramvdbogaerde/go-scp v1.5.0
github.com/buraksezer/olric v0.5.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57 h1:p2e
github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57/go.mod h1:k8K1q/QnsxMM7/wsiga/cJWGW/38G907ex7JPFw0B04=
github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45 h1:SwPk1n/oSVX7YwlNpC9KNH9YaYkcL/k6OfqSGVnxyyI=
github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45/go.mod h1:z5rtgIizc+/K27UtB0occwZgqg/mz3IqgyUJW8aubbI=
github.com/apoxy-dev/icx v0.12.1 h1:VaczJSdujpsO8NjS0RvxiF55fco+iKZyurcZu4ddeP8=
github.com/apoxy-dev/icx v0.12.1/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U=
github.com/apoxy-dev/icx v0.14.0 h1:3BXuhRysBsK2isLu7Z3+1pMiySu2eI0Ts5iObw6fp60=
github.com/apoxy-dev/icx v0.14.0/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e h1:10GIpiVyKoRgCyr0J2TvJtdn17bsFHN+ROWkeVJpcOU=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ=
github.com/apoxy-dev/upgrade-cli v0.0.0-20240213232412-a56c3a52fa0e h1:FBNxMQD93z2ththupB/BYKLEaMWaEr+G+sJWJqU2wC4=
Expand Down
8 changes: 7 additions & 1 deletion pkg/cmd/alpha/tunnel_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/apoxy-dev/apoxy/pkg/cryptoutils"
"github.com/apoxy-dev/apoxy/pkg/tunnel"
"github.com/apoxy-dev/apoxy/pkg/tunnel/batchpc"
"github.com/apoxy-dev/apoxy/pkg/tunnel/bifurcate"
"github.com/apoxy-dev/apoxy/pkg/tunnel/controllers"
"github.com/apoxy-dev/apoxy/pkg/tunnel/hasher"
Expand Down Expand Up @@ -49,11 +50,16 @@ var tunnelRelayCmd = &cobra.Command{
}

// One UDP socket shared between Geneve (data) and QUIC (control).
pc, err := net.ListenPacket("udp", listenAddress)
lis, err := net.ListenPacket("udp", listenAddress)
if err != nil {
return fmt.Errorf("failed to create UDP listener: %w", err)
}

pc, err := batchpc.New("udp", lis)
if err != nil {
return fmt.Errorf("failed to create batch packet conn: %w", err)
}

pcGeneve, pcQuic := bifurcate.Bifurcate(pc)
defer pcGeneve.Close()
defer pcQuic.Close()
Expand Down
9 changes: 7 additions & 2 deletions pkg/cmd/alpha/tunnel_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/apoxy-dev/apoxy/pkg/netstack"
"github.com/apoxy-dev/apoxy/pkg/tunnel/api"
"github.com/apoxy-dev/apoxy/pkg/tunnel/batchpc"
"github.com/apoxy-dev/apoxy/pkg/tunnel/bifurcate"
"github.com/apoxy-dev/apoxy/pkg/tunnel/conntrackpc"
"github.com/apoxy-dev/apoxy/pkg/tunnel/router"
Expand All @@ -47,11 +48,15 @@ var tunnelRunCmd = &cobra.Command{
}

// One UDP socket shared between Geneve (data) and QUIC (control).
pc, err := net.ListenPacket("udp", ":0")
lis, err := net.ListenPacket("udp", ":0")
if err != nil {
return fmt.Errorf("failed to create UDP socket: %w", err)
}
defer pc.Close()

pc, err := batchpc.New("udp", lis)
if err != nil {
return fmt.Errorf("failed to create batch packet conn: %w", err)
}

pcGeneve, pcQuic := bifurcate.Bifurcate(pc)
defer pcGeneve.Close()
Expand Down
202 changes: 126 additions & 76 deletions pkg/netstack/icx_network.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// icx_network.go
package netstack

import (
Expand Down Expand Up @@ -28,22 +29,26 @@ import (

stdnet "net"

"github.com/apoxy-dev/apoxy/pkg/tunnel/batchpc"
"github.com/apoxy-dev/apoxy/pkg/tunnel/l2pc"
)

// TODO (dpeckett): nuke this at some point and merge the logic into the router.
type ICXNetwork struct {
network.Network
handler *icx.Handler
phy *l2pc.L2PacketConn
ep *channel.Endpoint
stack *stack.Stack
ipt *IPTables
nicID tcpip.NICID
pcapFile *os.File
incomingPacket chan *buffer.View
pktPool sync.Pool
closeOnce sync.Once
handler *icx.Handler
phy *l2pc.L2PacketConn
ep *channel.Endpoint
stack *stack.Stack
ipt *IPTables
nicID tcpip.NICID
pcapFile *os.File

// Wakeup channel for batching outbound sends. Capacity 1 to coalesce notifies.
wakeOutbound chan struct{}

pktPool sync.Pool
closeOnce sync.Once
}

// NewICXNetwork creates a new ICXNetwork instance with the given handler, physical connection, MTU, and resolve configuration.
Expand Down Expand Up @@ -107,15 +112,17 @@ func NewICXNetwork(handler *icx.Handler, phy *l2pc.L2PacketConn, mtu int, resolv
})

net := &ICXNetwork{
Network: network.Netstack(ipstack, nicID, resolveConf),
handler: handler,
phy: phy,
ep: linkEP,
stack: ipstack,
ipt: ipt,
nicID: nicID,
pcapFile: pcapFile,
incomingPacket: make(chan *buffer.View),
Network: network.Netstack(ipstack, nicID, resolveConf),
handler: handler,
phy: phy,
ep: linkEP,
stack: ipstack,
ipt: ipt,
nicID: nicID,
pcapFile: pcapFile,

wakeOutbound: make(chan struct{}, 1),

pktPool: sync.Pool{
New: func() any {
b := make([]byte, 0, 65535)
Expand All @@ -129,14 +136,13 @@ func NewICXNetwork(handler *icx.Handler, phy *l2pc.L2PacketConn, mtu int, resolv
}

// WriteNotify is called by the channel endpoint when netstack has an outbound packet ready.
// We just coalesce a wakeup; actual draining/batching happens in the outbound pump.
func (net *ICXNetwork) WriteNotify() {
pkt := net.ep.Read()
if pkt == nil {
return
select {
case net.wakeOutbound <- struct{}{}:
default:
// already awake; coalesce
}
view := pkt.ToView()
pkt.DecRef()
net.incomingPacket <- view
}

// Close cleans up the network stack and closes the underlying resources.
Expand All @@ -146,8 +152,8 @@ func (net *ICXNetwork) Close() error {

net.ep.Close()

if net.incomingPacket != nil {
close(net.incomingPacket)
if net.wakeOutbound != nil {
close(net.wakeOutbound)
}

if net.pcapFile != nil {
Expand All @@ -159,47 +165,108 @@ func (net *ICXNetwork) Close() error {
}

// Start copies packets to and from netstack and icx.
// This is a blocking call that runs until either side is closed.
func (net *ICXNetwork) Start() error {
const tickMs = 100 // periodically flush scheduled frames (ToPhy)

var g errgroup.Group

// Outbound: netstack (L3) -> ICX -> L2PacketConn.WriteFrame
// Outbound: netstack (L3) -> ICX -> L2PacketConn.WriteBatchFrames (batched)
g.Go(func() error {
// Avoid a busy loop.
ticker := time.NewTicker(100 * time.Millisecond)
type owned struct {
msg batchpc.Message
buf *[]byte // owner to return to pool
}
putOwned := func(v []owned) {
for i := range v {
if v[i].buf != nil {
net.pktPool.Put(v[i].buf)
v[i].buf = nil
}
}
}

// Reuse per-iteration scratch arrays to avoid allocs.
// Assumes batchpc.MaxBatchSize is a const.
var (
batchOwned [batchpc.MaxBatchSize]owned
batchMsgs [batchpc.MaxBatchSize]batchpc.Message
)

ticker := time.NewTicker(tickMs * time.Millisecond)
defer ticker.Stop()

for {
// Wake on notify or periodic tick.
select {
case view, ok := <-net.incomingPacket:
case _, ok := <-net.wakeOutbound:
if !ok {
return stdnet.ErrClosed // channel closed => done
return stdnet.ErrClosed
}
case <-ticker.C:
}

ip := view.AsSlice() // raw IP bytes (v4 or v6)
batch := batchOwned[:]
count := 0

phyFrame := net.pktPool.Get().(*[]byte)
*phyFrame = (*phyFrame)[:cap(*phyFrame)]
n, _ := net.handler.VirtToPhy(ip, *phyFrame)
if n > 0 {
if err := net.phy.WriteFrame((*phyFrame)[:n]); err != nil {
net.pktPool.Put(phyFrame)
return fmt.Errorf("writing phy frame failed: %w", err)
}
// Drain endpoint fully into the batch.
for count < batchpc.MaxBatchSize {
pkt := net.ep.Read()
if pkt == nil {
break
}
net.pktPool.Put(phyFrame)
view := pkt.ToView()
pkt.DecRef()

ip := view.AsSlice() // raw L3 bytes
b := batch[count].buf
if b == nil {
b = net.pktPool.Get().(*[]byte)
*b = (*b)[:cap(*b)]
batch[count].buf = b
}
*b = (*b)[:cap(*b)]
if n, _ := net.handler.VirtToPhy(ip, *b); n > 0 {
batch[count].msg.Buf = (*b)[:n]
count++
}
}

case <-ticker.C:
phyFrame := net.pktPool.Get().(*[]byte)
*phyFrame = (*phyFrame)[:cap(*phyFrame)]

if n := net.handler.ToPhy(*phyFrame); n > 0 {
if err := net.phy.WriteFrame((*phyFrame)[:n]); err != nil {
net.pktPool.Put(phyFrame)
return fmt.Errorf("writing scheduled phy frame failed: %w", err)
}
// Coalesce scheduled frames (ToPhy) onto the same batch.
for count < batchpc.MaxBatchSize {
b := batch[count].buf
if b == nil {
b = net.pktPool.Get().(*[]byte)
*b = (*b)[:cap(*b)]
batch[count].buf = b
}
net.pktPool.Put(phyFrame)
*b = (*b)[:cap(*b)]
if n := net.handler.ToPhy(*b); n > 0 {
batch[count].msg.Buf = (*b)[:n]
count++
} else {
break
}
}

if count == 0 {
// Nothing to send this cycle.
putOwned(batch)
continue
}

// Send in one go.
msgs := batchMsgs[:count]
for i := 0; i < count; i++ {
msgs[i] = batch[i].msg
}
_, err := net.phy.WriteBatchFrames(msgs, 0)
putOwned(batch)
if err != nil {
if errors.Is(err, stdnet.ErrClosed) {
return err
}
slog.Warn("Error writing batched phy frames", slog.Any("error", err))
continue
}
}
})
Expand All @@ -213,7 +280,11 @@ func (net *ICXNetwork) Start() error {
n, err := net.phy.ReadFrame(*phyFrame)
if err != nil {
net.pktPool.Put(phyFrame)
return fmt.Errorf("reading phy frame failed: %w", err)
if errors.Is(err, stdnet.ErrClosed) {
return err
}
slog.Warn("Error reading frame from physical interface", slog.Any("error", err))
continue
}
if n == 0 {
net.pktPool.Put(phyFrame)
Expand Down Expand Up @@ -247,8 +318,8 @@ func (net *ICXNetwork) Start() error {
pkb.DecRef()
default:
// drop silently
net.pktPool.Put(virtFrame)
}
net.pktPool.Put(virtFrame)
}
})

Expand Down Expand Up @@ -332,24 +403,3 @@ func (net *ICXNetwork) ForwardTo(ctx context.Context, upstream network.Network)

return nil
}

func prefixToSubnet(p netip.Prefix) (tcpip.Subnet, error) {
addr := tcpip.AddrFromSlice(p.Addr().AsSlice())

totalBits := 128
if p.Addr().Is4() {
totalBits = 32
}
ones := p.Bits()
if ones < 0 || ones > totalBits {
return tcpip.Subnet{}, fmt.Errorf("invalid prefix length %d", ones)
}

maskBytes := make([]byte, totalBits/8)
for i := 0; i < ones; i++ {
maskBytes[i/8] |= 1 << (7 - uint(i%8))
}
mask := tcpip.MaskFromBytes(maskBytes)

return tcpip.NewSubnet(addr, mask)
}
Loading