From 132922f98cb2be93e391b8ce62950d449e6dc0b3 Mon Sep 17 00:00:00 2001 From: Jennifer Fitzhugh <74557543+jfitzu@users.noreply.github.com> Date: Wed, 3 Mar 2021 16:13:36 -0700 Subject: [PATCH] Revert "Ensure that resumeToken is included on resume attempts (#634)" This reverts commit 93be82fc284474324b8144464dd71ebc5ddd42ad. --- .../operation/ChangeStreamBatchCursor.java | 22 ++++++------------- .../internal/operation/QueryBatchCursor.java | 1 - .../OperationFunctionalSpecification.groovy | 13 +---------- ...reamOperationProseTestSpecification.groovy | 7 ++---- .../mongodb/client/ChangeStreamProseTest.java | 6 ++--- .../client/unified/UnifiedTestValidator.java | 3 +++ 6 files changed, 15 insertions(+), 37 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 8a76b8eb732..af302afde06 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -64,11 +64,7 @@ public boolean hasNext() { return resumeableOperation(new Function, Boolean>() { @Override public Boolean apply(final AggregateResponseBatchCursor queryBatchCursor) { - try { - return queryBatchCursor.hasNext(); - } finally { - cachePostBatchResumeToken(queryBatchCursor); - } + return queryBatchCursor.hasNext(); } }); } @@ -78,11 +74,9 @@ public List next() { return resumeableOperation(new Function, List>() { @Override public List apply(final AggregateResponseBatchCursor queryBatchCursor) { - try { - return convertResults(queryBatchCursor.next()); - } finally { - cachePostBatchResumeToken(queryBatchCursor); - } + List results = convertResults(queryBatchCursor.next()); + cachePostBatchResumeToken(queryBatchCursor); + return results; } }); } @@ -92,11 +86,9 @@ public List tryNext() { return resumeableOperation(new Function, List>() { @Override public List apply(final AggregateResponseBatchCursor queryBatchCursor) { - try { - return convertResults(queryBatchCursor.tryNext()); - } finally { - cachePostBatchResumeToken(queryBatchCursor); - } + List results = convertResults(queryBatchCursor.tryNext()); + cachePostBatchResumeToken(queryBatchCursor); + return results; } }); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java index 23379e4d579..451364b70c2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java @@ -100,7 +100,6 @@ class QueryBatchCursor implements AggregateResponseBatchCursor { this.decoder = notNull("decoder", decoder); if (result != null) { this.operationTime = result.getTimestamp(OPERATION_TIME, null); - this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result); } if (firstQueryResult.getCursor() != null) { notNull("connectionSource", connectionSource); diff --git a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy index a79f81b0ff7..e1fd5373b60 100644 --- a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy @@ -157,32 +157,21 @@ class OperationFunctionalSpecification extends Specification { } def next(cursor, boolean async, int minimumCount) { - next(cursor, async, false, minimumCount) - } - - def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) { List retVal = [] while (retVal.size() < minimumCount) { - retVal.addAll(doNext(cursor, async, callHasNextBeforeNext)) + retVal.addAll(next(cursor, async)) } retVal } def next(cursor, boolean async) { - doNext(cursor, async, false) - } - - def doNext(cursor, boolean async, boolean callHasNextBeforeNext) { if (async) { def futureResultCallback = new FutureResultCallback>() cursor.next(futureResultCallback) futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS) } else { - if (callHasNextBeforeNext) { - cursor.hasNext() - } cursor.next() } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy index 778d3b4e46c..dac2d6633a9 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/ChangeStreamOperationProseTestSpecification.groovy @@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe setFailPoint(failPointDocument) then: - def result = next(cursor, async, callHasNext, 2) + def result = next(cursor, async, 2) then: result.size() == 2 @@ -107,10 +107,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe waitForLastRelease(async ? getAsyncCluster() : getCluster()) where: - async | callHasNext - true | false - false | false - false | true + async << [true, false] } // diff --git a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java index dca1cdf019b..c372ba556e5 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java @@ -370,10 +370,8 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore() // use reflection to access the postBatchResumeToken AggregateResponseBatchCursor batchCursor = getBatchCursor(cursor); - assertNotNull(batchCursor.getPostBatchResumeToken()); - - // resume token should be null before iteration - assertNull(cursor.getResumeToken()); + // check equality in the case where the batch has not been iterated at all + assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken()); cursor.next(); assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken()); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java index c01d18c8af0..3ecd6ef79e9 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestValidator.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.List; +import static org.junit.Assume.assumeTrue; import static util.JsonPoweredTestHelper.getTestDocument; import static util.JsonPoweredTestHelper.getTestFiles; @@ -50,6 +51,8 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr @Before public void setUp() { + // TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed + assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume"))); super.setUp(); }