From 8ec04aa33677a6183d107f945230e41e3bf9ec37 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 24 Aug 2023 16:53:20 +1000 Subject: [PATCH 1/4] fix: remove body validation from unclean termination --- pkg/internal/itest/http_fetch_test.go | 79 +++++++++------------------ 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 96a22e41..0dd419ae 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -154,15 +154,9 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - // 3 blocks max, start at the root and then two blocks into the sharded data - wantCids := []cid.Cid{ - srcData.Root, - srcData.SelfCids[0], - srcData.SelfCids[1], - } - validateCarBody(t, body, srcData.Root, wantCids, true) - }}, + // no validation, Go's body parser will fail on the unclean end and we're unlikely + // to have enough content quick enough to parse before it encounters the unclean end + // and returns nothing }, { name: "graphsync max block limit in request", @@ -176,15 +170,9 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - // 3 blocks max, start at the root and then two blocks into the sharded data - wantCids := []cid.Cid{ - srcData.Root, - srcData.SelfCids[0], - srcData.SelfCids[1], - } - validateCarBody(t, body, srcData.Root, wantCids, true) - }}, + // no validation, Go's body parser will fail on the unclean end and we're unlikely + // to have enough content quick enough to parse before it encounters the unclean end + // and returns nothing }, { name: "bitswap max block limit", @@ -197,15 +185,9 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - // 3 blocks max, start at the root and then two blocks into the sharded data - wantCids := []cid.Cid{ - srcData.Root, - srcData.SelfCids[0], - srcData.SelfCids[1], - } - validateCarBody(t, body, srcData.Root, wantCids, true) - }}, + // no validation, Go's body parser will fail on the unclean end and we're unlikely + // to have enough content quick enough to parse before it encounters the unclean end + // and returns nothing }, { name: "http max block limit", @@ -218,15 +200,9 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - // 3 blocks max, start at the root and then two blocks into the sharded data - wantCids := []cid.Cid{ - srcData.Root, - srcData.SelfCids[0], - srcData.SelfCids[1], - } - validateCarBody(t, body, srcData.Root, wantCids, true) - }}, + // no validation, Go's body parser will fail on the unclean end and we're unlikely + // to have enough content quick enough to parse before it encounters the unclean end + // and returns nothing }, { name: "bitswap block timeout from missing block", @@ -240,15 +216,9 @@ func TestHttpFetch(t *testing.T) { remotes[0].Blockstore().DeleteBlock(context.Background(), file.SelfCids[2]) return []unixfs.DirEntry{file} }, - validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - // 3 blocks max, start at the root and then two blocks into the sharded data - wantCids := []cid.Cid{ - srcData.Root, - srcData.SelfCids[0], - srcData.SelfCids[1], - } - validateCarBody(t, body, srcData.Root, wantCids, true) - }}, + // no validation, Go's body parser will fail on the unclean end and we're unlikely + // to have enough content quick enough to parse before it encounters the unclean end + // and returns nothing }, { name: "same content, http missing block, bitswap completes", @@ -1173,14 +1143,19 @@ func TestHttpFetch(t *testing.T) { carFiles = append(carFiles, dstf) } - if testCase.validateBodies != nil && testCase.validateBodies[i] != nil { - testCase.validateBodies[i](t, srcData[i], body) - } else { - // gotDir := CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true) - gotLsys := CarBytesLinkSystem(t, bytes.NewReader(body)) - gotDir := unixfs.ToDirEntry(t, gotLsys, srcData[i].Root, true) - unixfs.CompareDirEntries(t, srcData[i], gotDir) + if !testCase.expectUncleanEnd { + if testCase.validateBodies != nil && testCase.validateBodies[i] != nil { + testCase.validateBodies[i](t, srcData[i], body) + } else { + // gotDir := CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true) + gotLsys := CarBytesLinkSystem(t, bytes.NewReader(body)) + gotDir := unixfs.ToDirEntry(t, gotLsys, srcData[i].Root, true) + unixfs.CompareDirEntries(t, srcData[i], gotDir) + } } + // else don't bother trying to validate, we may or may not have got the body + // before the Go HTTP client decided the dirty end meant it was improper to + // provide any body. } } From 2c720469b66f2e76c84a262ed8ce2976bb98b316 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 5 Sep 2023 15:26:33 +1000 Subject: [PATCH 2/4] Revert "fix: remove body validation from unclean termination" This reverts commit 8ec04aa33677a6183d107f945230e41e3bf9ec37. --- pkg/internal/itest/http_fetch_test.go | 79 ++++++++++++++++++--------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 0dd419ae..96a22e41 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -154,9 +154,15 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - // no validation, Go's body parser will fail on the unclean end and we're unlikely - // to have enough content quick enough to parse before it encounters the unclean end - // and returns nothing + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, }, { name: "graphsync max block limit in request", @@ -170,9 +176,15 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - // no validation, Go's body parser will fail on the unclean end and we're unlikely - // to have enough content quick enough to parse before it encounters the unclean end - // and returns nothing + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, }, { name: "bitswap max block limit", @@ -185,9 +197,15 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - // no validation, Go's body parser will fail on the unclean end and we're unlikely - // to have enough content quick enough to parse before it encounters the unclean end - // and returns nothing + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, }, { name: "http max block limit", @@ -200,9 +218,15 @@ func TestHttpFetch(t *testing.T) { generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} }, - // no validation, Go's body parser will fail on the unclean end and we're unlikely - // to have enough content quick enough to parse before it encounters the unclean end - // and returns nothing + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, }, { name: "bitswap block timeout from missing block", @@ -216,9 +240,15 @@ func TestHttpFetch(t *testing.T) { remotes[0].Blockstore().DeleteBlock(context.Background(), file.SelfCids[2]) return []unixfs.DirEntry{file} }, - // no validation, Go's body parser will fail on the unclean end and we're unlikely - // to have enough content quick enough to parse before it encounters the unclean end - // and returns nothing + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, }, { name: "same content, http missing block, bitswap completes", @@ -1143,19 +1173,14 @@ func TestHttpFetch(t *testing.T) { carFiles = append(carFiles, dstf) } - if !testCase.expectUncleanEnd { - if testCase.validateBodies != nil && testCase.validateBodies[i] != nil { - testCase.validateBodies[i](t, srcData[i], body) - } else { - // gotDir := CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true) - gotLsys := CarBytesLinkSystem(t, bytes.NewReader(body)) - gotDir := unixfs.ToDirEntry(t, gotLsys, srcData[i].Root, true) - unixfs.CompareDirEntries(t, srcData[i], gotDir) - } + if testCase.validateBodies != nil && testCase.validateBodies[i] != nil { + testCase.validateBodies[i](t, srcData[i], body) + } else { + // gotDir := CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true) + gotLsys := CarBytesLinkSystem(t, bytes.NewReader(body)) + gotDir := unixfs.ToDirEntry(t, gotLsys, srcData[i].Root, true) + unixfs.CompareDirEntries(t, srcData[i], gotDir) } - // else don't bother trying to validate, we may or may not have got the body - // before the Go HTTP client decided the dirty end meant it was improper to - // provide any body. } } From 400e83695beb3e110c6421ab4d6df1ed873334c3 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 5 Sep 2023 15:26:10 +1000 Subject: [PATCH 3/4] fix: read in smaller increments when expecting an error --- pkg/internal/itest/http_fetch_test.go | 40 ++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 96a22e41..987935a8 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -1154,14 +1154,12 @@ func TestHttpFetch(t *testing.T) { require.NotEmpty(t, requestId) _, err := uuid.Parse(requestId) req.NoError(err) - body, err := io.ReadAll(resp.Body) - if !testCase.expectUncleanEnd { - req.NoError(err) - } else { - req.Error(err) + expectBodyReadError := "" + if testCase.expectUncleanEnd { + expectBodyReadError = "http: unexpected EOF reading trailer" } - err = resp.Body.Close() - req.NoError(err) + body := readAllBody(t, resp.Body, expectBodyReadError) + req.NoError(resp.Body.Close()) if DEBUG_DATA { t.Logf("Creating CAR %s in temp dir", fmt.Sprintf("%s_received%d.car", testCase.name, i)) @@ -1269,3 +1267,31 @@ func debugRemotes(t *testing.T, ctx context.Context, name string, remotes []test } return carFiles } + +func readAllBody(t *testing.T, r io.Reader, expectError string) []byte { + if expectError == "" { + body, err := io.ReadAll(r) + require.NoError(t, err) + return body + } + // expect an error, so let's creep up on it and collect as much of the body + // as we can before the error blocks us + // see readLocked() in src/net/http/transfer.go: + // → b.src.Read(p) + // → followed by b.readTrailer() which should error; we want to capture both + var buf bytes.Buffer + var byt [1]byte + var err error + var n int + for { + n, err = r.Read(byt[:]) + // record the bytes we read, the error should come after the normal body + // read and then it attempts to read trailers where it should fail + buf.Write(byt[:n]) + if err != nil { + require.EqualError(t, err, expectError) + break + } + } + return buf.Bytes() +} From e1aaca9985d7409aa1871fb8c342f3cddab2132b Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 5 Sep 2023 17:15:33 +1000 Subject: [PATCH 4/4] fix: read body asap --- pkg/internal/itest/http_fetch_test.go | 36 ++++++++++++++------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 987935a8..fc0a4f74 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -61,6 +61,11 @@ func TestHttpFetch(t *testing.T) { type queryModifier func(url.Values, []testpeer.TestPeer) type bodyValidator func(*testing.T, unixfs.DirEntry, []byte) type lassieOptsGen func(*testing.T, *mocknet.MockRetrievalNet) []lassie.LassieOption + type response struct { + StatusCode int + Header http.Header + Body []byte + } wrapPath := "/want2/want1/want0" testCases := []struct { @@ -1077,9 +1082,9 @@ func TestHttpFetch(t *testing.T) { } } - responseChans := make([]chan *http.Response, 0) + responseChans := make([]chan response, 0) for i := 0; i < len(srcData); i++ { - responseChan := make(chan *http.Response, 1) + responseChan := make(chan response, 1) responseChans = append(responseChans, responseChan) go func(i int) { // Make a request for our CID and read the complete CAR bytes @@ -1099,11 +1104,17 @@ func TestHttpFetch(t *testing.T) { t.Log("Fetching", getReq.URL.String()) resp, err := http.DefaultClient.Do(getReq) req.NoError(err) - responseChan <- resp + expectBodyReadError := "" + if testCase.expectUncleanEnd { + expectBodyReadError = "http: unexpected EOF reading trailer" + } + body := readAllBody(t, resp.Body, expectBodyReadError) + req.NoError(resp.Body.Close()) + responseChan <- response{StatusCode: resp.StatusCode, Header: resp.Header, Body: body} }(i) } - responses := make([]*http.Response, 0) + responses := make([]response, 0) for _, responseChan := range responseChans { select { case resp := <-responseChan: @@ -1133,9 +1144,7 @@ func TestHttpFetch(t *testing.T) { req.Equal(http.StatusUnauthorized, resp.StatusCode) } else { if resp.StatusCode != http.StatusOK { - body, err := io.ReadAll(resp.Body) - req.NoError(err) - req.Failf("200 response code not received", "got code: %d, body: %s", resp.StatusCode, string(body)) + req.Failf("200 response code not received", "got code: %d, body: %s", resp.StatusCode, string(resp.Body)) } req.Regexp(`^lassie/v\d+\.\d+\.\d+-\w+$`, resp.Header.Get("Server")) req.Equal(fmt.Sprintf(`attachment; filename="%s.car"`, srcData[i].Root.String()), resp.Header.Get("Content-Disposition")) @@ -1154,28 +1163,21 @@ func TestHttpFetch(t *testing.T) { require.NotEmpty(t, requestId) _, err := uuid.Parse(requestId) req.NoError(err) - expectBodyReadError := "" - if testCase.expectUncleanEnd { - expectBodyReadError = "http: unexpected EOF reading trailer" - } - body := readAllBody(t, resp.Body, expectBodyReadError) - req.NoError(resp.Body.Close()) if DEBUG_DATA { t.Logf("Creating CAR %s in temp dir", fmt.Sprintf("%s_received%d.car", testCase.name, i)) dstf, err := os.CreateTemp("", fmt.Sprintf("%s_received%d.car", testCase.name, i)) req.NoError(err) t.Logf("Writing received data to CAR @ %s", dstf.Name()) - _, err = dstf.Write(body) + _, err = dstf.Write(resp.Body) req.NoError(err) carFiles = append(carFiles, dstf) } if testCase.validateBodies != nil && testCase.validateBodies[i] != nil { - testCase.validateBodies[i](t, srcData[i], body) + testCase.validateBodies[i](t, srcData[i], resp.Body) } else { - // gotDir := CarToDirEntry(t, bytes.NewReader(body), srcData[i].Root, true) - gotLsys := CarBytesLinkSystem(t, bytes.NewReader(body)) + gotLsys := CarBytesLinkSystem(t, bytes.NewReader(resp.Body)) gotDir := unixfs.ToDirEntry(t, gotLsys, srcData[i].Root, true) unixfs.CompareDirEntries(t, srcData[i], gotDir) }