Skip to content

Commit

Permalink
Add cert auth to the components
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed May 1, 2024
1 parent 9c21ebd commit 2cda5cf
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
Expand Up @@ -45,6 +45,11 @@ spec:
initialDelaySeconds: 120
periodSeconds: 10
failureThreshold: 3
{{ if .Values.global.kafkaCertAuth }}
volumeMounts:
- name: kafka-config-certs
mountPath: /opt/kafka/certs
{{ end }}
resources:
{{ toYaml .Values.components.api.communication.resources | indent 10 }}
initContainers:
Expand All @@ -62,3 +67,8 @@ spec:
- name: provisioning-scripts
configMap:
name: provisioning-scripts
{{ if .Values.global.kafkaCertAuth }}
- name: kafka-config-certs
configMap:
name: kafka-config-certs
{{ end }}
1 change: 1 addition & 0 deletions infrastructure/helm-chart/templates/config/kafka.yaml
Expand Up @@ -17,3 +17,4 @@ data:
{{ .Values.config.kafka.saslCaCertificate | nindent 4 | trim }}
KAFKA_SASL_USERNAME: {{ .Values.config.kafka.saslUsername }}
KAFKA_SASL_PASSWORD: {{ .Values.config.kafka.saslPassword }}
KAFKA_KEY_TRUST_SECRET: {{ .Values.config.kafka.keyTrustSecret }}
Expand Up @@ -12,6 +12,10 @@
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;



public class KafkaConsumerWrapper<K, V> {
Expand All @@ -22,6 +26,7 @@ public class KafkaConsumerWrapper<K, V> {
private KafkaConsumer<K, V> consumer;

private String jaasConfig;
private String kafkaKeyTrustSecret;

public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
Expand All @@ -33,13 +38,22 @@ public KafkaConsumerWrapper(final String brokers, final String schemaRegistryUrl
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
}

public KafkaConsumerWrapper<K,V> withAuthJaas(String jaasConfig) {
public KafkaConsumerWrapper<K,V> withAuthJaas(String jaasConfig, String kafkaKeyTrustSecret) {
this.jaasConfig = jaasConfig;
if(jaasConfig != null) {
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", jaasConfig);
}
if (kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}
return this;
}

Expand Down
Expand Up @@ -30,13 +30,18 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;


public class KafkaStreamsWrapper {
private static final Logger log = AiryLoggerFactory.getLogger(KafkaStreamsWrapper.class);
private final HealthCheckRunner healthCheckRunnerThread;
private final String brokers;
private final String schemaRegistryUrl;
private String jaasConfig;
private String kafkaKeyTrustSecret;
private long commitIntervalInMs;
private long suppressIntervalInMs;
private int threadCount;
Expand Down Expand Up @@ -70,8 +75,9 @@ public KafkaStreamsWrapper(final String brokers, final String schemaRegistryUrl)
healthCheckRunnerThread = new HealthCheckRunner(testMode);
}

public KafkaStreamsWrapper withJaasConfig(String jaasConfig) {
public KafkaStreamsWrapper withJaasConfig(String jaasConfig, String kafkaKeyTrustSecret) {
this.jaasConfig = jaasConfig;
this.kafkaKeyTrustSecret = kafkaKeyTrustSecret;
return this;
}

Expand Down Expand Up @@ -227,6 +233,17 @@ public synchronized void start(final Topology topology, final String appId) thro
props.put("sasl.jaas.config", jaasConfig);
}

if (this.kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}


props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, this.maxRequestSize);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.fetchMaxBytes);

Expand Down
Expand Up @@ -13,6 +13,9 @@
import org.springframework.context.annotation.Scope;

import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap;

@Configuration
@PropertySource("classpath:kafka-core.properties")
Expand All @@ -21,7 +24,8 @@ public class KafkaCoreConfig {
@Lazy
@Scope("prototype")
public <K, V> KafkaProducer<K, V> kafkaProducer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl,
@Value("${AUTH_JAAS:#{null}}") final String jaasConfig) {
@Value("${AUTH_JAAS:#{null}}") final String jaasConfig,
@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) {
final Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
Expand All @@ -37,15 +41,26 @@ public <K, V> KafkaProducer<K, V> kafkaProducer(@Value("${kafka.brokers}") final
props.put("sasl.jaas.config", jaasConfig);
}

if (kafkaKeyTrustSecret != null) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/opt/kafka/certs/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeyTrustSecret);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyTrustSecret);
}

return new KafkaProducer<>(props);
}

@Bean
@Lazy
@Scope("prototype")
public <K, V> KafkaConsumerWrapper<K, V> kafkaConsumer(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl,
@Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig) {
@Value("${kafka.sasl.jaas.config:#{null}}") final String jaasConfig,
@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}") final String kafkaKeyTrustSecret) {
return new KafkaConsumerWrapper<K, V>(brokers, schemaRegistryUrl)
.withAuthJaas(jaasConfig);
.withAuthJaas(jaasConfig, kafkaKeyTrustSecret);
}
}
Expand Up @@ -30,6 +30,9 @@ public class KafkaStreamsConfig {
@Value("${AUTH_JAAS:#{null}}")
private String jaasConfig;

@Value("${KAFKA_KEY_TRUST_SECRET:#{null}}")
private String kafkaKeyTrustSecret;

@Value("${kafka.rpc-port:0}")
private int rpcPort;

Expand Down Expand Up @@ -68,7 +71,7 @@ public class KafkaStreamsConfig {
public KafkaStreamsWrapper airyKafkaStreams(@Value("${kafka.brokers}") final String brokers, @Value("${kafka.schema-registry-url}") final String schemaRegistryUrl) {
return new KafkaStreamsWrapper(brokers, schemaRegistryUrl)
.withCommitIntervalInMs(commitIntervalMs)
.withJaasConfig(jaasConfig)
.withJaasConfig(jaasConfig, kafkaKeyTrustSecret)
.withSuppressIntervalInMs(suppressIntervalMs)
.withThreadCount(streamsThreadCount)
.withAppServerHost(rpcHost)
Expand Down

0 comments on commit 2cda5cf

Please sign in to comment.