Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package netpool

import (
"io"
"net"
"syscall"
)

func connCheck(conn net.Conn) error {
var sysErr error
rc, err := conn.(syscall.Conn).SyscallConn()
if err != nil {
return err
}
err = rc.Read(func(fd uintptr) bool {
buf := make([]byte, 1)
n, _, err := syscall.Recvfrom(int(fd), buf, syscall.MSG_PEEK|syscall.MSG_DONTWAIT)
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
// no-op
default:
sysErr = err
}
return true
})
if err != nil {
return err
}

return sysErr
}
44 changes: 39 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,64 @@ package netpool

import (
"net"
"time"
)

type Config struct {
type config struct {
DialHooks []func(c net.Conn) error
MaxPool int32
MinPool int32
CheckTime time.Duration
CullTime time.Duration
}

type Opt func(c *Config)
type Opt func(c *config)

// Maximum number of connections to allow to be active at any given moment.
// Connections both in the pool and currently in use count towards this limit.
// Additional calls will be held in a locked state waiting for another active
// session to be put back into the pool.
//
// Warning: As the maximum number of connections is a hard limit, a circular
// locking state can be met if a calling function requests more than one open
// connection per triggered event and more than one event requests the maximum.
func WithMaxPool(max int32) Opt {
return func(c *Config) {
return func(c *config) {
c.MaxPool = max
}
}

// Initial number of connections to open and then maintain over time.
// WithChecks must be set for a minimum number of connections to be opened when
// the minimum is unmet.
func WithMinPool(min int32) Opt {
return func(c *Config) {
return func(c *config) {
c.MinPool = min
}
}

// Add a hook to the DialHooks stack, can be called more than once, for
// handling connection setup steps, like a login. All the dial hooks must pass
// per connection or an error will be returned with the Get() call with the
// error from the hook.
func WithDialHooks(hooks ...func(c net.Conn) error) Opt {
return func(c *Config) {
return func(c *config) {
c.DialHooks = append(c.DialHooks, hooks...)
}
}

// Upon every check time, d, one connection will verified and then put back in
// the pool if the check passed.
func WithChecks(d time.Duration) Opt {
return func(c *config) {
c.CheckTime = d
}
}

// Upon the check interval, close connections in pool if no Get() or Put()
// calls have been seen in the given timeout.
func WithTimeout(d time.Duration) Opt {
return func(c *config) {
c.CullTime = d
}
}
67 changes: 67 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package netpool_test

import (
"fmt"
"log"
"net"
"time"

"github.com/yudhasubki/netpool"
)

func init() {
go func() {
l, err := net.Listen("tcp", ":2000")
if err != nil {
log.Fatal(err)
}
defer l.Close()
// Wait for a connection.
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
buf := make([]byte, 4096)
conn.Read(buf)
conn.Write([]byte("hello world"))
conn.Close()
}()
}

func ExampleNew() {
netpool, err := netpool.New(func() (net.Conn, error) {
return net.Dial("tcp", "localhost:2000")
},
netpool.WithMaxPool(2), // default 15
netpool.WithMinPool(1), // default 5
netpool.WithChecks(time.Second), // default none
)
if err != nil {
panic(err)
}
defer netpool.Close()

conn, err := netpool.Get()
defer netpool.Put(conn, err)
if err != nil {
log.Fatal(err)
}

_, err = conn.Write([]byte("dial"))
if err != nil {
log.Fatal(err)
}

// Read the response from the connection
buffer := make([]byte, 4096)
var n int
n, err = conn.Read(buffer)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Read %s\n", buffer[:n])

netpool.Put(conn, err)
// Output:
// Read hello world
}
119 changes: 111 additions & 8 deletions netpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/list"
"net"
"sync"
"time"
)

type Netpooler interface {
Expand All @@ -13,19 +14,25 @@ type Netpooler interface {
Put(conn net.Conn, err error)
}

// Handle for the connection pool.
type Netpool struct {
actives int32
counter int64
cond *sync.Cond
connections *list.List
fn netpoolFunc
mu *sync.Mutex
config Config
config config
last time.Time
}

type netpoolFunc func() (net.Conn, error)

// New will create a netpool with the connection creation function and options
// as desired. The fn is a function handle to call for creation of a new conn,
// and then opts sets the desired state of the netpool.
func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) {
config := Config{
config := config{
MaxPool: 15,
MinPool: 5,
}
Expand All @@ -40,6 +47,7 @@ func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) {
actives: 0,
connections: list.New(),
config: config,
last: time.Now(),
}

for i := int32(0); i < config.MinPool; i++ {
Expand All @@ -60,22 +68,92 @@ func New(fn netpoolFunc, opts ...Opt) (*Netpool, error) {

netpool.connections.PushBack(conn)
netpool.actives++
netpool.counter++
}
netpool.cond = sync.NewCond(netpool.mu)

if config.CheckTime > time.Second/4 {
go func() {
for {
// Wait a given timestep
time.Sleep(config.CheckTime)
netpool.mu.Lock()

// If there hasn't been much use, close all connections beyond the
// minimum required pooled connections.
if time.Now().Sub(netpool.last) > config.CullTime {
for netpool.connections.Len() > int(config.MinPool) {
conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn)
conn.Close()
netpool.actives--
}
}

// Check one connection every time step. Eventually any dead
// connections will be weeded out.
if netpool.connections.Len() > 0 {
conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn)
err := connCheck(conn)
if err != nil {
if conn != nil {
conn.Close()
netpool.actives--
}
} else {
netpool.connections.PushBack(conn)
}
}

// Maintain a minimum of connections in a "ready" state.
fillPool:
for netpool.connections.Len() < int(config.MinPool) {
conn, err := netpool.fn()
if err != nil {
break
}

if len(netpool.config.DialHooks) > 0 {
for _, dialHook := range netpool.config.DialHooks {
err = dialHook(conn)
if err != nil {
conn.Close()
break fillPool
}
}
}

netpool.connections.PushBack(conn)
netpool.actives++
netpool.counter++
}
netpool.mu.Unlock()
}
}()
}

return netpool, nil
}

// Get a net.conn to work with. If a used connection returned is checked to be
// ready for use, a new connection will be created if none is available, or an
// error will be returned.
func (netpool *Netpool) Get() (net.Conn, error) {
netpool.mu.Lock()
defer netpool.mu.Unlock()
netpool.last = time.Now()

for netpool.connections.Len() == 0 && netpool.actives >= netpool.config.MaxPool {
netpool.cond.Wait()
}

if netpool.connections.Len() > 0 {
return netpool.connections.Remove(netpool.connections.Front()).(net.Conn), nil
for netpool.connections.Len() > 0 {
conn := netpool.connections.Remove(netpool.connections.Front()).(net.Conn)
err := connCheck(conn)
if err == nil {
return conn, nil
}
conn.Close()
netpool.actives--
}

c, err := netpool.fn()
Expand All @@ -84,8 +162,8 @@ func (netpool *Netpool) Get() (net.Conn, error) {
}

if len(netpool.config.DialHooks) > 0 {
for _, hook := range netpool.config.DialHooks {
err = hook(c)
for _, dialHook := range netpool.config.DialHooks {
err = dialHook(c)
if err != nil {
c.Close()
return nil, err
Expand All @@ -94,14 +172,21 @@ func (netpool *Netpool) Get() (net.Conn, error) {
}

netpool.actives++
netpool.counter++

return c, nil
}

// Put a connection back on the pool. If an error is passed in then the active
// count will be reduced and not returned to the pool.
func (netpool *Netpool) Put(conn net.Conn, err error) {
// Verify that the connection is in a good state before returning it to the pool
if err == nil {
err = connCheck(conn)
}

netpool.mu.Lock()
defer netpool.mu.Unlock()

if err != nil {
if conn != nil {
conn.Close()
Expand All @@ -111,15 +196,33 @@ func (netpool *Netpool) Put(conn net.Conn, err error) {
netpool.connections.PushBack(conn)
}

netpool.last = time.Now()
netpool.cond.Signal()
}

// Close all connections in the pool.
//
// Note: If a minimum number of open connections is configured along with a
// check time, the minimum number of connections will be reopened.
func (netpool *Netpool) Close() {
for n := netpool.connections.Front(); n == nil; n = n.Next() {
for n := netpool.connections.Front(); n != nil; n = n.Next() {
n.Value.(net.Conn).Close()
}
}

// For metrics purposes, one can call actives to see any active connections
// both waiting and in-use.
func (netpool *Netpool) Actives() int {
return int(netpool.actives)
}

// For metrics purposes, one can call counter to see total count of created
// connections.
func (netpool *Netpool) Counter() int64 {
return netpool.counter
}

// Size of the connection pool.
func (netpool *Netpool) Len() int {
return netpool.connections.Len()
}