diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index 698eab552f..4a92c68e87 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -36,21 +36,29 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi * @since 2.1 */ public class ChangeStreamEvent { @SuppressWarnings("rawtypes") // - private static final AtomicReferenceFieldUpdater CONVERTED_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ChangeStreamEvent.class, Object.class, "converted"); + private static final AtomicReferenceFieldUpdater CONVERTED_FULL_DOCUMENT_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocument"); + + @SuppressWarnings("rawtypes") // + private static final AtomicReferenceFieldUpdater CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocumentBeforeChange"); private final @Nullable ChangeStreamDocument raw; private final Class targetType; private final MongoConverter converter; - // accessed through CONVERTED_UPDATER. - private volatile @Nullable T converted; + // accessed through CONVERTED_FULL_DOCUMENT_UPDATER. + private volatile @Nullable T convertedFullDocument; + + // accessed through CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER. + private volatile @Nullable T convertedFullDocumentBeforeChange; /** * @param raw can be {@literal null}. @@ -147,27 +155,36 @@ public String getCollectionName() { @Nullable public T getBody() { - if (raw == null) { + if (raw == null || raw.getFullDocument() == null) { return null; } - Document fullDocument = raw.getFullDocument(); + return getConvertedFullDocument(raw.getFullDocument()); + } + + @Nullable + public T getBodyBeforeChange() { - if (fullDocument == null) { - return targetType.cast(fullDocument); + if (raw == null || raw.getFullDocumentBeforeChange() == null) { + return null; } - return getConverted(fullDocument); + return getConvertedFullDocumentBeforeChange(raw.getFullDocumentBeforeChange()); + } + + @SuppressWarnings("unchecked") + private T getConvertedFullDocumentBeforeChange(Document fullDocument) { + return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER); } @SuppressWarnings("unchecked") - private T getConverted(Document fullDocument) { - return (T) doGetConverted(fullDocument); + private T getConvertedFullDocument(Document fullDocument) { + return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_UPDATER); } - private Object doGetConverted(Document fullDocument) { + private Object doGetConverted(Document fullDocument, AtomicReferenceFieldUpdater updater) { - Object result = CONVERTED_UPDATER.get(this); + Object result = updater.get(this); if (result != null) { return result; @@ -176,13 +193,13 @@ private Object doGetConverted(Document fullDocument) { if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) { result = converter.read(targetType, fullDocument); - return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this); + return updater.compareAndSet(this, null, result) ? result : updater.get(this); } if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) { result = converter.getConversionService().convert(fullDocument, targetType); - return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this); + return updater.compareAndSet(this, null, result) ? result : updater.get(this); } throw new IllegalArgumentException( diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java index b6bf35ae78..3e5ba7a0d4 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Optional; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -40,6 +41,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi * @since 2.1 */ public class ChangeStreamOptions { @@ -47,6 +49,7 @@ public class ChangeStreamOptions { private @Nullable Object filter; private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; + private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; private Resume resume = Resume.UNDEFINED; @@ -74,6 +77,13 @@ public Optional getFullDocumentLookup() { return Optional.ofNullable(fullDocumentLookup); } + /** + * @return {@link Optional#empty()} if not set. + */ + public Optional getFullDocumentBeforeChangeLookup() { + return Optional.ofNullable(fullDocumentBeforeChangeLookup); + } + /** * @return {@link Optional#empty()} if not set. */ @@ -170,6 +180,9 @@ public boolean equals(Object o) { if (!ObjectUtils.nullSafeEquals(this.fullDocumentLookup, that.fullDocumentLookup)) { return false; } + if (!ObjectUtils.nullSafeEquals(this.fullDocumentBeforeChangeLookup, that.fullDocumentBeforeChangeLookup)) { + return false; + } if (!ObjectUtils.nullSafeEquals(this.collation, that.collation)) { return false; } @@ -184,6 +197,7 @@ public int hashCode() { int result = ObjectUtils.nullSafeHashCode(filter); result = 31 * result + ObjectUtils.nullSafeHashCode(resumeToken); result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup); + result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup); result = 31 * result + ObjectUtils.nullSafeHashCode(collation); result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp); result = 31 * result + ObjectUtils.nullSafeHashCode(resume); @@ -220,6 +234,7 @@ public static class ChangeStreamOptionsBuilder { private @Nullable Object filter; private @Nullable BsonValue resumeToken; private @Nullable FullDocument fullDocumentLookup; + private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup; private @Nullable Collation collation; private @Nullable Object resumeTimestamp; private Resume resume = Resume.UNDEFINED; @@ -322,6 +337,20 @@ public ChangeStreamOptionsBuilder fullDocumentLookup(FullDocument lookup) { return this; } + /** + * Set the {@link FullDocumentBeforeChange} lookup to use. + * + * @param lookup must not be {@literal null}. + * @return this. + */ + public ChangeStreamOptionsBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) { + + Assert.notNull(lookup, "Lookup must not be null"); + + this.fullDocumentBeforeChangeLookup = lookup; + return this; + } + /** * Set the cluster time to resume from. * @@ -391,6 +420,7 @@ public ChangeStreamOptions build() { options.filter = this.filter; options.resumeToken = this.resumeToken; options.fullDocumentLookup = this.fullDocumentLookup; + options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup; options.collation = this.collation; options.resumeTimestamp = this.resumeTimestamp; options.resume = this.resume; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java index 6b740fc136..c9e0e065c3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.Instant; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonValue; import org.bson.Document; import org.springframework.data.mongodb.core.ChangeStreamOptions; @@ -90,6 +91,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi * @since 2.1 */ public class ChangeStreamRequest @@ -425,6 +427,20 @@ public ChangeStreamRequestBuilder fullDocumentLookup(FullDocument lookup) { return this; } + /** + * @return this. + * @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) (FullDocumentBeforeChange) + * @see ChangeStreamOptions#getFullDocumentBeforeChangeLookup() + * @see ChangeStreamOptionsBuilder#fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) + */ + public ChangeStreamRequestBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) { + + Assert.notNull(lookup, "FullDocumentBeforeChange not be null"); + + this.delegate.fullDocumentBeforeChangeLookup(lookup); + return this; + } + /** * Set the cursors maximum wait time on the server (for a new Document to be emitted). * diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index beea250f3a..7ea4da2b45 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import org.bson.BsonDocument; import org.bson.BsonTimestamp; import org.bson.BsonValue; @@ -58,6 +59,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi * @since 2.1 */ class ChangeStreamTask extends CursorReadingTask, Object> { @@ -86,6 +88,7 @@ protected MongoCursor> initCursor(MongoTemplate t Collation collation = null; FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP; + FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT; BsonTimestamp startAt = null; boolean resumeAfter = true; @@ -113,6 +116,9 @@ protected MongoCursor> initCursor(MongoTemplate t .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); + fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup() + .orElse(FullDocumentBeforeChange.DEFAULT); + startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null); } @@ -152,6 +158,7 @@ protected MongoCursor> initCursor(MongoTemplate t } iterable = iterable.fullDocument(fullDocument); + iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange); return iterable.iterator(); } @@ -230,6 +237,12 @@ public T getBody() { return delegate.getBody(); } + @Nullable + @Override + public T getBodyBeforeChange() { + return delegate.getBodyBeforeChange(); + } + @Override public MessageProperties getProperties() { return this.messageProperties; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java index 00ca875567..8749a7609b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/Message.java @@ -31,6 +31,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi * @see MessageProperties * @since 2.1 */ @@ -52,6 +53,16 @@ public interface Message { @Nullable T getBody(); + /** + * The converted message body before change if available. + * + * @return can be {@literal null}. + */ + @Nullable + default T getBodyBeforeChange() { + return null; + } + /** * {@link MessageProperties} containing information about the {@link Message} origin and other metadata. * diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java index a61ff161bf..08e771d979 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java @@ -42,6 +42,7 @@ /** * @author Christoph Strobl + * @author Myroslav Kosinskyi */ @ExtendWith(MockitoExtension.class) class ChangeStreamTaskUnitTests { @@ -67,6 +68,8 @@ void setUp() { when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable); when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable); + + when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable); } @Test // DATAMONGO-2258 diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java index c8b3b161cc..eda6f48344 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java @@ -21,6 +21,9 @@ import static org.springframework.data.mongodb.core.query.Criteria.*; import static org.springframework.data.mongodb.core.query.Query.*; +import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -47,7 +50,6 @@ import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions; import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage; import org.springframework.data.mongodb.core.messaging.Message.MessageProperties; -import org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion; @@ -67,6 +69,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Myroslav Kosinskyi */ @ExtendWith({ MongoTemplateExtension.class }) @EnableIfReplicaSetAvailable @@ -538,6 +541,194 @@ void filterOnUpdateDescriptionElement() throws InterruptedException { assertThat(messageBodies).hasSize(2); } + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailable() throws InterruptedException { + + createUserCollectionWithChangeStreamPreAndPostImagesEnabled(); + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentLookup(FullDocument.WHEN_AVAILABLE) // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE) // + .maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly); + + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isEqualTo(jellyBelly); + assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8)); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredRequired() throws InterruptedException { + + createUserCollectionWithChangeStreamPreAndPostImagesEnabled(); + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentLookup(FullDocument.WHEN_AVAILABLE) // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) // + .maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly); + + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isEqualTo(jellyBelly); + assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8)); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionIsNotDeclared() throws InterruptedException { + + createUserCollectionWithChangeStreamPreAndPostImagesEnabled(); + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredDefault() throws InterruptedException { + + createUserCollectionWithChangeStreamPreAndPostImagesEnabled(); + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.DEFAULT).maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredOff() throws InterruptedException { + + createUserCollectionWithChangeStreamPreAndPostImagesEnabled(); + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.OFF).maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isGreaterThanEqual = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailableAndChangeStreamPreAndPostImagesDisabled() + throws InterruptedException { + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE).maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); + } + + @Test // issue/41087 + @EnableIfMongoServerVersion(isLessThan = "6.0") + void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLessThan6() throws InterruptedException { + + CollectingMessageListener, User> messageListener = new CollectingMessageListener<>(); + ChangeStreamRequest request = ChangeStreamRequest.builder() // + .collection("user") // + .fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).maxAwaitTime(Duration.ofMillis(10)) // + .publishTo(messageListener).build(); + + Subscription subscription = container.register(request, User.class); + awaitSubscription(subscription); + + template.save(jellyBelly); + + template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first(); + + awaitMessages(messageListener, 2); + + assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull(); + assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull(); + } + + private void createUserCollectionWithChangeStreamPreAndPostImagesEnabled() { + CreateCollectionOptions createCollectionOptions = new CreateCollectionOptions(); + createCollectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); + template.getDb().createCollection("user", createCollectionOptions); + } + @Data static class User { @@ -546,6 +737,15 @@ static class User { int age; Address address; + + User withAge(int age) { + User user = new User(); + user.id = id; + user.userName = userName; + user.age = age; + + return user; + } } @Data