From 506d402dc407ca8f366f78f736f5f01ab874614a Mon Sep 17 00:00:00 2001 From: Wang Xu Date: Tue, 27 Feb 2018 20:43:19 +0800 Subject: [PATCH 1/2] enable shrink the socket pool size we found the mgo will allocate the pool size during burst traffic but won't close the sockets any more until restart the client or server. And the mongo document defines two related query options - [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize) - [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS) By implementing these two options, it could shrink the pool to minPoolSize after the sockets introduced by burst traffic timeout. The idea comes from https://github.com/JodeZer/mgo , he investigated this issue and provide the initial commits. I found there are still some issue in sockets maintenance, and had a PR against his repo JodeZer/mgo#1 . This commit include JodeZer's commits and my fix, and I simplified the data structure. What's in this commit could be described as this figure: +------------------------+ | Session | <-------+ Add options here +------------------------+ +------------------------+ | Cluster | <-------+ Add options here +------------------------+ +------------------------+ | Server | <-------+*Add options here | | *add timestamp when recycle a socket +---+ | +-----------+ | +---+ *periodically check the unused sockets | | | shrinker <------+ and reclaim the timeout sockets. +---+ | +-----------+ | | | | | +------------------------+ | | +------------------------+ | | Socket | <-------+ Add a field for last used times+---------+ +------------------------+ Signed-off-by: Wang Xu --- cluster.go | 36 +++++++++++++++------------ server.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++++------ session.go | 42 ++++++++++++++++++++++++++++++++ socket.go | 1 + 4 files changed, 126 insertions(+), 24 deletions(-) diff --git a/cluster.go b/cluster.go index ac461d5b9..204f507bd 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() { func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer { cluster.RLock() server := cluster.servers.Search(tcpaddr.String()) + minPoolSize := cluster.minPoolSize + maxIdleTimeMS := cluster.maxIdleTimeMS cluster.RUnlock() if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { diff --git a/server.go b/server.go index 7ad955255..f999b2407 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int } type dialer struct { @@ -76,17 +78,22 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: sync, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } server.Unlock() @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]struct{}{} + // Because the acquisition and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = struct{}{} + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if _, ok := reclaimMap[s]; !ok { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index 561f79487..d1c88420e 100644 --- a/session.go +++ b/session.go @@ -271,6 +271,16 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. If maxIdleTimeMS is 0, connections will never be +// closed due to inactivity. +// // appName= // // The identifier of this client application. This parameter is used to @@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + if minPoolSize < 0 { + return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } + if maxIdleTimeMS < 0 { + return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -475,6 +505,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + cluster.minPoolSize = info.MinPoolSize + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + cluster.Release() // People get confused when we return a session that is not actually diff --git a/socket.go b/socket.go index a9124b043..ae13e401f 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release sendMeta sync.Once } From cebb53498d217ba5e91b02fe275729d046faab08 Mon Sep 17 00:00:00 2001 From: Wang Xu Date: Tue, 27 Feb 2018 20:43:43 +0800 Subject: [PATCH 2/2] tests for shrink the socks pool Signed-off-by: Wang Xu --- session_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/session_test.go b/session_test.go index eb2c812b3..c740af47a 100644 --- a/session_test.go +++ b/session_test.go @@ -30,11 +30,13 @@ import ( "flag" "fmt" "math" + "math/rand" "os" "runtime" "sort" "strconv" "strings" + "sync" "testing" "time" @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) { } } +func (s *S) TestMinPoolSize(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?minPoolSize=0", 0, false}, + {"localhost:40001?minPoolSize=1", 1, false}, + {"localhost:40001?minPoolSize=-1", -1, true}, + {"localhost:40001?minPoolSize=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MinPoolSize, Equals, test.size) + } + } +} + +func (s *S) TestMaxIdleTimeMS(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?maxIdleTimeMS=0", 0, false}, + {"localhost:40001?maxIdleTimeMS=1", 1, false}, + {"localhost:40001?maxIdleTimeMS=-1", -1, true}, + {"localhost:40001?maxIdleTimeMS=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MaxIdleTimeMS, Equals, test.size) + } + } +} + +func (s *S) TestPoolShrink(c *C) { + if *fast { + c.Skip("-fast") + } + oldSocket := mgo.GetStats().SocketsAlive + + session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000") + c.Assert(err, IsNil) + defer session.Close() + + parallel := 10 + res := make(chan error, parallel+1) + wg := &sync.WaitGroup{} + for i := 1; i < parallel; i++ { + wg.Add(1) + go func() { + s := session.Copy() + defer s.Close() + result := struct{}{} + err := s.Run("ping", &result) + + //sleep random time to make the allocate and release in different sequence + time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond) + res <- err + wg.Done() + }() + } + wg.Wait() + stats := mgo.GetStats() + c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket) + + // give some time for shrink the pool, the tick is set to 1 minute + c.Log("Sleeping... 1 minute to for pool shrinking") + time.Sleep(60 * time.Second) + + stats = mgo.GetStats() + c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket) + c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false) +} + func (s *S) TestURLReadPreferenceTags(c *C) { type test struct { url string