From 389c0189377091056fbefb990456ffdb648c839e Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Mon, 20 Oct 2025 12:53:41 -0700 Subject: [PATCH 1/6] Fix race condition in TServerSocket.Addr() --- .../apache/thrift/lib/go/thrift/server_socket.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index d6e9495d..43beff7f 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -26,12 +26,12 @@ import ( ) type TServerSocket struct { - listener net.Listener addr net.Addr clientTimeout time.Duration - // Protects the interrupted value to make it thread safe. + // Protects the listener and interrupted fields to make them thread safe. mu sync.RWMutex + listener net.Listener interrupted bool } @@ -96,7 +96,9 @@ func (p *TServerSocket) Open() error { } func (p *TServerSocket) Addr() net.Addr { - if p.listener != nil { + p.mu.RLock() + defer p.mu.RUnlock() + if p.IsListening() != nil { return p.listener.Addr() } return p.addr From ce6a5af773f4e4c2c2371d72a11eb7c24911c325 Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Mon, 20 Oct 2025 12:56:07 -0700 Subject: [PATCH 2/6] Fix race condition in TServerSocket.Addr() --- .../github.com/apache/thrift/lib/go/thrift/server_socket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index 43beff7f..2b57d4c3 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -98,7 +98,7 @@ func (p *TServerSocket) Open() error { func (p *TServerSocket) Addr() net.Addr { p.mu.RLock() defer p.mu.RUnlock() - if p.IsListening() != nil { + if p.IsListening() { return p.listener.Addr() } return p.addr From 4352f5b77c29c4e8efaa0dbd7ff88616c3295845 Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Mon, 27 Oct 2025 10:41:45 -0700 Subject: [PATCH 3/6] full sync with upstream --- .../thrift/lib/go/thrift/server_socket.go | 224 ++++++++++-------- 1 file changed, 120 insertions(+), 104 deletions(-) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index 2b57d4c3..23fcb7fd 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -17,108 +17,124 @@ * under the License. */ -package thrift + package thrift -import ( - "net" - "sync" - "time" -) - -type TServerSocket struct { - addr net.Addr - clientTimeout time.Duration - - // Protects the listener and interrupted fields to make them thread safe. - mu sync.RWMutex - listener net.Listener - interrupted bool -} - -func NewTServerSocket(listenAddr string) (*TServerSocket, error) { - return NewTServerSocketTimeout(listenAddr, 0) -} - -func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { - addr, err := net.ResolveTCPAddr("tcp", listenAddr) - if err != nil { - return nil, err - } - return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil -} - -func (p *TServerSocket) Listen() error { - if p.IsListening() { - return nil - } - l, err := net.Listen(p.addr.Network(), p.addr.String()) - if err != nil { - return err - } - p.listener = l - return nil -} - -func (p *TServerSocket) Accept() (TTransport, error) { - p.mu.RLock() - interrupted := p.interrupted - p.mu.RUnlock() - - if interrupted { - return nil, errTransportInterrupted - } - if p.listener == nil { - return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") - } - conn, err := p.listener.Accept() - if err != nil { - return nil, NewTTransportExceptionFromError(err) - } - return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil -} - -// Checks whether the socket is listening. -func (p *TServerSocket) IsListening() bool { - return p.listener != nil -} - -// Connects the socket, creating a new socket object if necessary. -func (p *TServerSocket) Open() error { - if p.IsListening() { - return NewTTransportException(ALREADY_OPEN, "Server socket already open") - } - if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { - return err - } else { - p.listener = l - } - return nil -} - -func (p *TServerSocket) Addr() net.Addr { - p.mu.RLock() - defer p.mu.RUnlock() - if p.IsListening() { - return p.listener.Addr() - } - return p.addr -} - -func (p *TServerSocket) Close() error { - defer func() { - p.listener = nil - }() - if p.IsListening() { - return p.listener.Close() - } - return nil -} - -func (p *TServerSocket) Interrupt() error { - p.mu.Lock() - p.interrupted = true - p.Close() - p.mu.Unlock() - - return nil -} + import ( + "net" + "sync" + "time" + ) + + type TServerSocket struct { + addr net.Addr + clientTimeout time.Duration + + // Protects the listener and interrupted fields to make them thread safe. + mu sync.RWMutex + listener net.Listener + interrupted bool + } + + func NewTServerSocket(listenAddr string) (*TServerSocket, error) { + return NewTServerSocketTimeout(listenAddr, 0) + } + + func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { + addr, err := net.ResolveTCPAddr("tcp", listenAddr) + if err != nil { + return nil, err + } + return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil + } + + // Creates a TServerSocket from a net.Addr + func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { + return &TServerSocket{addr: addr, clientTimeout: clientTimeout} + } + + func (p *TServerSocket) Listen() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.IsListening() { + return nil + } + l, err := net.Listen(p.addr.Network(), p.addr.String()) + if err != nil { + return err + } + p.listener = l + return nil + } + + func (p *TServerSocket) Accept() (TTransport, error) { + p.mu.RLock() + interrupted := p.interrupted + p.mu.RUnlock() + + if interrupted { + return nil, errTransportInterrupted + } + + p.mu.Lock() + listener := p.listener + p.mu.Unlock() + if listener == nil { + return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") + } + + conn, err := listener.Accept() + if err != nil { + return nil, NewTTransportExceptionFromError(err) + } + return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil + } + + // Checks whether the socket is listening. + func (p *TServerSocket) IsListening() bool { + return p.listener != nil + } + + // Connects the socket, creating a new socket object if necessary. + func (p *TServerSocket) Open() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.IsListening() { + return NewTTransportException(ALREADY_OPEN, "Server socket already open") + } + if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { + return err + } else { + p.listener = l + } + return nil + } + + func (p *TServerSocket) Addr() net.Addr { + p.mu.RLock() + defer p.mu.RUnlock() + if p.IsListening() { + return p.listener.Addr() + } + return p.addr + } + + func (p *TServerSocket) Close() error { + var err error + p.mu.Lock() + if p.IsListening() { + err = p.listener.Close() + p.listener = nil + } + p.mu.Unlock() + return err + } + + func (p *TServerSocket) Interrupt() error { + p.mu.Lock() + p.interrupted = true + p.mu.Unlock() + p.Close() + + return nil + } + \ No newline at end of file From 897191cc2f46bd654e2de8a5c10200d9ab390e4f Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Mon, 27 Oct 2025 11:03:45 -0700 Subject: [PATCH 4/6] full sync with upstream v2 --- .../thrift/lib/go/thrift/server_socket.go | 235 +++++++++--------- 1 file changed, 114 insertions(+), 121 deletions(-) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index 23fcb7fd..5c939ca4 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -17,124 +17,117 @@ * under the License. */ - package thrift - - import ( - "net" - "sync" - "time" - ) - - type TServerSocket struct { - addr net.Addr - clientTimeout time.Duration - - // Protects the listener and interrupted fields to make them thread safe. - mu sync.RWMutex - listener net.Listener - interrupted bool - } - - func NewTServerSocket(listenAddr string) (*TServerSocket, error) { - return NewTServerSocketTimeout(listenAddr, 0) - } - - func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { - addr, err := net.ResolveTCPAddr("tcp", listenAddr) - if err != nil { - return nil, err - } - return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil - } - - // Creates a TServerSocket from a net.Addr - func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { - return &TServerSocket{addr: addr, clientTimeout: clientTimeout} - } - - func (p *TServerSocket) Listen() error { - p.mu.Lock() - defer p.mu.Unlock() - if p.IsListening() { - return nil - } - l, err := net.Listen(p.addr.Network(), p.addr.String()) - if err != nil { - return err - } - p.listener = l - return nil - } - - func (p *TServerSocket) Accept() (TTransport, error) { - p.mu.RLock() - interrupted := p.interrupted - p.mu.RUnlock() - - if interrupted { - return nil, errTransportInterrupted - } - - p.mu.Lock() - listener := p.listener - p.mu.Unlock() - if listener == nil { - return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") - } - - conn, err := listener.Accept() - if err != nil { - return nil, NewTTransportExceptionFromError(err) - } - return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil - } - - // Checks whether the socket is listening. - func (p *TServerSocket) IsListening() bool { - return p.listener != nil - } - - // Connects the socket, creating a new socket object if necessary. - func (p *TServerSocket) Open() error { - p.mu.Lock() - defer p.mu.Unlock() - if p.IsListening() { - return NewTTransportException(ALREADY_OPEN, "Server socket already open") - } - if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { - return err - } else { - p.listener = l - } - return nil - } - - func (p *TServerSocket) Addr() net.Addr { - p.mu.RLock() - defer p.mu.RUnlock() - if p.IsListening() { - return p.listener.Addr() - } - return p.addr - } - - func (p *TServerSocket) Close() error { - var err error - p.mu.Lock() - if p.IsListening() { - err = p.listener.Close() - p.listener = nil - } - p.mu.Unlock() - return err - } - - func (p *TServerSocket) Interrupt() error { - p.mu.Lock() - p.interrupted = true - p.mu.Unlock() - p.Close() - - return nil - } - \ No newline at end of file +package thrift + +import ( + "net" + "sync" + "time" +) + +type TServerSocket struct { + addr net.Addr + clientTimeout time.Duration + + // Protects the listener and interrupted fields to make them thread safe. + mu sync.RWMutex + listener net.Listener + interrupted bool +} + +func NewTServerSocket(listenAddr string) (*TServerSocket, error) { + return NewTServerSocketTimeout(listenAddr, 0) +} + +func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) { + addr, err := net.ResolveTCPAddr("tcp", listenAddr) + if err != nil { + return nil, err + } + return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil +} + +func (p *TServerSocket) Listen() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.IsListening() { + return nil + } + l, err := net.Listen(p.addr.Network(), p.addr.String()) + if err != nil { + return err + } + p.listener = l + return nil +} + +func (p *TServerSocket) Accept() (TTransport, error) { + p.mu.RLock() + interrupted := p.interrupted + p.mu.RUnlock() + + if interrupted { + return nil, errTransportInterrupted + } + + p.mu.Lock() + listener := p.listener + p.mu.Unlock() + if listener == nil { + return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") + } + conn, err := listener.Accept() + if err != nil { + return nil, NewTTransportExceptionFromError(err) + } + return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil +} + +// Checks whether the socket is listening. +func (p *TServerSocket) IsListening() bool { + return p.listener != nil +} + +// Connects the socket, creating a new socket object if necessary. +func (p *TServerSocket) Open() error { + p.mu.Lock() + defer p.mu.Unlock() + if p.IsListening() { + return NewTTransportException(ALREADY_OPEN, "Server socket already open") + } + if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { + return err + } else { + p.listener = l + } + return nil +} + +func (p *TServerSocket) Addr() net.Addr { + p.mu.RLock() + defer p.mu.RUnlock() + if p.IsListening() { + return p.listener.Addr() + } + return p.addr +} + +func (p *TServerSocket) Close() error { + var err error + p.mu.Lock() + if p.IsListening() { + err = p.listener.Close() + p.listener = nil + } + p.mu.Unlock() + return err +} + +func (p *TServerSocket) Interrupt() error { + p.mu.Lock() + p.interrupted = true + p.mu.Unlock() + p.Close() + + return nil +} From 5aedb8f7597a370fd6952971cf763aeeba1f8181 Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Mon, 27 Oct 2025 11:12:42 -0700 Subject: [PATCH 5/6] adding NewTServerSocketFromAddrTimeout func to sync with upstream changes --- .../github.com/apache/thrift/lib/go/thrift/server_socket.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index 5c939ca4..ee0aed30 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -47,6 +47,11 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil } +// Creates a TServerSocket from a net.Addr +func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { + return &TServerSocket{addr: addr, clientTimeout: clientTimeout} +} + func (p *TServerSocket) Listen() error { p.mu.Lock() defer p.mu.Unlock() From f00aa20d0932e771e5504b58cdec1e61fad00bba Mon Sep 17 00:00:00 2001 From: Atharva Chiplunkar Date: Tue, 28 Oct 2025 14:05:47 -0700 Subject: [PATCH 6/6] replacing write lock to read lock for reading listener --- .../github.com/apache/thrift/lib/go/thrift/server_socket.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go index ee0aed30..793408df 100644 --- a/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go +++ b/thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go @@ -69,15 +69,12 @@ func (p *TServerSocket) Listen() error { func (p *TServerSocket) Accept() (TTransport, error) { p.mu.RLock() interrupted := p.interrupted + listener := p.listener p.mu.RUnlock() if interrupted { return nil, errTransportInterrupted } - - p.mu.Lock() - listener := p.listener - p.mu.Unlock() if listener == nil { return nil, NewTTransportException(NOT_OPEN, "No underlying server socket") }