From b56c5f04c2e95f30abfcaf600f48b0a6cd5b0e67 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 27 Feb 2018 17:06:18 +1100 Subject: [PATCH 1/3] Add signaling support for connection pool waiting The current behaviour when the poolLimit is reached and a new connection is required is to poll every 100ms to see if there is now headroom to make a new connection. This adds tremendous latency to the limit-hit-path. This commit changes the checkout behaviour to watch on a condition variable for connections to become available, and the checkin behaviour to signal this variable. This should allow waiters to use connections immediately after they become available. A new parameter is also added to DialInfo, PoolTimeout, which is the maximum time that clients will wait for connection headroom to become available. By default this is unlimited. --- cluster.go | 22 ++++++++------ cluster_test.go | 26 +++++++++++++++++ server.go | 76 +++++++++++++++++++++++++++++++++++++++++++++---- session.go | 25 +++++++++++++++- 4 files changed, 133 insertions(+), 16 deletions(-) diff --git a/cluster.go b/cluster.go index 91a4e9ec2..4e54c5d81 100644 --- a/cluster.go +++ b/cluster.go @@ -618,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { + return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0) +} + +// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is +// true, it will attempt to return a socket to a slave server. If it is +// false, the socket will necessarily be to a master server. +func (cluster *mongoCluster) AcquireSocketWithPoolTimeout( + mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration, +) (s *mongoSocket, err error) { var started time.Time var syncCount uint - warnedLimit := false for { cluster.RLock() for { @@ -662,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout continue } - s, abended, err := server.AcquireSocket(poolLimit, socketTimeout) - if err == errPoolLimit { - if !warnedLimit { - warnedLimit = true - log("WARNING: Per-server connection limit reached.") - } - time.Sleep(100 * time.Millisecond) - continue + s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout) + if err == errPoolTimeout { + // No need to remove servers from the topology if acquiring a socket fails for this reason. + return nil, err } if err != nil { cluster.removeServer(server) diff --git a/cluster_test.go b/cluster_test.go index a0a197048..59a59f1c5 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1649,6 +1649,32 @@ func (s *S) TestPoolLimitMany(c *C) { c.Assert(delay < 6e9, Equals, true) } +func (s *S) TestPoolLimitTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + session.SetPoolTimeout(1 * time.Second) + session.SetPoolLimit(1) + + // Put one socket in use. + c.Assert(session.Ping(), IsNil) + + // Now block trying to get another one due to the pool limit. + copy := session.Copy() + defer copy.Close() + started := time.Now() + err = copy.Ping() + delay := time.Since(started) + + c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err)) +} + func (s *S) TestSetModeEventualIterBug(c *C) { session1, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) diff --git a/server.go b/server.go index f999b2407..5432a9e1f 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,7 @@ type mongoServer struct { abended bool minPoolSize int maxIdleTimeMS int + poolWaiter *sync.Cond } type dialer struct { @@ -78,18 +79,19 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ Addr: addr, ResolvedAddr: tcpaddr.String(), tcpaddr: tcpaddr, - sync: sync, + sync: syncChan, dial: dial, info: &defaultServerInfo, pingValue: time.Hour, // Push it back before an actual ping. minPoolSize: minPoolSize, maxIdleTimeMS: maxIdleTimeMS, } + server.poolWaiter = sync.NewCond(server) go server.pinger(true) if maxIdleTimeMS != 0 { go server.poolShrinker() @@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m } var errPoolLimit = errors.New("per-server connection limit reached") +var errPoolTimeout = errors.New("could not acquire connection within pool timeout") var errServerClosed = errors.New("server was closed") // AcquireSocket returns a socket for communicating with the server. @@ -109,6 +112,21 @@ var errServerClosed = errors.New("server was closed") // use in this server is greater than the provided limit, errPoolLimit is // returned. func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond) +} + +// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_ +// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout +// should elapse before a socket is available, it will return errPoolTimeout. +func (server *mongoServer) AcquireSocketWithBlocking( + poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout) +} + +func (server *mongoServer) acquireSocketInternal( + poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { for { server.Lock() abended = server.abended @@ -116,11 +134,49 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) ( server.Unlock() return nil, abended, errServerClosed } - n := len(server.unusedSockets) - if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit { - server.Unlock() - return nil, false, errPoolLimit + if poolLimit > 0 { + if shouldBlock { + // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout + // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, + // and fail if it is blown. + // Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition + // variable per waiter, which would involve loop traversal in the RecycleSocket + // method. + // We also can't use the approach of turning a condition variable into a channel outlined in + // https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine. + waitDone := make(chan struct{}) + timeoutHit := false + if poolTimeout > 0 { + go func() { + select { + case <-waitDone: + case <-time.After(poolTimeout): + // timeoutHit is part of the wait condition, so needs to be changed under mutex. + server.Lock() + defer server.Unlock() + timeoutHit = true + server.poolWaiter.Broadcast() + } + }() + } + for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { + // unlocks server mutex, waits, and locks again. Thus, the condition + // above is evaluated only when the lock is held. + server.poolWaiter.Wait() + } + close(waitDone) + if timeoutHit { + server.Unlock() + return nil, false, errPoolTimeout + } + } else { + if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { + server.Unlock() + return nil, false, errPoolLimit + } + } } + n := len(server.unusedSockets) if n > 0 { socket = server.unusedSockets[n-1] server.unusedSockets[n-1] = nil // Help GC. @@ -231,6 +287,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) { socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } + // If anybody is waiting for a connection, they should try now. + // Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket + // and AcquireSocketWithBlocking allow the caller to specify the max number of connections, + // rather than that being an intrinsic property of the connection pool (I assume to ensure + // that there is always a connection available for replset topology discovery). Thus, once + // a connection is returned to the pool, _every_ waiter needs to check if the connection count + // is underneath their particular value for poolLimit. + server.poolWaiter.Broadcast() server.Unlock() } diff --git a/session.go b/session.go index 3a27caf30..a9f3a8cb2 100644 --- a/session.go +++ b/session.go @@ -92,6 +92,7 @@ type Session struct { syncTimeout time.Duration sockTimeout time.Duration poolLimit int + poolTimeout time.Duration consistency Mode creds []Credential dialCred *Credential @@ -486,6 +487,11 @@ type DialInfo struct { // See Session.SetPoolLimit for details. PoolLimit int + // PoolTimeout defines max time to wait for a connection to become available + // if the pool limit is reaqched. Defaults to zero, which means forever. + // See Session.SetPoolTimeout for details + PoolTimeout time.Duration + // The identifier of the client application which ran the operation. AppName string @@ -596,6 +602,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) { cluster.minPoolSize = info.MinPoolSize cluster.maxIdleTimeMS = info.MaxIdleTimeMS + if info.PoolTimeout > 0 { + session.poolTimeout = info.PoolTimeout + } + cluster.Release() // People get confused when we return a session that is not actually @@ -711,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) { syncTimeout: session.syncTimeout, sockTimeout: session.sockTimeout, poolLimit: session.poolLimit, + poolTimeout: session.poolTimeout, consistency: session.consistency, creds: creds, dialCred: session.dialCred, @@ -2051,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) { s.m.Unlock() } +// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse +// an existing connection from the pool if the PoolLimit has been reached. If +// the value is exceeded, the attempt to use a session will fail with an error. +// The default value is zero, which means to wait forever with no timeout. +func (s *Session) SetPoolTimeout(timeout time.Duration) { + s.m.Lock() + s.poolTimeout = timeout + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. @@ -4908,7 +4929,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) + sock, err := s.cluster().AcquireSocketWithPoolTimeout( + s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout, + ) if err != nil { return nil, err } From 891f21dd6b2afd2620d335435c82da29e84e3801 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 24 Mar 2018 09:11:11 +1100 Subject: [PATCH 2/3] Gitignore .vscode directory I'm using vscode and accidently committed the .vscode directroy; .gitignore this footgun. --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 9f4fa6d20..aae58a8c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ _harness - +.vscode From a58d78964415a9ea6906c625faf5f8f89dc6495a Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Fri, 9 Mar 2018 10:10:41 +1100 Subject: [PATCH 3/3] Add stats for connection pool timings This exposes four new counters * The number of times a socket was successfully obtained from the connection pool * The number of times the connection pool needed to be waited on * The total time that has been spent waiting for a conneciton to become available * The number of times socket acquisition failed due to a pool timeout --- cluster_test.go | 16 ++++++++++++++++ server.go | 9 +++++++++ stats.go | 45 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 59a59f1c5..be11dc1a7 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) { } defer session.Close() + // So we can measure the stats for the blocking operation + mgo.ResetStats() + // Put one socket in use. c.Assert(session.Ping(), IsNil) @@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) { session.Refresh() delay := <-done c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + stats := mgo.GetStats() + c.Assert(stats.TimesSocketAcquired, Equals, 2) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.PoolTimeouts, Equals, 0) + c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true) } } @@ -1660,6 +1668,8 @@ func (s *S) TestPoolLimitTimeout(c *C) { session.SetPoolTimeout(1 * time.Second) session.SetPoolLimit(1) + mgo.ResetStats() + // Put one socket in use. c.Assert(session.Ping(), IsNil) @@ -1673,6 +1683,12 @@ func (s *S) TestPoolLimitTimeout(c *C) { c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err)) + stats := mgo.GetStats() + c.Assert(stats.PoolTimeouts, Equals, 1) + c.Assert(stats.TimesSocketAcquired, Equals, 1) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true) + c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true) } func (s *S) TestSetModeEventualIterBug(c *C) { diff --git a/server.go b/server.go index 5432a9e1f..7832bec1b 100644 --- a/server.go +++ b/server.go @@ -159,16 +159,25 @@ func (server *mongoServer) acquireSocketInternal( } }() } + timeSpentWaiting := time.Duration(0) for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { + // We only count time spent in Wait(), and not time evaluating the entire loop, + // so that in the happy non-blocking path where the condition above evaluates true + // first time, we record a nice round zero wait time. + waitStart := time.Now() // unlocks server mutex, waits, and locks again. Thus, the condition // above is evaluated only when the lock is held. server.poolWaiter.Wait() + timeSpentWaiting += time.Since(waitStart) } close(waitDone) if timeoutHit { server.Unlock() + stats.noticePoolTimeout(timeSpentWaiting) return nil, false, errPoolTimeout } + // Record that we fetched a connection of of a socket list and how long we spent waiting + stats.noticeSocketAcquisition(timeSpentWaiting) } else { if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { server.Unlock() diff --git a/stats.go b/stats.go index dcbd01045..8cf4ecec1 100644 --- a/stats.go +++ b/stats.go @@ -28,6 +28,7 @@ package mgo import ( "sync" + "time" ) var stats *Stats @@ -77,15 +78,19 @@ func ResetStats() { // // TODO outdated fields ? type Stats struct { - Clusters int - MasterConns int - SlaveConns int - SentOps int - ReceivedOps int - ReceivedDocs int - SocketsAlive int - SocketsInUse int - SocketRefs int + Clusters int + MasterConns int + SlaveConns int + SentOps int + ReceivedOps int + ReceivedDocs int + SocketsAlive int + SocketsInUse int + SocketRefs int + TimesSocketAcquired int + TimesWaitedForPool int + TotalPoolWaitTime time.Duration + PoolTimeouts int } func (stats *Stats) cluster(delta int) { @@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) { statsMutex.Unlock() } } + +func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesSocketAcquired++ + stats.TotalPoolWaitTime += waitTime + if waitTime > 0 { + stats.TimesWaitedForPool++ + } + statsMutex.Unlock() + } +} + +func (stats *Stats) noticePoolTimeout(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesWaitedForPool++ + stats.PoolTimeouts++ + stats.TotalPoolWaitTime += waitTime + statsMutex.Unlock() + } +}