diff --git a/cluster.go b/cluster.go index 7fc639c24..1d763b04e 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -419,7 +421,7 @@ func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoSer if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.minPoolSize, cluster.maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { diff --git a/server.go b/server.go index 7ad955255..fb3734019 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int } type dialer struct { @@ -76,17 +78,22 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: sync, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } server.Unlock() @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]bool{} + // Because the acquirision and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = true + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if !reclaimMap[s] { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index b62707c84..dcc5d8928 100644 --- a/session.go +++ b/session.go @@ -269,6 +269,15 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. +// // appName= // // The identifier of this client application. This parameter is used to @@ -320,6 +329,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -366,6 +377,16 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -400,6 +421,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -473,6 +496,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -552,6 +583,15 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + if info.MinPoolSize > 0 { + cluster.minPoolSize = info.MinPoolSize + } + + if info.MaxIdleTimeMS > 0 { + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + } + cluster.Release() // People get confused when we return a session that is not actually @@ -5262,7 +5302,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { diff --git a/socket.go b/socket.go index f6158189c..94ba4dae0 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release } type queryOpFlags uint32