diff --git a/cluster.go b/cluster.go index ac461d5b9..204f507bd 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() { func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer { cluster.RLock() server := cluster.servers.Search(tcpaddr.String()) + minPoolSize := cluster.minPoolSize + maxIdleTimeMS := cluster.maxIdleTimeMS cluster.RUnlock() if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { diff --git a/server.go b/server.go index 7ad955255..f999b2407 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int } type dialer struct { @@ -76,17 +78,22 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: sync, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } server.Unlock() @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]struct{}{} + // Because the acquisition and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = struct{}{} + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if _, ok := reclaimMap[s]; !ok { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index 561f79487..d1c88420e 100644 --- a/session.go +++ b/session.go @@ -271,6 +271,16 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. If maxIdleTimeMS is 0, connections will never be +// closed due to inactivity. +// // appName= // // The identifier of this client application. This parameter is used to @@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + if minPoolSize < 0 { + return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } + if maxIdleTimeMS < 0 { + return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -475,6 +505,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + cluster.minPoolSize = info.MinPoolSize + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + cluster.Release() // People get confused when we return a session that is not actually diff --git a/session_test.go b/session_test.go index eb2c812b3..c740af47a 100644 --- a/session_test.go +++ b/session_test.go @@ -30,11 +30,13 @@ import ( "flag" "fmt" "math" + "math/rand" "os" "runtime" "sort" "strconv" "strings" + "sync" "testing" "time" @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) { } } +func (s *S) TestMinPoolSize(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?minPoolSize=0", 0, false}, + {"localhost:40001?minPoolSize=1", 1, false}, + {"localhost:40001?minPoolSize=-1", -1, true}, + {"localhost:40001?minPoolSize=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MinPoolSize, Equals, test.size) + } + } +} + +func (s *S) TestMaxIdleTimeMS(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?maxIdleTimeMS=0", 0, false}, + {"localhost:40001?maxIdleTimeMS=1", 1, false}, + {"localhost:40001?maxIdleTimeMS=-1", -1, true}, + {"localhost:40001?maxIdleTimeMS=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MaxIdleTimeMS, Equals, test.size) + } + } +} + +func (s *S) TestPoolShrink(c *C) { + if *fast { + c.Skip("-fast") + } + oldSocket := mgo.GetStats().SocketsAlive + + session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000") + c.Assert(err, IsNil) + defer session.Close() + + parallel := 10 + res := make(chan error, parallel+1) + wg := &sync.WaitGroup{} + for i := 1; i < parallel; i++ { + wg.Add(1) + go func() { + s := session.Copy() + defer s.Close() + result := struct{}{} + err := s.Run("ping", &result) + + //sleep random time to make the allocate and release in different sequence + time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond) + res <- err + wg.Done() + }() + } + wg.Wait() + stats := mgo.GetStats() + c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket) + + // give some time for shrink the pool, the tick is set to 1 minute + c.Log("Sleeping... 1 minute to for pool shrinking") + time.Sleep(60 * time.Second) + + stats = mgo.GetStats() + c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket) + c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false) +} + func (s *S) TestURLReadPreferenceTags(c *C) { type test struct { url string diff --git a/socket.go b/socket.go index a9124b043..ae13e401f 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release sendMeta sync.Once }