Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aggregation with $merge as a string #768

Merged
merged 1 commit into from Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -214,13 +214,19 @@ private MongoNamespace getOutNamespace() {
+ "is not a string or namespace document");
}
} else if (lastPipelineStage.containsKey("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue());
if (lastPipelineStage.isString("$merge")) {
return new MongoNamespace(databaseName, lastPipelineStage.getString("$merge").getValue());
} else if (lastPipelineStage.isDocument("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Expand Up @@ -215,9 +215,9 @@ void shouldBuildTheExpectedOperationsForDollarOutAsDocument() {
assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation());
}

@DisplayName("Should build the expected AggregateOperation for $merge")
@DisplayName("Should build the expected AggregateOperation for $merge document")
@Test
void shouldBuildTheExpectedOperationsForDollarMerge() {
void shouldBuildTheExpectedOperationsForDollarMergeDocument() {
String collectionName = "collectionName";
List<BsonDocument> pipeline = asList(BsonDocument.parse("{'$match': 1}"),
BsonDocument.parse(format("{'$merge': {into: '%s'}}", collectionName)));
Expand Down Expand Up @@ -293,6 +293,38 @@ void shouldBuildTheExpectedOperationsForDollarMerge() {
assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation());
}

@DisplayName("Should build the expected AggregateOperation for $merge string")
@Test
void shouldBuildTheExpectedOperationsForDollarMergeString() {
String collectionName = "collectionName";
MongoNamespace collectionNamespace = new MongoNamespace(NAMESPACE.getDatabaseName(), collectionName);
List<BsonDocument> pipeline = asList(BsonDocument.parse("{'$match': 1}"),
BsonDocument.parse(format("{'$merge': '%s'}", collectionName)));

TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor(), getBatchCursor(), getBatchCursor(), null));
AggregatePublisher<Document> publisher =
new AggregatePublisherImpl<>(null, createMongoOperationPublisher(executor), pipeline, AggregationLevel.COLLECTION);

AggregateToCollectionOperation expectedOperation = new AggregateToCollectionOperation(NAMESPACE, pipeline,
ReadConcern.DEFAULT,
WriteConcern.ACKNOWLEDGED);

// default input should be as expected
Flux.from(publisher).blockFirst();

WriteOperationThenCursorReadOperation operation = (WriteOperationThenCursorReadOperation) executor.getReadOperation();
assertEquals(ReadPreference.primary(), executor.getReadPreference());
assertOperationIsTheSameAs(expectedOperation, operation.getAggregateToCollectionOperation());

FindOperation<Document> expectedFindOperation =
new FindOperation<>(collectionNamespace, getDefaultCodecRegistry().get(Document.class))
.filter(new BsonDocument())
.batchSize(Integer.MAX_VALUE)
.retryReads(true);

assertOperationIsTheSameAs(expectedFindOperation, operation.getReadOperation());
}

@DisplayName("Should handle error scenarios")
@Test
void shouldHandleErrorScenarios() {
Expand Down
Expand Up @@ -237,13 +237,19 @@ private MongoNamespace getOutNamespace() {
+ "is not a string or namespace document");
}
} else if (lastPipelineStage.containsKey("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
if (lastPipelineStage.isString("$merge")) {
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$merge").getValue());
} else if (lastPipelineStage.isDocument("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Expand Up @@ -194,7 +194,7 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge'() {
def 'should build the expected AggregateToCollectionOperation for $merge document'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
Expand Down Expand Up @@ -331,6 +331,31 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge string'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName)
def pipeline = [new BsonDocument('$match', new BsonDocument()), new BsonDocument('$merge', new BsonString(collectionName))]

when:
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
pipeline, AggregationLevel.COLLECTION, false)
.iterator()

def operation = executor.getWriteOperation() as AggregateToCollectionOperation

then:
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace, pipeline, readConcern, writeConcern,
AggregationLevel.COLLECTION))

when:
operation = executor.getReadOperation() as FindOperation<Document>

then:
operation.getNamespace() == collectionNamespace
}

def 'should build the expected AggregateToCollectionOperation for $out as a document'() {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
Expand Down