Skip to content

Commit

Permalink
Ensure that resumeToken is included on resume attempts (mongodb#634)
Browse files Browse the repository at this point in the history
JAVA-3871
  • Loading branch information
jyemin authored and ispringer committed May 16, 2023
1 parent 44ab149 commit de5e890
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
Expand Up @@ -64,7 +64,11 @@ public boolean hasNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
@Override
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
return queryBatchCursor.hasNext();
try {
return queryBatchCursor.hasNext();
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -74,9 +78,11 @@ public List<T> next() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.next());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.next());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -86,9 +92,11 @@ public List<T> tryNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.tryNext());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.tryNext());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand Down
Expand Up @@ -100,6 +100,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
this.decoder = notNull("decoder", decoder);
if (result != null) {
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
}
if (firstQueryResult.getCursor() != null) {
notNull("connectionSource", connectionSource);
Expand Down
Expand Up @@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
}

def next(cursor, boolean async, int minimumCount) {
next(cursor, async, false, minimumCount)
}

def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
List<BsonDocument> retVal = []

while (retVal.size() < minimumCount) {
retVal.addAll(next(cursor, async))
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
}

retVal
}

def next(cursor, boolean async) {
doNext(cursor, async, false)
}

def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
if (async) {
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
cursor.next(futureResultCallback)
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
} else {
if (callHasNextBeforeNext) {
cursor.hasNext()
}
cursor.next()
}
}
Expand Down
Expand Up @@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
setFailPoint(failPointDocument)

then:
def result = next(cursor, async, 2)
def result = next(cursor, async, callHasNext, 2)

then:
result.size() == 2
Expand All @@ -107,7 +107,10 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
waitForLastRelease(async ? getAsyncCluster() : getCluster())

where:
async << [true, false]
async | callHasNext
true | false
false | false
false | true
}

//
Expand Down
Expand Up @@ -370,8 +370,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
// use reflection to access the postBatchResumeToken
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);

// check equality in the case where the batch has not been iterated at all
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
assertNotNull(batchCursor.getPostBatchResumeToken());

// resume token should be null before iteration
assertNull(cursor.getResumeToken());

cursor.next();
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.Collection;
import java.util.List;

import static org.junit.Assume.assumeTrue;
import static util.JsonPoweredTestHelper.getTestDocument;
import static util.JsonPoweredTestHelper.getTestFiles;

Expand All @@ -51,8 +50,6 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr

@Before
public void setUp() {
// TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed
assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume")));
super.setUp();
}

Expand Down

0 comments on commit de5e890

Please sign in to comment.