Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lazyClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ func (l *lazyClientConn[T]) Write(b []byte) (int, error) {
return l.con.Write(b)
}

// Close closes the underlying io.ReadWriteCloser
//
// This does not flush anything.
// Close closes the underlying io.ReadWriteCloser after finishing the handshake.
func (l *lazyClientConn[T]) Close() error {
// As the client, we flush the handshake on close to cover an
// interesting edge-case where the server only speaks a single protocol
Expand All @@ -147,6 +145,22 @@ func (l *lazyClientConn[T]) Close() error {
// closed the stream for reading. I mean, we're the initiator so that's
// strange... but it's still allowed
_ = l.Flush()

// Finish reading the handshake before we close the connection/stream. This
// is necessary so that the other side can finish sending its response to our
// multistream header before we tell it we are done reading.
//
// Example:
// We open a QUIC stream, write the protocol `/a`, send 1 byte of application
// data, and immediately close.
Comment on lines +153 to +155

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we hit this case when, for example, we open a stream and then reset it immediately due to an application error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would. But in that case, it's fine to not read the handshake message, as the other side may or may not receive the writes made before calling Reset.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused when this case happened in practice in our protocol, but I realized that Bitswap opens a streams, writes to it and closes it, potentially never reading handshake data.

Copy link
Member

@sukunrt sukunrt Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness, servers(the other end of the multistream here) can handle this situation with a fix like: #87 which ignores a reset on writing the multistream header, allowing the user of such a stream to Read all the data that's written.

//
// This can result in a single packet that contains the stream data along
// with a STOP_SENDING frame. The other side may be unable to negotiate
// multistream select since it can't write to the stream anymore and may
// drop the stream.
Comment on lines +157 to +160
Copy link
Member

@sukunrt sukunrt Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ask js and rust libp2p to fix this on their ends similar to what #87 does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First we probably want to define optimistic multistream select so that implementations can reference that: libp2p/specs#643

//
// Note: We currently handle this case in Go(https://github.com/multiformats/go-multistream/pull/87), but rust-libp2p does not.
l.rhandshakeOnce.Do(l.doReadHandshake)
Copy link

@Wondertan Wondertan Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the read handshake happened in both Flush and Write, but it was done asynchronously(loc 172 and 128), and the connection could be closed before readHandshake finishes.... Makes sense

I am really surprised that Kubo folk never triggered(or realized) this case. Bitswap works there in the exactly same way, without ever calling Read, and thus sometimes without reading handshake.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, the comment on Close states no flushing is happening, but it actually does above.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as lazy wrapper always Flushes on close, do we actually need another wrapper in basichost https://github.com/libp2p/go-libp2p/blob/7268c98442c34d26a5d87cd72e37c48e1ffe2e6c/p2p/host/basic/basic_host.go#L1151-L1161?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out. I've been meaning to remove it. I'm not sure why we need to wrap the whole thing at all.

Copy link
Member

@sukunrt sukunrt Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really surprised that Kubo folk never triggered(or realized) this case.

I guess it did at some point. Go clients handle this correctly with: #87

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, as lazy wrapper always Flushes on close, do we actually need another wrapper in basichost

I think yes because we want to flush on CloseWrite as well.

return l.con.Close()
}

Expand Down
83 changes: 83 additions & 0 deletions multistream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"sort"
"strings"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -801,6 +802,88 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) {
}
}

func newPair() (*chanPipe, *chanPipe) {
a := make(chan []byte, 16)
b := make(chan []byte, 16)
aReadClosed := atomic.Bool{}
bReadClosed := atomic.Bool{}
return &chanPipe{r: a, w: b, myReadClosed: &aReadClosed, peerReadClosed: &bReadClosed},
&chanPipe{r: b, w: a, myReadClosed: &bReadClosed, peerReadClosed: &aReadClosed}
}

type chanPipe struct {
r, w chan []byte
buf bytes.Buffer

myReadClosed *atomic.Bool
peerReadClosed *atomic.Bool
}

func (cp *chanPipe) Read(b []byte) (int, error) {
if cp.buf.Len() > 0 {
return cp.buf.Read(b)
}

buf, ok := <-cp.r
if !ok {
return 0, io.EOF
}

cp.buf.Write(buf)
return cp.buf.Read(b)
}

func (cp *chanPipe) Write(b []byte) (int, error) {
if cp.peerReadClosed.Load() {
panic("peer's read side closed")
}
copied := make([]byte, len(b))
copy(copied, b)
cp.w <- copied
return len(b), nil
}

func (cp *chanPipe) Close() error {
cp.myReadClosed.Store(true)
close(cp.w)
return nil
}

func TestReadHandshakeOnClose(t *testing.T) {
rw1, rw2 := newPair()

clientDone := make(chan struct{})
go func() {
l1 := NewMSSelect(rw1, "a")
_, _ = l1.Write([]byte("hello"))
_ = l1.Close()
close(clientDone)
}()

serverDone := make(chan error)

server := NewMultistreamMuxer[string]()
server.AddHandler("a", func(protocol string, rwc io.ReadWriteCloser) error {
_, err := io.ReadAll(rwc)
rwc.Close()
serverDone <- err
return nil
})

p, h, err := server.Negotiate(rw2)
if err != nil {
t.Fatal(err)
}

go h(p, rw2)

err = <-serverDone
if err != nil {
t.Fatal(err)
}
<-clientDone
}

type rwc struct {
*strings.Reader
}
Expand Down