Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions thirdparty/github.com/apache/thrift/lib/go/thrift/server_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
)

type TServerSocket struct {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add more tests in server_socket_test.go and run them with race-detection flag

listener net.Listener
addr net.Addr
clientTimeout time.Duration

// Protects the interrupted value to make it thread safe.
// Protects the listener and interrupted fields to make them thread safe.
mu sync.RWMutex
listener net.Listener
interrupted bool
}

Expand All @@ -47,7 +47,14 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil
}

// Creates a TServerSocket from a net.Addr
func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket {
return &TServerSocket{addr: addr, clientTimeout: clientTimeout}
}

func (p *TServerSocket) Listen() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.IsListening() {
return nil
}
Expand All @@ -62,15 +69,16 @@ func (p *TServerSocket) Listen() error {
func (p *TServerSocket) Accept() (TTransport, error) {
p.mu.RLock()
interrupted := p.interrupted
listener := p.listener
Copy link

@pnndesh pnndesh Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/apache/thrift/blob/master/lib/go/thrift/server_socket.go, the listener is accessed under exclusive lock. Let's do the same here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a bug in upstream. fixed it on upstream apache/thrift@e7ab34e

p.mu.RUnlock()

if interrupted {
return nil, errTransportInterrupted
}
if p.listener == nil {
if listener == nil {
return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
}
conn, err := p.listener.Accept()
conn, err := listener.Accept()
if err != nil {
return nil, NewTTransportExceptionFromError(err)
}
Expand All @@ -84,6 +92,8 @@ func (p *TServerSocket) IsListening() bool {

// Connects the socket, creating a new socket object if necessary.
func (p *TServerSocket) Open() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.IsListening() {
return NewTTransportException(ALREADY_OPEN, "Server socket already open")
}
Expand All @@ -96,27 +106,30 @@ func (p *TServerSocket) Open() error {
}

func (p *TServerSocket) Addr() net.Addr {
if p.listener != nil {
p.mu.RLock()
defer p.mu.RUnlock()
if p.IsListening() {
return p.listener.Addr()
}
return p.addr
}

func (p *TServerSocket) Close() error {
defer func() {
p.listener = nil
}()
var err error
p.mu.Lock()
if p.IsListening() {
return p.listener.Close()
err = p.listener.Close()
p.listener = nil
}
return nil
p.mu.Unlock()
return err
}

func (p *TServerSocket) Interrupt() error {
p.mu.Lock()
p.interrupted = true
p.Close()
p.mu.Unlock()
p.Close()

return nil
}