Skip to content

Commit

Permalink
Merge pull request #53 from erikrz/optional-indexes-token-store
Browse files Browse the repository at this point in the history
[#51] Added an ensureIndexes(boolean) method
  • Loading branch information
smcvb committed Jan 27, 2021
2 parents 9a85d7b + 9e6fdaf commit 16f7d38
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ protected MongoTokenStore(Builder builder) {
this.claimTimeout = builder.claimTimeout;
this.nodeId = builder.nodeId;
this.contentType = builder.contentType;
ensureIndexes();
if(builder.ensureIndexes){
ensureIndexes();
}
}

/**
Expand Down Expand Up @@ -349,9 +351,9 @@ public void ensureIndexes() {
* Builder class to instantiate a {@link MongoTokenStore}.
* <p>
* The {@code claimTimeout} is defaulted to a 10 seconds duration (by using {@link Duration#ofSeconds(long)}, {@code
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output and the {@code contentType}
* to a {@code byte[]} {@link Class}. The {@link MongoTemplate} and {@link Serializer} are
* <b>hard requirements</b> and as such should be provided.
* nodeId} is defaulted to the {@code ManagementFactory#getRuntimeMXBean#getName} output, the {@code contentType}
* to a {@code byte[]} {@link Class}, and the {@code ensureIndexes} to {@code true}. The {@link MongoTemplate} and
* {@link Serializer} are <b>hard requirements</b> and as such should be provided.
*/
public static class Builder {

Expand All @@ -360,6 +362,7 @@ public static class Builder {
private TemporalAmount claimTimeout = Duration.ofSeconds(10);
private String nodeId = ManagementFactory.getRuntimeMXBean().getName();
private Class<?> contentType = byte[].class;
private boolean ensureIndexes = true;

/**
* Sets the {@link MongoTemplate} providing access to the collection which stores the {@link TrackingToken}s.
Expand Down Expand Up @@ -426,6 +429,19 @@ public Builder contentType(Class<?> contentType) {
return this;
}

/**
* Sets the {@code ensureIndexes} to tell the builder whether to create or not the indexes required to work with
* the TokenStore. Defaults to {@code true}. If set to {@code false}, the developer is responsible for the
* creation of the indexes defined in the {@link MongoTokenStore#ensureIndexes()} method beforehand.
*
* @param ensureIndexes the boolean to indicate if the indexes should be created.
* @return the current Builder instance, for fluent interfacing
*/
public Builder ensureIndexes(boolean ensureIndexes) {
this.ensureIndexes = ensureIndexes;
return this;
}

/**
* Initializes a {@link MongoTokenStore} as specified through this Builder.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2010-2020. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.extensions.mongo.eventsourcing.tokenstore;

import com.mongodb.client.ListIndexesIterable;
import com.mongodb.client.MongoCollection;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.time.temporal.TemporalAmount;
import static org.junit.jupiter.api.Assertions.*;

/**
* Test class validating the {@link MongoTokenStore} when skipping index creation.
*
* @author erikrz
*/
@Testcontainers
class MongoTokenStoreSkipIndexTest {

@Container
private static final MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo");

private MongoCollection<Document> trackingTokensCollection;
private final TemporalAmount claimTimeout = Duration.ofSeconds(5);
private final Class<byte[]> contentType = byte[].class;

@BeforeEach
void setUp() {
MongoTemplate mongoTemplate = MongoTemplateFactory.build(
MONGO_CONTAINER.getHost(), MONGO_CONTAINER.getFirstMappedPort()
);
trackingTokensCollection = mongoTemplate.trackingTokensCollection();
trackingTokensCollection.drop();
String testOwner = "testOwner";
Serializer serializer = XStreamSerializer.defaultSerializer();
MongoTokenStore.Builder tokenStoreBuilder = MongoTokenStore.builder()
.mongoTemplate(mongoTemplate)
.serializer(serializer)
.claimTimeout(claimTimeout)
.contentType(contentType)
.ensureIndexes(false)
.nodeId(testOwner);
tokenStoreBuilder.build();
}

@AfterEach
void tearDown() {
trackingTokensCollection.drop();
}

@Test
void testSkipIndexCreation() {
ListIndexesIterable<Document> listIndexes = trackingTokensCollection.listIndexes();
for(Document index : listIndexes){
// The index with this name does not exist, meaning it was not created on MongoTokenStore build() method.
assertNotEquals("processorName_1_segment_1",index.getString("name"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.axonframework.extensions.mongo.eventsourcing.tokenstore;

import com.mongodb.client.ListIndexesIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
Expand All @@ -39,6 +40,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -69,6 +71,7 @@ class MongoTokenStoreTest {
private Serializer serializer;
private final TemporalAmount claimTimeout = Duration.ofSeconds(5);
private final Class<byte[]> contentType = byte[].class;
private final boolean ensureIndexes = true;

private final String testProcessorName = "testProcessorName";
private final int testSegment = 9;
Expand All @@ -88,7 +91,8 @@ void setUp() {
.mongoTemplate(mongoTemplate)
.serializer(serializer)
.claimTimeout(claimTimeout)
.contentType(contentType);
.contentType(contentType)
.ensureIndexes(ensureIndexes);
tokenStore = tokenStoreBuilder.nodeId(testOwner).build();
tokenStoreDifferentOwner = tokenStoreBuilder.nodeId("anotherOwner").build();
}
Expand Down Expand Up @@ -228,6 +232,7 @@ void testConcurrentAccess() throws Exception {
.claimTimeout(claimTimeout)
.nodeId(owner)
.contentType(contentType)
.ensureIndexes(ensureIndexes)
.build();
GlobalSequenceTrackingToken token = new GlobalSequenceTrackingToken(iteration);
tokenStore.initializeSegment(token, testProcessorName, testSegment);
Expand Down Expand Up @@ -261,6 +266,7 @@ void testConcurrentAccess() throws Exception {
.claimTimeout(claimTimeout)
.nodeId(String.valueOf(iterationOfSuccessfulAttempt))
.contentType(contentType)
.ensureIndexes(ensureIndexes)
.build();

assertEquals(new GlobalSequenceTrackingToken(iterationOfSuccessfulAttempt),
Expand All @@ -275,6 +281,7 @@ void testStoreAndFetchTokenResultsInTheSameTokenWithXStreamSerializer() {
.claimTimeout(claimTimeout)
.contentType(contentType)
.nodeId(testOwner)
.ensureIndexes(ensureIndexes)
.build();
GlobalSequenceTrackingToken testToken = new GlobalSequenceTrackingToken(100);
String testProcessorName = "processorName";
Expand All @@ -295,6 +302,7 @@ void testStoreAndFetchTokenResultsInTheSameTokenWithJacksonSerializer() {
.claimTimeout(claimTimeout)
.contentType(contentType)
.nodeId(testOwner)
.ensureIndexes(ensureIndexes)
.build();
GlobalSequenceTrackingToken testToken = new GlobalSequenceTrackingToken(100);
String testProcessorName = "processorName";
Expand Down Expand Up @@ -378,4 +386,18 @@ void testDeleteTokenThrowsUnableToClaimTokenExceptionIfTheCallingProcessDoesNotO
() -> tokenStore.deleteToken(testProcessorName, testSegment)
);
}

@Test
void testEnsureIndexCreation() {
ListIndexesIterable<Document> listIndexes = trackingTokensCollection.listIndexes();
boolean indexFound = false;
for (Document index : listIndexes) {
if (Objects.equals("processorName_1_segment_1", index.getString("name"))) {
// The index with this name exists, meaning it was created on MongoTokenStore build() method.
indexFound = true;
break;
}
}
assertTrue(indexFound);
}
}

0 comments on commit 16f7d38

Please sign in to comment.