Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolving some sonar issues and adding some concurrent tests. #222

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
Expand All @@ -34,7 +35,6 @@
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.serialization.Serializer;
import org.bson.Document;
Expand Down Expand Up @@ -64,7 +64,6 @@
import static com.mongodb.client.model.Updates.set;
import static java.lang.String.format;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

/**
* An implementation of TokenStore that allows you store and retrieve tracking tokens with MongoDB.
Expand All @@ -74,12 +73,19 @@
*/
public class MongoTokenStore implements TokenStore {

private final static Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private final static Clock clock = Clock.systemUTC();
private static final Logger logger = LoggerFactory.getLogger(MongoTokenStore.class);
private static final Clock clock = Clock.systemUTC();

private static final String CONFIG_TOKEN_ID = "__config";
private static final int CONFIG_SEGMENT = 0;

private static final String OWNER_PROPERTY_NAME = "owner";
private static final String TIMESTAMP_PROPERTY_NAME = "timestamp";
private static final String PROCESSOR_NAME_PROPERTY_NAME = "processorName";
private static final String SEGMENT_PROPERTY_NAME = "segment";
private static final String TOKEN_PROPERTY_NAME = "token";
private static final String TOKEN_TYPE_PROPERTY_NAME = "tokenType";

private final MongoTemplate mongoTemplate;
private final Serializer serializer;
private final TemporalAmount claimTimeout;
Expand Down Expand Up @@ -120,6 +126,9 @@ public static Builder builder() {
return new Builder();
}

/**
* {@inheritDoc}
*/
@Override
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException {
updateToken(token, processorName, segment);
Expand All @@ -130,10 +139,10 @@ private void updateToken(TrackingToken token, String processorName, int segment)
new GenericTokenEntry<>(token, serializer, contentType, processorName, segment);
tokenEntry.claim(nodeId, claimTimeout);

Bson update = combine(set("owner", nodeId),
set("timestamp", tokenEntry.timestamp().toEpochMilli()),
set("token", tokenEntry.getSerializedToken().getData()),
set("tokenType", tokenEntry.getSerializedToken().getType().getName()));
Bson update = combine(set(OWNER_PROPERTY_NAME, nodeId),
set(TIMESTAMP_PROPERTY_NAME, tokenEntry.timestamp().toEpochMilli()),
set(TOKEN_PROPERTY_NAME, tokenEntry.getSerializedToken().getData()),
set(TOKEN_TYPE_PROPERTY_NAME, tokenEntry.getSerializedToken().getType().getName()));

UpdateResult updateResult = mongoTemplate.trackingTokensCollection()
.updateOne(claimableTokenEntryFilter(processorName, segment), update);
Expand All @@ -146,11 +155,17 @@ private void updateToken(TrackingToken token, String processorName, int segment)
}
}

/**
* {@inheritDoc}
*/
@Override
public void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException {
initializeTokenSegments(processorName, segmentCount, null);
}

