From 6b163da87c3570f66b52784435f0ad108ffc1206 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 8 Apr 2025 10:51:20 +0200 Subject: [PATCH 1/3] Placebo change --- toversok/actors/a_direct.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/toversok/actors/a_direct.go b/toversok/actors/a_direct.go index d7b02ad..015b45d 100644 --- a/toversok/actors/a_direct.go +++ b/toversok/actors/a_direct.go @@ -124,9 +124,11 @@ func (s *Stage) makeDR() *DirectRouter { } func (dr *DirectRouter) Push(frame ifaces.DirectedPeerFrame) { + // go func() { dr.frameCh <- frame // }() + } func (dr *DirectRouter) Run() { From 831fb6b7f50d7473adf53aedc85e78b9844a63cb Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 8 Apr 2025 10:56:56 +0200 Subject: [PATCH 2/3] Have frames be pushed in goroutine --- toversok/actors/a_direct.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/toversok/actors/a_direct.go b/toversok/actors/a_direct.go index 015b45d..e12a4ba 100644 --- a/toversok/actors/a_direct.go +++ b/toversok/actors/a_direct.go @@ -124,11 +124,9 @@ func (s *Stage) makeDR() *DirectRouter { } func (dr *DirectRouter) Push(frame ifaces.DirectedPeerFrame) { - - // go func() { - dr.frameCh <- frame - // }() - + go func() { + dr.frameCh <- frame + }() } func (dr *DirectRouter) Run() { From 86bf2ce8109ba15f70622cc57e342af75d299642 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 8 Apr 2025 12:32:08 +0200 Subject: [PATCH 3/3] Various performance improvements - remove time.After invocations on hot paths - slight buffer improvements - remove drop in ForwardPacket --- toversok/actors/a_conn.go | 16 +++++++++------- toversok/actors/a_direct.go | 6 +++--- toversok/actors/a_sockrecv.go | 10 ++++++---- usrwg/channel_conn.go | 31 ++++++++++++++++++++----------- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/toversok/actors/a_conn.go b/toversok/actors/a_conn.go index e92cb2e..1d3bad4 100644 --- a/toversok/actors/a_conn.go +++ b/toversok/actors/a_conn.go @@ -275,13 +275,15 @@ func (ic *InConn) Ctx() context.Context { // This prevents routers from blocking when the conn is shutting down, // or if its blocked otherwise. func (ic *InConn) ForwardPacket(pkt []byte) { - select { - case ic.pktCh <- pkt: - default: - // TODO maybe convert dropping to timeout? - // making lots of timers would be costly though - // TODO log? metric? - } + //select { + //case ic.pktCh <- pkt: + //default: + // // TODO maybe convert dropping to timeout? + // // making lots of timers would be costly though + // // TODO log? metric? + //} + + ic.pktCh <- pkt } // Bump the activity timer. diff --git a/toversok/actors/a_direct.go b/toversok/actors/a_direct.go index e12a4ba..d7b02ad 100644 --- a/toversok/actors/a_direct.go +++ b/toversok/actors/a_direct.go @@ -124,9 +124,9 @@ func (s *Stage) makeDR() *DirectRouter { } func (dr *DirectRouter) Push(frame ifaces.DirectedPeerFrame) { - go func() { - dr.frameCh <- frame - }() + // go func() { + dr.frameCh <- frame + // }() } func (dr *DirectRouter) Run() { diff --git a/toversok/actors/a_sockrecv.go b/toversok/actors/a_sockrecv.go index 374da4a..3f4eb33 100644 --- a/toversok/actors/a_sockrecv.go +++ b/toversok/actors/a_sockrecv.go @@ -1,13 +1,13 @@ package actors import ( + "bytes" "context" "errors" "log/slog" "net" "net/netip" "runtime/debug" - "slices" "time" "github.com/edup2p/common/types" @@ -52,7 +52,9 @@ func (r *SockRecv) Run() { } }() - buf := make([]byte, 1<<16) + const MaxBuf = 1 << 16 + + var buf = [MaxBuf]byte{0} for { if r.ctx.Err() != nil { @@ -65,7 +67,7 @@ func (r *SockRecv) Run() { return } - n, ap, err := r.Conn.ReadFromUDPAddrPort(buf) + n, ap, err := r.Conn.ReadFromUDPAddrPort(buf[:]) ts := time.Now() @@ -92,7 +94,7 @@ func (r *SockRecv) Run() { continue } - pkt := slices.Clone(buf[:n]) + pkt := bytes.Clone(buf[:n]) if r.ctx.Err() != nil { return diff --git a/usrwg/channel_conn.go b/usrwg/channel_conn.go index 89581df..5d3259a 100644 --- a/usrwg/channel_conn.go +++ b/usrwg/channel_conn.go @@ -43,15 +43,19 @@ func (cc *ChannelConn) SetReadDeadline(t time.Time) error { func (cc *ChannelConn) ReadFromUDPAddrPort(b []byte) (n int, addr netip.AddrPort, err error) { var val []byte - if cc.currentReadDeadline == (time.Time{}) { - // Block until value received. - val = <-cc.incoming - } else { - // Block until value or timeout. - select { - case val = <-cc.incoming: - case <-time.After(time.Until(cc.currentReadDeadline)): - return 0, netip.AddrPort{}, context.DeadlineExceeded + select { + case val = <-cc.incoming: + default: + if cc.currentReadDeadline == (time.Time{}) { + // Block until value received. + val = <-cc.incoming + } else { + // Block until value or timeout. + select { + case val = <-cc.incoming: + case <-time.After(time.Until(cc.currentReadDeadline)): + return 0, netip.AddrPort{}, context.DeadlineExceeded + } } } @@ -110,8 +114,13 @@ func (cc *ChannelConn) putIn(pkt []byte, d time.Duration) (ok bool) { select { case cc.incoming <- pkt: ok = true - case <-time.After(d): - ok = false + default: + select { + case cc.incoming <- pkt: + ok = true + case <-time.After(d): + ok = false + } } return