From c21bf7c5cdf4d31ebdecede393c48b5f42a147b3 Mon Sep 17 00:00:00 2001 From: Jeff Yemin Date: Tue, 27 Jul 2021 12:51:36 -0400 Subject: [PATCH] Support aggregation with $merge as a string JAVA-4258 --- .../internal/AggregatePublisherImpl.java | 20 +++++++---- .../internal/AggregatePublisherImplTest.java | 36 +++++++++++++++++-- .../internal/AggregateIterableImpl.java | 20 +++++++---- .../AggregateIterableSpecification.groovy | 27 +++++++++++++- 4 files changed, 86 insertions(+), 17 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java index 44aac976231..a3cea551c36 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/AggregatePublisherImpl.java @@ -214,13 +214,19 @@ private MongoNamespace getOutNamespace() { + "is not a string or namespace document"); } } else if (lastPipelineStage.containsKey("$merge")) { - BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge"); - if (mergeDocument.isDocument("into")) { - BsonDocument intoDocument = mergeDocument.getDocument("into"); - return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(), - intoDocument.getString("coll").getValue()); - } else if (mergeDocument.isString("into")) { - return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue()); + if (lastPipelineStage.isString("$merge")) { + return new MongoNamespace(databaseName, lastPipelineStage.getString("$merge").getValue()); + } else if (lastPipelineStage.isDocument("$merge")) { + BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge"); + if (mergeDocument.isDocument("into")) { + BsonDocument intoDocument = mergeDocument.getDocument("into"); + return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(), + intoDocument.getString("coll").getValue()); + } else if (mergeDocument.isString("into")) { + return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue()); + } + } else { + throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document"); } } diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/AggregatePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/AggregatePublisherImplTest.java index 9388d08a7cf..8cd2f668945 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/AggregatePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/AggregatePublisherImplTest.java @@ -215,9 +215,9 @@ void shouldBuildTheExpectedOperationsForDollarOutAsDocument() { assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation()); } - @DisplayName("Should build the expected AggregateOperation for $merge") + @DisplayName("Should build the expected AggregateOperation for $merge document") @Test - void shouldBuildTheExpectedOperationsForDollarMerge() { + void shouldBuildTheExpectedOperationsForDollarMergeDocument() { String collectionName = "collectionName"; List pipeline = asList(BsonDocument.parse("{'$match': 1}"), BsonDocument.parse(format("{'$merge': {into: '%s'}}", collectionName))); @@ -293,6 +293,38 @@ void shouldBuildTheExpectedOperationsForDollarMerge() { assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation()); } + @DisplayName("Should build the expected AggregateOperation for $merge string") + @Test + void shouldBuildTheExpectedOperationsForDollarMergeString() { + String collectionName = "collectionName"; + MongoNamespace collectionNamespace = new MongoNamespace(NAMESPACE.getDatabaseName(), collectionName); + List pipeline = asList(BsonDocument.parse("{'$match': 1}"), + BsonDocument.parse(format("{'$merge': '%s'}", collectionName))); + + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor(), getBatchCursor(), getBatchCursor(), null)); + AggregatePublisher publisher = + new AggregatePublisherImpl<>(null, createMongoOperationPublisher(executor), pipeline, AggregationLevel.COLLECTION); + + AggregateToCollectionOperation expectedOperation = new AggregateToCollectionOperation(NAMESPACE, pipeline, + ReadConcern.DEFAULT, + WriteConcern.ACKNOWLEDGED); + + // default input should be as expected + Flux.from(publisher).blockFirst(); + + WriteOperationThenCursorReadOperation operation = (WriteOperationThenCursorReadOperation) executor.getReadOperation(); + assertEquals(ReadPreference.primary(), executor.getReadPreference()); + assertOperationIsTheSameAs(expectedOperation, operation.getAggregateToCollectionOperation()); + + FindOperation expectedFindOperation = + new FindOperation<>(collectionNamespace, getDefaultCodecRegistry().get(Document.class)) + .filter(new BsonDocument()) + .batchSize(Integer.MAX_VALUE) + .retryReads(true); + + assertOperationIsTheSameAs(expectedFindOperation, operation.getReadOperation()); + } + @DisplayName("Should handle error scenarios") @Test void shouldHandleErrorScenarios() { diff --git a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java index 81ca62be18a..5e590f1f62c 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java @@ -237,13 +237,19 @@ private MongoNamespace getOutNamespace() { + "is not a string or namespace document"); } } else if (lastPipelineStage.containsKey("$merge")) { - BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge"); - if (mergeDocument.isDocument("into")) { - BsonDocument intoDocument = mergeDocument.getDocument("into"); - return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(), - intoDocument.getString("coll").getValue()); - } else if (mergeDocument.isString("into")) { - return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue()); + if (lastPipelineStage.isString("$merge")) { + return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$merge").getValue()); + } else if (lastPipelineStage.isDocument("$merge")) { + BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge"); + if (mergeDocument.isDocument("into")) { + BsonDocument intoDocument = mergeDocument.getDocument("into"); + return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(), + intoDocument.getString("coll").getValue()); + } else if (mergeDocument.isString("into")) { + return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue()); + } + } else { + throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document"); } } diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy index 8311362c053..6454b25e2b8 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy @@ -194,7 +194,7 @@ class AggregateIterableSpecification extends Specification { .comment('this is a comment')) } - def 'should build the expected AggregateToCollectionOperation for $merge'() { + def 'should build the expected AggregateToCollectionOperation for $merge document'() { given: def executor = new TestOperationExecutor([null, null, null, null, null, null, null]) def collectionName = 'collectionName' @@ -331,6 +331,31 @@ class AggregateIterableSpecification extends Specification { .comment('this is a comment')) } + def 'should build the expected AggregateToCollectionOperation for $merge string'() { + given: + def executor = new TestOperationExecutor([null, null, null, null, null, null, null]) + def collectionName = 'collectionName' + def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName) + def pipeline = [new BsonDocument('$match', new BsonDocument()), new BsonDocument('$merge', new BsonString(collectionName))] + + when: + new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor, + pipeline, AggregationLevel.COLLECTION, false) + .iterator() + + def operation = executor.getWriteOperation() as AggregateToCollectionOperation + + then: + expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace, pipeline, readConcern, writeConcern, + AggregationLevel.COLLECTION)) + + when: + operation = executor.getReadOperation() as FindOperation + + then: + operation.getNamespace() == collectionNamespace + } + def 'should build the expected AggregateToCollectionOperation for $out as a document'() { given: def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]