From a7f58c0a6dfc5814659e2a14fcf9f137f73a44b5 Mon Sep 17 00:00:00 2001 From: Wang Xu Date: Fri, 26 Jan 2018 18:27:01 +0800 Subject: [PATCH] enable shrink the socket pool size we found the mgo will allocate the pool size during burst traffic but won't close the sockets any more until restart the client or server. And the mongo document defines two related query options - [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize) - [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS) By implementing these two options, it could shrink the pool to minPoolSize after the sockets introduced by burst traffic timeout. The idea comes from https://github.com/JodeZer/mgo , he investigated this issue and provide the initial commits. I found there are still some issue in sockets maintenance, and had a PR against his repo JodeZer/mgo#1 . This commit include JodeZer's commits and my fix, and I simplified the data structure. What's in this commit could be described as this figure: +------------------------+ | Session | <-------+ Add options here +------------------------+ +------------------------+ | Cluster | <-------+ Add options here +------------------------+ +------------------------+ | Server | <-------+*Add options here | | *add timestamp when recycle a socket +---+ | +-----------+ | +---+ *periodically check the unused sockets | | | shrinker <------+ and reclaim the timeout sockets. +---+ | +-----------+ | | | | | +------------------------+ | | +------------------------+ | | Socket | <-------+ Add a field for last used times+---------+ +------------------------+ Signed-off-by: Wang Xu --- cluster.go | 34 ++++++++++++++------------ server.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++++------ session.go | 42 +++++++++++++++++++++++++++++++- socket.go | 1 + 4 files changed, 123 insertions(+), 25 deletions(-) diff --git a/cluster.go b/cluster.go index 7fc639c24..1d763b04e 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -419,7 +421,7 @@ func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoSer if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.minPoolSize, cluster.maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { diff --git a/server.go b/server.go index 7ad955255..fb3734019 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int } type dialer struct { @@ -76,17 +78,22 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: sync, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } server.Unlock() @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]bool{} + // Because the acquirision and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = true + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if !reclaimMap[s] { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index b62707c84..dcc5d8928 100644 --- a/session.go +++ b/session.go @@ -269,6 +269,15 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. +// // appName= // // The identifier of this client application. This parameter is used to @@ -320,6 +329,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -366,6 +377,16 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -400,6 +421,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -473,6 +496,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -552,6 +583,15 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + if info.MinPoolSize > 0 { + cluster.minPoolSize = info.MinPoolSize + } + + if info.MaxIdleTimeMS > 0 { + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + } + cluster.Release() // People get confused when we return a session that is not actually @@ -5262,7 +5302,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { diff --git a/socket.go b/socket.go index f6158189c..94ba4dae0 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release } type queryOpFlags uint32