From 51eef0c533cdbc9af416872817f5cb2f8d54e9c1 Mon Sep 17 00:00:00 2001 From: schou Date: Thu, 24 Oct 2024 10:19:09 -0400 Subject: [PATCH 1/2] Fix bug with "next == nil" in close, add comments, and add connection checks --- check.go | 33 +++++++++++++ config.go | 44 ++++++++++++++++-- examples_test.go | 67 ++++++++++++++++++++++++++ netpool.go | 119 +++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 250 insertions(+), 13 deletions(-) create mode 100644 check.go create mode 100644 examples_test.go diff --git a/check.go b/check.go new file mode 100644 index 0000000..aafa954 --- /dev/null +++ b/check.go @@ -0,0 +1,33 @@ +package netpool + +import ( + "io" + "net" + "syscall" +) + +func connCheck(conn net.Conn) error { + var sysErr error + rc, err := conn.(syscall.Conn).SyscallConn() + if err != nil { + return err + } + err = rc.Read(func(fd uintptr) bool { + buf := make([]byte, 1) + n, _, err := syscall.Recvfrom(int(fd), buf, syscall.MSG_PEEK|syscall.MSG_DONTWAIT) + switch { + case n == 0 && err == nil: + sysErr = io.EOF + case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK: + // no-op + default: + sysErr = err + } + return true + }) + if err != nil { + return err + } + + return sysErr +} diff --git a/config.go b/config.go index 21e88f4..3cad139 100644 --- a/config.go +++ b/config.go @@ -2,30 +2,64 @@ package netpool import ( "net" + "time" ) -type Config struct { +type config struct { DialHooks []func(c net.Conn) error MaxPool int32 MinPool int32 + CheckTime time.Duration + CullTime time.Duration } -type Opt func(c *Config) +type Opt func(c *config) +// Maximum number of connections to allow to be active at any given moment. +// Connections both in the pool and currently in use count towards this limit. +// Additional calls will be held in a locked state waiting for another active +// session to be put back into the pool. +// +// Warning: As the maximum number of connections is a hard limit, a circular +// locking state can be met if a calling function requests more than one open +// connection per triggered event and more than one event requests the maximum. func WithMaxPool(max int32) Opt { - return func(c *Config) { + return func(c *config) { c.MaxPool = max } } +// Initial number of connections to open and then maintain over time. +// WithChecks must be set for a minimum number of connections to be opened when +// the minimum is unmet. func WithMinPool(min int32) Opt { - return func(c *Config) { + return func(c *config) { c.MinPool = min } } +// Add a hook to the DialHooks stack, can be called more than once, for +// handling connection setup steps, like a login. All the dial hooks must pass +// per connection or an error will be returned with the Get() call with the +// error from the hook. func WithDialHooks(hooks ...func(c net.Conn) error) Opt { - return func(c *Config) { + return func(c *config) { c.DialHooks = append(c.DialHooks, hooks...) } } + +// Upon every check time, d, one connection will verified and then put back in +// the pool if the check passed. +func WithChecks(d time.Duration) Opt { + return func(c *config) { + c.CheckTime = d + } +} + +// Upon the check interval, close connections in pool if no Get() or Put() +// calls have been seen in the given timeout. +func WithTimeout(d time.Duration) Opt { + return func(c *config) { + c.CullTime = d + } +} diff --git a/examples_test.go b/examples_test.go new file mode 100644 index 0000000..f23e4b4 --- /dev/null +++ b/examples_test.go @@ -0,0 +1,67 @@ +package netpool_test + +import ( + "fmt" + "log" + "net" + "time" + + "github.com/yudhasubki/netpool" +) + +func init() { + go func() { + l, err := net.Listen("tcp", ":2000") + if err != nil { + log.Fatal(err) + } + defer l.Close() + // Wait for a connection. + conn, err := l.Accept() + if err != nil { + log.Fatal(err) + } + buf := make([]byte, 4096) + conn.Read(buf) + conn.Write([]byte("hello world")) + conn.Close() + }() +} + +func ExampleNew() { + netpool, err := netpool.New(func() (net.Conn, error) { + return net.Dial("tcp", "localhost:2000") + }, + netpool.WithMaxPool(2), // default 15 + netpool.WithMinPool(1), // default 5 + netpool.WithChecks(time.Second), // default none + ) + if err != nil { + panic(err) + } + defer netpool.Close() + + conn, err := netpool.Get() + defer netpool.Put(conn, err) + if err != nil { + log.Fatal(err) + } + + _, err = conn.Write([]byte("dial")) + if err != nil { + log.Fatal(err) + } + + // Read the response from the connection + buffer := make([]byte, 4096) + var n int + n, err = conn.Read(buffer) + if err != nil { + log.Fatal(err) + } + fmt.Printf("Read %s\n", buffer[:n]) + + netpool.Put(conn, err) + // Output: + // Read hello world +} diff --git a/netpool.go b/netpool.go index a5f0a4b..3f93948 100644 --- a/netpool.go +++ b/netpool.go @@ -4,6 +4,7 @@ import ( "container/list" "net" "sync" + "time" ) type Netpooler interface { @@ -13,19 +14,25 @@ type Netpooler interface { Put(conn net.Conn, err error) } +// Handle for the connection pool. type Netpool struct { actives int32 + counter int64 cond *sync.Cond connections *list.List fn netpoolFunc mu *sync.Mutex - config Config + config config + last time.Time } type netpoolFunc func() (net.Conn, error) +// New will create a netpool with the connection creation function and options +// as desired. The fn is a function handle to call for creation of a new conn, +// and then opts sets the desired state of the netpool. func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) { - config := Config{ + config := config{ MaxPool: 15, MinPool: 5, } @@ -40,6 +47,7 @@ func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) { actives: 0, connections: list.New(), config: config, + last: time.Now(), } for i := int32(0); i < config.MinPool; i++ { @@ -60,22 +68,92 @@ func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) { netpool.connections.PushBack(conn) netpool.actives++ + netpool.counter++ } netpool.cond = sync.NewCond(netpool.mu) + if config.CheckTime > time.Second/4 { + go func() { + for { + // Wait a given timestep + time.Sleep(config.CheckTime) + netpool.mu.Lock() + + // If there hasn't been much use, close all connections beyond the + // minimum required pooled connections. + if time.Now().Sub(netpool.last) > config.CullTime { + for netpool.connections.Len() > int(config.MinPool) { + conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn) + conn.Close() + netpool.actives-- + } + } + + // Check one connection every time step. Eventually any dead + // connections will be weeded out. + if netpool.connections.Len() > 0 { + conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn) + err := connCheck(conn) + if err != nil { + if conn != nil { + conn.Close() + netpool.actives-- + } + } else { + netpool.connections.PushBack(conn) + } + } + + // Maintain a minimum of connections in a "ready" state. + fillPool: + for netpool.connections.Len() < int(config.MinPool) { + conn, err := netpool.fn() + if err != nil { + break + } + + if len(netpool.config.DialHooks) > 0 { + for _, dialHook := range netpool.config.DialHooks { + err = dialHook(conn) + if err != nil { + conn.Close() + break fillPool + } + } + } + + netpool.connections.PushBack(conn) + netpool.actives++ + netpool.counter++ + } + netpool.mu.Unlock() + } + }() + } + return netpool, nil } +// Get a net.conn to work with. If a used connection returned is checked to be +// ready for use, a new connection will be created if none is available, or an +// error will be returned. func (netpool *Netpool) Get() (net.Conn, error) { netpool.mu.Lock() defer netpool.mu.Unlock() + netpool.last = time.Now() for netpool.connections.Len() == 0 && netpool.actives >= netpool.config.MaxPool { netpool.cond.Wait() } - if netpool.connections.Len() > 0 { - return netpool.connections.Remove(netpool.connections.Front()).(net.Conn), nil + for netpool.connections.Len() > 0 { + conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn) + err := connCheck(conn) + if err == nil { + return conn, nil + } + conn.Close() + netpool.actives-- } c, err := netpool.fn() @@ -84,8 +162,8 @@ func (netpool *Netpool) Get() (net.Conn, error) { } if len(netpool.config.DialHooks) > 0 { - for _, hook := range netpool.config.DialHooks { - err = hook(c) + for _, dialHook := range netpool.config.DialHooks { + err = dialHook(c) if err != nil { c.Close() return nil, err @@ -94,14 +172,21 @@ func (netpool *Netpool) Get() (net.Conn, error) { } netpool.actives++ + netpool.counter++ return c, nil } +// Put a connection back on the pool. If an error is passed in then the active +// count will be reduced and not returned to the pool. func (netpool *Netpool) Put(conn net.Conn, err error) { + // Verify that the connection is in a good state before returning it to the pool + if err == nil { + err = connCheck(conn) + } + netpool.mu.Lock() defer netpool.mu.Unlock() - if err != nil { if conn != nil { conn.Close() @@ -111,15 +196,33 @@ func (netpool *Netpool) Put(conn net.Conn, err error) { netpool.connections.PushBack(conn) } + netpool.last = time.Now() netpool.cond.Signal() } +// Close all connections in the pool. +// +// Note: If a minimum number of open connections is configured along with a +// check time, the minimum number of connections will be reopened. func (netpool *Netpool) Close() { - for n := netpool.connections.Front(); n == nil; n = n.Next() { + for n := netpool.connections.Front(); n != nil; n = n.Next() { n.Value.(net.Conn).Close() } } +// For metrics purposes, one can call actives to see any active connections +// both waiting and in-use. +func (netpool *Netpool) Actives() int { + return int(netpool.actives) +} + +// For metrics purposes, one can call count to see total count of created +// connections. +func (netpool *Netpool) Counter() int64 { + return netpool.counter +} + +// Size of the connection pool. func (netpool *Netpool) Len() int { return netpool.connections.Len() } From a8e059eb7eafa969b06ecb46ecbf2fcb600e0893 Mon Sep 17 00:00:00 2001 From: schou Date: Thu, 24 Oct 2024 10:42:08 -0400 Subject: [PATCH 2/2] fix name --- netpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netpool.go b/netpool.go index 3f93948..f6b4ca1 100644 --- a/netpool.go +++ b/netpool.go @@ -216,7 +216,7 @@ func (netpool *Netpool) Actives() int { return int(netpool.actives) } -// For metrics purposes, one can call count to see total count of created +// For metrics purposes, one can call counter to see total count of created // connections. func (netpool *Netpool) Counter() int64 { return netpool.counter