Skip to content

Commit

Permalink
Merge branch 'axon-4.6.x' into bug/2481
Browse files Browse the repository at this point in the history
  • Loading branch information
smcvb committed Nov 28, 2022
2 parents 2892e71 + 1823694 commit 2972eb2
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 38 deletions.
2 changes: 1 addition & 1 deletion coverage-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.axonframework</groupId>
<artifactId>axon</artifactId>
<version>4.6.2-SNAPSHOT</version>
<version>4.6.3-SNAPSHOT</version>
</parent>

<artifactId>axon-coverage-report</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ public interface DeadLetterJpaConverter<M extends EventMessage<?>> {
* Converts an {@link EventMessage} implementation to a {@link DeadLetterEventEntry}.
*
* @param message The message to convert.
* @param serializer The {@link Serializer} to use for serialization of payload and metadata.
* @param eventSerializer The {@link Serializer} for serialization of payload and metadata.
* @param genericSerializer The {@link Serializer} for serialization of the token, if present.
* @return The created {@link DeadLetterEventEntry}
*/
DeadLetterEventEntry convert(M message, Serializer serializer);
DeadLetterEventEntry convert(M message, Serializer eventSerializer, Serializer genericSerializer);

/**
* Converts a {@link DeadLetterEventEntry} to a {@link EventMessage} implementation.
*
* @param entry The database entry to convert to a {@link EventMessage}
* @param serializer The {@link Serializer} to use for deserialization of payload and metadata.
* @param eventSerializer The {@link Serializer} for deserialization of payload and metadata.
* @param genericSerializer The {@link Serializer} for deserialization of the token, if present.
* @return The created {@link DeadLetterEventEntry}
*/
M convert(DeadLetterEventEntry entry, Serializer serializer);
M convert(DeadLetterEventEntry entry, Serializer eventSerializer, Serializer genericSerializer);

/**
* Check whether this converter supports the given {@link DeadLetterEventEntry}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ public class EventMessageDeadLetterJpaConverter implements DeadLetterJpaConverte

@SuppressWarnings("rawtypes")
@Override
public DeadLetterEventEntry convert(EventMessage<?> message, Serializer serializer) {
public DeadLetterEventEntry convert(EventMessage<?> message, Serializer eventSerializer, Serializer genericSerializer) {
GenericEventMessage<?> eventMessage = (GenericEventMessage<?>) message;
Optional<TrackedEventMessage> trackedEventMessage = Optional.of(eventMessage).filter(
TrackedEventMessage.class::isInstance).map(TrackedEventMessage.class::cast);
Optional<DomainEventMessage> domainEventMessage = Optional.of(eventMessage).filter(
DomainEventMessage.class::isInstance).map(DomainEventMessage.class::cast);

SerializedObject<byte[]> serializedPayload = serializer.serialize(message.getPayload(), byte[].class);
SerializedObject<byte[]> serializedMetadata = serializer.serialize(message.getMetaData(), byte[].class);
Optional<SerializedObject<byte[]>> serializedToken = trackedEventMessage.map(m -> serializer.serialize(m.trackingToken(),
SerializedObject<byte[]> serializedPayload = eventSerializer.serialize(message.getPayload(), byte[].class);
SerializedObject<byte[]> serializedMetadata = eventSerializer.serialize(message.getMetaData(), byte[].class);
Optional<SerializedObject<byte[]>> serializedToken = trackedEventMessage.map(m -> genericSerializer.serialize(m.trackingToken(),
byte[].class));


Expand All @@ -79,14 +79,14 @@ public DeadLetterEventEntry convert(EventMessage<?> message, Serializer serializ
}

@Override
public EventMessage<?> convert(DeadLetterEventEntry entry, Serializer serializer) {
public EventMessage<?> convert(DeadLetterEventEntry entry, Serializer eventSerializer, Serializer genericSerializer) {
SerializedMessage<?> serializedMessage = new SerializedMessage<>(entry.getEventIdentifier(),
entry.getPayload(),
entry.getMetaData(),
serializer);
eventSerializer);
Supplier<Instant> timestampSupplier = () -> Instant.parse(entry.getTimeStamp());
if (entry.getTrackingToken() != null) {
TrackingToken trackingToken = serializer.deserialize(entry.getTrackingToken());
TrackingToken trackingToken = genericSerializer.deserialize(entry.getTrackingToken());
if (entry.getAggregateIdentifier() != null) {
return new GenericTrackedDomainEventMessage<>(trackingToken,
entry.getType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public class JpaSequencedDeadLetterQueue<M extends EventMessage<?>> implements S
private final int maxSequences;
private final int maxSequenceSize;
private final int queryPageSize;
private final Serializer serializer;
private final Serializer eventSerializer;
private final Serializer genericSerializer;
private final Duration claimDuration;

/**
Expand All @@ -101,7 +102,8 @@ protected <T extends EventMessage<?>> JpaSequencedDeadLetterQueue(Builder<T> bui
this.maxSequenceSize = builder.maxSequenceSize;
this.entityManagerProvider = builder.entityManagerProvider;
this.transactionManager = builder.transactionManager;
this.serializer = builder.serializer;
this.eventSerializer = builder.eventSerializer;
this.genericSerializer = builder.genericSerializer;
this.converters = builder.converters;
this.claimDuration = builder.claimDuration;
this.queryPageSize = builder.queryPageSize;
Expand Down Expand Up @@ -143,7 +145,7 @@ public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter<? ex
.stream()
.filter(c -> c.canConvert(letter.message()))
.findFirst()
.map(c -> c.convert(letter.message(), serializer))
.map(c -> c.convert(letter.message(), eventSerializer, genericSerializer))
.orElseThrow(() -> new NoJpaConverterFoundException(
String.format("No converter found for message of type: [%s]",
letter.message().getClass().getName()))
Expand All @@ -159,7 +161,7 @@ public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter<? ex
letter.lastTouched(),
letter.cause().orElse(null),
letter.diagnostics(),
serializer);
eventSerializer);
logger.info("Storing DeadLetter (id: [{}]) for sequence [{}] with index [{}] in processing group [{}].",
deadLetter.getDeadLetterId(),
stringSequenceIdentifier,
Expand Down Expand Up @@ -215,7 +217,7 @@ public void requeue(@Nonnull DeadLetter<? extends M> letter,
if (letterEntity == null) {
throw new NoSuchDeadLetterException(String.format("Can not find dead letter with id [%s] to requeue.", id));
}
letterEntity.setDiagnostics(updatedLetter.diagnostics(), serializer);
letterEntity.setDiagnostics(updatedLetter.diagnostics(), eventSerializer);
letterEntity.setLastTouched(updatedLetter.lastTouched());
letterEntity.setCause(updatedLetter.cause().orElse(null));
letterEntity.clearProcessingStarted();
Expand Down Expand Up @@ -296,10 +298,10 @@ private JpaDeadLetter<M> toLetter(DeadLetterEntry entry) {
.orElseThrow(() -> new NoJpaConverterFoundException(String.format(
"No converter found to convert message of class [%s].",
entry.getMessage().getMessageType())));
MetaData deserializedDiagnostics = serializer.deserialize(entry.getDiagnostics());
MetaData deserializedDiagnostics = eventSerializer.deserialize(entry.getDiagnostics());
return new JpaDeadLetter<>(entry,
deserializedDiagnostics,
converter.convert(entry.getMessage(), serializer));
converter.convert(entry.getMessage(), eventSerializer, genericSerializer));
}

@Override
Expand Down Expand Up @@ -600,7 +602,8 @@ public static class Builder<T extends EventMessage<?>> {
private int queryPageSize = 100;
private EntityManagerProvider entityManagerProvider;
private TransactionManager transactionManager;
private Serializer serializer;
private Serializer eventSerializer;
private Serializer genericSerializer;
private Duration claimDuration = Duration.ofSeconds(30);

public Builder() {
Expand Down Expand Up @@ -682,15 +685,42 @@ public Builder<T> transactionManager(TransactionManager transactionManager) {
}

/**
* Sets the {@link Serializer} to deserialize the events, metadata and diagnostics of the {@link DeadLetter}
* when storing it to a database.
* Sets the {@link Serializer} to (de)serialize the event payload,
* event metadata, tracking token, and diagnostics of the {@link DeadLetter} when storing it to the database.
*
* @param serializer The serializer to use
* @return the current Builder instance, for fluent interfacing
*/
public Builder<T> serializer(Serializer serializer) {
assertNonNull(serializer, "The serializer may not be null");
this.serializer = serializer;
this.eventSerializer = serializer;
this.genericSerializer = serializer;
return this;
}

/**
* Sets the {@link Serializer} to (de)serialize the event payload,
* event metadata, and diagnostics of the {@link DeadLetter} when storing it to the database.
*
* @param serializer The serializer to use
* @return the current Builder instance, for fluent interfacing
*/
public Builder<T> eventSerializer(Serializer serializer) {
assertNonNull(serializer, "The eventSerializer may not be null");
this.eventSerializer = serializer;
return this;
}

/**
* Sets the {@link Serializer} to (de)serialize the tracking token of the event in
* the {@link DeadLetter} when storing it to the database.
*
* @param serializer The serializer to use
* @return the current Builder instance, for fluent interfacing
*/
public Builder<T> genericSerializer(Serializer serializer) {
assertNonNull(serializer, "The genericSerializer may not be null");
this.genericSerializer = serializer;
return this;
}

Expand Down Expand Up @@ -769,8 +799,10 @@ protected void validate() {
"Must supply a TransactionManager when constructing a JpaSequencedDeadLetterQueue");
assertNonNull(entityManagerProvider,
"Must supply a EntityManagerProvider when constructing a JpaSequencedDeadLetterQueue");
assertNonNull(serializer,
"Must supply a Serializer when constructing a JpaSequencedDeadLetterQueue");
assertNonNull(eventSerializer,
"Must supply an eventSerializer when constructing a JpaSequencedDeadLetterQueue");
assertNonNull(genericSerializer,
"Must supply an genericSerializer when constructing a JpaSequencedDeadLetterQueue");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class EventMessageDeadLetterJpaConverterTest {

private static final String PAYLOAD_REVISION = "23.0";
private final EventMessageDeadLetterJpaConverter converter = new EventMessageDeadLetterJpaConverter();
private final Serializer serializer = TestSerializer.JACKSON.getSerializer();
private final Serializer eventSerializer = TestSerializer.JACKSON.getSerializer();
private final Serializer genericSerializer = TestSerializer.XSTREAM.getSerializer();
private final ConverterTestEvent event = new ConverterTestEvent("myValue");
private final MetaData metaData = MetaData.from(Collections.singletonMap("myMetadataKey", "myMetadataValue"));

Expand Down Expand Up @@ -95,12 +96,12 @@ void canConvertTrackedEventMessageWithGapAwareTokenAndBackCorrectly() {

private void testConversion(EventMessage<?> message) {
assertTrue(converter.canConvert(message));
DeadLetterEventEntry deadLetterEventEntry = converter.convert(message, serializer);
DeadLetterEventEntry deadLetterEventEntry = converter.convert(message, eventSerializer, genericSerializer);

assertCorrectlyMapped(message, deadLetterEventEntry);
assertTrue(converter.canConvert(deadLetterEventEntry));

EventMessage<?> restoredEventMessage = converter.convert(deadLetterEventEntry, serializer);
EventMessage<?> restoredEventMessage = converter.convert(deadLetterEventEntry, eventSerializer, genericSerializer);
assertCorrectlyRestored(message, restoredEventMessage);
}

Expand Down Expand Up @@ -134,10 +135,10 @@ private void assertCorrectlyMapped(EventMessage<?> eventMessage, DeadLetterEvent
assertEquals(eventMessage.getPayload().getClass().getName(),
deadLetterEventEntry.getPayload().getType().getName());
assertEquals(PAYLOAD_REVISION, deadLetterEventEntry.getPayload().getType().getRevision());
assertEquals(serializer.serialize(event, String.class).getData(),
assertEquals(eventSerializer.serialize(event, String.class).getData(),
new String(deadLetterEventEntry.getPayload().getData()));
assertEquals(MetaData.class.getName(), deadLetterEventEntry.getMetaData().getType().getName());
assertEquals(serializer.serialize(metaData, String.class).getData(),
assertEquals(eventSerializer.serialize(metaData, String.class).getData(),
new String(deadLetterEventEntry.getMetaData().getData()));

if (eventMessage instanceof DomainEventMessage) {
Expand All @@ -154,7 +155,7 @@ private void assertCorrectlyMapped(EventMessage<?> eventMessage, DeadLetterEvent
TrackedEventMessage<?> trackedEventMessage = (TrackedEventMessage<?>) eventMessage;
assertEquals(trackedEventMessage.trackingToken().getClass().getName(),
deadLetterEventEntry.getTrackingToken().getType().getName());
assertEquals(serializer.serialize(trackedEventMessage.trackingToken(), String.class).getData(),
assertEquals(genericSerializer.serialize(trackedEventMessage.trackingToken(), String.class).getData(),
new String(deadLetterEventEntry.getTrackingToken().getData()));
} else {
assertNull(deadLetterEventEntry.getTrackingToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public SequencedDeadLetterQueue<EventMessage<?>> buildTestSubject() {
.maxSequences(MAX_SEQUENCES_AND_SEQUENCE_SIZE)
.maxSequenceSize(MAX_SEQUENCES_AND_SEQUENCE_SIZE)
.processingGroup("my_processing_group")
.serializer(TestSerializer.JACKSON.getSerializer())
.eventSerializer(TestSerializer.JACKSON.getSerializer())
.genericSerializer(TestSerializer.XSTREAM.getSerializer())
.build();
}

Expand Down
12 changes: 6 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@

<slf4j.version>2.0.3</slf4j.version>
<log4j.version>2.18.0</log4j.version>
<spring.version>5.3.23</spring.version>
<spring.version>5.3.24</spring.version>
<spring-security.version>5.7.3</spring-security.version>
<spring.boot.version>2.7.5</spring.boot.version>
<spring.boot.version>2.7.6</spring.boot.version>
<mockito.version>4.8.1</mockito.version>
<projectreactor.version>3.4.24</projectreactor.version>
<micrometer.version>1.9.5</micrometer.version>
<projectreactor.version>3.4.25</projectreactor.version>
<micrometer.version>1.9.6</micrometer.version>
<dropwizard.metrics.version>4.2.12</dropwizard.metrics.version>
<jackson.version>2.13.4</jackson.version>

Expand All @@ -104,7 +104,7 @@
<c3p0.version>0.9.1.2</c3p0.version>
<hsqldb.version>2.5.2</hsqldb.version>
<hibernate-core.version>5.6.12.Final</hibernate-core.version>
<byte-buddy.version>1.12.18</byte-buddy.version>
<byte-buddy.version>1.12.19</byte-buddy.version>
<findbugs-jsr305.version>3.0.2</findbugs-jsr305.version>
<commons-io.version>2.11.0</commons-io.version>
<javassist.version>3.29.2-GA</javassist.version>
Expand All @@ -115,7 +115,7 @@
<junit.jupiter.version>5.9.1</junit.jupiter.version>
<axonserver-connector-java.version>4.6.3</axonserver-connector-java.version>
<hamcrest.version>2.2</hamcrest.version>
<testcontainers.version>1.17.5</testcontainers.version>
<testcontainers.version>1.17.6</testcontainers.version>
<xstream.version>1.4.19</xstream.version>
<reactive.streams.spec.version>1.0.4</reactive.streams.spec.version>
<!-- plugin versions -->
Expand Down

0 comments on commit 2972eb2

Please sign in to comment.