-
Notifications
You must be signed in to change notification settings - Fork 115
/
ReactorApp.java
71 lines (54 loc) · 2.21 KB
/
ReactorApp.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package io.confluent.parallelconsumer.examples.reactor;
/*-
* Copyright (C) 2020-2024 Confluent, Inc.
*/
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.reactor.ReactorProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import pl.tlinkowski.unij.api.UniMaps;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.Properties;
import static pl.tlinkowski.unij.api.UniLists.of;
@Slf4j
public class ReactorApp {
static String inputTopic = "input-topic-" + RandomUtils.nextInt();
Consumer<String, String> getKafkaConsumer() {
return new KafkaConsumer<>(new Properties());
}
Producer<String, String> getKafkaProducer() {
return new KafkaProducer<>(new Properties());
}
ReactorProcessor<String, String> parallelConsumer;
void run() {
Consumer<String, String> kafkaConsumer = getKafkaConsumer();
Producer<String, String> kafkaProducer = getKafkaProducer();
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
this.parallelConsumer = new ReactorProcessor<>(options);
parallelConsumer.subscribe(of(inputTopic));
postSetup();
// tag::example[]
parallelConsumer.react(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
return Mono.just("something todo"); // <1>
});
// end::example[]
}
void close() {
this.parallelConsumer.closeDrainFirst();
}
protected void postSetup() {
// no-op, for testing
}
}