@@ -57,6 +57,7 @@ type mongoServer struct {
5757 abended bool
5858 minPoolSize int
5959 maxIdleTimeMS int
60+ poolWaiter * sync.Cond
6061}
6162
6263type dialer struct {
@@ -78,18 +79,19 @@ type mongoServerInfo struct {
7879
7980var defaultServerInfo mongoServerInfo
8081
81- func newServer (addr string , tcpaddr * net.TCPAddr , sync chan bool , dial dialer , minPoolSize , maxIdleTimeMS int ) * mongoServer {
82+ func newServer (addr string , tcpaddr * net.TCPAddr , syncChan chan bool , dial dialer , minPoolSize , maxIdleTimeMS int ) * mongoServer {
8283 server := & mongoServer {
8384 Addr : addr ,
8485 ResolvedAddr : tcpaddr .String (),
8586 tcpaddr : tcpaddr ,
86- sync : sync ,
87+ sync : syncChan ,
8788 dial : dial ,
8889 info : & defaultServerInfo ,
8990 pingValue : time .Hour , // Push it back before an actual ping.
9091 minPoolSize : minPoolSize ,
9192 maxIdleTimeMS : maxIdleTimeMS ,
9293 }
94+ server .poolWaiter = sync .NewCond (server )
9395 go server .pinger (true )
9496 if maxIdleTimeMS != 0 {
9597 go server .poolShrinker ()
@@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m
98100}
99101
100102var errPoolLimit = errors .New ("per-server connection limit reached" )
103+ var errPoolTimeout = errors .New ("could not acquire connection within pool timeout" )
101104var errServerClosed = errors .New ("server was closed" )
102105
103106// AcquireSocket returns a socket for communicating with the server.
@@ -109,18 +112,80 @@ var errServerClosed = errors.New("server was closed")
109112// use in this server is greater than the provided limit, errPoolLimit is
110113// returned.
111114func (server * mongoServer ) AcquireSocket (poolLimit int , timeout time.Duration ) (socket * mongoSocket , abended bool , err error ) {
115+ return server .acquireSocketInternal (poolLimit , timeout , false , 0 * time .Millisecond )
116+ }
117+
118+ // AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_
119+ // return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout
120+ // should elapse before a socket is available, it will return errPoolTimeout.
121+ func (server * mongoServer ) AcquireSocketWithBlocking (
122+ poolLimit int , socketTimeout time.Duration , poolTimeout time.Duration ,
123+ ) (socket * mongoSocket , abended bool , err error ) {
124+ return server .acquireSocketInternal (poolLimit , socketTimeout , true , poolTimeout )
125+ }
126+
127+ func (server * mongoServer ) acquireSocketInternal (
128+ poolLimit int , timeout time.Duration , shouldBlock bool , poolTimeout time.Duration ,
129+ ) (socket * mongoSocket , abended bool , err error ) {
112130 for {
113131 server .Lock ()
114132 abended = server .abended
115133 if server .closed {
116134 server .Unlock ()
117135 return nil , abended , errServerClosed
118136 }
119- n := len (server .unusedSockets )
120- if poolLimit > 0 && len (server .liveSockets )- n >= poolLimit {
121- server .Unlock ()
122- return nil , false , errPoolLimit
137+ if poolLimit > 0 {
138+ if shouldBlock {
139+ // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout
140+ // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout,
141+ // and fail if it is blown.
142+ // Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition
143+ // variable per waiter, which would involve loop traversal in the RecycleSocket
144+ // method.
145+ // We also can't use the approach of turning a condition variable into a channel outlined in
146+ // https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine.
147+ waitDone := make (chan struct {})
148+ timeoutHit := false
149+ if poolTimeout > 0 {
150+ go func () {
151+ select {
152+ case <- waitDone :
153+ case <- time .After (poolTimeout ):
154+ // timeoutHit is part of the wait condition, so needs to be changed under mutex.
155+ server .Lock ()
156+ defer server .Unlock ()
157+ timeoutHit = true
158+ server .poolWaiter .Broadcast ()
159+ }
160+ }()
161+ }
162+ timeSpentWaiting := time .Duration (0 )
163+ for len (server .liveSockets )- len (server .unusedSockets ) >= poolLimit && ! timeoutHit {
164+ // We only count time spent in Wait(), and not time evaluating the entire loop,
165+ // so that in the happy non-blocking path where the condition above evaluates true
166+ // first time, we record a nice round zero wait time.
167+ waitStart := time .Now ()
168+ // unlocks server mutex, waits, and locks again. Thus, the condition
169+ // above is evaluated only when the lock is held.
170+ server .poolWaiter .Wait ()
171+ timeSpentWaiting += time .Since (waitStart )
172+ }
173+ close (waitDone )
174+ if timeoutHit {
175+ server .Unlock ()
176+ stats .noticePoolTimeout (timeSpentWaiting )
177+ return nil , false , errPoolTimeout
178+ }
179+ // Record that we fetched a connection of of a socket list and how long we spent waiting
180+ stats .noticeSocketAcquisition (timeSpentWaiting )
181+ } else {
182+ if len (server .liveSockets )- len (server .unusedSockets ) >= poolLimit {
183+ server .Unlock ()
184+ return nil , false , errPoolLimit
185+ }
186+ }
123187 }
188+ n := len (server .unusedSockets )
124189 if n > 0 {
125190 socket = server .unusedSockets [n - 1 ]
126191 server .unusedSockets [n - 1 ] = nil // Help GC.
@@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
231296 socket .lastTimeUsed = time .Now ()
232297 server .unusedSockets = append (server .unusedSockets , socket )
233298 }
299+ // If anybody is waiting for a connection, they should try now.
300+ // Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket
301+ // and AcquireSocketWithBlocking allow the caller to specify the max number of connections,
302+ // rather than that being an intrinsic property of the connection pool (I assume to ensure
303+ // that there is always a connection available for replset topology discovery). Thus, once
304+ // a connection is returned to the pool, _every_ waiter needs to check if the connection count
305+ // is underneath their particular value for poolLimit.
306+ server .poolWaiter .Broadcast ()
234307 server .Unlock ()
235308}
236309
0 commit comments