Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/core/v1alpha2/tunnel_agent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ type TunnelAgentConnection struct {
// +optional
VNI *uint `json:"vni,omitempty,omitzero"`

// LastRXTimestamp is the last time a packet was received from the agent on
// LastRX is the last time a packet was received from the agent on
// this connection.
// +optional
LastRXTimestamp *metav1.Time `json:"lastRxTimestamp,omitempty,omitzero"`
LastRX *metav1.Time `json:"lastRx,omitempty,omitzero"`

// RXBytes is the total number of bytes received from the agent on this connection.
// +optional
RXBytes uint64 `json:"rxBytes,omitempty,omitzero"`
RXBytes *int64 `json:"rxBytes,omitempty,omitzero"`

// TXBytes is the total number of bytes transmitted to the agent on this connection.
// +optional
TxBytes uint64 `json:"txBytes,omitempty,omitzero"`
TXBytes *int64 `json:"txBytes,omitempty,omitzero"`
}

// TunnelAgentStatus represents the status of a tunnel agent.
Expand Down
14 changes: 12 additions & 2 deletions api/core/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/generated/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/adrg/xdg v0.5.3
github.com/alphadose/haxmap v1.4.1
github.com/anatol/vmtest v0.0.0-20250318022921-2f32244e2f0f
github.com/apoxy-dev/icx v0.14.0
github.com/apoxy-dev/icx v0.16.1
github.com/avast/retry-go/v4 v4.6.1
github.com/bramvdbogaerde/go-scp v1.5.0
github.com/buraksezer/olric v0.5.6
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,10 @@ github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57 h1:p2e
github.com/apoxy-dev/apiserver-runtime v0.0.0-20251017224250-220a8896ee57/go.mod h1:k8K1q/QnsxMM7/wsiga/cJWGW/38G907ex7JPFw0B04=
github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45 h1:SwPk1n/oSVX7YwlNpC9KNH9YaYkcL/k6OfqSGVnxyyI=
github.com/apoxy-dev/connect-ip-go v0.0.0-20250530062404-603929a73f45/go.mod h1:z5rtgIizc+/K27UtB0occwZgqg/mz3IqgyUJW8aubbI=
github.com/apoxy-dev/icx v0.14.0 h1:3BXuhRysBsK2isLu7Z3+1pMiySu2eI0Ts5iObw6fp60=
github.com/apoxy-dev/icx v0.14.0/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U=
github.com/apoxy-dev/icx v0.16.0 h1:+mijAkIuTZVbXF6vMtSWUfn/TINHUaCzB+Da2HnPfpQ=
github.com/apoxy-dev/icx v0.16.0/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U=
github.com/apoxy-dev/icx v0.16.1 h1:u7Db5R9Fm7LoQ9cY6lq25T+l086v/zlERpbGYYVajsM=
github.com/apoxy-dev/icx v0.16.1/go.mod h1:QNPhLVUVbbSVSyERjmgGN4K8vzSC6bvZlN0tyflYf0U=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e h1:10GIpiVyKoRgCyr0J2TvJtdn17bsFHN+ROWkeVJpcOU=
github.com/apoxy-dev/quic-go v0.0.0-20250530165952-53cca597715e/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP4wqrWmzQ=
github.com/apoxy-dev/upgrade-cli v0.0.0-20240213232412-a56c3a52fa0e h1:FBNxMQD93z2ththupB/BYKLEaMWaEr+G+sJWJqU2wC4=
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiserver/controllers/tunnel_agent_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ func (r *TunnelAgentReconciler) PruneOrphanedConnections(ctx context.Context) er
// Determine orphaned-ness
isOrphaned := false
switch {
case conn.LastRXTimestamp != nil:
isOrphaned = conn.LastRXTimestamp.Add(gcMaxSilence).Before(now)
case conn.LastRX != nil:
isOrphaned = conn.LastRX.Add(gcMaxSilence).Before(now)
case conn.ConnectedAt != nil:
isOrphaned = conn.ConnectedAt.Add(gcMaxSilence).Before(now)
default:
Expand Down
16 changes: 8 additions & 8 deletions pkg/apiserver/controllers/tunnel_agent_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,16 @@ func TestTunnelAgentPruneOrphanedConnections(t *testing.T) {
Status: corev1alpha2.TunnelAgentStatus{
Connections: []corev1alpha2.TunnelAgentConnection{
{
ID: "conn-orphaned",
Address: pfxOrphaned.String(),
VNI: &vOrphaned,
LastRXTimestamp: &orphaned,
ID: "conn-orphaned",
Address: pfxOrphaned.String(),
VNI: &vOrphaned,
LastRX: &orphaned,
},
{
ID: "conn-fresh",
Address: pfxFresh.String(),
VNI: &vFresh,
LastRXTimestamp: &fresh,
ID: "conn-fresh",
Address: pfxFresh.String(),
VNI: &vFresh,
LastRX: &fresh,
},
},
},
Expand Down
11 changes: 9 additions & 2 deletions pkg/cmd/alpha/tunnel_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func connectAndInitSession(
return cleanupOnErr(fmt.Errorf("parse assigned addresses: %w", err))
}

var allowedRoutes []icx.Route
for _, route := range connectResp.Routes {
dst, err := netip.ParsePrefix(route.Destination)
if err != nil {
Expand All @@ -431,13 +432,19 @@ func connectAndInitSession(
slog.Any("error", err))
continue
}
overlayAddrs = append(overlayAddrs, dst)

for _, addr := range overlayAddrs {
allowedRoutes = append(allowedRoutes, icx.Route{
Src: addr,
Dst: dst,
})
}
}

if err := handler.AddVirtualNetwork(
connectResp.VNI,
netstack.ToFullAddress(remoteAddr),
overlayAddrs,
allowedRoutes,
); err != nil {
return cleanupOnErr(fmt.Errorf("add virtual network: %w", err))
}
Expand Down
31 changes: 23 additions & 8 deletions pkg/tunnel/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,16 @@ func (c *connection) SetVNI(ctx context.Context, vni uint) error {
c.vni = nil
}

// Add new VNI
var addrs []netip.Prefix
var allowedRoutes []icx.Route
if c.overlayAddr != nil {
addrs = []netip.Prefix{*c.overlayAddr}
// Relays shouldn't have conflicting destinations, so we can use a wildcard src.
var src netip.Prefix
if c.overlayAddr.Addr().Is4() {
src = netip.MustParsePrefix("0.0.0.0/0")
} else {
src = netip.MustParsePrefix("::/0")
}
allowedRoutes = []icx.Route{{Src: src, Dst: *c.overlayAddr}}
}

fa := netstack.ToFullAddress(c.remoteAddr)
Expand All @@ -106,7 +112,7 @@ func (c *connection) SetVNI(ctx context.Context, vni uint) error {
}
}

if err := c.handler.AddVirtualNetwork(vni, fa, addrs); err != nil {
if err := c.handler.AddVirtualNetwork(vni, fa, allowedRoutes); err != nil {
return fmt.Errorf("failed to add virtual network %d: %w", vni, err)
}
c.vni = &vni
Expand Down Expand Up @@ -189,9 +195,18 @@ func (c *connection) SetOverlayAddress(addr string) error {
// Update in-memory state.
c.overlayAddr = &p

// 2) If a VNI is active, update its allowed prefixes in-place.
// Relays shouldn't have conflicting destinations, so we can use a wildcard src.
var src netip.Prefix
if c.overlayAddr.Addr().Is4() {
src = netip.MustParsePrefix("0.0.0.0/0")
} else {
src = netip.MustParsePrefix("::/0")
}
allowedRoutes := []icx.Route{{Src: src, Dst: p}}

// 2) If a VNI is active, update its allowed routes in-place.
if c.vni != nil {
if err := c.handler.UpdateVirtualNetworkAddrs(*c.vni, []netip.Prefix{p}); err != nil {
if err := c.handler.UpdateVirtualNetworkRoutes(*c.vni, allowedRoutes); err != nil {
// Attempt to roll back router state to old addr on failure.
if c.router != nil {
_ = c.router.DelAddr(p)
Expand Down Expand Up @@ -241,8 +256,8 @@ func (c *connection) Stats() (controllers.ConnectionStats, bool) {
}

return controllers.ConnectionStats{
RXBytes: vnet.Stats.RXBytes.Load(),
TXBytes: vnet.Stats.TXBytes.Load(),
RXBytes: int64(vnet.Stats.RXBytes.Load()),
TXBytes: int64(vnet.Stats.TXBytes.Load()),
LastRX: lastRx,
}, true
}
4 changes: 2 additions & 2 deletions pkg/tunnel/controllers/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
// ConnectionStats is a lightweight snapshot of connection counters.
type ConnectionStats struct {
// RXBytes is the total number of bytes received on this connection.
RXBytes uint64
RXBytes int64
// TXBytes is the total number of bytes transmitted on this connection.
TXBytes uint64
TXBytes int64
// LastRX is the last time a packet was received on this connection.
// The zero value indicates that no packets have been received.
LastRX time.Time
Expand Down
24 changes: 16 additions & 8 deletions pkg/tunnel/controllers/tunnel_agent_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,18 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) {

if s, ok := conn.Stats(); ok {
u := corev1alpha2.TunnelAgentConnection{
ID: id,
RXBytes: s.RXBytes,
TxBytes: s.TXBytes,
ID: id,
}
if !s.LastRX.IsZero() {
t := metav1.NewTime(s.LastRX)
u.LastRXTimestamp = &t
u.LastRX = &t
}
// Intentionally swap the order so transfer stats are from the agent's perspective.
if s.TXBytes != 0 {
u.RXBytes = &s.TXBytes
}
if s.RXBytes != 0 {
u.TXBytes = &s.RXBytes
}
updatesByAgent[agentName] = append(updatesByAgent[agentName], u)
}
Expand All @@ -334,7 +339,7 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) {

// Apply updates per agent with conflict retries.
for agentName, updates := range updatesByAgent {
_ = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
var cur corev1alpha2.TunnelAgent
if err := r.client.Get(ctx, types.NamespacedName{Name: agentName}, &cur); err != nil {
// If the object is gone, skip.
Expand All @@ -352,14 +357,17 @@ func (r *TunnelAgentReconciler) PushStatsOnce(ctx context.Context) {
for _, u := range updates {
if c := connByID[u.ID]; c != nil {
c.RXBytes = u.RXBytes
c.TxBytes = u.TxBytes
if u.LastRXTimestamp != nil {
c.LastRXTimestamp = u.LastRXTimestamp
c.TXBytes = u.TXBytes
if u.LastRX != nil {
c.LastRX = u.LastRX
}
}
}

return r.client.Status().Update(ctx, &cur)
})
if err != nil {
slog.Warn("Failed to update TunnelAgent stats", slog.String("agent", agentName), slog.Any("error", err))
}
}
}
8 changes: 4 additions & 4 deletions pkg/tunnel/controllers/tunnel_agent_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ func TestTunnelAgentPushStatsOnce_UpdatesStatusForKnownConnection(t *testing.T)
require.Len(t, got.Status.Connections, 1)
entry := got.Status.Connections[0]
assert.Equal(t, "conn-stat", entry.ID)
assert.Equal(t, uint64(1111), entry.RXBytes)
assert.Equal(t, uint64(2222), entry.TxBytes)
require.NotNil(t, entry.LastRXTimestamp)
assert.True(t, entry.LastRXTimestamp.Time.Equal(lastRX))
assert.Equal(t, int64(2222), *entry.RXBytes) // From the agent's perspective
assert.Equal(t, int64(1111), *entry.TXBytes)
require.NotNil(t, entry.LastRX)
assert.True(t, entry.LastRX.Time.Equal(lastRX))

relay.AssertExpectations(t)
conn.AssertExpectations(t)
Expand Down
8 changes: 5 additions & 3 deletions pkg/tunnel/router/icx_netlink_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/netip"
"os"
"sync"
"time"

"github.com/apoxy-dev/icx"
"github.com/apoxy-dev/icx/addrselect"
Expand Down Expand Up @@ -108,6 +109,7 @@ func NewICXNetlinkRouter(opts ...Option) (*ICXNetlinkRouter, error) {

handlerOpts := []icx.HandlerOption{
icx.WithVirtMAC(virtMAC),
icx.WithKeepAliveInterval(25 * time.Second),
}

for _, addr := range extAddrs {
Expand Down Expand Up @@ -397,8 +399,8 @@ func (r *ICXNetlinkRouter) syncDNATChain() error {
for i, peer := range peers {
slog.Info("Adding DNAT rules for peer", slog.String("peer", peer.RemoteAddr.Addr.String()))

for _, addr := range peer.Addrs {
if addr.Addr().Is4() { // Skipping IPv4 peers - only IPv6 tunnel ingress is supported.
for _, route := range peer.AllowedRoutes {
if route.Dst.Addr().Is4() { // Skipping IPv4 peers - only IPv6 tunnel ingress is supported.
continue
}
natRules.Write(
Expand All @@ -407,7 +409,7 @@ func (r *ICXNetlinkRouter) syncDNATChain() error {
"--mode", "random",
"--probability", probability(len(peers)-i),
"-j", "DNAT",
"--to-destination", addr.Addr().String(),
"--to-destination", route.Dst.Addr().String(),
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/tunnel/router/icx_netstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/netip"
"strconv"
"sync"
"time"

"github.com/apoxy-dev/icx"
"github.com/dpeckett/network"
Expand Down Expand Up @@ -51,6 +52,7 @@ func NewICXNetstackRouter(opts ...Option) (*ICXNetstackRouter, error) {

handlerOpts := []icx.HandlerOption{
icx.WithLayer3VirtFrames(),
icx.WithKeepAliveInterval(25 * time.Second),
}
if options.sourcePortHashing {
handlerOpts = append(handlerOpts, icx.WithSourcePortHashing())
Expand Down