Skip to content

Commit

Permalink
Fixes #1659: Add a missing synchronized keyword to TransactionalRunne…
Browse files Browse the repository at this point in the history
…r.close (#1660)

* Fixes #1659: Add a missing synchronized keyword to TransactionalRunner.close

This existed ot FDBDatabaseRunnerImpl, but not TransactionalRunner,
so closing at the same time as calling `run` can cause a
ConcurrentModificationException.
  • Loading branch information
ScottDugas committed May 13, 2022
1 parent 8b7a41f commit f9b3328
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -21,7 +21,7 @@ This release also updates downstream dependency versions. Most notably, the prot
### NEXT_RELEASE
* **Bug fix** Fix 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Synchronize TransactionalRunner.close [(Issue #1659)](https://github.com/FoundationDB/fdb-record-layer/issues/1659)
* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Expand Up @@ -167,7 +167,7 @@ private synchronized void addContextToClose(@Nonnull FDBRecordContext context) {
}

@Override
public void close() {
public synchronized void close() {
if (closed) {
return;
}
Expand Down
Expand Up @@ -53,11 +53,14 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand Down Expand Up @@ -283,38 +286,54 @@ private <T extends Exception> void runWithWeakReadSemantics(

@Test
void closesContextsSynchronous() {
closesContext((runner, contexts, completed) ->
// You shouldn't be doing this, but maybe I haven't thought of something similar, but reasonable, where
// the executable for `run` does not complete, but the runner is closed
CompletableFuture.runAsync(() -> {
runner.run(false, context -> {
contexts.add(context);
new CompletableFuture<Void>().join(); // never joins
return completed.incrementAndGet();
});
})
);
final List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
final ForkJoinPool forkJoinPool = new ForkJoinPool(10);
closesContext((runner, contextFuture, completed) ->
// You shouldn't be doing this, but maybe I haven't thought of something similar, but reasonable, where
// the executable for `run` does not complete, but the runner is closed
CompletableFuture.runAsync(() ->
runner.run(false, context -> {
final CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
contextFuture.complete(context);
future.join(); // never joins
return completed.incrementAndGet();
}),
forkJoinPool)
);
} finally {
// cleanup the futures, so that the executor used by runAsync doesn't have a bunch of garbage sitting around
// Note: if you remove this, and change the test to @RepeatedTest(100), after 28 repetitions, it fails
// consistently.
futures.forEach(future -> future.complete(null));
}
}

@Test
void closesContexts() {
closesContext((runner, contexts, completed) ->
closesContext((runner, contextFuture, completed) ->
runner.runAsync(false, context -> {
contexts.add(context);
contextFuture.complete(context);
// the first future will never complete
return new CompletableFuture<Void>()
.thenApply(vignore -> completed.incrementAndGet());
}) // DO NOT join
);
}

private <T> void closesContext(TriFunction<TransactionalRunner, List<FDBRecordContext>, AtomicInteger, T> run) {
List<FDBRecordContext> contexts = new ArrayList<>();
private <T> void closesContext(TriFunction<TransactionalRunner, CompletableFuture<FDBRecordContext>, AtomicInteger, T> run) {
List<FDBRecordContext> contexts;
AtomicInteger completed = new AtomicInteger();
try (TransactionalRunner runner = defaultTransactionalRunner()) {
for (int i = 0; i < 10; i++) {
run.apply(runner, contexts, completed);
}
final List<CompletableFuture<FDBRecordContext>> contextFutures = IntStream.range(0, 10).mapToObj(i -> {
CompletableFuture<FDBRecordContext> contextFuture = new CompletableFuture<>();
run.apply(runner, contextFuture, completed);
return contextFuture;
}).collect(Collectors.toList());
// make sure that the contexts have been created
contexts = contextFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
assertEquals(0, completed.get());
for (final FDBRecordContext context : contexts) {
assertFalse(context.isClosed());
}
Expand Down

0 comments on commit f9b3328

Please sign in to comment.