Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
71eba71
Add deadlines to Transport
nkryuchkov Aug 14, 2019
ed38420
Improve logic of handling deadlines
nkryuchkov Aug 14, 2019
1f20f25
Improve logic of handling deadlines
nkryuchkov Aug 14, 2019
a0d9802
Get rid of a goto
nkryuchkov Aug 14, 2019
0169098
Implement deadline for pending i/o
nkryuchkov Aug 14, 2019
7406640
Fix a typo
nkryuchkov Aug 14, 2019
aaf426e
Improve logic
nkryuchkov Aug 14, 2019
808195d
Change implementation of deadlines
nkryuchkov Aug 15, 2019
76f44b7
Fix uninitialized channels, improve implementation
nkryuchkov Aug 15, 2019
dadc93f
Add tests for deadlines
nkryuchkov Aug 15, 2019
a2cab90
Improve deadline tests names
nkryuchkov Aug 15, 2019
7bc89e2
Run deadline tests in parallel
nkryuchkov Aug 15, 2019
079158c
Add a test case
nkryuchkov Aug 15, 2019
2d9aeee
Group deadline variables to a struct
nkryuchkov Aug 15, 2019
0edd95a
Minor improvements
nkryuchkov Aug 16, 2019
80a47c8
Add deadline.Serve
nkryuchkov Aug 16, 2019
d316c9e
Add deadlines checks before reading/writing
nkryuchkov Aug 16, 2019
952c353
Make deadlines checks before reading/writing atomic
nkryuchkov Aug 16, 2019
a286d79
Fix issues
nkryuchkov Aug 17, 2019
bf6a935
Refactor deadlines
nkryuchkov Aug 17, 2019
cc5acf9
Fix a logic issue
nkryuchkov Aug 17, 2019
ade4248
Get rid of an else statement
nkryuchkov Aug 17, 2019
b50c331
Fix concurrency bugs
nkryuchkov Aug 19, 2019
7583b23
Add basic tests for deadlines
nkryuchkov Aug 19, 2019
fb1edee
Fix naming and logic issues
nkryuchkov Aug 20, 2019
08ec36e
Add a concurrent test for deadlines
nkryuchkov Aug 20, 2019
29e4ddf
Merge remote-tracking branch 'upstream/mainnet-milestone1' into featu…
Aug 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestClient(t *testing.T) {
conn1.mx.RUnlock()
assert.Equal(t, initID+2, newInitID)

assert.NoError(t, closeClosers(conn1, conn2))
assert.NoError(t, closeClosers(conn1, conn2, tr1))
checkClientConnsClosed(t, conn1, conn2)

assert.Error(t, errWithTimeout(serveErrCh1))
Expand Down
78 changes: 78 additions & 0 deletions deadline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package dmsg

import (
"io"
"math"
"sync"
"sync/atomic"
"time"
)

type deadline struct {
mu sync.Mutex
timestamp int64
ch chan struct{}
modCh chan struct{}
modChClosed uint32
}

func newDeadline() *deadline {
return &deadline{
timestamp: math.MaxInt64,
ch: make(chan struct{}),
modCh: make(chan struct{}, 1),
}
}

func (d *deadline) Close() error {
d.mu.Lock()
defer d.mu.Unlock()

atomic.StoreUint32(&d.modChClosed, 1)
close(d.modCh)

return nil
}

func (d *deadline) Set(t time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()

if atomic.LoadUint32(&d.modChClosed) != 0 {
return io.ErrClosedPipe
}

atomic.StoreInt64(&d.timestamp, t.UnixNano())
d.modCh <- struct{}{}

return nil
}

func (d *deadline) Serve() error {
defer close(d.ch)
for {
if d.Exceeded() {
select {
case d.ch <- struct{}{}:
case _, ok := <-d.modCh:
if !ok {
return nil
}
}
continue
}
if _, ok := <-d.modCh; !ok {
return nil
}
}
}

func (d *deadline) Exceeded() bool {
now := time.Now().UnixNano()
deadline := atomic.LoadInt64(&d.timestamp)
return now >= deadline
}

func (d *deadline) Chan() <-chan struct{} {
return d.ch
}
133 changes: 133 additions & 0 deletions deadline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package dmsg

import (
"math"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_newDeadline(t *testing.T) {
deadline := newDeadline()
require.NotNil(t, deadline)

require.EqualValues(t, math.MaxInt64, deadline.timestamp)
require.EqualValues(t, 0, deadline.modChClosed)

ch := deadline.ch
require.NotNil(t, ch)

modCh := deadline.modCh
require.NotNil(t, modCh)
}

func Test_deadline_SetDeadline(t *testing.T) {
deadline := newDeadline()

var wg sync.WaitGroup
var serveErr error
wg.Add(1)
go func() {
defer wg.Done()
serveErr = deadline.Serve()
}()

now := time.Now()
cases := []struct {
deadline time.Time
unixNano int64
}{
{
deadline: now.Add(1 * time.Second),
unixNano: now.Add(1 * time.Second).UnixNano(),
},
{
deadline: now.Add(-1 * time.Second),
unixNano: now.Add(-1 * time.Second).UnixNano(),
},
}

for _, tc := range cases {
require.NoError(t, deadline.Set(tc.deadline))
require.Equal(t, tc.unixNano, deadline.timestamp)
}

require.NoError(t, deadline.Close())
require.NotEqual(t, 0, deadline.modChClosed)

wg.Wait()
require.NoError(t, serveErr)
}

func Test_deadline_Chan(t *testing.T) {
deadline := newDeadline()
ch := deadline.Chan()
require.NotNil(t, ch)
}

func Test_deadline_Exceeded(t *testing.T) {
deadline := newDeadline()

var wg sync.WaitGroup
var serveErr error
wg.Add(1)
go func() {
defer wg.Done()
serveErr = deadline.Serve()
}()

now := time.Now()
cases := []struct {
deadline time.Time
exceeded bool
}{
{
deadline: now.Add(1 * time.Second),
exceeded: false,
},
{
deadline: now.Add(-1 * time.Second),
exceeded: true,
},
}

for _, tc := range cases {
require.NoError(t, deadline.Set(tc.deadline))
require.Equal(t, tc.exceeded, deadline.Exceeded())
}

require.NoError(t, deadline.Close())
require.NotEqual(t, 0, deadline.modChClosed)

wg.Wait()
require.NoError(t, serveErr)
}

func Test_deadline_concurrency(t *testing.T) {
t.Run("Set and close", func(t *testing.T) {
deadline := newDeadline()

var wg sync.WaitGroup
var serveErr error
wg.Add(1)
go func() {
defer wg.Done()
serveErr = deadline.Serve()
}()

var setErr error
wg.Add(1)
go func() {
defer wg.Done()
setErr = deadline.Set(time.Now().Add(1 * time.Second))
}()

require.NoError(t, deadline.Close())

wg.Wait()
require.NoError(t, serveErr)
require.Error(t, setErr)
})
}
87 changes: 80 additions & 7 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
Expand Down Expand Up @@ -228,6 +229,10 @@ func TestServer_Serve(t *testing.T) {
testServerSelfDialing(t)
})

t.Run("Deadlines", func(t *testing.T) {
testServerDeadlines(t)
})

t.Run("Server disconnection closes transports", func(t *testing.T) {
testServerDisconnection(t)
})
Expand Down Expand Up @@ -255,7 +260,7 @@ func testServerDisconnection(t *testing.T) {
responder := createClient(t, dc, responderName)
initiator := createClient(t, dc, initiatorName)
initConn, respConns := dial(t, initiator, responder, port, noDelay)
testTransportMessaging(t, initConn, respConns)
require.NoError(t, testTransportMessaging(initConn, respConns, time.Duration(0)))

require.NoError(t, srv.Close())
require.NoError(t, errWithTimeout(srvErrCh))
Expand All @@ -276,24 +281,92 @@ func testServerSelfDialing(t *testing.T) {
client := createClient(t, dc, "client")
selfWrTp, selfRdTp := dial(t, client, client, port, noDelay)
// try to write/read message to/from self
testTransportMessaging(t, selfWrTp, selfRdTp)
require.NoError(t, testTransportMessaging(selfWrTp, selfRdTp, time.Duration(0)))
require.NoError(t, closeClosers(selfRdTp, selfWrTp, client))

assert.NoError(t, srv.Close())
assert.NoError(t, errWithTimeout(srvErrCh))
}

func testTransportMessaging(t *testing.T, init, resp io.ReadWriter) {
func testServerDeadlines(t *testing.T) {
t.Parallel()

now := time.Now()
cases := []struct {
deadline time.Time
error error
}{
{now.Add(10 * time.Second), nil},
{now.Add(-1 * time.Second), ErrDeadlineExceeded},
{now.Add(1 * time.Millisecond), ErrDeadlineExceeded},
{time.Unix(0, math.MaxInt64), nil},
}

dc := disc.NewMock()
srv, srvErrCh, err := createServer(dc)
require.NoError(t, err)

responder := createClient(t, dc, responderName)
initiator := createClient(t, dc, initiatorName)
initiatorTransport, responderTransport := dial(t, initiator, responder, port, noDelay)

t.Run("Read", func(t *testing.T) {
for _, tc := range cases {
require.NoError(t, responderTransport.SetReadDeadline(tc.deadline))
require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0)))
}
})

t.Run("Write", func(t *testing.T) {
for _, tc := range cases {
require.NoError(t, initiatorTransport.SetWriteDeadline(tc.deadline))
require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0)))
}
})

