Skip to content

Commit

Permalink
Implement retrieveStorageIdentifier()
Browse files Browse the repository at this point in the history
Implement the retrieveStorageIdentifier() method. Do so by first
checking if a ConfigToken is present within the trackingTokensCollection
If it isn't, introduce a new token into the collection. In either case,
return the existing or created ConfigToken to return the ID from it as
the result of retrieveStorageIdentifier

#49
  • Loading branch information
smcvb committed Oct 19, 2021
1 parent b5f65ec commit d7453e6
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,10 +28,12 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.serialization.Serializer;
Expand All @@ -47,8 +49,11 @@
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -72,6 +77,9 @@ public class MongoTokenStore implements TokenStore {
private final static Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private final static Clock clock = Clock.systemUTC();

private static final String CONFIG_TOKEN_ID = "__config";
private static final int CONFIG_SEGMENT = 0;

private final MongoTemplate mongoTemplate;
private final Serializer serializer;
private final TemporalAmount claimTimeout;
Expand All @@ -93,7 +101,7 @@ protected MongoTokenStore(Builder builder) {
this.claimTimeout = builder.claimTimeout;
this.nodeId = builder.nodeId;
this.contentType = builder.contentType;
if(builder.ensureIndexes){
if (builder.ensureIndexes) {
ensureIndexes();
}
}
Expand Down Expand Up @@ -282,6 +290,40 @@ public int[] fetchSegments(String processorName) {
return ints;
}

@Override
public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdentifierException {
try {
return Optional.of(getConfig()).map(configToken -> configToken.get("id"));
} catch (Exception e) {
throw new UnableToRetrieveIdentifierException(
"Exception occurred while trying to establish storage identifier", e
);
}
}

private ConfigToken getConfig() {
Document document = mongoTemplate.trackingTokensCollection()
.find(and(
eq("processorName", CONFIG_TOKEN_ID),
eq("segment", CONFIG_SEGMENT)
))
.first();
AbstractTokenEntry<?> token;

if (Objects.isNull(document)) {
token = new GenericTokenEntry<>(
new ConfigToken(Collections.singletonMap("id", UUID.randomUUID().toString())),
serializer, contentType, CONFIG_TOKEN_ID, CONFIG_SEGMENT
);
mongoTemplate.trackingTokensCollection()
.insertOne(tokenEntryToDocument(token));
} else {
token = documentToTokenEntry(document);
}

return (ConfigToken) token.getToken(serializer);
}

/**
* Creates a filter that allows you to retrieve a claimable token entry with a given processor name and segment.
*
Expand Down Expand Up @@ -351,8 +393,8 @@ public void ensureIndexes() {
* Builder class to instantiate a {@link MongoTokenStore}.
* <p>
* The {@code claimTimeout} is defaulted to a 10 seconds duration (by using {@link Duration#ofSeconds(long)}, {@code
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output, the {@code contentType}
* to a {@code byte[]} {@link Class}, and the {@code ensureIndexes} to {@code true}. The {@link MongoTemplate} and
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output, the {@code contentType} to
* a {@code byte[]} {@link Class}, and the {@code ensureIndexes} to {@code true}. The {@link MongoTemplate} and
* {@link Serializer} are <b>hard requirements</b> and as such should be provided.
*/
public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,9 @@
import com.thoughtworks.xstream.XStream;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
Expand All @@ -31,6 +34,7 @@
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -40,8 +44,11 @@
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -403,4 +410,49 @@ void testEnsureIndexCreation() {
}
assertTrue(indexFound);
}

@Test
void testRetrieveStorageIdentifierCreatesAndReturnsConfigTokenIdentifier() {
String expectedConfigTokenProcessorName = "__config";
int expectedConfigTokenSegmentId = 0;
Bson configTokenBson =
and(eq("processorName", expectedConfigTokenProcessorName), eq("segment", expectedConfigTokenSegmentId));

assertNull(trackingTokensCollection.find(configTokenBson).first());


Optional<String> result = tokenStore.retrieveStorageIdentifier();

assertTrue(result.isPresent());
assertFalse(result.get().isEmpty());

assertNotNull(trackingTokensCollection.find(configTokenBson).first());
}

@Test
void testRetrieveStorageIdentifierReturnsExistingConfigTokenIdentifier() {
String expectedStorageIdentifier = UUID.randomUUID().toString();
String expectedConfigTokenProcessorName = "__config";
int expectedConfigTokenSegmentId = 0;

AbstractTokenEntry<?> testTokenEntry = new GenericTokenEntry<>(
new ConfigToken(Collections.singletonMap("id", expectedStorageIdentifier)),
serializer, contentType, expectedConfigTokenProcessorName, expectedConfigTokenSegmentId
);
Document testConfigTokenDocument = new Document("processorName", testTokenEntry.getProcessorName())
.append("segment", testTokenEntry.getSegment())
.append("owner", testTokenEntry.getOwner())
.append("timestamp", testTokenEntry.timestamp().toEpochMilli())
.append("token", testTokenEntry.getSerializedToken().getData())
.append("tokenType", testTokenEntry.getSerializedToken().getType().getName());

mongoTemplate.trackingTokensCollection()
.insertOne(testConfigTokenDocument);

Optional<String> result = tokenStore.retrieveStorageIdentifier();

assertTrue(result.isPresent());
String resultStorageIdentifier = result.get();
assertEquals(expectedStorageIdentifier, resultStorageIdentifier);
}
}

0 comments on commit d7453e6

Please sign in to comment.