-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add R2DBC support #2545
Add R2DBC support #2545
Changes from 1 commit
98515fc
d3409df
3e2b34b
84ce94d
7b4a8c6
8903ed8
a3a5c5a
886c9e7
184da88
834dae8
937fe96
574df74
fa6a715
e145cc1
830f483
3db13dc
38e81dc
b28fb65
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package org.testcontainers.containers; | ||
|
||
import io.r2dbc.spi.ConnectionFactoryOptions; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.experimental.Delegate; | ||
import org.testcontainers.lifecycle.Startable; | ||
import org.testcontainers.r2dbc.R2DBCDatabaseContainer; | ||
|
||
@RequiredArgsConstructor | ||
public final class PostgreSQLR2DBCDatabaseContainer implements R2DBCDatabaseContainer { | ||
|
||
@Delegate(types = Startable.class) | ||
private final PostgreSQLContainer<?> container; | ||
|
||
public static ConnectionFactoryOptions getOptions(PostgreSQLContainer<?> container) { | ||
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder() | ||
.option(ConnectionFactoryOptions.DRIVER, PostgreSQLR2DBCDatabaseContainerProvider.DRIVER) | ||
.build(); | ||
|
||
return new PostgreSQLR2DBCDatabaseContainer(container).configure(options); | ||
} | ||
|
||
@Override | ||
public ConnectionFactoryOptions configure(ConnectionFactoryOptions options) { | ||
return options.mutate() | ||
.option(ConnectionFactoryOptions.HOST, container.getContainerIpAddress()) | ||
.option(ConnectionFactoryOptions.PORT, container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) | ||
.option(ConnectionFactoryOptions.DATABASE, container.getDatabaseName()) | ||
.option(ConnectionFactoryOptions.USER, container.getUsername()) | ||
.option(ConnectionFactoryOptions.PASSWORD, container.getPassword()) | ||
.build(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package org.testcontainers.containers; | ||
|
||
import io.r2dbc.spi.ConnectionFactoryOptions; | ||
import org.testcontainers.r2dbc.R2DBCDatabaseContainer; | ||
import org.testcontainers.r2dbc.R2DBCDatabaseContainerProvider; | ||
|
||
public final class PostgreSQLR2DBCDatabaseContainerProvider implements R2DBCDatabaseContainerProvider { | ||
|
||
static final String DRIVER = "postgresql"; | ||
|
||
@Override | ||
public boolean supports(ConnectionFactoryOptions options) { | ||
return DRIVER.equals(options.getRequiredValue(ConnectionFactoryOptions.DRIVER)); | ||
} | ||
|
||
@Override | ||
public R2DBCDatabaseContainer createContainer(ConnectionFactoryOptions options) { | ||
PostgreSQLContainer<?> container = new PostgreSQLContainer<>(options.getRequiredValue(IMAGE_OPTION)) | ||
.withDatabaseName(options.getRequiredValue(ConnectionFactoryOptions.DATABASE)); | ||
|
||
if (Boolean.TRUE.equals(options.getValue(REUSABLE_OPTION))) { | ||
container.withReuse(true); | ||
} | ||
return new PostgreSQLR2DBCDatabaseContainer(container); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
org.testcontainers.containers.PostgreSQLR2DBCDatabaseContainerProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package org.testcontainers.containers; | ||
|
||
import io.r2dbc.spi.Closeable; | ||
import io.r2dbc.spi.Connection; | ||
import io.r2dbc.spi.ConnectionFactories; | ||
import org.junit.Test; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
import static org.junit.Assert.*; | ||
|
||
public class PostgreSQLR2DBCDatabaseContainerTest { | ||
|
||
@Test | ||
public void testGetOptions() { | ||
try (PostgreSQLContainer<?> container = new PostgreSQLContainer<>()) { | ||
container.start(); | ||
|
||
int result = Flux | ||
.usingWhen( | ||
Mono.just( | ||
ConnectionFactories.get( | ||
PostgreSQLR2DBCDatabaseContainer.getOptions(container) | ||
) | ||
), | ||
connectionFactory -> { | ||
return Flux | ||
.usingWhen( | ||
connectionFactory.create(), | ||
connection -> connection.createStatement("SELECT 42").execute(), | ||
Connection::close | ||
) | ||
.flatMap(it -> it.map((row, meta) -> (Integer) row.get(0))); | ||
}, | ||
it -> ((Closeable) it).close() | ||
) | ||
.blockFirst(); | ||
|
||
assertEquals(42, result); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
<configuration> | ||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | ||
<!-- encoders are assigned the type | ||
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> | ||
<encoder> | ||
<pattern>%d{HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern> | ||
</encoder> | ||
</appender> | ||
|
||
<root level="INFO"> | ||
<appender-ref ref="STDOUT"/> | ||
</root> | ||
|
||
<logger name="org.testcontainers" level="DEBUG"/> | ||
</configuration> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
description = "Testcontainers :: R2DBC" | ||
|
||
dependencies { | ||
compile project(':testcontainers') | ||
compile 'io.r2dbc:r2dbc-spi:0.8.1.RELEASE' | ||
|
||
testCompile 'org.assertj:assertj-core:3.14.0' | ||
testCompile 'io.r2dbc:r2dbc-postgresql:0.8.1.RELEASE' | ||
testCompile project(':postgresql') | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package org.testcontainers.r2dbc; | ||
|
||
import org.reactivestreams.Subscription; | ||
|
||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
class CancellableSubscription implements Subscription { | ||
|
||
private final AtomicBoolean cancelled = new AtomicBoolean(); | ||
|
||
@Override | ||
public void request(long n) { | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
cancelled.set(true); | ||
} | ||
|
||
public boolean isCancelled() { | ||
return cancelled.get(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
package org.testcontainers.r2dbc; | ||
|
||
import io.r2dbc.spi.Connection; | ||
import io.r2dbc.spi.ConnectionFactory; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.function.Supplier; | ||
|
||
class ConnectionPublisher implements Publisher<Connection> { | ||
|
||
private final Supplier<CompletableFuture<ConnectionFactory>> futureSupplier; | ||
|
||
ConnectionPublisher(Supplier<CompletableFuture<ConnectionFactory>> futureSupplier) { | ||
this.futureSupplier = futureSupplier; | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super Connection> actual) { | ||
actual.onSubscribe(new StateMachineSubscription(actual)); | ||
} | ||
|
||
/** | ||
* Design notes: | ||
* - ConnectionPublisher is Mono-like (0..1), the request amount is ignored | ||
* - given the testing nature, the performance requirements are less strict | ||
* - "synchronized" is used to avoid races | ||
* - Reactive Streams spec violations are not checked (e.g. non-positive request) | ||
*/ | ||
private class StateMachineSubscription implements Subscription { | ||
|
||
private final Subscriber<? super Connection> actual; | ||
|
||
Subscription subscriptionState; | ||
|
||
StateMachineSubscription(Subscriber<? super Connection> actual) { | ||
this.actual = actual; | ||
subscriptionState = new WaitRequestSubscriptionState(); | ||
} | ||
|
||
@Override | ||
public synchronized void request(long n) { | ||
subscriptionState.request(n); | ||
} | ||
|
||
@Override | ||
public synchronized void cancel() { | ||
subscriptionState.cancel(); | ||
} | ||
|
||
synchronized void transitionTo(SubscriptionState newState) { | ||
subscriptionState = newState; | ||
newState.enter(); | ||
} | ||
|
||
abstract class SubscriptionState implements Subscription { | ||
void enter() { | ||
} | ||
} | ||
|
||
class WaitRequestSubscriptionState extends SubscriptionState { | ||
|
||
@Override | ||
public void request(long n) { | ||
transitionTo(new WaitFutureCompletionSubscriptionState()); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
} | ||
|
||
class WaitFutureCompletionSubscriptionState extends SubscriptionState { | ||
|
||
private CompletableFuture<ConnectionFactory> future; | ||
|
||
@Override | ||
void enter() { | ||
this.future = futureSupplier.get(); | ||
|
||
future.whenComplete((connectionFactory, e) -> { | ||
if (e != null) { | ||
actual.onSubscribe(EmptySubscription.INSTANCE); | ||
actual.onError(e); | ||
return; | ||
} | ||
|
||
Publisher<? extends Connection> publisher = connectionFactory.create(); | ||
transitionTo(new ProxySubscriptionState(publisher)); | ||
}); | ||
} | ||
|
||
@Override | ||
public void request(long n) { | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
future.cancel(true); | ||
} | ||
} | ||
|
||
class ProxySubscriptionState extends SubscriptionState implements Subscriber<Connection> { | ||
|
||
private final Publisher<? extends Connection> publisher; | ||
|
||
private Subscription s; | ||
|
||
ProxySubscriptionState(Publisher<? extends Connection> publisher) { | ||
this.publisher = publisher; | ||
} | ||
|
||
@Override | ||
void enter() { | ||
publisher.subscribe(this); | ||
} | ||
|
||
@Override | ||
public void request(long n) { | ||
// Ignore | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
s.cancel(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be a case when
The simples solution for that is to add flag and synchronization between request cancel and onsubscriber to ensure we are not ending up with NPE if the subscription has not arrived yet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch |
||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription s) { | ||
this.s = s; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here we have to have a flag, e.g (canceled) and if so, immediately call s.cancel on the given one |
||
s.request(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mp911de does the connection factory absolutely guarantees a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it does (checked with Mark), yes. See the design notes in the Javadoc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it was unclear to me that comment was about the factory, since it mentions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
@Override | ||
public void onNext(Connection connection) { | ||
actual.onNext(connection); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
actual.onError(t); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
actual.onComplete(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.testcontainers.r2dbc; | ||
|
||
import org.reactivestreams.Subscription; | ||
|
||
enum EmptySubscription implements Subscription { | ||
INSTANCE; | ||
|
||
@Override | ||
public void request(long n) { | ||
|
||
} | ||
|
||
@Override | ||
public void cancel() { | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that pattern was a bit harder for me to follow because of the habit of seeing
Subscription
as self-sufficient and thus aggressively guarded, but after review it looks correct. I feel it wouldn't be too hard to collapse the variousSubscriptionState
into theStateMachineSubscription
itself and deal with int/enum based states for transitions, but eh as long as it is correct...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking at it! FYI I just fixed a race reported by @OlegDokuka:
d3409df