Skip to content

Commit

Permalink
Revert "Ensure that resumeToken is included on resume attempts (mongo…
Browse files Browse the repository at this point in the history
…db#634)"

This reverts commit 93be82f.
  • Loading branch information
jfitzu committed Mar 3, 2021
1 parent 69e5f11 commit 132922f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 37 deletions.
Expand Up @@ -64,11 +64,7 @@ public boolean hasNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, Boolean>() {
@Override
public Boolean apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
try {
return queryBatchCursor.hasNext();
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
return queryBatchCursor.hasNext();
}
});
}
Expand All @@ -78,11 +74,9 @@ public List<T> next() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
try {
return convertResults(queryBatchCursor.next());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
List<T> results = convertResults(queryBatchCursor.next());
cachePostBatchResumeToken(queryBatchCursor);
return results;
}
});
}
Expand All @@ -92,11 +86,9 @@ public List<T> tryNext() {
return resumeableOperation(new Function<AggregateResponseBatchCursor<RawBsonDocument>, List<T>>() {
@Override
public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
try {
return convertResults(queryBatchCursor.tryNext());
} finally {
cachePostBatchResumeToken(queryBatchCursor);
}
List<T> results = convertResults(queryBatchCursor.tryNext());
cachePostBatchResumeToken(queryBatchCursor);
return results;
}
});
}
Expand Down
Expand Up @@ -100,7 +100,6 @@ class QueryBatchCursor<T> implements AggregateResponseBatchCursor<T> {
this.decoder = notNull("decoder", decoder);
if (result != null) {
this.operationTime = result.getTimestamp(OPERATION_TIME, null);
this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
}
if (firstQueryResult.getCursor() != null) {
notNull("connectionSource", connectionSource);
Expand Down
Expand Up @@ -157,32 +157,21 @@ class OperationFunctionalSpecification extends Specification {
}

def next(cursor, boolean async, int minimumCount) {
next(cursor, async, false, minimumCount)
}

def next(cursor, boolean async, boolean callHasNextBeforeNext, int minimumCount) {
List<BsonDocument> retVal = []

while (retVal.size() < minimumCount) {
retVal.addAll(doNext(cursor, async, callHasNextBeforeNext))
retVal.addAll(next(cursor, async))
}

retVal
}

def next(cursor, boolean async) {
doNext(cursor, async, false)
}

def doNext(cursor, boolean async, boolean callHasNextBeforeNext) {
if (async) {
def futureResultCallback = new FutureResultCallback<List<BsonDocument>>()
cursor.next(futureResultCallback)
futureResultCallback.get(TIMEOUT, TimeUnit.SECONDS)
} else {
if (callHasNextBeforeNext) {
cursor.hasNext()
}
cursor.next()
}
}
Expand Down
Expand Up @@ -96,7 +96,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
setFailPoint(failPointDocument)

then:
def result = next(cursor, async, callHasNext, 2)
def result = next(cursor, async, 2)

then:
result.size() == 2
Expand All @@ -107,10 +107,7 @@ class ChangeStreamOperationProseTestSpecification extends OperationFunctionalSpe
waitForLastRelease(async ? getAsyncCluster() : getCluster())

where:
async | callHasNext
true | false
false | false
false | true
async << [true, false]
}

//
Expand Down
Expand Up @@ -370,10 +370,8 @@ public void testGetResumeTokenReturnsPostBatchResumeTokenAfterGetMore()
// use reflection to access the postBatchResumeToken
AggregateResponseBatchCursor<?> batchCursor = getBatchCursor(cursor);

assertNotNull(batchCursor.getPostBatchResumeToken());

// resume token should be null before iteration
assertNull(cursor.getResumeToken());
// check equality in the case where the batch has not been iterated at all
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());

cursor.next();
assertEquals(cursor.getResumeToken(), batchCursor.getPostBatchResumeToken());
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.util.Collection;
import java.util.List;

import static org.junit.Assume.assumeTrue;
import static util.JsonPoweredTestHelper.getTestDocument;
import static util.JsonPoweredTestHelper.getTestFiles;

Expand All @@ -50,6 +51,8 @@ public UnifiedTestValidator(final String fileDescription, final String testDescr

@Before
public void setUp() {
// TODO: remove after https://jira.mongodb.org/browse/JAVA-3871 is fixed
assumeTrue(!(fileDescription.equals("poc-change-streams") && testDescription.equals("Test consecutive resume")));
super.setUp();
}

Expand Down

0 comments on commit 132922f

Please sign in to comment.