t.Run("Read and write", func(t *testing.T) {
for _, tc := range cases {
require.NoError(t, initiatorTransport.SetDeadline(tc.deadline))
require.NoError(t, responderTransport.SetDeadline(tc.deadline))
require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0)))
}
})

t.Run("Multiple read and write", func(t *testing.T) {
duration := 10 * time.Second

deadline := time.Now().Add(duration)
require.NoError(t, initiatorTransport.SetDeadline(deadline))
require.NoError(t, responderTransport.SetDeadline(deadline))
time.Sleep(duration)
require.Equal(t, ErrDeadlineExceeded, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0)))

deadline = time.Now().Add(duration)
require.NoError(t, initiatorTransport.SetDeadline(deadline))
require.NoError(t, responderTransport.SetDeadline(deadline))
require.Equal(t, ErrDeadlineExceeded, testTransportMessaging(initiatorTransport, responderTransport, duration))
})

assert.NoError(t, srv.Close())
assert.NoError(t, errWithTimeout(srvErrCh))
}

func testTransportMessaging(init, resp io.ReadWriter, delay time.Duration) error {
for i := 0; i < msgCount; i++ {
_, err := init.Write([]byte(message))
require.NoError(t, err) // TODO: Sometimes this returns error: "io: read/write on closed pipe"
if _, err := init.Write([]byte(message)); err != nil {
return err
}

time.Sleep(delay)

recvBuf := make([]byte, bufSize)
for i := 0; i < len(message); i += bufSize {
_, err := resp.Read(recvBuf)
require.NoError(t, err)
if _, err := resp.Read(recvBuf); err != nil {
return err
}
}
}
return nil
}

func testServerCappedTransport(t *testing.T) {
Expand Down
Loading