Skip to content

Commit

Permalink
Make MongoMetricsCommandListener thread safe by not sharing `Timer.…
Browse files Browse the repository at this point in the history
…Builder` (#2402)

Includes a unit test for `MongoMetricsCommandListener` thread safety.

Fixes #2401
  • Loading branch information
jshields-squarespace committed Jan 26, 2021
1 parent 3d79b46 commit 32b2e19
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Expand Up @@ -36,9 +36,6 @@
@Incubating(since = "1.2.0")
public class MongoMetricsCommandListener implements CommandListener {

private final Timer.Builder timerBuilder = Timer.builder("mongodb.driver.commands")
.description("Timer of mongodb commands");

private final MeterRegistry registry;

public MongoMetricsCommandListener(MeterRegistry registry) {
Expand All @@ -60,7 +57,8 @@ public void commandFailed(CommandFailedEvent event) {
}

private void timeCommand(CommandEvent event, String status, long elapsedTimeInNanoseconds) {
timerBuilder
Timer.builder("mongodb.driver.commands")
.description("Timer of mongodb commands")
.tag("command", event.getCommandName())
.tag("cluster.id", event.getConnectionDescription().getConnectionId().getServerId().getClusterId().getValue())
.tag("server.address", event.getConnectionDescription().getServerAddress().toString())
Expand Down
Expand Up @@ -22,13 +22,16 @@
import com.mongodb.event.ClusterOpeningEvent;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.search.MeterNotFoundException;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -89,6 +92,56 @@ void shouldCreateFailedCommandMetric() {
assertThat(registry.get("mongodb.driver.commands").tags(tags).timer().count()).isEqualTo(1);
}

@Test
void shouldSupportConcurrentCommands() throws InterruptedException {
for (int i = 0; i < 20; i++) {
Map<String, Thread> commandThreadMap = new HashMap<>();

commandThreadMap.put("insert", new Thread(() -> mongo.getDatabase("test")
.getCollection("testCol")
.insertOne(new Document("testField", new Date()))));

commandThreadMap.put("update", new Thread(() -> mongo.getDatabase("test")
.getCollection("testCol")
.updateOne(new Document("nonExistentField", "abc"),
new Document("$set", new Document("nonExistentField", "xyz")))));

commandThreadMap.put("delete", new Thread(() -> mongo.getDatabase("test")
.getCollection("testCol")
.deleteOne(new Document("nonExistentField", "abc"))));

commandThreadMap.put("aggregate", new Thread(() -> mongo.getDatabase("test")
.getCollection("testCol")
.countDocuments()));

for (Thread thread : commandThreadMap.values()) {
thread.start();
}

for (Thread thread : commandThreadMap.values()) {
thread.join();
}

final int iterationsCompleted = i + 1;

for (String command : commandThreadMap.keySet()) {
long commandsRecorded;
try {
commandsRecorded = registry.get("mongodb.driver.commands")
.tags(Tags.of("command", command))
.timer()
.count();
} catch (MeterNotFoundException e) {
commandsRecorded = 0L;
}

assertThat(commandsRecorded)
.as("Check how many %s commands were recorded after %d iterations", command, iterationsCompleted)
.isEqualTo(iterationsCompleted);
}
}
}

@AfterEach
void destroy() {
if (mongo != null) {
Expand Down

0 comments on commit 32b2e19

Please sign in to comment.