From f5471c831d2163bcd2b525df3b67c6e1b2c76c16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez=20Gonzales?= Date: Thu, 17 Aug 2023 09:12:56 -0600 Subject: [PATCH] Improve RedpandaContainer (#7320) * Configure Redpanda via `.bootstrap.yml` and `redpanda.yml` * Allow to enable authorization and authentication methods * Allow to add additional listeners. E.g. using it with toxiproxy or redpanda console * Enable rest proxy * Configure rpk Fixes #6395 --------- Co-authored-by: Kevin Wittek --- docs/modules/redpanda.md | 38 +++ modules/redpanda/build.gradle | 1 + .../redpanda/RedpandaContainer.java | 222 +++++++++++++++++- .../testcontainers/bootstrap.yaml.ftl | 18 ++ .../resources/testcontainers/entrypoint-tc.sh | 8 + .../testcontainers/redpanda.yaml.ftl | 73 ++++++ .../redpanda/RedpandaContainerTest.java | 214 +++++++++++++++++ 7 files changed, 562 insertions(+), 12 deletions(-) create mode 100644 modules/redpanda/src/main/resources/testcontainers/bootstrap.yaml.ftl create mode 100644 modules/redpanda/src/main/resources/testcontainers/entrypoint-tc.sh create mode 100644 modules/redpanda/src/main/resources/testcontainers/redpanda.yaml.ftl diff --git a/docs/modules/redpanda.md b/docs/modules/redpanda.md index 51178788449..22545ffb026 100644 --- a/docs/modules/redpanda.md +++ b/docs/modules/redpanda.md @@ -25,6 +25,44 @@ Redpanda also provides a schema registry implementation. Like the Redpanda broke [Schema Registry](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:getSchemaRegistryAddress +It is also possible to enable security capabilities of Redpanda by using: + + +[Enable security](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:security + + +Superusers can be created by using: + + +[Register Superuser](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createSuperUser + + +Below is an example of how to create the `AdminClient`: + + +[Create Admin Client](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createAdminClient + + +There are scenarios where additional listeners are needed because the consumer/producer can be another +container in the same network or a different process where the port to connect differs from the default +exposed port `9092`. E.g [Toxiproxy](../../docs/modules/toxiproxy.md). + + +[Register additional listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:registerListener + + +Container defined in the same network: + + +[Create kcat container](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:createKCatContainer + + +Client using the new registered listener: + + +[Produce/Consume via new listener](../../modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java) inside_block:produceConsumeMessage + + ## Adding this module to your project dependencies Add the following dependency to your `pom.xml`/`build.gradle` file: diff --git a/modules/redpanda/build.gradle b/modules/redpanda/build.gradle index 3fbf9b7a8ab..0aa32791a9f 100644 --- a/modules/redpanda/build.gradle +++ b/modules/redpanda/build.gradle @@ -2,6 +2,7 @@ description = "Testcontainers :: Redpanda" dependencies { api project(':testcontainers') + shaded 'org.freemarker:freemarker:2.3.32' testImplementation 'org.apache.kafka:kafka-clients:3.5.1' testImplementation 'org.assertj:assertj-core:3.24.2' diff --git a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java index 3843c76d6d0..b4f5b94798b 100644 --- a/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java +++ b/modules/redpanda/src/main/java/org/testcontainers/redpanda/RedpandaContainer.java @@ -1,11 +1,31 @@ package org.testcontainers.redpanda; import com.github.dockerjava.api.command.InspectContainerResponse; +import freemarker.template.Configuration; +import freemarker.template.Template; +import lombok.AllArgsConstructor; +import lombok.Cleanup; +import lombok.Data; +import lombok.SneakyThrows; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.ComparableVersion; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Testcontainers implementation for Redpanda. @@ -14,6 +34,7 @@ * */ public class RedpandaContainer extends GenericContainer { @@ -30,9 +51,21 @@ public class RedpandaContainer extends GenericContainer { private static final int REDPANDA_PORT = 9092; + private static final int REDPANDA_ADMIN_PORT = 9644; + private static final int SCHEMA_REGISTRY_PORT = 8081; - private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; + private static final int REST_PROXY_PORT = 8082; + + private boolean enableAuthorization; + + private String authenticationMethod = "none"; + + private String schemaRegistryAuthenticationMethod = "none"; + + private final List superusers = new ArrayList<>(); + + private final Set> listenersValueSupplier = new HashSet<>(); public RedpandaContainer(String image) { this(DockerImageName.parse(image)); @@ -47,33 +80,198 @@ public RedpandaContainer(DockerImageName imageName) { throw new IllegalArgumentException("Redpanda version must be >= v22.2.1"); } - withExposedPorts(REDPANDA_PORT, SCHEMA_REGISTRY_PORT); + withExposedPorts(REDPANDA_PORT, REDPANDA_ADMIN_PORT, SCHEMA_REGISTRY_PORT, REST_PROXY_PORT); withCreateContainerCmdModifier(cmd -> { - cmd.withEntrypoint("sh"); + cmd.withEntrypoint(); + cmd.withUser("root:root"); }); - waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1)); - withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); + waitingFor(Wait.forLogMessage(".*Successfully started Redpanda!.*", 1)); + withCopyFileToContainer( + MountableFile.forClasspathResource("testcontainers/entrypoint-tc.sh", 0700), + "/entrypoint-tc.sh" + ); + withCommand("/entrypoint-tc.sh", "redpanda", "start", "--mode=dev-container", "--smp=1", "--memory=1G"); + } + + @Override + protected void configure() { + this.listenersValueSupplier.stream() + .map(Supplier::get) + .map(Listener::getAddress) + .forEach(this::withNetworkAliases); } + @SneakyThrows @Override protected void containerIsStarting(InspectContainerResponse containerInfo) { super.containerIsStarting(containerInfo); - String command = "#!/bin/bash\n"; - - command += "/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G "; - command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 "; - command += - "--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(9092); + Configuration cfg = new Configuration(Configuration.DEFAULT_INCOMPATIBLE_IMPROVEMENTS); + cfg.setClassLoaderForTemplateLoading(getClass().getClassLoader(), "testcontainers"); + cfg.setDefaultEncoding("UTF-8"); - copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT); + copyFileToContainer(getBootstrapFile(cfg), "/etc/redpanda/.bootstrap.yaml"); + copyFileToContainer(getRedpandaFile(cfg), "/etc/redpanda/redpanda.yaml"); } + /** + * Returns the bootstrap servers address. + * @return the bootstrap servers address + */ public String getBootstrapServers() { return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(REDPANDA_PORT)); } + /** + * Returns the schema registry address. + * @return the schema registry address + */ public String getSchemaRegistryAddress() { return String.format("http://%s:%s", getHost(), getMappedPort(SCHEMA_REGISTRY_PORT)); } + + /** + * Returns the admin address. + * @return the admin address + */ + public String getAdminAddress() { + return String.format("http://%s:%s", getHost(), getMappedPort(REDPANDA_ADMIN_PORT)); + } + + /** + * Returns the rest proxy address. + * @return the rest proxy address + */ + public String getRestProxyAddress() { + return String.format("http://%s:%s", getHost(), getMappedPort(REST_PROXY_PORT)); + } + + /** + * Enables authorization. + * @return this {@link RedpandaContainer} instance + */ + public RedpandaContainer enableAuthorization() { + this.enableAuthorization = true; + return this; + } + + /** + * Enables SASL. + * @return this {@link RedpandaContainer} instance + */ + public RedpandaContainer enableSasl() { + this.authenticationMethod = "sasl"; + return this; + } + + /** + * Enables Http Basic Auth for Schema Registry. + * @return this {@link RedpandaContainer} instance + */ + public RedpandaContainer enableSchemaRegistryHttpBasicAuth() { + this.schemaRegistryAuthenticationMethod = "http_basic"; + return this; + } + + /** + * Register username as a superuser. + * @param username username to register as a superuser + * @return this {@link RedpandaContainer} instance + */ + public RedpandaContainer withSuperuser(String username) { + this.superusers.add(username); + return this; + } + + /** + * Add a {@link Supplier} that will provide a listener with format {@code host:port}. + * Host will be added as a network alias. + *

+ * The listener will be added to the default listeners. + *

+ * Default listeners: + *

    + *
  • 0.0.0.0:9092
  • + *
  • 0.0.0.0:9093
  • + *
+ *

+ * Default advertised listeners: + *

    + *
  • {@code container.getHost():container.getMappedPort(9092)}
  • + *
  • 127.0.0.1:9093
  • + *
+ * @param listenerSupplier a supplier that will provide a listener + * @return this {@link RedpandaContainer} instance + */ + public RedpandaContainer withListener(Supplier listenerSupplier) { + String[] parts = listenerSupplier.get().split(":"); + this.listenersValueSupplier.add(() -> new Listener(parts[0], Integer.parseInt(parts[1]))); + return this; + } + + private Transferable getBootstrapFile(Configuration cfg) { + Map kafkaApi = new HashMap<>(); + kafkaApi.put("enableAuthorization", this.enableAuthorization); + kafkaApi.put("superusers", this.superusers); + + Map root = new HashMap<>(); + root.put("kafkaApi", kafkaApi); + + String file = resolveTemplate(cfg, "bootstrap.yaml.ftl", root); + + return Transferable.of(file, 0700); + } + + private Transferable getRedpandaFile(Configuration cfg) { + List> listeners = + this.listenersValueSupplier.stream() + .map(Supplier::get) + .map(listener -> { + Map listenerMap = new HashMap<>(); + listenerMap.put("address", listener.getAddress()); + listenerMap.put("port", listener.getPort()); + return listenerMap; + }) + .collect(Collectors.toList()); + + Map kafkaApi = new HashMap<>(); + kafkaApi.put("authenticationMethod", this.authenticationMethod); + kafkaApi.put("enableAuthorization", this.enableAuthorization); + kafkaApi.put("advertisedHost", getHost()); + kafkaApi.put("advertisedPort", getMappedPort(9092)); + kafkaApi.put("listeners", listeners); + + Map schemaRegistry = new HashMap<>(); + schemaRegistry.put("authenticationMethod", this.schemaRegistryAuthenticationMethod); + + Map root = new HashMap<>(); + root.put("kafkaApi", kafkaApi); + root.put("schemaRegistry", schemaRegistry); + + String file = resolveTemplate(cfg, "redpanda.yaml.ftl", root); + + return Transferable.of(file, 0700); + } + + @SneakyThrows + private String resolveTemplate(Configuration cfg, String template, Map data) { + Template temp = cfg.getTemplate(template); + + @Cleanup + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + @Cleanup + Writer out = new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8); + temp.process(data, out); + + return new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8); + } + + @Data + @AllArgsConstructor + private static class Listener { + + private String address; + + private int port; + } } diff --git a/modules/redpanda/src/main/resources/testcontainers/bootstrap.yaml.ftl b/modules/redpanda/src/main/resources/testcontainers/bootstrap.yaml.ftl new file mode 100644 index 00000000000..f066cf428aa --- /dev/null +++ b/modules/redpanda/src/main/resources/testcontainers/bootstrap.yaml.ftl @@ -0,0 +1,18 @@ +# Injected by testcontainers +# This file contains cluster properties which will only be considered when +# starting the cluster for the first time. Afterwards, you can configure cluster +# properties via the Redpanda Admi n API. +superusers: +<#if kafkaApi.superusers?has_content > + <#list kafkaApi.superusers as superuser> + - ${superuser} + +<#else> + [] + + +<#if kafkaApi.enableAuthorization > +kafka_enable_authorization: true + + +auto_create_topics_enabled: true diff --git a/modules/redpanda/src/main/resources/testcontainers/entrypoint-tc.sh b/modules/redpanda/src/main/resources/testcontainers/entrypoint-tc.sh new file mode 100644 index 00000000000..36932d7a27c --- /dev/null +++ b/modules/redpanda/src/main/resources/testcontainers/entrypoint-tc.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +# Wait for testcontainer's injected redpanda config with the port only known after docker start +until grep -q "# Injected by testcontainers" "/etc/redpanda/redpanda.yaml" +do + sleep 0.1 +done +exec /entrypoint.sh $@ \ No newline at end of file diff --git a/modules/redpanda/src/main/resources/testcontainers/redpanda.yaml.ftl b/modules/redpanda/src/main/resources/testcontainers/redpanda.yaml.ftl new file mode 100644 index 00000000000..6e0c124eb5c --- /dev/null +++ b/modules/redpanda/src/main/resources/testcontainers/redpanda.yaml.ftl @@ -0,0 +1,73 @@ +# Injected by testcontainers +<#setting boolean_format="c"> +<#setting number_format="c"> +redpanda: + admin: + address: 0.0.0.0 + port: 9644 + + kafka_api: + - address: 0.0.0.0 + name: external + port: 9092 + authentication_method: ${ kafkaApi.authenticationMethod } + + # This listener is required for the schema registry client. The schema + # registry client connects via an advertised listener like a normal Kafka + # client would do. It can't use the other listener because the mapped + # port is not accessible from within the Redpanda container. + - address: 0.0.0.0 + name: internal + port: 9093 + authentication_method: <#if kafkaApi.enableAuthorization >sasl<#else>none + +<#list kafkaApi.listeners as listener> + - address: 0.0.0.0 + name: ${listener.address} + port: ${listener.port} + + + advertised_kafka_api: + - address: ${ kafkaApi.advertisedHost } + name: external + port: ${ kafkaApi.advertisedPort } + - address: 127.0.0.1 + name: internal + port: 9093 +<#list kafkaApi.listeners as listener> + - address: ${listener.address} + name: ${listener.address} + port: ${listener.port} + + +schema_registry: + schema_registry_api: + - address: "0.0.0.0" + name: main + port: 8081 + authentication_method: ${ schemaRegistry.authenticationMethod } + +schema_registry_client: + brokers: + - address: localhost + port: 9093 + +pandaproxy: + pandaproxy_api: + - address: 0.0.0.0 + port: 8082 + name: proxy-internal + advertised_pandaproxy_api: + - address: 127.0.0.1 + port: 8082 + name: proxy-internal + +pandaproxy_client: + brokers: + - address: localhost + port: 9093 + +rpk: + kafka_api: + brokers: + - localhost:9093 diff --git a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java index 9a0a0edd030..d0f13d0952a 100644 --- a/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java +++ b/modules/redpanda/src/test/java/org/testcontainers/redpanda/RedpandaContainerTest.java @@ -2,7 +2,9 @@ import com.google.common.collect.ImmutableMap; import io.restassured.RestAssured; +import io.restassured.common.mapper.TypeRef; import io.restassured.response.Response; +import lombok.SneakyThrows; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -13,15 +15,24 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; import org.junit.Test; import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -96,6 +107,179 @@ public void testSchemaRegistry() { } } + @Test + public void testUsageWithListener() throws Exception { + try ( + Network network = Network.newNetwork(); + // registerListener { + RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7") + .withListener(() -> "redpanda:19092") + .withNetwork(network); + // } + // createKCatContainer { + GenericContainer kcat = new GenericContainer<>("confluentinc/cp-kcat:7.4.1") + .withCreateContainerCmdModifier(cmd -> { + cmd.withEntrypoint("sh"); + }) + .withCopyToContainer(Transferable.of("Message produced by kcat"), "/data/msgs.txt") + .withNetwork(network) + .withCommand("-c", "tail -f /dev/null") + // } + ) { + redpanda.start(); + kcat.start(); + // produceConsumeMessage { + kcat.execInContainer("kcat", "-b", "redpanda:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"); + String stdout = kcat + .execInContainer("kcat", "-b", "redpanda:19092", "-C", "-t", "msgs", "-c", "1") + .getStdout(); + // } + assertThat(stdout).contains("Message produced by kcat"); + } + } + + @SneakyThrows + @Test + public void enableSaslWithSuccessfulTopicCreation() { + try ( + // security { + RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7") + .enableAuthorization() + .enableSasl() + .withSuperuser("superuser-1") + // } + ) { + redpanda.start(); + + createSuperUser(redpanda); + + AdminClient adminClient = getAdminClient(redpanda); + String topicName = "messages-" + UUID.randomUUID(); + Collection topics = Collections.singletonList(new NewTopic(topicName, 1, (short) 1)); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + assertThat(adminClient.listTopics().names().get()).contains(topicName); + } + } + + @Test + public void enableSaslWithUnsuccessfulTopicCreation() { + try ( + RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7") + .enableAuthorization() + .enableSasl() + ) { + redpanda.start(); + + createSuperUser(redpanda); + + AdminClient adminClient = getAdminClient(redpanda); + String topicName = "messages-" + UUID.randomUUID(); + Collection topics = Collections.singletonList(new NewTopic(topicName, 1, (short) 1)); + + Awaitility + .await() + .untilAsserted(() -> { + assertThatThrownBy(() -> adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS)) + .hasCauseInstanceOf(TopicAuthorizationException.class); + }); + } + } + + @Test + public void enableSaslAndWithAuthenticationError() { + try ( + RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7") + .enableAuthorization() + .enableSasl() + ) { + redpanda.start(); + + AdminClient adminClient = getAdminClient(redpanda); + String topicName = "messages-" + UUID.randomUUID(); + Collection topics = Collections.singletonList(new NewTopic(topicName, 1, (short) 1)); + + Awaitility + .await() + .untilAsserted(() -> { + assertThatThrownBy(() -> adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS)) + .hasCauseInstanceOf(SaslAuthenticationException.class); + }); + } + } + + @Test + public void schemaRegistryWithHttpBasic() { + try ( + RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7") + .enableSchemaRegistryHttpBasicAuth() + .withSuperuser("superuser-1") + ) { + redpanda.start(); + + createSuperUser(redpanda); + + String subjectsEndpoint = String.format("%s/subjects", redpanda.getSchemaRegistryAddress()); + + RestAssured.when().get(subjectsEndpoint).then().statusCode(401); + + RestAssured + .given() + .auth() + .preemptive() + .basic("superuser-1", "test") + .get(subjectsEndpoint) + .then() + .statusCode(200); + } + } + + @SneakyThrows + @Test + public void testRestProxy() { + try (RedpandaContainer redpanda = new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.7")) { + redpanda.start(); + + redpanda.execInContainer("rpk", "topic", "create", "test_topic", "-p", "3"); + + String applicationKafkaJson = "application/vnd.kafka.json.v2+json"; + + String restProxy = redpanda.getRestProxyAddress(); + RestAssured + .given() + .contentType(applicationKafkaJson) + .body( + "{\"records\":[{\"value\":\"jsmith\",\"partition\":0},{\"value\":\"htanaka\",\"partition\":1},{\"value\":\"awalther\",\"partition\":2}]}" + ) + .post(String.format("%s/topics/test_topic", restProxy)) + .then() + .statusCode(200); + + RestAssured + .given() + .contentType("application/vnd.kafka.v2+json") + .body("{\"name\": \"test_consumer\", \"format\": \"json\", \"auto.offset.reset\": \"earliest\"}") + .post(String.format("%s/consumers/test_group", restProxy)) + .then() + .statusCode(200); + RestAssured + .given() + .contentType("application/vnd.kafka.v2+json") + .body("{\"topics\":[\"test_topic\"]}") + .post(String.format("%s/consumers/test_group/instances/test_consumer/subscription", restProxy)) + .then() + .statusCode(204); + + List> response = RestAssured + .given() + .accept(applicationKafkaJson) + .get(String.format("%s/consumers/test_group/instances/test_consumer/records", restProxy)) + .getBody() + .as(new TypeRef>>() {}); + assertThat(response).hasSize(3).extracting("value").containsExactly("jsmith", "htanaka", "awalther"); + } + } + private void testKafkaFunctionality(String bootstrapServers) throws Exception { testKafkaFunctionality(bootstrapServers, 1, 1); } @@ -159,4 +343,34 @@ private void testKafkaFunctionality(String bootstrapServers, int partitions, int consumer.unsubscribe(); } } + + private AdminClient getAdminClient(RedpandaContainer redpanda) { + String bootstrapServer = String.format("%s:%s", redpanda.getHost(), redpanda.getMappedPort(9092)); + // createAdminClient { + AdminClient adminClient = AdminClient.create( + ImmutableMap.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapServer, + AdminClientConfig.SECURITY_PROTOCOL_CONFIG, + "SASL_PLAINTEXT", + SaslConfigs.SASL_MECHANISM, + "SCRAM-SHA-256", + SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"superuser-1\" password=\"test\";" + ) + ); + // } + return adminClient; + } + + private void createSuperUser(RedpandaContainer redpanda) { + String adminUrl = String.format("%s/v1/security/users", redpanda.getAdminAddress()); + RestAssured + .given() + .contentType("application/json") + .body("{\"username\": \"superuser-1\", \"password\": \"test\", \"algorithm\": \"SCRAM-SHA-256\"}") + .post(adminUrl) + .then() + .statusCode(200); + } }