Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Redpanda schema registry #5994

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/modules/redpanda.md
Expand Up @@ -19,6 +19,12 @@ Now your tests or any other process running on your machine can get access to ru
[Bootstrap Servers](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getBootstrapServers
<!--/codeinclude-->

Redpanda also provides a schema registry implementation. Like the Redpanda broker, you can access by using the following schema registry location:

<!--codeinclude-->
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Expand Up @@ -5,4 +5,5 @@ dependencies {

testImplementation 'org.apache.kafka:kafka-clients:3.3.0'
testImplementation 'org.assertj:assertj-core:3.23.1'
testImplementation 'io.rest-assured:rest-assured:5.2.0'
}
Expand Up @@ -18,6 +18,8 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

public RedpandaContainer(String image) {
Expand All @@ -33,7 +35,7 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT);
withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
});
Expand All @@ -49,12 +51,17 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {

command += "/usr/bin/rpk redpanda start --mode dev-container ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
eddumelendez marked this conversation as resolved.
Show resolved Hide resolved

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}

public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}
}
@@ -1,6 +1,8 @@
package org.testcontainers.redpanda;

import com.google.common.collect.ImmutableMap;
import io.restassured.RestAssured;
import io.restassured.response.Response;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -64,6 +66,36 @@ public void testNotCompatibleVersion() {
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void testSchemaRegistry() {
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
container.start();

String subjectsEndpoint = String.format(
"%s/subjects",
// getSchemaRegistryAddress {
container.getSchemaRegistryAddress()
// }
);

String subjectName = String.format("test-%s-value", UUID.randomUUID());

Response createSubject = RestAssured
.given()
.contentType("application/vnd.schemaregistry.v1+json")
.pathParam("subject", subjectName)
.body("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}")
.when()
.post(subjectsEndpoint + "/{subject}/versions")
.thenReturn();
assertThat(createSubject.getStatusCode()).isEqualTo(200);

Response allSubjects = RestAssured.given().when().get(subjectsEndpoint).thenReturn();
assertThat(allSubjects.getStatusCode()).isEqualTo(200);
assertThat(allSubjects.jsonPath().getList("$")).contains(subjectName);
}
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down