Skip to content

Commit

Permalink
Merge pull request #139 from AxonFramework/feature/49
Browse files Browse the repository at this point in the history
[#49] Implement `retrieveStorageIdentifier` for MongoTokenStore
  • Loading branch information
smcvb committed Oct 25, 2021
2 parents 1aefa3b + 6f4893c commit b9e402b
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,10 +28,12 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.serialization.Serializer;
Expand All @@ -47,8 +49,11 @@
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -72,6 +77,9 @@ public class MongoTokenStore implements TokenStore {
private final static Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private final static Clock clock = Clock.systemUTC();

private static final String CONFIG_TOKEN_ID = "__config";
private static final int CONFIG_SEGMENT = 0;

private final MongoTemplate mongoTemplate;
private final Serializer serializer;
private final TemporalAmount claimTimeout;
Expand All @@ -93,7 +101,7 @@ protected MongoTokenStore(Builder builder) {
this.claimTimeout = builder.claimTimeout;
this.nodeId = builder.nodeId;
this.contentType = builder.contentType;
if(builder.ensureIndexes){
if (builder.ensureIndexes) {
ensureIndexes();
}
}
Expand Down Expand Up @@ -282,6 +290,40 @@ public int[] fetchSegments(String processorName) {
return ints;
}

@Override
public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdentifierException {
try {
return Optional.of(getConfig()).map(configToken -> configToken.get("id"));
} catch (Exception e) {
throw new UnableToRetrieveIdentifierException(
"Exception occurred while trying to establish storage identifier", e
);
}
}

private ConfigToken getConfig() {
Document document = mongoTemplate.trackingTokensCollection()
.find(and(
eq("processorName", CONFIG_TOKEN_ID),
eq("segment", CONFIG_SEGMENT)
))
.first();
AbstractTokenEntry<?> token;

if (Objects.isNull(document)) {
token = new GenericTokenEntry<>(
new ConfigToken(Collections.singletonMap("id", UUID.randomUUID().toString())),
serializer, contentType, CONFIG_TOKEN_ID, CONFIG_SEGMENT
);
mongoTemplate.trackingTokensCollection()
.insertOne(tokenEntryToDocument(token));
} else {
token = documentToTokenEntry(document);
}

return (ConfigToken) token.getToken(serializer);
}

/**
* Creates a filter that allows you to retrieve a claimable token entry with a given processor name and segment.
*
Expand Down Expand Up @@ -351,8 +393,8 @@ public void ensureIndexes() {
* Builder class to instantiate a {@link MongoTokenStore}.
* <p>
* The {@code claimTimeout} is defaulted to a 10 seconds duration (by using {@link Duration#ofSeconds(long)}, {@code
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output, the {@code contentType}
* to a {@code byte[]} {@link Class}, and the {@code ensureIndexes} to {@code true}. The {@link MongoTemplate} and
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output, the {@code contentType} to
* a {@code byte[]} {@link Class}, and the {@code ensureIndexes} to {@code true}. The {@link MongoTemplate} and
* {@link Serializer} are <b>hard requirements</b> and as such should be provided.
*/
public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,12 +19,12 @@
import com.mongodb.client.FindIterable;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.extensions.mongo.utils.TestSerializer;
import org.axonframework.modelling.saga.AssociationValue;
import org.axonframework.modelling.saga.AssociationValues;
import org.axonframework.modelling.saga.AssociationValuesImpl;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.*;
Expand Down Expand Up @@ -62,6 +62,7 @@ void setUp() {
mongoTemplate.sagaCollection().drop();
testSubject = MongoSagaStore.builder()
.mongoTemplate(mongoTemplate)
.serializer(TestSerializer.xStreamSerializer())
.build();
}

Expand Down Expand Up @@ -201,7 +202,7 @@ void testLoadSaga_AssociationValueRemoved() {
MyTestSaga saga = new MyTestSaga();
AssociationValue associationValue = new AssociationValue("key", "value");
SagaEntry<MyTestSaga> testSagaEntry = new SagaEntry<>(
identifier, saga, singleton(associationValue), XStreamSerializer.defaultSerializer()
identifier, saga, singleton(associationValue), TestSerializer.xStreamSerializer()
);
mongoTemplate.sagaCollection().insertOne(testSagaEntry.asDocument());

Expand All @@ -217,7 +218,7 @@ identifier, saga, singleton(associationValue), XStreamSerializer.defaultSerializ
void testSaveSaga() {
String identifier = UUID.randomUUID().toString();
MyTestSaga saga = new MyTestSaga();
Serializer serializer = XStreamSerializer.defaultSerializer();
Serializer serializer = TestSerializer.xStreamSerializer();
mongoTemplate.sagaCollection()
.insertOne(new SagaEntry<>(identifier, saga, emptySet(), serializer).asDocument());
SagaStore.Entry<MyTestSaga> loaded = testSubject.loadSaga(MyTestSaga.class, identifier);
Expand All @@ -243,7 +244,7 @@ void testEndSaga() {
MyTestSaga saga = new MyTestSaga();
AssociationValue associationValue = new AssociationValue("key", "value");
SagaEntry<MyTestSaga> testSagaEntry = new SagaEntry<>(
identifier, saga, singleton(associationValue), XStreamSerializer.defaultSerializer()
identifier, saga, singleton(associationValue), TestSerializer.xStreamSerializer()
);
mongoTemplate.sagaCollection().insertOne(testSagaEntry.asDocument());
testSubject.deleteSaga(MyTestSaga.class, identifier, singleton(associationValue));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.extensions.mongo.utils.TestSerializer;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
Expand Down Expand Up @@ -114,8 +115,11 @@ public void testCreateTokenAtTimeBeforeFirstEvent() {

@Override
protected MongoEventStorageEngine createEngine(UnaryOperator<MongoEventStorageEngine.Builder> customization) {
MongoEventStorageEngine.Builder engineBuilder = MongoEventStorageEngine.builder()
.mongoTemplate(mongoTemplate);
MongoEventStorageEngine.Builder engineBuilder =
MongoEventStorageEngine.builder()
.snapshotSerializer(TestSerializer.xStreamSerializer())
.eventSerializer(TestSerializer.xStreamSerializer())
.mongoTemplate(mongoTemplate);
return customization.apply(engineBuilder).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,18 +20,17 @@
import com.mongodb.client.MongoCollection;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.extensions.mongo.utils.TestSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.time.temporal.TemporalAmount;

import static org.junit.jupiter.api.Assertions.*;

/**
Expand All @@ -57,7 +56,7 @@ void setUp() {
trackingTokensCollection = mongoTemplate.trackingTokensCollection();
trackingTokensCollection.drop();
String testOwner = "testOwner";
Serializer serializer = XStreamSerializer.defaultSerializer();
Serializer serializer = TestSerializer.xStreamSerializer();
MongoTokenStore.Builder tokenStoreBuilder = MongoTokenStore.builder()
.mongoTemplate(mongoTemplate)
.serializer(serializer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,15 +22,20 @@
import com.thoughtworks.xstream.XStream;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.extensions.mongo.utils.TestSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -40,8 +45,11 @@
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -87,7 +95,7 @@ void setUp() {
trackingTokensCollection = mongoTemplate.trackingTokensCollection();
trackingTokensCollection.drop();

serializer = XStreamSerializer.defaultSerializer();
serializer = TestSerializer.xStreamSerializer();
MongoTokenStore.Builder tokenStoreBuilder = MongoTokenStore.builder()
.mongoTemplate(mongoTemplate)
.serializer(serializer)
Expand Down Expand Up @@ -403,4 +411,49 @@ void testEnsureIndexCreation() {
}
assertTrue(indexFound);
}

@Test
void testRetrieveStorageIdentifierCreatesAndReturnsConfigTokenIdentifier() {
String expectedConfigTokenProcessorName = "__config";
int expectedConfigTokenSegmentId = 0;
Bson configTokenBson =
and(eq("processorName", expectedConfigTokenProcessorName), eq("segment", expectedConfigTokenSegmentId));

assertNull(trackingTokensCollection.find(configTokenBson).first());


Optional<String> result = tokenStore.retrieveStorageIdentifier();

assertTrue(result.isPresent());
assertFalse(result.get().isEmpty());

assertNotNull(trackingTokensCollection.find(configTokenBson).first());
}

@Test
void testRetrieveStorageIdentifierReturnsExistingConfigTokenIdentifier() {
String expectedStorageIdentifier = UUID.randomUUID().toString();
String expectedConfigTokenProcessorName = "__config";
int expectedConfigTokenSegmentId = 0;

AbstractTokenEntry<?> testTokenEntry = new GenericTokenEntry<>(
new ConfigToken(Collections.singletonMap("id", expectedStorageIdentifier)),
serializer, contentType, expectedConfigTokenProcessorName, expectedConfigTokenSegmentId
);
Document testConfigTokenDocument = new Document("processorName", testTokenEntry.getProcessorName())
.append("segment", testTokenEntry.getSegment())
.append("owner", testTokenEntry.getOwner())
.append("timestamp", testTokenEntry.timestamp().toEpochMilli())
.append("token", testTokenEntry.getSerializedToken().getData())
.append("tokenType", testTokenEntry.getSerializedToken().getType().getName());

mongoTemplate.trackingTokensCollection()
.insertOne(testConfigTokenDocument);

Optional<String> result = tokenStore.retrieveStorageIdentifier();

assertTrue(result.isPresent());
String resultStorageIdentifier = result.get();
assertEquals(expectedStorageIdentifier, resultStorageIdentifier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.mongo.utils;

import com.thoughtworks.xstream.XStream;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;

/**
* Utility providing {@link org.axonframework.serialization.Serializer} instances for testing.
*
* @author Steven van Beelen
*/
public abstract class TestSerializer {

private TestSerializer() {
// Test utility class
}

/**
* Return a {@link XStreamSerializer} using a default {@link XStream} instance with a {@link CompactDriver}.
*
* @return a {@link XStreamSerializer} using a default {@link XStream} instance with a {@link CompactDriver}
*/
public static XStreamSerializer xStreamSerializer() {
return XStreamSerializer.builder()
.xStream(new XStream(new CompactDriver()))
.build();
}
}

0 comments on commit b9e402b

Please sign in to comment.