diff --git a/source/client-side-operations-timeout/tests/change-streams.json b/source/client-side-operations-timeout/tests/change-streams.json index 8cffb08e26..46b2d2725f 100644 --- a/source/client-side-operations-timeout/tests/change-streams.json +++ b/source/client-side-operations-timeout/tests/change-streams.json @@ -404,7 +404,7 @@ ] }, { - "description": "change stream can be iterated again if previous iteration times out", + "description": "change stream iteration succeeds after a timeout error", "operations": [ { "name": "createChangeStream", @@ -443,6 +443,26 @@ "isTimeoutError": true } }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "aggregate" + ], + "blockConnection": true, + "blockTimeMS": 150 + } + } + } + }, { "name": "iterateOnce", "object": "changeStream" @@ -496,21 +516,6 @@ } } } - }, - { - "commandStartedEvent": { - "commandName": "getMore", - "databaseName": "test", - "command": { - "getMore": { - "$$type": [ - "int", - "long" - ] - }, - "collection": "coll" - } - } } ] } diff --git a/source/client-side-operations-timeout/tests/change-streams.yml b/source/client-side-operations-timeout/tests/change-streams.yml index c813be035a..539500c918 100644 --- a/source/client-side-operations-timeout/tests/change-streams.yml +++ b/source/client-side-operations-timeout/tests/change-streams.yml @@ -84,7 +84,7 @@ tests: command: aggregate: *collectionName maxTimeMS: { $$type: ["int", "long"] } - + # If maxAwaitTimeMS is not set, timeoutMS should be refreshed for the getMore and the getMore should not have a # maxTimeMS field. This test requires a high timeout because the server applies a default 1000ms maxAwaitTime. To # ensure that the driver is refreshing the timeout between commands, the test blocks aggregate and getMore commands @@ -170,10 +170,8 @@ tests: collection: *collectionName maxTimeMS: 1 - # The timeout should be applied to the entire resume attempt, not individually to each command. The test creates a - # change stream with timeoutMS=200 which returns an empty initial batch and then sets a fail point to block both - # getMore and aggregate for 120ms each and fail with a resumable error. When the resume attempt happens, the getMore - # and aggregate block for longer than 200ms total, so it times out. + # If a resume is required for a next call on a change stream, the timeout MUST apply to the entirety of the initial + # getMore and all commands sent as part of the resume attempt. - description: "timeoutMS applies to full resume attempt in a next call" operations: - name: createChangeStream @@ -225,7 +223,14 @@ tests: aggregate: *collectionName maxTimeMS: { $$type: ["int", "long"] } - - description: "change stream can be iterated again if previous iteration times out" + # [resumption] If a next call fails with a timeout error, drivers MUST NOT + # invalidate the change stream. The subsequent next call MUST perform a resume + # attempt to establish a new change stream on the server. + # + # [refresh] If a resume is required for a next call on a change stream, the + # timeout MUST apply to the entirety of the initial getMore and all commands + # sent as part of the resume attempt. + - description: "change stream iteration succeeds after a timeout error" operations: - name: createChangeStream object: *collection @@ -248,14 +253,27 @@ tests: failCommands: ["getMore"] blockConnection: true blockTimeMS: 250 - # The original aggregate didn't return any events so this should do a getMore and return a timeout error. + # Iterate until timeout. - name: iterateUntilDocumentOrError object: *changeStream expectError: isTimeoutError: true - # The previous iteration attempt timed out so this should re-create the change stream. We use iterateOnce rather - # than iterateUntilDocumentOrError because there haven't been any events and we only want to assert that the - # cursor was re-created. + # Block the resume aggregate for 150ms. If the timeout were exhausted from + # the previous call, this would fail immediately. With a fresh 200ms + # timeout, the 150ms delay should be acceptable. + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["aggregate"] + blockConnection: true + blockTimeMS: 150 + # A final iteration should succeed because the timeout is refreshed and + # the change stream was not invalidated. - name: iterateOnce object: *changeStream expectEvents: @@ -267,27 +285,24 @@ tests: command: aggregate: *collectionName maxTimeMS: { $$type: ["int", "long"] } - # The iterateUntilDocumentOrError operation should send a getMore. + # The first iterateUntilDocumentOrError sends a getMore that times + # out. - commandStartedEvent: commandName: getMore databaseName: *databaseName command: getMore: { $$type: ["int", "long"] } collection: *collectionName - # The iterateOnce operation should re-create the cursor via an aggregate and then send a getMore to iterate - # the new cursor. + # The iterateOnce operation re-creates the cursor via an aggregate + # [resumption]. The aggregate is blocked for 150ms but succeeds + # because the timeout was refreshed to a fresh 200ms, proving the + # timeout is not shared with the previous timed-out call [refresh]. - commandStartedEvent: commandName: aggregate databaseName: *databaseName command: aggregate: *collectionName maxTimeMS: { $$type: ["int", "long"] } - - commandStartedEvent: - commandName: getMore - databaseName: *databaseName - command: - getMore: { $$type: ["int", "long"] } - collection: *collectionName # The timeoutMS value should be refreshed for getMore's. This is a failure test. The createChangeStream operation # sets timeoutMS=200 and the getMore blocks for 250ms, causing iteration to fail with a timeout error.