/**
* {@inheritDoc}
*/
@Override
public void initializeTokenSegments(String processorName,
int segmentCount,
Expand Down Expand Up @@ -183,7 +198,8 @@ private AbstractTokenEntry<?> loadToken(String processorName, int segment) {
Document document = mongoTemplate.trackingTokensCollection()
.findOneAndUpdate(
claimableTokenEntryFilter(processorName, segment),
combine(set("owner", nodeId), set("timestamp", clock.millis())),
combine(set(OWNER_PROPERTY_NAME, nodeId),
set(TIMESTAMP_PROPERTY_NAME, clock.millis())),
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)
);

Expand All @@ -207,10 +223,10 @@ private AbstractTokenEntry<?> loadToken(String processorName, int segment) {
public void extendClaim(String processorName, int segment) throws UnableToClaimTokenException {
UpdateResult updateResult =
mongoTemplate.trackingTokensCollection()
.updateOne(and(eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)),
set("timestamp", TokenEntry.clock.instant().toEpochMilli()));
.updateOne(and(eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)),
set(TIMESTAMP_PROPERTY_NAME, clock.instant().toEpochMilli()));
if (updateResult.getMatchedCount() == 0) {
throw new UnableToClaimTokenException(format(
"Unable to extend claim on token token '%s[%s]'. It is owned by another segment.",
Expand All @@ -226,11 +242,10 @@ public void extendClaim(String processorName, int segment) throws UnableToClaimT
public void releaseClaim(String processorName, int segment) {
UpdateResult updateResult = mongoTemplate.trackingTokensCollection()
.updateOne(and(
eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)
), set("owner", null));

eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)
), set(OWNER_PROPERTY_NAME, null));
if (updateResult.getMatchedCount() == 0) {
logger.warn("Releasing claim of token {}/{} failed. It was owned by another node.", processorName, segment);
}
Expand Down Expand Up @@ -259,9 +274,9 @@ public void initializeSegment(TrackingToken token,
public void deleteToken(String processorName, int segment) throws UnableToClaimTokenException {
DeleteResult deleteResult = mongoTemplate.trackingTokensCollection()
.deleteOne(and(
eq("processorName", processorName),
eq("segment", segment),
eq("owner", nodeId)
eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
eq(OWNER_PROPERTY_NAME, nodeId)
));

if (deleteResult.getDeletedCount() == 0) {
Expand All @@ -277,10 +292,10 @@ public boolean requiresExplicitSegmentInitialization() {
@Override
public int[] fetchSegments(String processorName) {
ArrayList<Integer> segments = mongoTemplate.trackingTokensCollection()
.find(eq("processorName", processorName))
.sort(ascending("segment"))
.projection(fields(include("segment"), excludeId()))
.map(d -> d.get("segment", Integer.class))
.find(eq(PROCESSOR_NAME_PROPERTY_NAME, processorName))
.sort(ascending(SEGMENT_PROPERTY_NAME))
.projection(fields(include(SEGMENT_PROPERTY_NAME), excludeId()))
.map(d -> d.get(SEGMENT_PROPERTY_NAME, Integer.class))
.into(new ArrayList<>());
// toArray doesn't work because of autoboxing limitations
int[] ints = new int[segments.size()];
Expand All @@ -304,8 +319,8 @@ public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdent
private ConfigToken getConfig() {
Document document = mongoTemplate.trackingTokensCollection()
.find(and(
eq("processorName", CONFIG_TOKEN_ID),
eq("segment", CONFIG_SEGMENT)
eq(PROCESSOR_NAME_PROPERTY_NAME, CONFIG_TOKEN_ID),
eq(SEGMENT_PROPERTY_NAME, CONFIG_SEGMENT)
))
.first();
AbstractTokenEntry<?> token;
Expand Down Expand Up @@ -333,47 +348,47 @@ private ConfigToken getConfig() {
*/
private Bson claimableTokenEntryFilter(String processorName, int segment) {
return and(
eq("processorName", processorName),
eq("segment", segment),
eq(PROCESSOR_NAME_PROPERTY_NAME, processorName),
eq(SEGMENT_PROPERTY_NAME, segment),
or(
eq("owner", nodeId),
eq("owner", null),
lt("timestamp", clock.instant().minus(claimTimeout).toEpochMilli())
eq(OWNER_PROPERTY_NAME, nodeId),
eq(OWNER_PROPERTY_NAME, null),
lt(TIMESTAMP_PROPERTY_NAME, clock.instant().minus(claimTimeout).toEpochMilli())
)
);
}

private Document tokenEntryToDocument(AbstractTokenEntry<?> tokenEntry) {
return new Document("processorName", tokenEntry.getProcessorName())
.append("segment", tokenEntry.getSegment())
.append("owner", tokenEntry.getOwner())
.append("timestamp", tokenEntry.timestamp().toEpochMilli())
.append("token",
return new Document(PROCESSOR_NAME_PROPERTY_NAME, tokenEntry.getProcessorName())
.append(SEGMENT_PROPERTY_NAME, tokenEntry.getSegment())
.append(OWNER_PROPERTY_NAME, tokenEntry.getOwner())
.append(TIMESTAMP_PROPERTY_NAME, tokenEntry.timestamp().toEpochMilli())
.append(TOKEN_PROPERTY_NAME,
tokenEntry.getSerializedToken() == null ? null : tokenEntry.getSerializedToken().getData())
.append("tokenType",
.append(TOKEN_TYPE_PROPERTY_NAME,
tokenEntry.getSerializedToken() == null ? null : tokenEntry.getSerializedToken()
.getType().getName());
}

private AbstractTokenEntry<?> documentToTokenEntry(Document document) {
return new GenericTokenEntry<>(
readSerializedData(document),
document.getString("tokenType"),
Instant.ofEpochMilli(document.getLong("timestamp")).toString(),
document.getString("owner"),
document.getString("processorName"),
document.getInteger("segment"),
document.getString(TOKEN_TYPE_PROPERTY_NAME),
Instant.ofEpochMilli(document.getLong(TIMESTAMP_PROPERTY_NAME)).toString(),
document.getString(OWNER_PROPERTY_NAME),
document.getString(PROCESSOR_NAME_PROPERTY_NAME),
document.getInteger(SEGMENT_PROPERTY_NAME),
contentType
);
}

@SuppressWarnings("unchecked")
private <T> T readSerializedData(Document document) {
if (byte[].class.equals(contentType)) {
Binary token = document.get("token", Binary.class);
Binary token = document.get(TOKEN_PROPERTY_NAME, Binary.class);
return (T) ((token != null) ? token.getData() : null);
}
return (T) document.get("token", contentType);
return (T) document.get(TOKEN_PROPERTY_NAME, contentType);
}

/**
Expand All @@ -385,7 +400,8 @@ private <T> T readSerializedData(Document document) {
*/
@Deprecated
public void ensureIndexes() {
mongoTemplate.trackingTokensCollection().createIndex(Indexes.ascending("processorName", "segment"),
mongoTemplate.trackingTokensCollection().createIndex(Indexes.ascending(PROCESSOR_NAME_PROPERTY_NAME,
SEGMENT_PROPERTY_NAME),
new IndexOptions().unique(true));
}

Expand Down Expand Up @@ -453,7 +469,7 @@ public Builder claimTimeout(TemporalAmount claimTimeout) {
* @return the current Builder instance, for fluent interfacing
*/
public Builder nodeId(String nodeId) {
assertNodeId(nodeId, "The nodeId may not be null or empty");
BuilderUtils.assertNonEmpty(nodeId, "The nodeId may not be null or empty");
this.nodeId = nodeId;
return this;
}
Expand Down Expand Up @@ -503,9 +519,5 @@ protected void validate() throws AxonConfigurationException {
assertNonNull(mongoTemplate, "The MongoTemplate is a hard requirement and should be provided");
assertNonNull(serializer, "The Serializer is a hard requirement and should be provided");
}

private void assertNodeId(String nodeId, String exceptionMessage) {
assertThat(nodeId, name -> Objects.nonNull(name) && !"".equals(name), exceptionMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.mongodb.client.model.Filters.and;
Expand Down Expand Up @@ -456,4 +458,80 @@ void testRetrieveStorageIdentifierReturnsExistingConfigTokenIdentifier() {
String resultStorageIdentifier = result.get();
assertEquals(expectedStorageIdentifier, resultStorageIdentifier);
}

@Test
void storeTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
testConcurrency(
() -> {
tokenStore.storeToken(someToken, testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.storeToken(someToken, testProcessorName, testSegment);
return true;
});
}

@Test
void deleteTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
tokenStore.storeToken(someToken, testProcessorName, testSegment);
testConcurrency(
() -> {
tokenStore.deleteToken(testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.deleteToken(testProcessorName, testSegment);
return true;
});
}

@Test
void fetchTokenConcurrently() throws InterruptedException {
tokenStore.initializeSegment(null, testProcessorName, testSegment);
TrackingToken someToken = new GlobalSequenceTrackingToken(42);
tokenStore.storeToken(someToken, testProcessorName, testSegment);
tokenStore.releaseClaim(testProcessorName, testSegment);
TrackingToken result = testConcurrency(
() -> tokenStore.fetchToken(testProcessorName, testSegment),
() -> tokenStoreDifferentOwner.fetchToken(testProcessorName, testSegment)
);
assertEquals(someToken, result);
}

@Test
void initializeSegmentWithTokenConcurrently() throws InterruptedException {
TrackingToken initialToken = new GlobalSequenceTrackingToken(42);
testConcurrency(
() -> {
tokenStore.initializeSegment(initialToken, testProcessorName, testSegment);
return true;
},
() -> {
tokenStoreDifferentOwner.initializeSegment(initialToken, testProcessorName, testSegment);
return true;
});
}

private <T> T testConcurrency(Supplier<T> s1, Supplier<T> s2) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicReference<T> r1 = new AtomicReference<>(null);
AtomicReference<T> r2 = new AtomicReference<>(null);
executor.execute(() -> r1.set(s1.get()));
executor.execute(() -> r2.set(s2.get()));
executor.shutdown();
boolean done = executor.awaitTermination(6L, TimeUnit.SECONDS);
assertTrue(done, "should complete in 6 seconds");
if (r1.get() == null) {
assertNotNull(r2.get(), "at least one of the results should be valid");
return r2.get();
} else {
assertNull(r2.get(), "only one of the results should be valid");
return r1.get();
}
}
}