From 0b0ffc3254d4cc0b2bad01e9b2d73a6b5e637695 Mon Sep 17 00:00:00 2001 From: Jeff Yemin Date: Wed, 20 Jan 2021 09:31:28 -0500 Subject: [PATCH] Ensure that resumeToken is included on resume attempts (#634) JAVA-3871 --- .../operation/ChangeStreamBatchCursor.java | 22 +++++++++++++------ .../mongodb/operation/QueryBatchCursor.java | 1 + .../OperationFunctionalSpecification.groovy | 13 ++++++++++- .../mongodb/client/ChangeStreamProseTest.java | 6 +++-- 4 files changed, 32 insertions(+), 10 deletions(-) diff --git a/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java index 3f553893b71..16c373af866 100644 --- a/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/operation/ChangeStreamBatchCursor.java @@ -61,7 +61,11 @@ public boolean hasNext() { return resumeableOperation(new Function, Boolean>() { @Override public Boolean apply(final AggregateResponseBatchCursor queryBatchCursor) { - return queryBatchCursor.hasNext(); + try { + return queryBatchCursor.hasNext(); + } finally { + cachePostBatchResumeToken(queryBatchCursor); + } } }); } @@ -71,9 +75,11 @@ public List next() { return resumeableOperation(new Function, List>() { @Override public List apply(final AggregateResponseBatchCursor queryBatchCursor) { - List results = convertResults(queryBatchCursor.next()); - cachePostBatchResumeToken(queryBatchCursor); - return results; + try { + return convertResults(queryBatchCursor.next()); + } finally { + cachePostBatchResumeToken(queryBatchCursor); + } } }); } @@ -83,9 +89,11 @@ public List tryNext() { return resumeableOperation(new Function, List>() { @Override public List apply(final AggregateResponseBatchCursor queryBatchCursor) { - List results = convertResults(queryBatchCursor.tryNext()); - cachePostBatchResumeToken(queryBatchCursor); - return results; + try { + return convertResults(queryBatchCursor.tryNext()); + } finally { + cachePostBatchResumeToken(queryBatchCursor); + } } }); } diff --git a/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java index ba3efdf053e..895fe68110b 100644 --- a/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java @@ -99,6 +99,7 @@ class QueryBatchCursor implements AggregateResponseBatchCursor { this.decoder = notNull("decoder", decoder); if (result != null) { this.operationTime = result.getTimestamp(OPERATION_TIME, null); + this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result); } if (firstQueryResult.getCursor() != null) { notNull("connectionSource", connectionSource); diff --git a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy index 3defca55427..1d968ceebd9 100644 --- a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy @@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification { } def next(cursor, boolean async, int minimumCount) { + next(cursor, async, false, minimumCount) + } + + def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) { List retVal = [] while (retVal.size() < minimumCount) { - retVal.addAll(next(cursor, async)) + retVal.addAll(doNext(cursor, async, callHasNextBeforeNext)) } retVal } def next(cursor, boolean async) { + doNext(cursor, async, false) + } + + def doNext(cursor, boolean async, boolean callHasNextBeforeNext) { if (async) { def futureResultCallback = new FutureResultCallback>() cursor.next(futureResultCallback) futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS) } else { + if (callHasNextBeforeNext) { + cursor.hasNext() + } cursor.next() } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java index 17785a503ce..fb7da3bec27 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ChangeStreamProseTest.java @@ -394,8 +394,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore() // use reflection to access the postBatchResumeToken AggregateResponseBatchCursor batchCursor = getBatchCursor(cursor); - // check equality in the case where the batch has not been iterated at all - assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken()); + assertNotNull(batchCursor.getPostBatchResumeToken()); + + // resume token should be null before iteration + assertNull(cursor.getResumeToken()); cursor.next(); assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());