forked from FoundationDB/fdb-record-layer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TransactionalRunner.java
178 lines (163 loc) · 7.65 KB
/
TransactionalRunner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
* TransactionalRunner.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb.record.provider.foundationdb.runners;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabase;
import com.apple.foundationdb.record.provider.foundationdb.FDBDatabaseRunner;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContextConfig;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
* A simple runner that can be used for opening and committing transactions, and making sure they are all closed.
*/
@API(API.Status.INTERNAL)
public class TransactionalRunner implements AutoCloseable {
@Nonnull
private final FDBDatabase database;
@Nonnull
private final FDBRecordContextConfig.Builder contextConfigBuilder;
private boolean closed;
@Nonnull
private final List<FDBRecordContext> contextsToClose;
/**
* Creates a new runner for operating against a given database.
* @param database the underlying databse to open contexts against
* @param contextConfig configuration for how to open contexts
*/
public TransactionalRunner(@Nonnull FDBDatabase database,
@Nonnull FDBRecordContextConfig contextConfig) {
this(database, contextConfig.toBuilder());
}
/**
* Creates a runner for operating against a given database, with a <em>mutable</em>
* {@link FDBRecordContextConfig.Builder}.
* <p>
* You probably don't want to call this, and should probably call
* {@link #TransactionalRunner(FDBDatabase, FDBRecordContextConfig)} instead.
* </p>
* @param database the underlying database to open contexts against
* @param contextConfigBuilder configuration for how to open contexts.
* Note: The same as FDBDatabaseRunnerImpl, this maintains mutability, but that mutability is not thread safe, so
* you shouldn't change it, while simultaneously calling {@link #runAsync(boolean, Function)}.
*/
public TransactionalRunner(@Nonnull FDBDatabase database,
@Nonnull FDBRecordContextConfig.Builder contextConfigBuilder) {
this.database = database;
this.contextConfigBuilder = contextConfigBuilder;
contextsToClose = new ArrayList<>();
}
/**
* Run some code with a given context, and commit the context.
* <p>
* The context will be committed if the future returned by the runnable is successful, otherwise it will not
* be committed. If this {@code TransactionalRunner} is closed, so will the context passed to the runnable.
* </p>
* <p>
* Note: {@code runnable} is run in the current thread.
* </p>
* @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before
* creating the transaction. These should be cleared if retrying a transaction, particularly in response to a
* conflict, because reusing the old read version would just cause it to re-conflict.
* @param runnable some code to run that uses an {@link FDBRecordContext}
* @param <T> the type of the value returned by the future
* @return a future containing the result of the runnable, if successfully committed.
* Note: the future will not be {@code null}, but if the runnable returns a future containing {@code null} then
* so will the future returned here.
*/
@Nonnull
public <T> CompletableFuture<T> runAsync(final boolean clearWeakReadSemantics,
@Nonnull Function<? super FDBRecordContext, CompletableFuture<? extends T>> runnable) {
FDBRecordContext context = openContext(clearWeakReadSemantics);
return runnable.apply(context)
.thenCompose((T val) -> context.commitAsync().thenApply(vignore -> val))
.whenComplete((result, exception) -> context.close());
}
/**
* Run a function against a context synchronously.
* <p>
* Note: since committing the transaction is an inherently async function, this is a blocking call, so if
* calling async methods from within {@code runnable}, it is probably better to use
* {@link #runAsync(boolean, Function)}.
* </p>
* @param clearWeakReadSemantics whether to clear the {@link FDBRecordContextConfig#getWeakReadSemantics()} before
* creating the transaction. These should be cleared if retrying a transaction, particularly in response to a
* conflict, because reusing the old read version would just cause it to re-conflict.
* @param runnable some code to run synchronously that uses an {@link FDBRecordContext}
* @param <T> the type of the value returned by the runnable
* @return the value returned by {@code runnable}.
*/
public <T> T run(final boolean clearWeakReadSemantics,
@Nonnull Function<? super FDBRecordContext, ? extends T> runnable) {
final T result;
try (FDBRecordContext context = openContext(clearWeakReadSemantics)) {
result = runnable.apply(context);
context.commit();
}
return result;
}
/**
* Open a new context with the config attached to this runner, that will be closed when this runner is closed.
* <p>
* It is probably preferable to use {@link #run(boolean, Function)} or {@link #runAsync(boolean, Function)}
* over opening transactions directly, but this is exposed because {@link FDBDatabaseRunner} exposes it.
* </p>
* @return a new context
*/
@Nonnull
public FDBRecordContext openContext() {
return openContext(true);
}
@Nonnull
private FDBRecordContext openContext(boolean clearWeakReadSemantics) {
if (closed) {
throw new FDBDatabaseRunner.RunnerClosed();
}
FDBRecordContextConfig contextConfig;
if (!clearWeakReadSemantics || contextConfigBuilder.getWeakReadSemantics() == null) {
contextConfig = contextConfigBuilder.build();
} else {
// Clear any weak semantics to avoid reusing old read versions
contextConfig = contextConfigBuilder.copyBuilder().setWeakReadSemantics(null).build();
}
FDBRecordContext context = database.openContext(contextConfig);
addContextToClose(context);
return context;
}
private synchronized void addContextToClose(@Nonnull FDBRecordContext context) {
if (closed) {
context.close();
throw new FDBDatabaseRunner.RunnerClosed();
}
contextsToClose.removeIf(FDBRecordContext::isClosed);
contextsToClose.add(context);
}
@Override
public synchronized void close() {
if (closed) {
return;
}
contextsToClose.forEach(FDBRecordContext::close);
this.closed = true;
}
}