Skip to content

Commit

Permalink
Expose Redpanda schema registry (#5994)
Browse files Browse the repository at this point in the history
The schema registry port `8081` was added to the list of exposed ports and a supporting method `getSchemaRegistryAddress` was added to easily use for configuring `schema.registry.url` serializer configuration.

A minor change on the internal advertised address was made as the `kafka` alias was not available within the container and the panda proxy was getting lost with the following message:
```
INFO  2022-10-17 19:08:37,164 [shard 0] kafka/client - broker.cc:41 - connected to broker:-1 - 0.0.0.0:29092
WARN  2022-10-17 19:08:37,176 [shard 0] kafka/client - broker.cc:52 - std::system_error: kafka: Not found
ERROR 2022-10-17 19:08:37,177 [shard 0] pandaproxy - service.cc:137 - Schema registry failed to initialize internal topic: kafka::client::broker_error ({ node: -1 }, { error_code: broker_not_available [8] })
```
  • Loading branch information
gustavomonarin committed Oct 27, 2022
1 parent b7925be commit 0bb6925
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/modules/redpanda.md
Expand Up @@ -19,6 +19,12 @@ Now your tests or any other process running on your machine can get access to ru
[Bootstrap Servers](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getBootstrapServers
<!--/codeinclude-->

Redpanda also provides a schema registry implementation. Like the Redpanda broker, you can access by using the following schema registry location:

<!--codeinclude-->
[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress
<!--/codeinclude-->

## Adding this module to your project dependencies

Add the following dependency to your `pom.xml`/`build.gradle` file:
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/build.gradle
Expand Up @@ -5,4 +5,5 @@ dependencies {

testImplementation 'org.apache.kafka:kafka-clients:3.3.0'
testImplementation 'org.assertj:assertj-core:3.23.1'
testImplementation 'io.rest-assured:rest-assured:5.2.0'
}
Expand Up @@ -18,6 +18,8 @@ public class RedpandaContainer extends GenericContainer<RedpandaContainer> {

private static final int REDPANDA_PORT = 9092;

private static final int SCHEMA_REGISTRY_PORT = 8081;

private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

public RedpandaContainer(String image) {
Expand All @@ -33,7 +35,7 @@ public RedpandaContainer(DockerImageName imageName) {
throw new IllegalArgumentException("Redpanda version must be >= v22.2.1");
}

withExposedPorts(REDPANDA_PORT);
withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT);
withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
});
Expand All @@ -49,12 +51,17 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {

command += "/usr/bin/rpk redpanda start --mode dev-container ";
command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";
command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);
command +=
"--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092);

copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

public String getBootstrapServers() {
return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT));
}

public String getSchemaRegistryAddress() {
return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT));
}
}
@@ -1,6 +1,8 @@
package org.testcontainers.redpanda;

import com.google.common.collect.ImmutableMap;
import io.restassured.RestAssured;
import io.restassured.response.Response;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -64,6 +66,36 @@ public void testNotCompatibleVersion() {
.hasMessageContaining("Redpanda version must be >= v22.2.1");
}

@Test
public void testSchemaRegistry() {
try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) {
container.start();

String subjectsEndpoint = String.format(
"%s/subjects",
// getSchemaRegistryAddress {
container.getSchemaRegistryAddress()
// }
);

String subjectName = String.format("test-%s-value", UUID.randomUUID());

Response createSubject = RestAssured
.given()
.contentType("application/vnd.schemaregistry.v1+json")
.pathParam("subject", subjectName)
.body("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}")
.when()
.post(subjectsEndpoint + "/{subject}/versions")
.thenReturn();
assertThat(createSubject.getStatusCode()).isEqualTo(200);

Response allSubjects = RestAssured.given().when().get(subjectsEndpoint).thenReturn();
assertThat(allSubjects.getStatusCode()).isEqualTo(200);
assertThat(allSubjects.jsonPath().getList("$")).contains(subjectName);
}
}

private void testKafkaFunctionality(String bootstrapServers) throws Exception {
testKafkaFunctionality(bootstrapServers, 1, 1);
}
Expand Down

0 comments on commit 0bb6925

Please sign in to comment.