Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that resumeToken is included on resume attempts #634

Merged
merged 3 commits into from Jan 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -64,7 +64,11 @@ public boolean hasNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
@Override
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
return queryBatchCursor.hasNext();
try {
return queryBatchCursor.hasNext();
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -74,9 +78,11 @@ public List<T> next() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.next());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.next());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand All @@ -86,9 +92,11 @@ public List<T> tryNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
List<T> results = convertResults(queryBatchCursor.tryNext());
cachePostBatchResumeToken(queryBatchCursor);
return results;
try {
return convertResults(queryBatchCursor.tryNext());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
}
});
}
Expand Down
Expand Up @@ -100,6 +100,7 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
this.decoder = notNull("decoder", decoder);
if (result != null) {
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it matches AsyncQueryBatchCursor constructor

}
if (firstQueryResult.getCursor() != null) {
notNull("connectionSource", connectionSource);
Expand Down
Expand Up @@ -157,21 +157,32 @@ class OperationFunctionalSpecification extends Specification {
}

def next(cursor, boolean async, int minimumCount) {
next(cursor, async, false, minimumCount)
}

def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
List<BsonDocument> retVal = []

while (retVal.size() < minimumCount) {
retVal.addAll(next(cursor, async))
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
}

retVal
}

def next(cursor, boolean async) {
doNext(cursor, async, false)
}

def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
if (async) {
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
cursor.next(futureResultCallback)
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
} else {
if (callHasNextBeforeNext) {
cursor.hasNext()
}
cursor.next()
}
}
Expand Down
Expand Up @@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
setFailPoint(failPointDocument)

then:
def result = next(cursor, async, 2)
def result = next(cursor, async, callHasNext, 2)

then:
result.size() == 2
Expand All @@ -107,7 +107,10 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
waitForLastRelease(async ? getAsyncCluster() : getCluster())

where:
async << [true, false]
async | callHasNext
rozza marked this conversation as resolved.
Show resolved Hide resolved
true | false
false | false
false | true
}

//
Expand Down
Expand Up @@ -370,8 +370,10 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
// use reflection to access the postBatchResumeToken
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);

// check equality in the case where the batch has not been iterated at all
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
assertNotNull(batchCursor.getPostBatchResumeToken());

// resume token should be null before iteration
assertNull(cursor.getResumeToken());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was wrong. Before the bug fix, both the values were null. Afterwards, the postBatchResumeToken is set on the batch cursor but not the high-level cursor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you add a assertNotNull(batchCursor.getPostBatchResumeToken()) check as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


cursor.next();
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.Collection;
import java.util.List;

import static org.junit.Assume.assumeTrue;
import static util.JsonPoweredTestHelper.getTestDocument;
import static util.JsonPoweredTestHelper.getTestFiles;

Expand All @@ -51,8 +50,6 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr

@Before
public void setUp() {
// TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed
assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hurrah!

super.setUp();
}

Expand Down