Skip to content

Commit

Permalink
Add fullDocumentBeforeChange support for change streams.
Browse files Browse the repository at this point in the history
Closes: #4187
Original Pull Request: #4193
  • Loading branch information
kufd authored and christophstrobl committed Oct 10, 2022
1 parent a572580 commit aa35aae
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 16 deletions.
Expand Up @@ -36,21 +36,29 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamEvent<T> {

@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "converted");
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocument");

@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocumentBeforeChange");

private final @Nullable ChangeStreamDocument<Document> raw;

private final Class<T> targetType;
private final MongoConverter converter;

// accessed through CONVERTED_UPDATER.
private volatile @Nullable T converted;
// accessed through CONVERTED_FULL_DOCUMENT_UPDATER.
private volatile @Nullable T convertedFullDocument;

// accessed through CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER.
private volatile @Nullable T convertedFullDocumentBeforeChange;

/**
* @param raw can be {@literal null}.
Expand Down Expand Up @@ -147,27 +155,36 @@ public String getCollectionName() {
@Nullable
public T getBody() {

if (raw == null) {
if (raw == null || raw.getFullDocument() == null) {
return null;
}

Document fullDocument = raw.getFullDocument();
return getConvertedFullDocument(raw.getFullDocument());
}

@Nullable
public T getBodyBeforeChange() {

if (fullDocument == null) {
return targetType.cast(fullDocument);
if (raw == null || raw.getFullDocumentBeforeChange() == null) {
return null;
}

return getConverted(fullDocument);
return getConvertedFullDocumentBeforeChange(raw.getFullDocumentBeforeChange());
}

@SuppressWarnings("unchecked")
private T getConvertedFullDocumentBeforeChange(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER);
}

@SuppressWarnings("unchecked")
private T getConverted(Document fullDocument) {
return (T) doGetConverted(fullDocument);
private T getConvertedFullDocument(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_UPDATER);
}

private Object doGetConverted(Document fullDocument) {
private Object doGetConverted(Document fullDocument, AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> updater) {

Object result = CONVERTED_UPDATER.get(this);
Object result = updater.get(this);

if (result != null) {
return result;
Expand All @@ -176,13 +193,13 @@ private Object doGetConverted(Document fullDocument) {
if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) {

result = converter.read(targetType, fullDocument);
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}

if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) {

result = converter.getConversionService().convert(fullDocument, targetType);
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}

throw new IllegalArgumentException(
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Optional;

import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
Expand All @@ -40,13 +41,15 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamOptions {

private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private Resume resume = Resume.UNDEFINED;
Expand Down Expand Up @@ -74,6 +77,13 @@ public Optional<FullDocument> getFullDocumentLookup() {
return Optional.ofNullable(fullDocumentLookup);
}

/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<FullDocumentBeforeChange> getFullDocumentBeforeChangeLookup() {
return Optional.ofNullable(fullDocumentBeforeChangeLookup);
}

/**
* @return {@link Optional#empty()} if not set.
*/
Expand Down Expand Up @@ -170,6 +180,9 @@ public boolean equals(Object o) {
if (!ObjectUtils.nullSafeEquals(this.fullDocumentLookup, that.fullDocumentLookup)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.fullDocumentBeforeChangeLookup, that.fullDocumentBeforeChangeLookup)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.collation, that.collation)) {
return false;
}
Expand All @@ -184,6 +197,7 @@ public int hashCode() {
int result = ObjectUtils.nullSafeHashCode(filter);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeToken);
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
Expand Down Expand Up @@ -220,6 +234,7 @@ public static class ChangeStreamOptionsBuilder {
private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private Resume resume = Resume.UNDEFINED;
Expand Down Expand Up @@ -322,6 +337,20 @@ public ChangeStreamOptionsBuilder fullDocumentLookup(FullDocument lookup) {
return this;
}

/**
* Set the {@link FullDocumentBeforeChange} lookup to use.
*
* @param lookup must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {

Assert.notNull(lookup, "Lookup must not be null");

this.fullDocumentBeforeChangeLookup = lookup;
return this;
}

/**
* Set the cluster time to resume from.
*
Expand Down Expand Up @@ -391,6 +420,7 @@ public ChangeStreamOptions build() {
options.filter = this.filter;
options.resumeToken = this.resumeToken;
options.fullDocumentLookup = this.fullDocumentLookup;
options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
options.collation = this.collation;
options.resumeTimestamp = this.resumeTimestamp;
options.resume = this.resume;
Expand Down
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.time.Instant;

import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
Expand Down Expand Up @@ -90,6 +91,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamRequest<T>
Expand Down Expand Up @@ -425,6 +427,20 @@ public ChangeStreamRequestBuilder<T> fullDocumentLookup(FullDocument lookup) {
return this;
}

/**
* @return this.
* @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) (FullDocumentBeforeChange)
* @see ChangeStreamOptions#getFullDocumentBeforeChangeLookup()
* @see ChangeStreamOptionsBuilder#fullDocumentBeforeChangeLookup(FullDocumentBeforeChange)
*/
public ChangeStreamRequestBuilder<T> fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {

Assert.notNull(lookup, "FullDocumentBeforeChange not be null");

this.delegate.fullDocumentBeforeChangeLookup(lookup);
return this;
}

/**
* Set the cursors maximum wait time on the server (for a new Document to be emitted).
*
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
Expand Down Expand Up @@ -58,6 +59,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
Expand Down Expand Up @@ -86,6 +88,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
Collation collation = null;
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
BsonTimestamp startAt = null;
boolean resumeAfter = true;

Expand Down Expand Up @@ -113,6 +116,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);

fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup()
.orElse(FullDocumentBeforeChange.DEFAULT);

startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}

Expand Down Expand Up @@ -152,6 +158,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
}

iterable = iterable.fullDocument(fullDocument);
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);

return iterable.iterator();
}
Expand Down Expand Up @@ -230,6 +237,12 @@ public T getBody() {
return delegate.getBody();
}

@Nullable
@Override
public T getBodyBeforeChange() {
return delegate.getBodyBeforeChange();
}

@Override
public MessageProperties getProperties() {
return this.messageProperties;
Expand Down
Expand Up @@ -31,6 +31,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @see MessageProperties
* @since 2.1
*/
Expand All @@ -52,6 +53,16 @@ public interface Message<S, T> {
@Nullable
T getBody();

/**
* The converted message body before change if available.
*
* @return can be {@literal null}.
*/
@Nullable
default T getBodyBeforeChange() {
return null;
}

/**
* {@link MessageProperties} containing information about the {@link Message} origin and other metadata.
*
Expand Down
Expand Up @@ -42,6 +42,7 @@

/**
* @author Christoph Strobl
* @author Myroslav Kosinskyi
*/
@ExtendWith(MockitoExtension.class)
class ChangeStreamTaskUnitTests {
Expand All @@ -67,6 +68,8 @@ void setUp() {
when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);

when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);

when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);
}

@Test // DATAMONGO-2258
Expand Down

0 comments on commit aa35aae

Please sign in to comment.