Skip to content

Commit

Permalink
Support aggregation with $merge as a string (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyemin committed Jul 29, 2021
1 parent 81cc1a7 commit 2a32ac3
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 16 deletions.
Expand Up @@ -186,13 +186,19 @@ private MongoNamespace getOutNamespace() {
if (lastStageDocument.containsKey("$out")) {
return new MongoNamespace(namespace.getDatabaseName(), lastStageDocument.getString("$out").getValue());
} else if (lastStageDocument.containsKey("$merge")) {
BsonDocument mergeDocument = lastStageDocument.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
if (lastStageDocument.isString("$merge")) {
return new MongoNamespace(namespace.getDatabaseName(), lastStageDocument.getString("$merge").getValue());
} else if (lastStageDocument.isDocument("$merge")) {
BsonDocument mergeDocument = lastStageDocument.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Expand Up @@ -209,7 +209,7 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge'() {
def 'should build the expected AggregateToCollectionOperation for $merge document'() {
given:
def cursor = Stub(AsyncBatchCursor) {
next(_) >> {
Expand Down Expand Up @@ -358,6 +358,38 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge string'() {
given:
def cursor = Stub(AsyncBatchCursor) {
next(_) >> {
it[0].onResult(null, null)
}
}
def executor = new TestOperationExecutor([cursor, cursor, cursor, cursor, cursor, cursor, cursor]);
def collectionName = 'collectionName'
def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName)
def pipeline = [new Document('$match', 1), new Document('$merge', new Document('into', collectionName))]

when: 'aggregation includes $merge'
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
pipeline, AggregationLevel.COLLECTION, true)
.into([]) { result, t -> }

def operation = executor.getReadOperation() as WriteOperationThenCursorReadOperation

then:
expect operation.getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation(namespace,
[new BsonDocument('$match', new BsonInt32(1)),
new BsonDocument('$merge', new BsonDocument('into', new BsonString(collectionName)))],
readConcern, writeConcern))

when:
operation = operation.getReadOperation() as FindOperation

then:
operation.getNamespace() == collectionNamespace
}

def 'should handle exceptions correctly'() {
given:
def codecRegistry = fromProviders([new ValueCodecProvider(), new BsonValueCodecProvider()])
Expand Down
Expand Up @@ -187,13 +187,19 @@ private MongoNamespace getOutNamespace() {
if (lastStageDocument.containsKey("$out")) {
return new MongoNamespace(namespace.getDatabaseName(), lastStageDocument.getString("$out").getValue());
} else if (lastStageDocument.containsKey("$merge")) {
BsonDocument mergeDocument = lastStageDocument.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
if (lastStageDocument.isString("$merge")) {
return new MongoNamespace(namespace.getDatabaseName(), lastStageDocument.getString("$merge").getValue());
} else if (lastStageDocument.isDocument("$merge")) {
BsonDocument mergeDocument = lastStageDocument.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Expand Up @@ -195,7 +195,7 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge'() {
def 'should build the expected AggregateToCollectionOperation for $merge document'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
Expand Down Expand Up @@ -335,6 +335,30 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge string'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName)
def pipeline = [new BsonDocument('$match', new BsonDocument()), new BsonDocument('$merge', new BsonString(collectionName))]

when:
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
pipeline, AggregationLevel.COLLECTION, false)
.iterator()

def operation = executor.getWriteOperation() as AggregateToCollectionOperation

then:
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace, pipeline, readConcern, writeConcern,
AggregationLevel.COLLECTION))

when:
operation = executor.getReadOperation() as FindOperation<Document>

then:
operation.getNamespace() == collectionNamespace
}

def 'should use ClientSession for AggregationOperation'() {
given:
Expand Down

0 comments on commit 2a32ac3

Please sign in to comment.