From 71eba71e43e3d310db59fa61bf53ba630609ad18 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 17:22:53 +0400 Subject: [PATCH 01/26] Add deadlines to Transport --- transport.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 97 insertions(+), 8 deletions(-) diff --git a/transport.go b/transport.go index 734983d..b71ce50 100644 --- a/transport.go +++ b/transport.go @@ -7,6 +7,7 @@ import ( "io" "net" "sync" + "time" "github.com/skycoin/skycoin/src/util/logging" @@ -19,6 +20,7 @@ var ( ErrRequestRejected = errors.New("failed to create transport: request rejected") ErrRequestCheckFailed = errors.New("failed to create transport: request check failed") ErrAcceptCheckFailed = errors.New("failed to create transport: accept check failed") + ErrTimeout = errors.New("timeout") ) // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). @@ -46,6 +48,11 @@ type Transport struct { done chan struct{} // chan which closes when transport stops serving doneOnce sync.Once // ensures 'done' only closes once doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client + + readDeadlineMu sync.RWMutex + readDeadline time.Time + writeDeadlineMu sync.RWMutex + writeDeadline time.Time } // NewTransport creates a new dms_tp. @@ -338,13 +345,34 @@ func (tp *Transport) Serve() { } // Read implements io.Reader -// TODO(evanlinjin): read deadline. func (tp *Transport) Read(p []byte) (n int, err error) { - <-tp.serving + type result struct { + n int + err error + } + + resultCh := make(chan result, 1) + + readDeadline := tp.ReadDeadline() + if readDeadline != (time.Time{}) { + time.AfterFunc(time.Until(readDeadline), func() { + resultCh <- result{0, ErrTimeout} + }) + } + go func() { + n, err := tp.read(p) + resultCh <- result{n, err} + }() + + res := <-resultCh + return res.n, res.err +} + +func (tp *Transport) read(p []byte) (n int, err error) { + <-tp.serving tp.rMx.Lock() defer tp.rMx.Unlock() - startRead: tp.bufMx.Lock() n, err = tp.buf.Read(p) @@ -358,14 +386,12 @@ startRead: }() } tp.bufMx.Unlock() - if n > 0 || len(p) == 0 { if !tp.IsClosed() { err = nil } return n, err } - if _, ok := <-tp.bufCh; !ok { return n, err } @@ -373,14 +399,35 @@ startRead: } // Write implements io.Writer -// TODO(evanlinjin): write deadline. func (tp *Transport) Write(p []byte) (int, error) { - <-tp.serving + type result struct { + n int + err error + } + + resultCh := make(chan result, 1) + writeDeadline := tp.WriteDeadline() + if writeDeadline != (time.Time{}) { + time.AfterFunc(time.Until(writeDeadline), func() { + resultCh <- result{0, ErrTimeout} + }) + } + + go func() { + n, err := tp.write(p) + resultCh <- result{n, err} + }() + + res := <-resultCh + return res.n, res.err +} + +func (tp *Transport) write(p []byte) (int, error) { + <-tp.serving if tp.IsClosed() { return 0, io.ErrClosedPipe } - err := tp.ackWaiter.Wait(context.Background(), func(seq ioutil.Uint16Seq) error { if err := writeFwdFrame(tp.Conn, tp.id, seq, p); err != nil { tp.close() @@ -393,3 +440,45 @@ func (tp *Transport) Write(p []byte) (int, error) { } return len(p), nil } + +// SetDeadline sets read and write deadlines for transport . +func (tp *Transport) SetDeadline(t time.Time) error { + if err := tp.SetReadDeadline(t); err != nil { + return err + } + return tp.SetWriteDeadline(t) +} + +// ReadDeadline gets read deadline for transport. +func (tp *Transport) ReadDeadline() time.Time { + tp.readDeadlineMu.RLock() + defer tp.readDeadlineMu.RUnlock() + + return tp.readDeadline +} + +// SetReadDeadline sets read deadline for transport. +func (tp *Transport) SetReadDeadline(t time.Time) error { + tp.readDeadlineMu.Lock() + defer tp.readDeadlineMu.Unlock() + + tp.readDeadline = t + return nil +} + +// WriteDeadline gets write deadline for transport. +func (tp *Transport) WriteDeadline() time.Time { + tp.writeDeadlineMu.RLock() + defer tp.writeDeadlineMu.RUnlock() + + return tp.writeDeadline +} + +// SetWriteDeadline sets write deadline for transport. +func (tp *Transport) SetWriteDeadline(t time.Time) error { + tp.writeDeadlineMu.Lock() + defer tp.writeDeadlineMu.Unlock() + + tp.writeDeadline = t + return nil +} From ed3842062d65aaaaf6d3daa8c55bd13c496deb4c Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 17:41:52 +0400 Subject: [PATCH 02/26] Improve logic of handling deadlines --- transport.go | 52 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/transport.go b/transport.go index b71ce50..4d11ec8 100644 --- a/transport.go +++ b/transport.go @@ -346,27 +346,31 @@ func (tp *Transport) Serve() { // Read implements io.Reader func (tp *Transport) Read(p []byte) (n int, err error) { + ctx := context.Background() + readDeadline := tp.ReadDeadline() + if readDeadline != (time.Time{}) { + deadlineCtx, cancel := context.WithDeadline(ctx, readDeadline) + defer cancel() + ctx = deadlineCtx + } + type result struct { n int err error } - resultCh := make(chan result, 1) - - readDeadline := tp.ReadDeadline() - if readDeadline != (time.Time{}) { - time.AfterFunc(time.Until(readDeadline), func() { - resultCh <- result{0, ErrTimeout} - }) - } - go func() { n, err := tp.read(p) resultCh <- result{n, err} + close(resultCh) }() - res := <-resultCh - return res.n, res.err + select { + case res := <-resultCh: + return res.n, res.err + case <-ctx.Done(): + return 0, ErrTimeout + } } func (tp *Transport) read(p []byte) (n int, err error) { @@ -400,27 +404,31 @@ startRead: // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { + ctx := context.Background() + writeDeadline := tp.WriteDeadline() + if writeDeadline != (time.Time{}) { + deadlineCtx, cancel := context.WithDeadline(ctx, writeDeadline) + defer cancel() + ctx = deadlineCtx + } + type result struct { n int err error } - resultCh := make(chan result, 1) - - writeDeadline := tp.WriteDeadline() - if writeDeadline != (time.Time{}) { - time.AfterFunc(time.Until(writeDeadline), func() { - resultCh <- result{0, ErrTimeout} - }) - } - go func() { n, err := tp.write(p) resultCh <- result{n, err} + close(resultCh) }() - res := <-resultCh - return res.n, res.err + select { + case res := <-resultCh: + return res.n, res.err + case <-ctx.Done(): + return 0, ErrTimeout + } } func (tp *Transport) write(p []byte) (int, error) { From 1f20f25b86f92c221fc6e6cb24539fd305ae0d59 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 18:07:47 +0400 Subject: [PATCH 03/26] Improve logic of handling deadlines --- transport.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/transport.go b/transport.go index 4d11ec8..1c23c7e 100644 --- a/transport.go +++ b/transport.go @@ -345,7 +345,7 @@ func (tp *Transport) Serve() { } // Read implements io.Reader -func (tp *Transport) Read(p []byte) (n int, err error) { +func (tp *Transport) Read(p []byte) (int, error) { ctx := context.Background() readDeadline := tp.ReadDeadline() if readDeadline != (time.Time{}) { @@ -354,20 +354,17 @@ func (tp *Transport) Read(p []byte) (n int, err error) { ctx = deadlineCtx } - type result struct { - n int - err error - } - resultCh := make(chan result, 1) + var n int + var err error + done := make(chan struct{}) go func() { - n, err := tp.read(p) - resultCh <- result{n, err} - close(resultCh) + n, err = tp.read(p) + close(done) }() select { - case res := <-resultCh: - return res.n, res.err + case <-done: + return n, err case <-ctx.Done(): return 0, ErrTimeout } @@ -412,20 +409,17 @@ func (tp *Transport) Write(p []byte) (int, error) { ctx = deadlineCtx } - type result struct { - n int - err error - } - resultCh := make(chan result, 1) + var n int + var err error + done := make(chan struct{}) go func() { - n, err := tp.write(p) - resultCh <- result{n, err} - close(resultCh) + n, err = tp.write(p) + close(done) }() select { - case res := <-resultCh: - return res.n, res.err + case <-done: + return n, err case <-ctx.Done(): return 0, ErrTimeout } From a0d9802b7f18ca3c11d8f9d13d1e081065745c02 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 18:34:21 +0400 Subject: [PATCH 04/26] Get rid of a goto --- transport.go | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/transport.go b/transport.go index 1c23c7e..f327fad 100644 --- a/transport.go +++ b/transport.go @@ -374,29 +374,30 @@ func (tp *Transport) read(p []byte) (n int, err error) { <-tp.serving tp.rMx.Lock() defer tp.rMx.Unlock() -startRead: - tp.bufMx.Lock() - n, err = tp.buf.Read(p) - if tp.bufSize -= n; tp.bufSize < tpBufCap && len(tp.ackBuf) > 0 { - acks := tp.ackBuf - tp.ackBuf = make([]byte, 0, tpAckCap) - go func() { - if err := writeFrame(tp.Conn, acks); err != nil { - tp.close() + + for { + tp.bufMx.Lock() + n, err = tp.buf.Read(p) + if tp.bufSize -= n; tp.bufSize < tpBufCap && len(tp.ackBuf) > 0 { + acks := tp.ackBuf + tp.ackBuf = make([]byte, 0, tpAckCap) + go func() { + if err := writeFrame(tp.Conn, acks); err != nil { + tp.close() + } + }() + } + tp.bufMx.Unlock() + if n > 0 || len(p) == 0 { + if !tp.IsClosed() { + err = nil } - }() - } - tp.bufMx.Unlock() - if n > 0 || len(p) == 0 { - if !tp.IsClosed() { - err = nil + return n, err + } + if _, ok := <-tp.bufCh; !ok { + return n, err } - return n, err - } - if _, ok := <-tp.bufCh; !ok { - return n, err } - goto startRead } // Write implements io.Writer From 01690983bb374312c8c981335758da3c180d91fc Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 22:03:54 +0400 Subject: [PATCH 05/26] Implement deadline for pending i/o --- transport.go | 129 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 77 insertions(+), 52 deletions(-) diff --git a/transport.go b/transport.go index f327fad..66b015b 100644 --- a/transport.go +++ b/transport.go @@ -49,28 +49,37 @@ type Transport struct { doneOnce sync.Once // ensures 'done' only closes once doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client - readDeadlineMu sync.RWMutex - readDeadline time.Time - writeDeadlineMu sync.RWMutex - writeDeadline time.Time + // avoid changing deadline from multiple goroutines simultaneously + readDeadlineMu sync.Mutex + writeDeadlineMu sync.Mutex + + // indicate that read deadline is exceeded + readDeadline chan struct{} + writeDeadline chan struct{} + + // cancellation of an already started goroutine for setting deadline + cancelReadDeadline func() + cancelWriteDeadline func() } // NewTransport creates a new dms_tp. func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16, doneFunc func(id uint16)) *Transport { tp := &Transport{ - Conn: conn, - log: log, - id: id, - local: local, - remote: remote, - inCh: make(chan Frame), - ackWaiter: ioutil.NewUint16AckWaiter(), - ackBuf: make([]byte, 0, tpAckCap), - buf: make(net.Buffers, 0, tpBufFrameCap), - bufCh: make(chan struct{}, 1), - serving: make(chan struct{}), - done: make(chan struct{}), - doneFunc: doneFunc, + Conn: conn, + log: log, + id: id, + local: local, + remote: remote, + inCh: make(chan Frame), + ackWaiter: ioutil.NewUint16AckWaiter(), + ackBuf: make([]byte, 0, tpAckCap), + buf: make(net.Buffers, 0, tpBufFrameCap), + bufCh: make(chan struct{}, 1), + serving: make(chan struct{}), + done: make(chan struct{}), + doneFunc: doneFunc, + readDeadline: make(chan struct{}), + writeDeadline: make(chan struct{}), } if err := tp.ackWaiter.RandSeq(); err != nil { log.Fatalln("failed to set ack_waiter seq:", err) @@ -346,14 +355,6 @@ func (tp *Transport) Serve() { // Read implements io.Reader func (tp *Transport) Read(p []byte) (int, error) { - ctx := context.Background() - readDeadline := tp.ReadDeadline() - if readDeadline != (time.Time{}) { - deadlineCtx, cancel := context.WithDeadline(ctx, readDeadline) - defer cancel() - ctx = deadlineCtx - } - var n int var err error done := make(chan struct{}) @@ -365,7 +366,7 @@ func (tp *Transport) Read(p []byte) (int, error) { select { case <-done: return n, err - case <-ctx.Done(): + case <-tp.readDeadline: return 0, ErrTimeout } } @@ -402,14 +403,6 @@ func (tp *Transport) read(p []byte) (n int, err error) { // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { - ctx := context.Background() - writeDeadline := tp.WriteDeadline() - if writeDeadline != (time.Time{}) { - deadlineCtx, cancel := context.WithDeadline(ctx, writeDeadline) - defer cancel() - ctx = deadlineCtx - } - var n int var err error done := make(chan struct{}) @@ -421,7 +414,7 @@ func (tp *Transport) Write(p []byte) (int, error) { select { case <-done: return n, err - case <-ctx.Done(): + case <-tp.writeDeadline: return 0, ErrTimeout } } @@ -452,29 +445,37 @@ func (tp *Transport) SetDeadline(t time.Time) error { return tp.SetWriteDeadline(t) } -// ReadDeadline gets read deadline for transport. -func (tp *Transport) ReadDeadline() time.Time { - tp.readDeadlineMu.RLock() - defer tp.readDeadlineMu.RUnlock() - - return tp.readDeadline -} - // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { tp.readDeadlineMu.Lock() defer tp.readDeadlineMu.Unlock() - tp.readDeadline = t - return nil -} + // If channel is closed, then deadline is already exceeded. + // If a new deadline is after the current time. we need to create channel again. + if _, ok := <-tp.readDeadline; !ok && t.After(time.Now()) { + tp.readDeadline = make(chan struct{}) + } + + // If a goroutine that would close the channel is already running, we need to cancel it and delete. + if tp.cancelReadDeadline != nil { + tp.cancelReadDeadline() + tp.cancelReadDeadline = nil + } -// WriteDeadline gets write deadline for transport. -func (tp *Transport) WriteDeadline() time.Time { - tp.writeDeadlineMu.RLock() - defer tp.writeDeadlineMu.RUnlock() + ctx, cancel := context.WithDeadline(context.Background(), t) + // Store a cancellation function to let other goroutines cancel this one. + tp.cancelReadDeadline = cancel - return tp.writeDeadline + go func(ctx context.Context, doneCh chan struct{}) { + <-ctx.Done() + // Here, context may be canceled either by another goroutine or by deadline. + // If it's canceled by deadline, channel needs to be closed. + if ctx.Err() == context.DeadlineExceeded { + close(doneCh) + } + }(ctx, tp.readDeadline) + + return nil } // SetWriteDeadline sets write deadline for transport. @@ -482,6 +483,30 @@ func (tp *Transport) SetWriteDeadline(t time.Time) error { tp.writeDeadlineMu.Lock() defer tp.writeDeadlineMu.Unlock() - tp.writeDeadline = t + // If channel is closed, then deadline is already exceeded. + // If a new deadline is after the current time. we need to create channel again. + if _, ok := <-tp.writeDeadline; !ok && t.After(time.Now()) { + tp.writeDeadline = make(chan struct{}) + } + + // If a goroutine that would close the channel is already running, we need to cancel it and delete. + if tp.cancelWriteDeadline != nil { + tp.cancelWriteDeadline() + tp.cancelWriteDeadline = nil + } + + ctx, cancel := context.WithDeadline(context.Background(), t) + // Store a cancellation function to let other goroutines cancel this one. + tp.cancelWriteDeadline = cancel + + go func(ctx context.Context, doneCh chan struct{}) { + <-ctx.Done() + // Here, context may be canceled either by another goroutine or by deadline. + // If it's canceled by deadline, channel needs to be closed. + if ctx.Err() == context.DeadlineExceeded { + close(doneCh) + } + }(ctx, tp.writeDeadline) + return nil } From 740664028f65be4f871c44a30643341facd7b936 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 22:07:09 +0400 Subject: [PATCH 06/26] Fix a typo --- transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport.go b/transport.go index 66b015b..db742d9 100644 --- a/transport.go +++ b/transport.go @@ -451,7 +451,7 @@ func (tp *Transport) SetReadDeadline(t time.Time) error { defer tp.readDeadlineMu.Unlock() // If channel is closed, then deadline is already exceeded. - // If a new deadline is after the current time. we need to create channel again. + // If a new deadline is after the current time, we need to create channel again. if _, ok := <-tp.readDeadline; !ok && t.After(time.Now()) { tp.readDeadline = make(chan struct{}) } From aaf426e2910053cfbd52e34934563607a1c8ff12 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 14 Aug 2019 22:53:18 +0400 Subject: [PATCH 07/26] Improve logic --- transport.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/transport.go b/transport.go index db742d9..68cc6cc 100644 --- a/transport.go +++ b/transport.go @@ -456,14 +456,15 @@ func (tp *Transport) SetReadDeadline(t time.Time) error { tp.readDeadline = make(chan struct{}) } - // If a goroutine that would close the channel is already running, we need to cancel it and delete. + ctx, cancel := context.WithDeadline(context.Background(), t) + + // If a goroutine that would close the channel is already running, + // we need to cancel it and replace cancellation function with a new one. if tp.cancelReadDeadline != nil { tp.cancelReadDeadline() - tp.cancelReadDeadline = nil } - ctx, cancel := context.WithDeadline(context.Background(), t) - // Store a cancellation function to let other goroutines cancel this one. + // Store the cancellation function to let other goroutines cancel this one. tp.cancelReadDeadline = cancel go func(ctx context.Context, doneCh chan struct{}) { @@ -489,14 +490,15 @@ func (tp *Transport) SetWriteDeadline(t time.Time) error { tp.writeDeadline = make(chan struct{}) } - // If a goroutine that would close the channel is already running, we need to cancel it and delete. + ctx, cancel := context.WithDeadline(context.Background(), t) + + // If a goroutine that would close the channel is already running, + // we need to cancel it and replace cancellation function with a new one. if tp.cancelWriteDeadline != nil { tp.cancelWriteDeadline() - tp.cancelWriteDeadline = nil } - ctx, cancel := context.WithDeadline(context.Background(), t) - // Store a cancellation function to let other goroutines cancel this one. + // Store the cancellation function to let other goroutines cancel this one. tp.cancelWriteDeadline = cancel go func(ctx context.Context, doneCh chan struct{}) { From 808195d9642b4e5bcc9588d4b80dbc1bf16f45d6 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 00:17:43 +0400 Subject: [PATCH 08/26] Change implementation of deadlines --- transport.go | 154 ++++++++++++++++++++++----------------------------- 1 file changed, 67 insertions(+), 87 deletions(-) diff --git a/transport.go b/transport.go index 68cc6cc..e4af30c 100644 --- a/transport.go +++ b/transport.go @@ -7,6 +7,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/skycoin/skycoin/src/util/logging" @@ -49,41 +50,74 @@ type Transport struct { doneOnce sync.Once // ensures 'done' only closes once doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client - // avoid changing deadline from multiple goroutines simultaneously - readDeadlineMu sync.Mutex - writeDeadlineMu sync.Mutex + readDeadline int64 + readDeadlineCh chan struct{} + readDeadlineModCh chan struct{} - // indicate that read deadline is exceeded - readDeadline chan struct{} - writeDeadline chan struct{} - - // cancellation of an already started goroutine for setting deadline - cancelReadDeadline func() - cancelWriteDeadline func() + writeDeadline int64 + writeDeadlineCh chan struct{} + writeDeadlineModCh chan struct{} } // NewTransport creates a new dms_tp. func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16, doneFunc func(id uint16)) *Transport { tp := &Transport{ - Conn: conn, - log: log, - id: id, - local: local, - remote: remote, - inCh: make(chan Frame), - ackWaiter: ioutil.NewUint16AckWaiter(), - ackBuf: make([]byte, 0, tpAckCap), - buf: make(net.Buffers, 0, tpBufFrameCap), - bufCh: make(chan struct{}, 1), - serving: make(chan struct{}), - done: make(chan struct{}), - doneFunc: doneFunc, - readDeadline: make(chan struct{}), - writeDeadline: make(chan struct{}), + Conn: conn, + log: log, + id: id, + local: local, + remote: remote, + inCh: make(chan Frame), + ackWaiter: ioutil.NewUint16AckWaiter(), + ackBuf: make([]byte, 0, tpAckCap), + buf: make(net.Buffers, 0, tpBufFrameCap), + bufCh: make(chan struct{}, 1), + serving: make(chan struct{}), + done: make(chan struct{}), + doneFunc: doneFunc, + readDeadlineCh: make(chan struct{}), + readDeadlineModCh: make(chan struct{}, 1), } if err := tp.ackWaiter.RandSeq(); err != nil { log.Fatalln("failed to set ack_waiter seq:", err) } + + go func() { + for { + if rd := atomic.LoadInt64(&tp.readDeadline); rd != 0 && time.Now().UnixNano() >= rd { + select { + case tp.readDeadlineCh <- struct{}{}: + case _, ok := <-tp.readDeadlineModCh: + if !ok { + return + } + } + } else { + if _, ok := <-tp.readDeadlineModCh; !ok { + return + } + } + } + }() + + go func() { + for { + if wd := atomic.LoadInt64(&tp.writeDeadline); wd != 0 && time.Now().UnixNano() >= wd { + select { + case tp.writeDeadlineCh <- struct{}{}: + case _, ok := <-tp.writeDeadlineModCh: + if !ok { + return + } + } + } else { + if _, ok := <-tp.writeDeadlineModCh; !ok { + return + } + } + } + }() + return tp } @@ -366,8 +400,8 @@ func (tp *Transport) Read(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.readDeadline: - return 0, ErrTimeout + case <-tp.readDeadlineCh: + return 0, errors.New("deadline exceeded") } } @@ -414,8 +448,8 @@ func (tp *Transport) Write(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.writeDeadline: - return 0, ErrTimeout + case <-tp.writeDeadlineCh: + return 0, errors.New("deadline exceeded") } } @@ -447,68 +481,14 @@ func (tp *Transport) SetDeadline(t time.Time) error { // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { - tp.readDeadlineMu.Lock() - defer tp.readDeadlineMu.Unlock() - - // If channel is closed, then deadline is already exceeded. - // If a new deadline is after the current time, we need to create channel again. - if _, ok := <-tp.readDeadline; !ok && t.After(time.Now()) { - tp.readDeadline = make(chan struct{}) - } - - ctx, cancel := context.WithDeadline(context.Background(), t) - - // If a goroutine that would close the channel is already running, - // we need to cancel it and replace cancellation function with a new one. - if tp.cancelReadDeadline != nil { - tp.cancelReadDeadline() - } - - // Store the cancellation function to let other goroutines cancel this one. - tp.cancelReadDeadline = cancel - - go func(ctx context.Context, doneCh chan struct{}) { - <-ctx.Done() - // Here, context may be canceled either by another goroutine or by deadline. - // If it's canceled by deadline, channel needs to be closed. - if ctx.Err() == context.DeadlineExceeded { - close(doneCh) - } - }(ctx, tp.readDeadline) - + atomic.StoreInt64(&tp.readDeadline, t.UnixNano()) + tp.readDeadlineModCh <- struct{}{} return nil } // SetWriteDeadline sets write deadline for transport. func (tp *Transport) SetWriteDeadline(t time.Time) error { - tp.writeDeadlineMu.Lock() - defer tp.writeDeadlineMu.Unlock() - - // If channel is closed, then deadline is already exceeded. - // If a new deadline is after the current time. we need to create channel again. - if _, ok := <-tp.writeDeadline; !ok && t.After(time.Now()) { - tp.writeDeadline = make(chan struct{}) - } - - ctx, cancel := context.WithDeadline(context.Background(), t) - - // If a goroutine that would close the channel is already running, - // we need to cancel it and replace cancellation function with a new one. - if tp.cancelWriteDeadline != nil { - tp.cancelWriteDeadline() - } - - // Store the cancellation function to let other goroutines cancel this one. - tp.cancelWriteDeadline = cancel - - go func(ctx context.Context, doneCh chan struct{}) { - <-ctx.Done() - // Here, context may be canceled either by another goroutine or by deadline. - // If it's canceled by deadline, channel needs to be closed. - if ctx.Err() == context.DeadlineExceeded { - close(doneCh) - } - }(ctx, tp.writeDeadline) - + atomic.StoreInt64(&tp.writeDeadline, t.UnixNano()) + tp.writeDeadlineModCh <- struct{}{} return nil } From 76f44b793df4936bb2cec8b3cb7ec5c11b6f7a1e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 01:19:24 +0400 Subject: [PATCH 09/26] Fix uninitialized channels, improve implementation --- transport.go | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/transport.go b/transport.go index e4af30c..8162bba 100644 --- a/transport.go +++ b/transport.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "math" "net" "sync" "sync/atomic" @@ -21,7 +22,7 @@ var ( ErrRequestRejected = errors.New("failed to create transport: request rejected") ErrRequestCheckFailed = errors.New("failed to create transport: request check failed") ErrAcceptCheckFailed = errors.New("failed to create transport: accept check failed") - ErrTimeout = errors.New("timeout") + ErrDeadlineExceeded = errors.New("deadline exceeded") ) // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). @@ -62,21 +63,25 @@ type Transport struct { // NewTransport creates a new dms_tp. func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16, doneFunc func(id uint16)) *Transport { tp := &Transport{ - Conn: conn, - log: log, - id: id, - local: local, - remote: remote, - inCh: make(chan Frame), - ackWaiter: ioutil.NewUint16AckWaiter(), - ackBuf: make([]byte, 0, tpAckCap), - buf: make(net.Buffers, 0, tpBufFrameCap), - bufCh: make(chan struct{}, 1), - serving: make(chan struct{}), - done: make(chan struct{}), - doneFunc: doneFunc, - readDeadlineCh: make(chan struct{}), - readDeadlineModCh: make(chan struct{}, 1), + Conn: conn, + log: log, + id: id, + local: local, + remote: remote, + inCh: make(chan Frame), + ackWaiter: ioutil.NewUint16AckWaiter(), + ackBuf: make([]byte, 0, tpAckCap), + buf: make(net.Buffers, 0, tpBufFrameCap), + bufCh: make(chan struct{}, 1), + serving: make(chan struct{}), + done: make(chan struct{}), + doneFunc: doneFunc, + readDeadline: math.MaxInt64, + readDeadlineCh: make(chan struct{}), + readDeadlineModCh: make(chan struct{}, 1), + writeDeadline: math.MaxInt64, + writeDeadlineCh: make(chan struct{}), + writeDeadlineModCh: make(chan struct{}, 1), } if err := tp.ackWaiter.RandSeq(); err != nil { log.Fatalln("failed to set ack_waiter seq:", err) @@ -84,7 +89,7 @@ func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKe go func() { for { - if rd := atomic.LoadInt64(&tp.readDeadline); rd != 0 && time.Now().UnixNano() >= rd { + if rd := atomic.LoadInt64(&tp.readDeadline); time.Now().UnixNano() >= rd { select { case tp.readDeadlineCh <- struct{}{}: case _, ok := <-tp.readDeadlineModCh: @@ -102,7 +107,7 @@ func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKe go func() { for { - if wd := atomic.LoadInt64(&tp.writeDeadline); wd != 0 && time.Now().UnixNano() >= wd { + if wd := atomic.LoadInt64(&tp.writeDeadline); time.Now().UnixNano() >= wd { select { case tp.writeDeadlineCh <- struct{}{}: case _, ok := <-tp.writeDeadlineModCh: @@ -449,7 +454,7 @@ func (tp *Transport) Write(p []byte) (int, error) { case <-done: return n, err case <-tp.writeDeadlineCh: - return 0, errors.New("deadline exceeded") + return 0, ErrDeadlineExceeded } } From dadc93fdd437ffde659d8d2ba565a25e9cbe8eef Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 01:19:36 +0400 Subject: [PATCH 10/26] Add tests for deadlines --- server_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/server_test.go b/server_test.go index 6d960ff..841d48f 100644 --- a/server_test.go +++ b/server_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "net" "os" @@ -227,6 +228,10 @@ func TestServer_Serve(t *testing.T) { testServerSelfDialing(t) }) + t.Run("Deadlines", func(t *testing.T) { + testServerDeadlines(t) + }) + t.Run("Server disconnection closes transports", func(t *testing.T) { testServerDisconnection(t) }) @@ -254,7 +259,7 @@ func testServerDisconnection(t *testing.T) { responder := createClient(t, dc, responderName) initiator := createClient(t, dc, initiatorName) initiatorTransport, responderTransport := dial(t, initiator, responder, noDelay) - testTransportMessaging(t, initiatorTransport, responderTransport) + require.NoError(t, testTransportMessaging(initiatorTransport, responderTransport)) require.NoError(t, srv.Close()) require.NoError(t, errWithTimeout(srvErrCh)) @@ -275,24 +280,74 @@ func testServerSelfDialing(t *testing.T) { client := createClient(t, dc, "client") selfWrTp, selfRdTp := dial(t, client, client, noDelay) // try to write/read message to/from self - testTransportMessaging(t, selfWrTp, selfRdTp) + require.NoError(t, testTransportMessaging(selfWrTp, selfRdTp)) require.NoError(t, closeClosers(selfRdTp, selfWrTp, client)) assert.NoError(t, srv.Close()) assert.NoError(t, errWithTimeout(srvErrCh)) } -func testTransportMessaging(t *testing.T, init *Transport, resp *Transport) { +func testServerDeadlines(t *testing.T) { + // t.Parallel() + + cases := []struct { + deadline time.Time + error error + }{ + {time.Now().Add(10 * time.Second), nil}, + {time.Now().Add(-1 * time.Second), ErrDeadlineExceeded}, + {time.Now().Add(1 * time.Millisecond), ErrDeadlineExceeded}, + {time.Unix(0, math.MaxInt64), nil}, + } + + dc := disc.NewMock() + srv, srvErrCh, err := createServer(dc) + require.NoError(t, err) + + responder := createClient(t, dc, responderName) + initiator := createClient(t, dc, initiatorName) + initiatorTransport, responderTransport := dial(t, initiator, responder, noDelay) + + t.Run("Read deadline", func(t *testing.T) { + for _, tc := range cases { + require.NoError(t, responderTransport.SetReadDeadline(tc.deadline)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + } + }) + + t.Run("Write deadline", func(t *testing.T) { + for _, tc := range cases { + require.NoError(t, initiatorTransport.SetWriteDeadline(tc.deadline)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + } + }) + + t.Run("Read/write deadline", func(t *testing.T) { + for _, tc := range cases { + require.NoError(t, initiatorTransport.SetDeadline(tc.deadline)) + require.NoError(t, responderTransport.SetDeadline(tc.deadline)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + } + }) + + assert.NoError(t, srv.Close()) + assert.NoError(t, errWithTimeout(srvErrCh)) +} + +func testTransportMessaging(init *Transport, resp *Transport) error { for i := 0; i < msgCount; i++ { - _, err := init.Write([]byte(message)) - require.NoError(t, err) + if _, err := init.Write([]byte(message)); err != nil { + return err + } recvBuf := make([]byte, bufSize) for i := 0; i < len(message); i += bufSize { - _, err := resp.Read(recvBuf) - require.NoError(t, err) + if _, err := resp.Read(recvBuf); err != nil { + return err + } } } + return nil } func testServerCappedTransport(t *testing.T) { From a2cab901a1ac84948399926e60af4ec7d5c3e18f Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 01:23:57 +0400 Subject: [PATCH 11/26] Improve deadline tests names --- server_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server_test.go b/server_test.go index 841d48f..bbd4563 100644 --- a/server_test.go +++ b/server_test.go @@ -308,21 +308,21 @@ func testServerDeadlines(t *testing.T) { initiator := createClient(t, dc, initiatorName) initiatorTransport, responderTransport := dial(t, initiator, responder, noDelay) - t.Run("Read deadline", func(t *testing.T) { + t.Run("Read", func(t *testing.T) { for _, tc := range cases { require.NoError(t, responderTransport.SetReadDeadline(tc.deadline)) require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) } }) - t.Run("Write deadline", func(t *testing.T) { + t.Run("Write", func(t *testing.T) { for _, tc := range cases { require.NoError(t, initiatorTransport.SetWriteDeadline(tc.deadline)) require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) } }) - t.Run("Read/write deadline", func(t *testing.T) { + t.Run("Read and write", func(t *testing.T) { for _, tc := range cases { require.NoError(t, initiatorTransport.SetDeadline(tc.deadline)) require.NoError(t, responderTransport.SetDeadline(tc.deadline)) From 7bc89e21374951fbcf5b670a52fa2f9db8f57971 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 01:26:54 +0400 Subject: [PATCH 12/26] Run deadline tests in parallel --- server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server_test.go b/server_test.go index bbd4563..2f24c16 100644 --- a/server_test.go +++ b/server_test.go @@ -288,7 +288,7 @@ func testServerSelfDialing(t *testing.T) { } func testServerDeadlines(t *testing.T) { - // t.Parallel() + t.Parallel() cases := []struct { deadline time.Time From 079158cd7706fb5e8484da789a701793cdecc0d7 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 02:04:08 +0400 Subject: [PATCH 13/26] Add a test case --- server_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server_test.go b/server_test.go index 2f24c16..870f2ce 100644 --- a/server_test.go +++ b/server_test.go @@ -330,6 +330,15 @@ func testServerDeadlines(t *testing.T) { } }) + t.Run("Multiple read and write", func(t *testing.T) { + duration := 100 * time.Millisecond + require.NoError(t, initiatorTransport.SetDeadline(time.Now().Add(duration))) + require.NoError(t, responderTransport.SetDeadline(time.Now().Add(duration))) + + time.Sleep(duration) + require.Equal(t, nil, testTransportMessaging(initiatorTransport, responderTransport)) + }) + assert.NoError(t, srv.Close()) assert.NoError(t, errWithTimeout(srvErrCh)) } From 2d9aeeefdcb4ef08a04d3e5134a53d0b65325ffa Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 02:06:23 +0400 Subject: [PATCH 14/26] Group deadline variables to a struct --- transport.go | 85 +++++++++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/transport.go b/transport.go index 8162bba..c6e5078 100644 --- a/transport.go +++ b/transport.go @@ -25,6 +25,20 @@ var ( ErrDeadlineExceeded = errors.New("deadline exceeded") ) +type deadline struct { + unixNano int64 + ch chan struct{} + modCh chan struct{} +} + +func newDeadline() deadline { + return deadline{ + unixNano: math.MaxInt64, + ch: make(chan struct{}), + modCh: make(chan struct{}, 1), + } +} + // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). type Transport struct { net.Conn // underlying connection to dmsg.Server @@ -51,37 +65,28 @@ type Transport struct { doneOnce sync.Once // ensures 'done' only closes once doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client - readDeadline int64 - readDeadlineCh chan struct{} - readDeadlineModCh chan struct{} - - writeDeadline int64 - writeDeadlineCh chan struct{} - writeDeadlineModCh chan struct{} + readDeadline deadline + writeDeadline deadline } // NewTransport creates a new dms_tp. func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16, doneFunc func(id uint16)) *Transport { tp := &Transport{ - Conn: conn, - log: log, - id: id, - local: local, - remote: remote, - inCh: make(chan Frame), - ackWaiter: ioutil.NewUint16AckWaiter(), - ackBuf: make([]byte, 0, tpAckCap), - buf: make(net.Buffers, 0, tpBufFrameCap), - bufCh: make(chan struct{}, 1), - serving: make(chan struct{}), - done: make(chan struct{}), - doneFunc: doneFunc, - readDeadline: math.MaxInt64, - readDeadlineCh: make(chan struct{}), - readDeadlineModCh: make(chan struct{}, 1), - writeDeadline: math.MaxInt64, - writeDeadlineCh: make(chan struct{}), - writeDeadlineModCh: make(chan struct{}, 1), + Conn: conn, + log: log, + id: id, + local: local, + remote: remote, + inCh: make(chan Frame), + ackWaiter: ioutil.NewUint16AckWaiter(), + ackBuf: make([]byte, 0, tpAckCap), + buf: make(net.Buffers, 0, tpBufFrameCap), + bufCh: make(chan struct{}, 1), + serving: make(chan struct{}), + done: make(chan struct{}), + doneFunc: doneFunc, + readDeadline: newDeadline(), + writeDeadline: newDeadline(), } if err := tp.ackWaiter.RandSeq(); err != nil { log.Fatalln("failed to set ack_waiter seq:", err) @@ -89,16 +94,16 @@ func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKe go func() { for { - if rd := atomic.LoadInt64(&tp.readDeadline); time.Now().UnixNano() >= rd { + if rd := atomic.LoadInt64(&tp.readDeadline.unixNano); time.Now().UnixNano() >= rd { select { - case tp.readDeadlineCh <- struct{}{}: - case _, ok := <-tp.readDeadlineModCh: + case tp.readDeadline.ch <- struct{}{}: + case _, ok := <-tp.readDeadline.modCh: if !ok { return } } } else { - if _, ok := <-tp.readDeadlineModCh; !ok { + if _, ok := <-tp.readDeadline.modCh; !ok { return } } @@ -107,16 +112,16 @@ func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKe go func() { for { - if wd := atomic.LoadInt64(&tp.writeDeadline); time.Now().UnixNano() >= wd { + if wd := atomic.LoadInt64(&tp.writeDeadline.unixNano); time.Now().UnixNano() >= wd { select { - case tp.writeDeadlineCh <- struct{}{}: - case _, ok := <-tp.writeDeadlineModCh: + case tp.writeDeadline.ch <- struct{}{}: + case _, ok := <-tp.writeDeadline.modCh: if !ok { return } } } else { - if _, ok := <-tp.writeDeadlineModCh; !ok { + if _, ok := <-tp.writeDeadline.modCh; !ok { return } } @@ -405,7 +410,7 @@ func (tp *Transport) Read(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.readDeadlineCh: + case <-tp.readDeadline.ch: return 0, errors.New("deadline exceeded") } } @@ -453,7 +458,7 @@ func (tp *Transport) Write(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.writeDeadlineCh: + case <-tp.writeDeadline.ch: return 0, ErrDeadlineExceeded } } @@ -486,14 +491,14 @@ func (tp *Transport) SetDeadline(t time.Time) error { // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { - atomic.StoreInt64(&tp.readDeadline, t.UnixNano()) - tp.readDeadlineModCh <- struct{}{} + atomic.StoreInt64(&tp.readDeadline.unixNano, t.UnixNano()) + tp.readDeadline.modCh <- struct{}{} return nil } // SetWriteDeadline sets write deadline for transport. func (tp *Transport) SetWriteDeadline(t time.Time) error { - atomic.StoreInt64(&tp.writeDeadline, t.UnixNano()) - tp.writeDeadlineModCh <- struct{}{} + atomic.StoreInt64(&tp.writeDeadline.unixNano, t.UnixNano()) + tp.writeDeadline.modCh <- struct{}{} return nil } From 0edd95acab331af4fa742450ecc7e1ba1d248a58 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 11:59:42 +0400 Subject: [PATCH 15/26] Minor improvements --- server_test.go | 16 +++++++++------- transport.go | 7 +++++++ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server_test.go b/server_test.go index 870f2ce..b9dce99 100644 --- a/server_test.go +++ b/server_test.go @@ -290,13 +290,14 @@ func testServerSelfDialing(t *testing.T) { func testServerDeadlines(t *testing.T) { t.Parallel() + now := time.Now() cases := []struct { deadline time.Time error error }{ - {time.Now().Add(10 * time.Second), nil}, - {time.Now().Add(-1 * time.Second), ErrDeadlineExceeded}, - {time.Now().Add(1 * time.Millisecond), ErrDeadlineExceeded}, + {now.Add(10 * time.Second), nil}, + {now.Add(-1 * time.Second), ErrDeadlineExceeded}, + {now.Add(1 * time.Millisecond), ErrDeadlineExceeded}, {time.Unix(0, math.MaxInt64), nil}, } @@ -331,12 +332,13 @@ func testServerDeadlines(t *testing.T) { }) t.Run("Multiple read and write", func(t *testing.T) { - duration := 100 * time.Millisecond - require.NoError(t, initiatorTransport.SetDeadline(time.Now().Add(duration))) - require.NoError(t, responderTransport.SetDeadline(time.Now().Add(duration))) + duration := 10 * time.Second + deadline := time.Now().Add(duration) + require.NoError(t, initiatorTransport.SetDeadline(deadline)) + require.NoError(t, responderTransport.SetDeadline(deadline)) time.Sleep(duration) - require.Equal(t, nil, testTransportMessaging(initiatorTransport, responderTransport)) + require.NoError(t, testTransportMessaging(initiatorTransport, responderTransport)) }) assert.NoError(t, srv.Close()) diff --git a/transport.go b/transport.go index c6e5078..2f7e41c 100644 --- a/transport.go +++ b/transport.go @@ -39,6 +39,13 @@ func newDeadline() deadline { } } +func (d deadline) Close() error { + close(d.ch) + close(d.modCh) + + return nil +} + // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). type Transport struct { net.Conn // underlying connection to dmsg.Server From 80a47c8ba50b244a50e3bc391a1c5477fe67f459 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 18:18:07 +0400 Subject: [PATCH 16/26] Add deadline.Serve --- transport.go | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/transport.go b/transport.go index 2f7e41c..061523e 100644 --- a/transport.go +++ b/transport.go @@ -39,6 +39,24 @@ func newDeadline() deadline { } } +func (d deadline) Serve() error { + for { + if rd := atomic.LoadInt64(&d.unixNano); time.Now().UnixNano() >= rd { + select { + case d.ch <- struct{}{}: + case _, ok := <-d.modCh: + if !ok { + return nil + } + } + } else { + if _, ok := <-d.modCh; !ok { + return nil + } + } + } +} + func (d deadline) Close() error { close(d.ch) close(d.modCh) @@ -100,38 +118,14 @@ func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKe } go func() { - for { - if rd := atomic.LoadInt64(&tp.readDeadline.unixNano); time.Now().UnixNano() >= rd { - select { - case tp.readDeadline.ch <- struct{}{}: - case _, ok := <-tp.readDeadline.modCh: - if !ok { - return - } - } - } else { - if _, ok := <-tp.readDeadline.modCh; !ok { - return - } - } + if err := tp.readDeadline.Serve(); err != nil { + log.WithError(err).Warn("Failed to serve read deadline") } }() go func() { - for { - if wd := atomic.LoadInt64(&tp.writeDeadline.unixNano); time.Now().UnixNano() >= wd { - select { - case tp.writeDeadline.ch <- struct{}{}: - case _, ok := <-tp.writeDeadline.modCh: - if !ok { - return - } - } - } else { - if _, ok := <-tp.writeDeadline.modCh; !ok { - return - } - } + if err := tp.writeDeadline.Serve(); err != nil { + log.WithError(err).Warn("Failed to serve write deadline") } }() From d316c9ed8d0ecfc651b9f9032c167a7dde6481ee Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 18:22:51 +0400 Subject: [PATCH 17/26] Add deadlines checks before reading/writing --- transport.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/transport.go b/transport.go index 061523e..04e2590 100644 --- a/transport.go +++ b/transport.go @@ -400,6 +400,10 @@ func (tp *Transport) Serve() { // Read implements io.Reader func (tp *Transport) Read(p []byte) (int, error) { + if _, ok := <-tp.readDeadline.ch; ok { + return 0, ErrDeadlineExceeded + } + var n int var err error done := make(chan struct{}) @@ -412,7 +416,7 @@ func (tp *Transport) Read(p []byte) (int, error) { case <-done: return n, err case <-tp.readDeadline.ch: - return 0, errors.New("deadline exceeded") + return 0, ErrDeadlineExceeded } } @@ -448,6 +452,10 @@ func (tp *Transport) read(p []byte) (n int, err error) { // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { + if _, ok := <-tp.writeDeadline.ch; ok { + return 0, ErrDeadlineExceeded + } + var n int var err error done := make(chan struct{}) From 952c353f0b93407b8a585fa2c9959e871bb7752c Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Fri, 16 Aug 2019 18:25:33 +0400 Subject: [PATCH 18/26] Make deadlines checks before reading/writing atomic --- transport.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/transport.go b/transport.go index 04e2590..7fa1e64 100644 --- a/transport.go +++ b/transport.go @@ -31,6 +31,10 @@ type deadline struct { modCh chan struct{} } +func (d deadline) UnixNano() int64 { + return atomic.LoadInt64(&d.unixNano) +} + func newDeadline() deadline { return deadline{ unixNano: math.MaxInt64, @@ -400,7 +404,7 @@ func (tp *Transport) Serve() { // Read implements io.Reader func (tp *Transport) Read(p []byte) (int, error) { - if _, ok := <-tp.readDeadline.ch; ok { + if time.Now().UnixNano() > tp.readDeadline.UnixNano() { return 0, ErrDeadlineExceeded } @@ -452,7 +456,7 @@ func (tp *Transport) read(p []byte) (n int, err error) { // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { - if _, ok := <-tp.writeDeadline.ch; ok { + if time.Now().UnixNano() > tp.readDeadline.UnixNano() { return 0, ErrDeadlineExceeded } From a286d797d6c558338e22a21a36b36688b39ef208 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 17 Aug 2019 20:47:13 +0400 Subject: [PATCH 19/26] Fix issues --- client_test.go | 2 +- server_test.go | 23 +++++++++++++++-------- transport.go | 8 ++++---- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/client_test.go b/client_test.go index fb488f8..1c9d299 100644 --- a/client_test.go +++ b/client_test.go @@ -174,7 +174,7 @@ func TestClient(t *testing.T) { conn1.mx.RUnlock() assert.Equal(t, initID+2, newInitID) - assert.NoError(t, closeClosers(conn1, conn2)) + assert.NoError(t, closeClosers(conn1, conn2, tr1)) checkClientConnsClosed(t, conn1, conn2) assert.Error(t, errWithTimeout(serveErrCh1)) diff --git a/server_test.go b/server_test.go index b9dce99..a6e802c 100644 --- a/server_test.go +++ b/server_test.go @@ -259,7 +259,7 @@ func testServerDisconnection(t *testing.T) { responder := createClient(t, dc, responderName) initiator := createClient(t, dc, initiatorName) initiatorTransport, responderTransport := dial(t, initiator, responder, noDelay) - require.NoError(t, testTransportMessaging(initiatorTransport, responderTransport)) + require.NoError(t, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0))) require.NoError(t, srv.Close()) require.NoError(t, errWithTimeout(srvErrCh)) @@ -280,7 +280,7 @@ func testServerSelfDialing(t *testing.T) { client := createClient(t, dc, "client") selfWrTp, selfRdTp := dial(t, client, client, noDelay) // try to write/read message to/from self - require.NoError(t, testTransportMessaging(selfWrTp, selfRdTp)) + require.NoError(t, testTransportMessaging(selfWrTp, selfRdTp, time.Duration(0))) require.NoError(t, closeClosers(selfRdTp, selfWrTp, client)) assert.NoError(t, srv.Close()) @@ -312,14 +312,14 @@ func testServerDeadlines(t *testing.T) { t.Run("Read", func(t *testing.T) { for _, tc := range cases { require.NoError(t, responderTransport.SetReadDeadline(tc.deadline)) - require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0))) } }) t.Run("Write", func(t *testing.T) { for _, tc := range cases { require.NoError(t, initiatorTransport.SetWriteDeadline(tc.deadline)) - require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0))) } }) @@ -327,30 +327,37 @@ func testServerDeadlines(t *testing.T) { for _, tc := range cases { require.NoError(t, initiatorTransport.SetDeadline(tc.deadline)) require.NoError(t, responderTransport.SetDeadline(tc.deadline)) - require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport)) + require.Equal(t, tc.error, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0))) } }) t.Run("Multiple read and write", func(t *testing.T) { duration := 10 * time.Second + deadline := time.Now().Add(duration) require.NoError(t, initiatorTransport.SetDeadline(deadline)) require.NoError(t, responderTransport.SetDeadline(deadline)) - time.Sleep(duration) - require.NoError(t, testTransportMessaging(initiatorTransport, responderTransport)) + require.Equal(t, ErrDeadlineExceeded, testTransportMessaging(initiatorTransport, responderTransport, time.Duration(0))) + + deadline = time.Now().Add(duration) + require.NoError(t, initiatorTransport.SetDeadline(deadline)) + require.NoError(t, responderTransport.SetDeadline(deadline)) + require.Equal(t, ErrDeadlineExceeded, testTransportMessaging(initiatorTransport, responderTransport, duration)) }) assert.NoError(t, srv.Close()) assert.NoError(t, errWithTimeout(srvErrCh)) } -func testTransportMessaging(init *Transport, resp *Transport) error { +func testTransportMessaging(init *Transport, resp *Transport, delay time.Duration) error { for i := 0; i < msgCount; i++ { if _, err := init.Write([]byte(message)); err != nil { return err } + time.Sleep(delay) + recvBuf := make([]byte, bufSize) for i := 0; i < len(message); i += bufSize { if _, err := resp.Read(recvBuf); err != nil { diff --git a/transport.go b/transport.go index 7fa1e64..b70c86c 100644 --- a/transport.go +++ b/transport.go @@ -31,7 +31,7 @@ type deadline struct { modCh chan struct{} } -func (d deadline) UnixNano() int64 { +func (d *deadline) UnixNano() int64 { return atomic.LoadInt64(&d.unixNano) } @@ -43,7 +43,7 @@ func newDeadline() deadline { } } -func (d deadline) Serve() error { +func (d *deadline) Serve() error { for { if rd := atomic.LoadInt64(&d.unixNano); time.Now().UnixNano() >= rd { select { @@ -61,7 +61,7 @@ func (d deadline) Serve() error { } } -func (d deadline) Close() error { +func (d *deadline) Close() error { close(d.ch) close(d.modCh) @@ -456,7 +456,7 @@ func (tp *Transport) read(p []byte) (n int, err error) { // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { - if time.Now().UnixNano() > tp.readDeadline.UnixNano() { + if time.Now().UnixNano() > tp.writeDeadline.UnixNano() { return 0, ErrDeadlineExceeded } From bf6a9350dc7d05fa896284378825a385a2db5884 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 17 Aug 2019 21:24:20 +0400 Subject: [PATCH 20/26] Refactor deadlines --- deadline.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++ transport.go | 77 +++++++++++++++------------------------------------- 2 files changed, 95 insertions(+), 55 deletions(-) create mode 100644 deadline.go diff --git a/deadline.go b/deadline.go new file mode 100644 index 0000000..ae2f0ec --- /dev/null +++ b/deadline.go @@ -0,0 +1,73 @@ +package dmsg + +import ( + "math" + "sync" + "sync/atomic" + "time" +) + +type deadline struct { + mu sync.Mutex + unixNano int64 + ch chan struct{} + modCh chan struct{} +} + +func newDeadline() *deadline { + return &deadline{ + unixNano: math.MaxInt64, + ch: make(chan struct{}), + modCh: make(chan struct{}, 1), + } +} + +func (d *deadline) Close() error { + close(d.ch) + d.mu.Lock() + close(d.modCh) + d.mu.Unlock() + + return nil +} + +func (d *deadline) SetDeadline(t time.Time) { + atomic.StoreInt64(&d.unixNano, t.UnixNano()) + d.mu.Lock() + d.modCh <- struct{}{} + d.mu.Unlock() +} + +func (d *deadline) Serve() error { + for { + now := time.Now().UnixNano() + deadline := d.UnixNano() + if now >= deadline { + select { + case d.ch <- struct{}{}: + case _, ok := <-d.modCh: + if !ok { + return nil + } + } + } else { + if _, ok := <-d.modCh; !ok { + return nil + } + } + } +} + +func (d *deadline) Exceeded() bool { + now := time.Now().UnixNano() + deadline := d.UnixNano() + return now >= deadline +} + +func (d *deadline) Chan() <-chan struct{} { + return d.ch +} + +func (d *deadline) UnixNano() int64 { + return atomic.LoadInt64(&d.unixNano) +} diff --git a/transport.go b/transport.go index b70c86c..a74d7e5 100644 --- a/transport.go +++ b/transport.go @@ -5,10 +5,8 @@ import ( "errors" "fmt" "io" - "math" "net" "sync" - "sync/atomic" "time" "github.com/skycoin/skycoin/src/util/logging" @@ -25,49 +23,6 @@ var ( ErrDeadlineExceeded = errors.New("deadline exceeded") ) -type deadline struct { - unixNano int64 - ch chan struct{} - modCh chan struct{} -} - -func (d *deadline) UnixNano() int64 { - return atomic.LoadInt64(&d.unixNano) -} - -func newDeadline() deadline { - return deadline{ - unixNano: math.MaxInt64, - ch: make(chan struct{}), - modCh: make(chan struct{}, 1), - } -} - -func (d *deadline) Serve() error { - for { - if rd := atomic.LoadInt64(&d.unixNano); time.Now().UnixNano() >= rd { - select { - case d.ch <- struct{}{}: - case _, ok := <-d.modCh: - if !ok { - return nil - } - } - } else { - if _, ok := <-d.modCh; !ok { - return nil - } - } - } -} - -func (d *deadline) Close() error { - close(d.ch) - close(d.modCh) - - return nil -} - // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). type Transport struct { net.Conn // underlying connection to dmsg.Server @@ -94,8 +49,8 @@ type Transport struct { doneOnce sync.Once // ensures 'done' only closes once doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client - readDeadline deadline - writeDeadline deadline + readDeadline *deadline + writeDeadline *deadline } // NewTransport creates a new dms_tp. @@ -168,6 +123,14 @@ func (tp *Transport) close() (closed bool) { tp.inMx.Lock() close(tp.inCh) tp.inMx.Unlock() + + if err := tp.readDeadline.Close(); err != nil { + tp.log.WithError(err).Warn("Failed to close read deadline") + } + + if err := tp.writeDeadline.Close(); err != nil { + tp.log.WithError(err).Warn("Failed to close write deadline") + } }) tp.serve() // just in case. @@ -404,7 +367,7 @@ func (tp *Transport) Serve() { // Read implements io.Reader func (tp *Transport) Read(p []byte) (int, error) { - if time.Now().UnixNano() > tp.readDeadline.UnixNano() { + if tp.readDeadline.Exceeded() { return 0, ErrDeadlineExceeded } @@ -419,7 +382,10 @@ func (tp *Transport) Read(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.readDeadline.ch: + case _, ok := <-tp.readDeadline.ch: + if !ok { + return 0, io.EOF + } return 0, ErrDeadlineExceeded } } @@ -456,7 +422,7 @@ func (tp *Transport) read(p []byte) (n int, err error) { // Write implements io.Writer func (tp *Transport) Write(p []byte) (int, error) { - if time.Now().UnixNano() > tp.writeDeadline.UnixNano() { + if tp.writeDeadline.Exceeded() { return 0, ErrDeadlineExceeded } @@ -471,7 +437,10 @@ func (tp *Transport) Write(p []byte) (int, error) { select { case <-done: return n, err - case <-tp.writeDeadline.ch: + case _, ok := <-tp.writeDeadline.ch: + if !ok { + return 0, io.EOF + } return 0, ErrDeadlineExceeded } } @@ -504,14 +473,12 @@ func (tp *Transport) SetDeadline(t time.Time) error { // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { - atomic.StoreInt64(&tp.readDeadline.unixNano, t.UnixNano()) - tp.readDeadline.modCh <- struct{}{} + tp.readDeadline.SetDeadline(t) return nil } // SetWriteDeadline sets write deadline for transport. func (tp *Transport) SetWriteDeadline(t time.Time) error { - atomic.StoreInt64(&tp.writeDeadline.unixNano, t.UnixNano()) - tp.writeDeadline.modCh <- struct{}{} + tp.writeDeadline.SetDeadline(t) return nil } From cc5acf912fff54e9eed26a05d824a091722955f1 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 17 Aug 2019 21:28:36 +0400 Subject: [PATCH 21/26] Fix a logic issue --- deadline.go | 10 ++-------- transport.go | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/deadline.go b/deadline.go index ae2f0ec..b787bde 100644 --- a/deadline.go +++ b/deadline.go @@ -40,9 +40,7 @@ func (d *deadline) SetDeadline(t time.Time) { func (d *deadline) Serve() error { for { - now := time.Now().UnixNano() - deadline := d.UnixNano() - if now >= deadline { + if d.Exceeded() { select { case d.ch <- struct{}{}: case _, ok := <-d.modCh: @@ -60,14 +58,10 @@ func (d *deadline) Serve() error { func (d *deadline) Exceeded() bool { now := time.Now().UnixNano() - deadline := d.UnixNano() + deadline := atomic.LoadInt64(&d.unixNano) return now >= deadline } func (d *deadline) Chan() <-chan struct{} { return d.ch } - -func (d *deadline) UnixNano() int64 { - return atomic.LoadInt64(&d.unixNano) -} diff --git a/transport.go b/transport.go index a74d7e5..a7660db 100644 --- a/transport.go +++ b/transport.go @@ -439,7 +439,7 @@ func (tp *Transport) Write(p []byte) (int, error) { return n, err case _, ok := <-tp.writeDeadline.ch: if !ok { - return 0, io.EOF + return 0, io.ErrClosedPipe } return 0, ErrDeadlineExceeded } From ade4248327fb0dae6790357bcb5625d3ec9228c0 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 17 Aug 2019 21:35:02 +0400 Subject: [PATCH 22/26] Get rid of an else statement --- deadline.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deadline.go b/deadline.go index b787bde..041cc2b 100644 --- a/deadline.go +++ b/deadline.go @@ -43,15 +43,15 @@ func (d *deadline) Serve() error { if d.Exceeded() { select { case d.ch <- struct{}{}: + continue case _, ok := <-d.modCh: if !ok { return nil } } - } else { - if _, ok := <-d.modCh; !ok { - return nil - } + } + if _, ok := <-d.modCh; !ok { + return nil } } } From b50c331b40381d5feb89f1ec07c4c573a29d3dec Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 19 Aug 2019 10:20:05 +0400 Subject: [PATCH 23/26] Fix concurrency bugs --- deadline.go | 30 +++++++++++++++++++++--------- transport.go | 7 +++---- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/deadline.go b/deadline.go index 041cc2b..1e956f0 100644 --- a/deadline.go +++ b/deadline.go @@ -8,10 +8,11 @@ import ( ) type deadline struct { - mu sync.Mutex - unixNano int64 - ch chan struct{} - modCh chan struct{} + mu sync.Mutex + unixNano int64 + ch chan struct{} + modCh chan struct{} + modChClosed uint32 } func newDeadline() *deadline { @@ -23,32 +24,43 @@ func newDeadline() *deadline { } func (d *deadline) Close() error { - close(d.ch) + atomic.StoreUint32(&d.modChClosed, 1) + d.mu.Lock() + defer d.mu.Unlock() + close(d.modCh) - d.mu.Unlock() return nil } -func (d *deadline) SetDeadline(t time.Time) { +func (d *deadline) SetDeadline(t time.Time) error { + if atomic.LoadUint32(&d.modChClosed) != 0 { + return ErrDeadlineExceeded + } + atomic.StoreInt64(&d.unixNano, t.UnixNano()) + d.mu.Lock() + defer d.mu.Unlock() + d.modCh <- struct{}{} - d.mu.Unlock() + + return nil } func (d *deadline) Serve() error { + defer close(d.ch) for { if d.Exceeded() { select { case d.ch <- struct{}{}: - continue case _, ok := <-d.modCh: if !ok { return nil } } + continue } if _, ok := <-d.modCh; !ok { return nil diff --git a/transport.go b/transport.go index a7660db..a22cec4 100644 --- a/transport.go +++ b/transport.go @@ -21,6 +21,7 @@ var ( ErrRequestCheckFailed = errors.New("failed to create transport: request check failed") ErrAcceptCheckFailed = errors.New("failed to create transport: accept check failed") ErrDeadlineExceeded = errors.New("deadline exceeded") + ErrDeadlineClosed = errors.New("failed to set deadline: deadline manager is closed") ) // Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). @@ -473,12 +474,10 @@ func (tp *Transport) SetDeadline(t time.Time) error { // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { - tp.readDeadline.SetDeadline(t) - return nil + return tp.readDeadline.SetDeadline(t) } // SetWriteDeadline sets write deadline for transport. func (tp *Transport) SetWriteDeadline(t time.Time) error { - tp.writeDeadline.SetDeadline(t) - return nil + return tp.writeDeadline.SetDeadline(t) } From 7583b23a22aa3eebae331495087942786f7792e7 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 19 Aug 2019 11:59:13 +0400 Subject: [PATCH 24/26] Add basic tests for deadlines --- deadline.go | 2 +- deadline_test.go | 106 +++++++++++++++++++++++++++++++++++++++++++++++ transport.go | 6 +-- 3 files changed, 110 insertions(+), 4 deletions(-) create mode 100644 deadline_test.go diff --git a/deadline.go b/deadline.go index 1e956f0..5578d93 100644 --- a/deadline.go +++ b/deadline.go @@ -34,7 +34,7 @@ func (d *deadline) Close() error { return nil } -func (d *deadline) SetDeadline(t time.Time) error { +func (d *deadline) Set(t time.Time) error { if atomic.LoadUint32(&d.modChClosed) != 0 { return ErrDeadlineExceeded } diff --git a/deadline_test.go b/deadline_test.go new file mode 100644 index 0000000..6bd94e5 --- /dev/null +++ b/deadline_test.go @@ -0,0 +1,106 @@ +package dmsg + +import ( + "math" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_newDeadline(t *testing.T) { + deadline := newDeadline() + require.NotNil(t, deadline) + + require.EqualValues(t, math.MaxInt64, deadline.unixNano) + require.EqualValues(t, 0, deadline.modChClosed) + + ch := deadline.ch + require.NotNil(t, ch) + + modCh := deadline.modCh + require.NotNil(t, modCh) +} + +func Test_deadline_SetDeadline(t *testing.T) { + deadline := newDeadline() + + var wg sync.WaitGroup + var serveErr error + wg.Add(1) + go func() { + defer wg.Done() + serveErr = deadline.Serve() + }() + + now := time.Now() + cases := []struct { + deadline time.Time + unixNano int64 + }{ + { + deadline: now.Add(1 * time.Second), + unixNano: now.Add(1 * time.Second).UnixNano(), + }, + { + deadline: now.Add(-1 * time.Second), + unixNano: now.Add(-1 * time.Second).UnixNano(), + }, + } + + for _, tc := range cases { + require.NoError(t, deadline.Set(tc.deadline)) + require.Equal(t, tc.unixNano, deadline.unixNano) + } + + require.NoError(t, deadline.Close()) + require.NotEqual(t, 0, deadline.modChClosed) + + wg.Wait() + require.NoError(t, serveErr) +} + +func Test_deadline_Chan(t *testing.T) { + deadline := newDeadline() + ch := deadline.Chan() + require.NotNil(t, ch) +} + +func Test_deadline_Exceeded(t *testing.T) { + deadline := newDeadline() + + var wg sync.WaitGroup + var serveErr error + wg.Add(1) + go func() { + defer wg.Done() + serveErr = deadline.Serve() + }() + + now := time.Now() + cases := []struct { + deadline time.Time + exceeded bool + }{ + { + deadline: now.Add(1 * time.Second), + exceeded: false, + }, + { + deadline: now.Add(-1 * time.Second), + exceeded: true, + }, + } + + for _, tc := range cases { + require.NoError(t, deadline.Set(tc.deadline)) + require.Equal(t, tc.exceeded, deadline.Exceeded()) + } + + require.NoError(t, deadline.Close()) + require.NotEqual(t, 0, deadline.modChClosed) + + wg.Wait() + require.NoError(t, serveErr) +} diff --git a/transport.go b/transport.go index a22cec4..66cc6f3 100644 --- a/transport.go +++ b/transport.go @@ -464,7 +464,7 @@ func (tp *Transport) write(p []byte) (int, error) { return len(p), nil } -// SetDeadline sets read and write deadlines for transport . +// SetDeadline sets read and write deadlines for transport. func (tp *Transport) SetDeadline(t time.Time) error { if err := tp.SetReadDeadline(t); err != nil { return err @@ -474,10 +474,10 @@ func (tp *Transport) SetDeadline(t time.Time) error { // SetReadDeadline sets read deadline for transport. func (tp *Transport) SetReadDeadline(t time.Time) error { - return tp.readDeadline.SetDeadline(t) + return tp.readDeadline.Set(t) } // SetWriteDeadline sets write deadline for transport. func (tp *Transport) SetWriteDeadline(t time.Time) error { - return tp.writeDeadline.SetDeadline(t) + return tp.writeDeadline.Set(t) } From fb1edee4e04471fbc2bbb4a3ec8b073335b623ca Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 20 Aug 2019 12:33:48 +0400 Subject: [PATCH 25/26] Fix naming and logic issues --- deadline.go | 25 ++++++++++++------------- deadline_test.go | 4 ++-- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/deadline.go b/deadline.go index 5578d93..5274496 100644 --- a/deadline.go +++ b/deadline.go @@ -1,6 +1,7 @@ package dmsg import ( + "io" "math" "sync" "sync/atomic" @@ -9,7 +10,7 @@ import ( type deadline struct { mu sync.Mutex - unixNano int64 + timestamp int64 ch chan struct{} modCh chan struct{} modChClosed uint32 @@ -17,33 +18,31 @@ type deadline struct { func newDeadline() *deadline { return &deadline{ - unixNano: math.MaxInt64, - ch: make(chan struct{}), - modCh: make(chan struct{}, 1), + timestamp: math.MaxInt64, + ch: make(chan struct{}), + modCh: make(chan struct{}, 1), } } func (d *deadline) Close() error { - atomic.StoreUint32(&d.modChClosed, 1) - d.mu.Lock() defer d.mu.Unlock() + atomic.StoreUint32(&d.modChClosed, 1) close(d.modCh) return nil } func (d *deadline) Set(t time.Time) error { - if atomic.LoadUint32(&d.modChClosed) != 0 { - return ErrDeadlineExceeded - } - - atomic.StoreInt64(&d.unixNano, t.UnixNano()) - d.mu.Lock() defer d.mu.Unlock() + if atomic.LoadUint32(&d.modChClosed) != 0 { + return io.ErrClosedPipe + } + + atomic.StoreInt64(&d.timestamp, t.UnixNano()) d.modCh <- struct{}{} return nil @@ -70,7 +69,7 @@ func (d *deadline) Serve() error { func (d *deadline) Exceeded() bool { now := time.Now().UnixNano() - deadline := atomic.LoadInt64(&d.unixNano) + deadline := atomic.LoadInt64(&d.timestamp) return now >= deadline } diff --git a/deadline_test.go b/deadline_test.go index 6bd94e5..afdf1bb 100644 --- a/deadline_test.go +++ b/deadline_test.go @@ -13,7 +13,7 @@ func Test_newDeadline(t *testing.T) { deadline := newDeadline() require.NotNil(t, deadline) - require.EqualValues(t, math.MaxInt64, deadline.unixNano) + require.EqualValues(t, math.MaxInt64, deadline.timestamp) require.EqualValues(t, 0, deadline.modChClosed) ch := deadline.ch @@ -51,7 +51,7 @@ func Test_deadline_SetDeadline(t *testing.T) { for _, tc := range cases { require.NoError(t, deadline.Set(tc.deadline)) - require.Equal(t, tc.unixNano, deadline.unixNano) + require.Equal(t, tc.unixNano, deadline.timestamp) } require.NoError(t, deadline.Close()) From 08ec36e2036392aea8833e14a09f6f1b7ac8abdb Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 20 Aug 2019 13:54:09 +0400 Subject: [PATCH 26/26] Add a concurrent test for deadlines --- deadline_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/deadline_test.go b/deadline_test.go index afdf1bb..8790ad5 100644 --- a/deadline_test.go +++ b/deadline_test.go @@ -104,3 +104,30 @@ func Test_deadline_Exceeded(t *testing.T) { wg.Wait() require.NoError(t, serveErr) } + +func Test_deadline_concurrency(t *testing.T) { + t.Run("Set and close", func(t *testing.T) { + deadline := newDeadline() + + var wg sync.WaitGroup + var serveErr error + wg.Add(1) + go func() { + defer wg.Done() + serveErr = deadline.Serve() + }() + + var setErr error + wg.Add(1) + go func() { + defer wg.Done() + setErr = deadline.Set(time.Now().Add(1 * time.Second)) + }() + + require.NoError(t, deadline.Close()) + + wg.Wait() + require.NoError(t, serveErr) + require.Error(t, setErr) + }) +}