Skip to content

Commit

Permalink
Legg til mulighet for kafka-header til statistikk (#1359)
Browse files Browse the repository at this point in the history
  • Loading branch information
jolarsen committed Apr 12, 2024
1 parent e096146 commit 27aa641
Showing 1 changed file with 18 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package no.nav.vedtak.felles.integrasjon.kafka;

import java.util.Optional;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;

import no.nav.vedtak.exception.IntegrasjonException;

Expand All @@ -21,16 +24,30 @@ public String getTopicName() {
return topicName;
}

public record KafkaHeader(String key, byte[] value) {}

public RecordMetadata send(String key, String message) {
if (topicName == null) {
throw kafkaPubliseringException("null", new IllegalArgumentException());
}
return send(key, message, this.topicName);
return send(null, key, message, topicName);
}

public RecordMetadata send(KafkaHeader header, String key, String message) {
if (topicName == null) {
throw kafkaPubliseringException("null", new IllegalArgumentException());
}
return send(header, key, message, this.topicName);
}

public RecordMetadata send(String key, String message, String topic) {
return send(null, key, message, topic);
}

public RecordMetadata send(KafkaHeader header, String key, String message, String topic) {
try {
var record = new ProducerRecord<>(topic, key, message);
Optional.ofNullable(header).ifPresent(h -> record.headers().add(new RecordHeader(h.key(), h.value())));
return producer.send(record).get();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Expand Down

0 comments on commit 27aa641

Please sign in to comment.