Skip to content

Commit

Permalink
Resolving some sonar issues and adding some concurrent tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
gklijs committed Apr 28, 2022
1 parent 9a09b61 commit be9aecb
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
Expand All @@ -34,7 +35,6 @@
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.serialization.Serializer;
import org.bson.Document;
Expand Down Expand Up @@ -64,7 +64,6 @@
import static com.mongodb.client.model.Updates.set;
import static java.lang.String.format;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

/**
* An implementation of TokenStore that allows you store and retrieve tracking tokens with MongoDB.
Expand All @@ -74,12 +73,19 @@
*/
public class MongoTokenStore implements TokenStore {

private final static Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private final static Clock clock = Clock.systemUTC();
private static final Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private static final Clock clock = Clock.systemUTC();

private static final String CONFIG_TOKEN_ID = "__config";
private static final int CONFIG_SEGMENT = 0;

private static final String OWNER_PROPERTY_NAME = "owner";
private static final String TIMESTAMP_PROPERTY_NAME = "timestamp";
private static final String PROCESSOR_NAME_PROPERTY_NAME = "processorName";
private static final String SEGMENT_PROPERTY_NAME = "segment";
private static final String TOKEN_PROPERTY_NAME = "token";
private static final String TOKEN_TYPE_PROPERTY_NAME = "tokenType";

private final MongoTemplate mongoTemplate;
private final Serializer serializer;
private final TemporalAmount claimTimeout;
Expand Down Expand Up @@ -120,6 +126,9 @@ public static Builder builder() {
return new Builder();
}

/**
* {@inheritDoc}
*/
@Override
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException {
updateToken(token, processorName, segment);
Expand All @@ -130,10 +139,10 @@ private void updateToken(TrackingToken token, String processorName, int segment)
new GenericTokenEntry<>(token, serializer, contentType, processorName, segment);
tokenEntry.claim(nodeId, claimTimeout);

Bson update = combine(set("owner", nodeId),
set("timestamp", tokenEntry.timestamp().toEpochMilli()),
set("token", tokenEntry.getSerializedToken().getData()),
set("tokenType", tokenEntry.getSerializedToken().getType().getName()));
Bson update = combine(set(OWNER_PROPERTY_NAME, nodeId),
set(TIMESTAMP_PROPERTY_NAME, tokenEntry.timestamp().toEpochMilli()),
set(TOKEN_PROPERTY_NAME, tokenEntry.getSerializedToken().getData()),
set(TOKEN_TYPE_PROPERTY_NAME, tokenEntry.getSerializedToken().getType().getName()));

UpdateResult updateResult = mongoTemplate.trackingTokensCollection()
.updateOne(claimableTokenEntryFilter(processorName, segment), update);
Expand All @@ -146,11 +155,17 @@ private void updateToken(TrackingToken token, String processorName, int segment)
}
}

/**
* {@inheritDoc}
*/
@Override
public void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException {
initializeTokenSegments(processorName, segmentCount, null);
}

