From 8ac668c23f034963e3ed2024d8b2588edf3bb73a Mon Sep 17 00:00:00 2001 From: "gustavo.monarin" Date: Mon, 17 Oct 2022 21:22:39 +0200 Subject: [PATCH 1/5] Expose Redpanda out of box schema registry This change is intended to make the redpanda implementation of the schema registry easily available on test containers. The schema registry port `8081` was added to the list of exposed ports and a supporting method `getSchemaRegistryAddress` was added to easily use for configuring `schema.registry.url` serializer configuration. A minor change on the internal advertised address was made as the `kafka` alias was not available within the container and the panda proxy was getting lost with the following message: ``` INFO 2022-10-17 19:08:37,164 [shard 0] kafka/client - broker.cc:41 - connected to broker:-1 - 0.0.0.0:29092 WARN 2022-10-17 19:08:37,176 [shard 0] kafka/client - broker.cc:52 - std::system_error: kafka: Not found ERROR 2022-10-17 19:08:37,177 [shard 0] pandaproxy - service.cc:137 - Schema registry failed to initialize internal topic: kafka::client::broker_error ({ node: -1 }, { error_code: broker_not_available [8] }) ``` Since this is only an internal advertised address and the `dev-container` is an alias to only one node, this should not be an issue. An alternative approach for configuring the `kafka` alias within the container node would be welcome. Also, a simple test to check if the registry is available is made. Please let me know if you feel like we should actually perform some schema registration. --- modules/redpanda/build.gradle | 1 + .../redpanda/RedpandaContainer.java | 11 +++++++++-- .../redpanda/RedpandaContainerTest.java | 16 ++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/modules/redpanda/build.gradle b/modules/redpanda/build.gradle index 4d7221711a3..17e0ca201bc 100644 --- a/modules/redpanda/build.gradle +++ b/modules/redpanda/build.gradle @@ -5,4 +5,5 @@ dependencies { testImplementation 'org.apache.kafka:kafka-clients:3.3.0' testImplementation 'org.assertj:assertj-core:3.23.1' + testImplementation 'io.rest-assured:rest-assured:5.2.0' } diff --git a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java index cbb7db1e7c1..32b2ae20857 100644 --- a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java +++ b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java @@ -18,6 +18,8 @@ public class RedpandaContainer extends GenericContainer { private static final int REDPANDA_PORT = 9092; + private static final int SCHEMA_REGISTRY_PORT = 8081; + private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; public RedpandaContainer(String image) { @@ -33,7 +35,7 @@ public RedpandaContainer(DockerImageName imageName) { throw new IllegalArgumentException("Redpanda version must be >= v22.2.1"); } - withExposedPorts(REDPANDA_PORT); + withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT); withCreateContainerCmdModifier(cmd -> { cmd.withEntrypoint("sh"); }); @@ -49,7 +51,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { command += "/usr/bin/rpk redpanda start --mode dev-container "; command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 "; - command += "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092); + command += + "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092); copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT); } @@ -57,4 +60,8 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { public String getBootstrapServers() { return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT)); } + + public String getSchemaRegistryAddress() { + return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT)); + } } diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index 086bf43364b..15df8ff12c3 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -1,6 +1,7 @@ package org.testcontainers.redpanda; import com.google.common.collect.ImmutableMap; +import io.restassured.RestAssured; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -64,6 +65,21 @@ public void testNotCompatibleVersion() { .hasMessageContaining("Redpanda version must be >= v22.2.1"); } + @Test + public void testSchemaRegistry() { + try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) { + container.start(); + + io.restassured.response.Response response = RestAssured + .given() + .when() + .get(container.getSchemaRegistryAddress() + "/subjects") + .andReturn(); + + assertThat(response.getStatusCode()).isEqualTo(200); + } + } + private void testKafkaFunctionality(String bootstrapServers) throws Exception { testKafkaFunctionality(bootstrapServers, 1, 1); } From 25e951938555363cfffbf221d69db387cc66c002 Mon Sep 17 00:00:00 2001 From: "gustavo.monarin" Date: Wed, 19 Oct 2022 10:38:49 +0200 Subject: [PATCH 2/5] Add documentation Describes how to retrieve the schema registry address --- docs/modules/redpanda.md | 6 ++++++ .../redpanda/RedpandaContainerTest.java | 15 +++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/docs/modules/redpanda.md b/docs/modules/redpanda.md index 8749c9c87f5..8cdcc1ee83d 100644 --- a/docs/modules/redpanda.md +++ b/docs/modules/redpanda.md @@ -19,6 +19,12 @@ Now your tests or any other process running on your machine can get access to ru [Bootstrap Servers](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getBootstrapServers +Redpanda also provides a schema registry implementation. Like the Redpanda broker, you can access by using the following schema registry location: + + +[Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress + + ## Adding this module to your project dependencies Add the following dependency to your `pom.xml`/`build.gradle` file: diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index 15df8ff12c3..dd32151f407 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -70,13 +70,16 @@ public void testSchemaRegistry() { try (RedpandaContainer container = new RedpandaContainer(REDPANDA_DOCKER_IMAGE)) { container.start(); - io.restassured.response.Response response = RestAssured - .given() - .when() - .get(container.getSchemaRegistryAddress() + "/subjects") - .andReturn(); + String subjectsEndpoint = String.format( + "%s/subjects", + // getSchemaRegistryAddress { + container.getSchemaRegistryAddress() + // } + ); + + io.restassured.response.Response subjects = RestAssured.given().when().get(subjectsEndpoint).andReturn(); - assertThat(response.getStatusCode()).isEqualTo(200); + assertThat(subjects.getStatusCode()).isEqualTo(200); } } From 4453ac17e59fe29a2dd64249e14dbe34fafbf962 Mon Sep 17 00:00:00 2001 From: "gustavo.monarin" Date: Wed, 19 Oct 2022 13:06:26 +0200 Subject: [PATCH 3/5] Extend schema registry tests Schema registry tests for publishing and consuming new schemas. The tests only cover the api availability. Further reference should follow the official documentation as this is a quite extensive / complex subject, which would require code generation, plugins to have as example while the [official documentation](https://docs.confluent.io/platform/current/schema-registry/index.html) is quite good. --- .../redpanda/RedpandaContainerTest.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index dd32151f407..7ee1c8bdc04 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.tuple; +import static org.hamcrest.Matchers.hasItems; public class RedpandaContainerTest { @@ -77,9 +78,21 @@ public void testSchemaRegistry() { // } ); - io.restassured.response.Response subjects = RestAssured.given().when().get(subjectsEndpoint).andReturn(); - - assertThat(subjects.getStatusCode()).isEqualTo(200); + String subjectName = String.format("test-%s-value", UUID.randomUUID()); + + // register the new subject + RestAssured + .given() + .contentType("application/vnd.schemaregistry.v1+json") + .pathParam("subject", subjectName) + .body("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}") + .when() + .post(subjectsEndpoint + "/{subject}/versions") + .then() + .statusCode(200); + + // list all the registered subjects + RestAssured.given().when().get(subjectsEndpoint).then().statusCode(200).body("$", hasItems(subjectName)); } } From b501e7f232ad1dfee063fe28a30a6af70646a186 Mon Sep 17 00:00:00 2001 From: "gustavo.monarin" Date: Wed, 19 Oct 2022 19:04:51 +0200 Subject: [PATCH 4/5] Remove unecessary test comments --- .../java/org/testcontainers/redpanda/RedpandaContainerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index 7ee1c8bdc04..afeaf04d1f2 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -80,7 +80,6 @@ public void testSchemaRegistry() { String subjectName = String.format("test-%s-value", UUID.randomUUID()); - // register the new subject RestAssured .given() .contentType("application/vnd.schemaregistry.v1+json") @@ -91,7 +90,6 @@ public void testSchemaRegistry() { .then() .statusCode(200); - // list all the registered subjects RestAssured.given().when().get(subjectsEndpoint).then().statusCode(200).body("$", hasItems(subjectName)); } } From 7a36f32e595d5ef0064b7d4ce780907134c28eaa Mon Sep 17 00:00:00 2001 From: "gustavo.monarin" Date: Thu, 27 Oct 2022 17:30:52 +0200 Subject: [PATCH 5/5] switch from Hamcrest to AssertJ matchers --- .../redpanda/RedpandaContainerTest.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index afeaf04d1f2..5ffccd63c62 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -2,6 +2,7 @@ import com.google.common.collect.ImmutableMap; import io.restassured.RestAssured; +import io.restassured.response.Response; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -27,7 +28,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.tuple; -import static org.hamcrest.Matchers.hasItems; public class RedpandaContainerTest { @@ -80,17 +80,19 @@ public void testSchemaRegistry() { String subjectName = String.format("test-%s-value", UUID.randomUUID()); - RestAssured + Response createSubject = RestAssured .given() .contentType("application/vnd.schemaregistry.v1+json") .pathParam("subject", subjectName) .body("{\"schema\": \"{\\\"type\\\": \\\"string\\\"}\"}") .when() .post(subjectsEndpoint + "/{subject}/versions") - .then() - .statusCode(200); + .thenReturn(); + assertThat(createSubject.getStatusCode()).isEqualTo(200); - RestAssured.given().when().get(subjectsEndpoint).then().statusCode(200).body("$", hasItems(subjectName)); + Response allSubjects = RestAssured.given().when().get(subjectsEndpoint).thenReturn(); + assertThat(allSubjects.getStatusCode()).isEqualTo(200); + assertThat(allSubjects.jsonPath().getList("$")).contains(subjectName); } }