Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,23 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
Expand Down Expand Up @@ -419,7 +421,7 @@ func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoSer
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.minPoolSize, cluster.maxIdleTimeMS)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit late, and I might be wrong here, but aren't you reading the cluster fields (minPoolSize, maxIdleTimeMs) outside of the read lock?

I assume they stay constant during the run of the application, but stil I would appreciate a comment here stating tha this is safe.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, thanks @szank

}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down
71 changes: 63 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type mongoServer struct {
pingCount uint32
closed bool
abended bool
minPoolSize int
maxIdleTimeMS int
}

type dialer struct {
Expand All @@ -76,17 +78,22 @@ type mongoServerInfo struct {

var defaultServerInfo mongoServerInfo

func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
Addr: addr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
pingValue: time.Hour, // Push it back before an actual ping.
minPoolSize: minPoolSize,
maxIdleTimeMS: maxIdleTimeMS,
}
go server.pinger(true)
if maxIdleTimeMS != 0 {
go server.poolShrinker()
}
return server
}

Expand Down Expand Up @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) {
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
server.Lock()
if !server.closed {
socket.lastTimeUsed = time.Now()
server.unusedSockets = append(server.unusedSockets, socket)
}
server.Unlock()
Expand Down Expand Up @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) {
}
}

func (server *mongoServer) poolShrinker() {
ticker := time.NewTicker(1 * time.Minute)
for _ = range ticker.C {
if server.closed {
ticker.Stop()
return
}
server.Lock()
unused := len(server.unusedSockets)
if unused < server.minPoolSize {
server.Unlock()
continue
}
now := time.Now()
end := 0
reclaimMap := map[*mongoSocket]bool{}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a minor nitpick, but map[*mongoSocket]struct{}{} is more idiomatic

// Because the acquirision and recycle are done at the tail of array,
// the head is always the oldest unused socket.
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
break
}
end++
reclaimMap[s] = true
}
tbr := server.unusedSockets[:end]
if end > 0 {
next := make([]*mongoSocket, unused-end)
copy(next, server.unusedSockets[end:])
server.unusedSockets = next
remainSockets := []*mongoSocket{}
for _, s := range server.liveSockets {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is late, and I might be missing something here, but it looks like you populate the reclaimMap with the unused sockets. Then you check if the liveSockets contain elements from the unusedSockets. Is it possible to liveSockets and unusedSockets to contain pointers to the same structs ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the liveSockets includes both in-use sockets and unusedSockets, and when we reclaim the timeout elements in unusedSockets, they should be removed from liveSockets as well. If you don't remove them, there would lead to a "leakage", i.e. when you allocate new socket, there are full of live sockets but no unused, and no sockets could be allocated then. I observed the case during my local test.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, thanks for the clarification :).

if !reclaimMap[s] {
remainSockets = append(remainSockets, s)
}
}
server.liveSockets = remainSockets
stats.conn(-1*end, server.info.Master)
}
server.Unlock()

for _, s := range tbr {
s.Close()
}
}
}

type mongoServerSlice []*mongoServer

func (s mongoServerSlice) Len() int {
Expand Down
42 changes: 41 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ const (
// Defines the per-server socket pool limit. Defaults to 4096.
// See Session.SetPoolLimit for details.
//
// minPoolSize=<limit>
//
// Defines the per-server socket pool minium size. Defaults to 0.
//
// maxIdleTimeMS=<millisecond>
//
// The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed.
//
// appName=<appName>
//
// The identifier of this client application. This parameter is used to
Expand Down Expand Up @@ -320,6 +329,8 @@ func ParseURL(url string) (*DialInfo, error) {
appName := ""
readPreferenceMode := Primary
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
for _, opt := range uinfo.options {
switch opt.key {
case "authSource":
Expand Down Expand Up @@ -366,6 +377,16 @@ func ParseURL(url string) (*DialInfo, error) {
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
}
readPreferenceTagSets = append(readPreferenceTagSets, doc)
case "minPoolSize":
minPoolSize, err = strconv.Atoi(opt.value)
if err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please return an error if the value is <0?
This is unlikely to happen, but it is still an invalid input and it would be better to notify the user instead of silently ignoring the values.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

return nil, errors.New("bad value for minPoolSize: " + opt.value)
}
case "maxIdleTimeMS":
maxIdleTimeMS, err = strconv.Atoi(opt.value)
if err != nil {
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
}
case "connect":
if opt.value == "direct" {
direct = true
Expand Down Expand Up @@ -400,6 +421,8 @@ func ParseURL(url string) (*DialInfo, error) {
TagSets: readPreferenceTagSets,
},
ReplicaSetName: setName,
MinPoolSize: minPoolSize,
MaxIdleTimeMS: maxIdleTimeMS,
}
return &info, nil
}
Expand Down Expand Up @@ -473,6 +496,14 @@ type DialInfo struct {
// cluster and establish connections with further servers too.
Direct bool

// MinPoolSize defines The minimum number of connections in the connection pool.
// Defaults to 0.
MinPoolSize int

//The maximum number of milliseconds that a connection can remain idle in the pool
// before being removed and closed.
MaxIdleTimeMS int

// DialServer optionally specifies the dial function for establishing
// connections with the MongoDB servers.
DialServer func(addr *ServerAddr) (net.Conn, error)
Expand Down Expand Up @@ -552,6 +583,15 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
if info.PoolLimit > 0 {
session.poolLimit = info.PoolLimit
}

if info.MinPoolSize > 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this checks should be reversed to return an error if MinPoolSize or MaxIdleTimeMS are <0

cluster.minPoolSize = info.MinPoolSize
}

if info.MaxIdleTimeMS > 0 {
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
}

cluster.Release()

// People get confused when we return a session that is not actually
Expand Down Expand Up @@ -5262,7 +5302,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string {
var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;")
//The elements in the sequence needs to be reversed when converting them
for i := len(*RDNElements) - 1; i >= 0; i-- {
var nameAndValueList = make([]string,len((*RDNElements)[i]))
var nameAndValueList = make([]string, len((*RDNElements)[i]))
for j, attribute := range (*RDNElements)[i] {
var shortAttributeName = rdnOIDToShortName(attribute.Type)
if len(shortAttributeName) <= 0 {
Expand Down
1 change: 1 addition & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type mongoSocket struct {
dead error
serverInfo *mongoServerInfo
closeAfterIdle bool
lastTimeUsed time.Time // for time based idle socket release
}

type queryOpFlags uint32
Expand Down