/**
* {@inheritDoc}
*/
@Override
public void initializeTokenSegments(String processorName,
int segmentCount,
Expand Down Expand Up @@ -183,7 +198,8 @@ private AbstractTokenEntry<?> loadToken(String processorName, int segment) {
Document document = mongoTemplate.trackingTokensCollection()
.findOneAndUpdate(
claimableTokenEntryFilter(processorName, segment),
combine(set("owner", nodeId), set("timestamp", clock.millis())),
combine(set(OWNER_PROPERTY_NAME, nodeId),
set(TIMESTAMP_PROPERTY_NAME, clock.millis())),
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)
);

Expand All @@ -207,10 +223,10 @@ private AbstractTokenEntry<?> loadToken(String processorName, int segment) {
public void extendClaim(String processorName, int segment) throws UnableToClaimTokenException {
UpdateResult updateResult =
mongoTemplate.trackingTokensCollection()
.updateOne(and(eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)),
set("timestamp", TokenEntry.clock.instant().toEpochMilli()));
.updateOne(and(eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)),
set(TIMESTAMP_PROPERTY_NAME, clock.instant().toEpochMilli()));
if (updateResult.getMatchedCount() == 0) {
throw new UnableToClaimTokenException(format(
"Unable to extend claim on token token '%s[%s]'. It is owned by another segment.",
Expand All @@ -226,11 +242,10 @@ public void extendClaim(String processorName, int segment) throws UnableToClaimT
public void releaseClaim(String processorName, int segment) {
UpdateResult updateResult = mongoTemplate.trackingTokensCollection()
.updateOne(and(
eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)
), set("owner", null));

eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)
), set(OWNER_PROPERTY_NAME, null));
if (updateResult.getMatchedCount() == 0) {
logger.warn("Releasing claim of token {}/{} failed. It was owned by another node.", processorName, segment);
}
Expand Down Expand Up @@ -259,9 +274,9 @@ public void initializeSegment(TrackingToken token,
public void deleteToken(String processorName, int segment) throws UnableToClaimTokenException {
DeleteResult deleteResult = mongoTemplate.trackingTokensCollection()
.deleteOne(and(
eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)
eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)
));

if (deleteResult.getDeletedCount() == 0) {
Expand All @@ -277,10 +292,10 @@ public boolean requiresExplicitSegmentInitialization() {
@Override
public int[] fetchSegments(String processorName) {
ArrayList<Integer> segments = mongoTemplate.trackingTokensCollection()
.find(eq("processorName", processorName))
.sort(ascending("segment"))
.projection(fields(include("segment"), excludeId()))
.map(d -> d.get("segment", Integer.class))
.find(eq(PROCESSOR_NAME_PROPERTY_NAME, processorName))
.sort(ascending(SEGMENT_PROPERTY_NAME))
.projection(fields(include(SEGMENT_PROPERTY_NAME), excludeId()))
.map(d -> d.get(SEGMENT_PROPERTY_NAME, Integer.class))
.into(new ArrayList<>());
// toArray doesn't work because of autoboxing limitations
int[] ints = new int[segments.size()];
Expand All @@ -304,8 +319,8 @@ public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdent
private ConfigToken getConfig() {
Document document = mongoTemplate.trackingTokensCollection()
.find(and(
eq("processorName", CONFIG_TOKEN_ID),
eq("segment", CONFIG_SEGMENT)
eq(PROCESSOR_NAME_PROPERTY_NAME, CONFIG_TOKEN_ID),
eq(SEGMENT_PROPERTY_NAME, CONFIG_SEGMENT)
))
.first();
AbstractTokenEntry<?> token;
Expand Down Expand Up @@ -333,47 +348,47 @@ private ConfigToken getConfig() {
*/
private Bson claimableTokenEntryFilter(String processorName, int segment) {
return and(
eq("processorName", processorName),
eq("segment", segment),
eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
or(
eq("owner", nodeId),
eq("owner", null),
lt("timestamp", clock.instant().minus(claimTimeout).toEpochMilli())
eq(OWNER_PROPERTY_NAME, nodeId),
eq(OWNER_PROPERTY_NAME, null),
lt(TIMESTAMP_PROPERTY_NAME, clock.instant().minus(claimTimeout).toEpochMilli())
)
);
}

private Document tokenEntryToDocument(AbstractTokenEntry<?> tokenEntry) {
return new Document("processorName", tokenEntry.getProcessorName())
.append("segment", tokenEntry.getSegment())
.append("owner", tokenEntry.getOwner())
.append("timestamp", tokenEntry.timestamp().toEpochMilli())
.append("token",
return new Document(PROCESSOR_NAME_PROPERTY_NAME, tokenEntry.getProcessorName())
.append(SEGMENT_PROPERTY_NAME, tokenEntry.getSegment())
.append(OWNER_PROPERTY_NAME, tokenEntry.getOwner())
.append(TIMESTAMP_PROPERTY_NAME, tokenEntry.timestamp().toEpochMilli())
.append(TOKEN_PROPERTY_NAME,
tokenEntry.getSerializedToken() == null ? null : tokenEntry.getSerializedToken().getData())
.append("tokenType",
.append(TOKEN_TYPE_PROPERTY_NAME,
tokenEntry.getSerializedToken() == null ? null : tokenEntry.getSerializedToken()
.getType().getName());
}

private AbstractTokenEntry<?> documentToTokenEntry(Document document) {
return new GenericTokenEntry<>(
readSerializedData(document),
document.getString("tokenType"),
Instant.ofEpochMilli(document.getLong("timestamp")).toString(),
document.getString("owner"),
document.getString("processorName"),
document.getInteger("segment"),
document.getString(TOKEN_TYPE_PROPERTY_NAME),
Instant.ofEpochMilli(document.getLong(TIMESTAMP_PROPERTY_NAME)).toString(),
document.getString(OWNER_PROPERTY_NAME),
document.getString(PROCESSOR_NAME_PROPERTY_NAME),
document.getInteger(SEGMENT_PROPERTY_NAME),
contentType
);
}

@SuppressWarnings("unchecked")
private <T> T readSerializedData(Document document) {
if (byte[].class.equals(contentType)) {
Binary token = document.get("token", Binary.class);
Binary token = document.get(TOKEN_PROPERTY_NAME, Binary.class);
return (T) ((token != null) ? token.getData() : null);
}
return (T) document.get("token", contentType);
return (T) document.get(TOKEN_PROPERTY_NAME, contentType);
}

/**
Expand All @@ -385,7 +400,8 @@ private <T> T readSerializedData(Document document) {
*/
@Deprecated
public void ensureIndexes() {
mongoTemplate.trackingTokensCollection().createIndex(Indexes.ascending("processorName", "segment"),
mongoTemplate.trackingTokensCollection().createIndex(Indexes.ascending(PROCESSOR_NAME_PROPERTY_NAME,
SEGMENT_PROPERTY_NAME),
new IndexOptions().unique(true));
}

Expand Down Expand Up @@ -453,7 +469,7 @@ public Builder claimTimeout(TemporalAmount claimTimeout) {
* @return the current Builder instance, for fluent interfacing
*/
public Builder nodeId(String nodeId) {
assertNodeId(nodeId, "The nodeId may not be null or empty");
BuilderUtils.assertNonEmpty(nodeId, "The nodeId may not be null or empty");
this.nodeId = nodeId;
return this;
}
Expand Down Expand Up @@ -503,9 +519,5 @@ protected void validate() throws AxonConfigurationException {
assertNonNull(mongoTemplate, "The MongoTemplate is a hard requirement and should be provided");
assertNonNull(serializer, "The Serializer is a hard requirement and should be provided");
}

private void assertNodeId(String nodeId, String exceptionMessage) {
assertThat(nodeId, name -> Objects.nonNull(name) && !"".equals(name), exceptionMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.mongodb.client.model.Filters.and;
Expand Down Expand Up @@ -456,4 +458,80 @@ void testRetrieveStorageIdentifierReturnsExistingConfigTokenIdentifier() {
String resultStorageIdentifier = result.get();
assertEquals(expectedStorageIdentifier, resultStorageIdentifier);
}

@Test
void storeTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
testConcurrency(
() -> {
tokenStore.storeToken(someToken, testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.storeToken(someToken, testProcessorName, testSegment);
return true;
});
}

@Test
void deleteTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
tokenStore.storeToken(someToken, testProcessorName, testSegment);
testConcurrency(
() -> {
tokenStore.deleteToken(testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.deleteToken(testProcessorName, testSegment);
return true;
});
}

@Test
void fetchTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
tokenStore.storeToken(someToken, testProcessorName, testSegment);
tokenStore.releaseClaim(testProcessorName, testSegment);
TrackingToken result = testConcurrency(
() -> tokenStore.fetchToken(testProcessorName, testSegment),
() -> tokenStoreDifferentOwner.fetchToken(testProcessorName, testSegment)
);
assertEquals(someToken, result);
}

@Test
void initializeSegmentWithTokenConcurrently() throws InterruptedException {
TrackingToken initialToken = new GlobalSequenceTrackingToken(42);
testConcurrency(
() -> {
tokenStore.initializeSegment(initialToken, testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.initializeSegment(initialToken, testProcessorName, testSegment);
return true;
});
}

private <T> T testConcurrency(Supplier<T> s1, Supplier<T> s2) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicReference<T> r1 = new AtomicReference<>(null);
AtomicReference<T> r2 = new AtomicReference<>(null);
executor.execute(() -> r1.set(s1.get()));
executor.execute(() -> r2.set(s2.get()));
executor.shutdown();
boolean done = executor.awaitTermination(6L, TimeUnit.SECONDS);
assertTrue(done, "should complete in 6 seconds");
if (r1.get() == null) {
assertNotNull(r2.get(), "at least one of the results should be valid");
return r2.get();
} else {
assertNull(r2.get(), "only one of the results should be valid");
return r1.get();
}
}
}

0 comments on commit be9aecb

Please sign in to comment.