forked from quarkusio/quarkus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DevServicesApicurioRegistryProcessor.java
295 lines (254 loc) · 12.5 KB
/
DevServicesApicurioRegistryProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package io.quarkus.apicurio.registry.devservice;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CuratedApplicationShutdownBuildItem;
import io.quarkus.deployment.builditem.DevServicesResultBuildItem;
import io.quarkus.deployment.builditem.DevServicesResultBuildItem.RunningDevService;
import io.quarkus.deployment.builditem.DevServicesSharedNetworkBuildItem;
import io.quarkus.deployment.builditem.DockerStatusBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.console.ConsoleInstalledBuildItem;
import io.quarkus.deployment.console.StartupLogCompressor;
import io.quarkus.deployment.dev.devservices.GlobalDevServicesConfig;
import io.quarkus.deployment.logging.LoggingSetupBuildItem;
import io.quarkus.devservices.common.ConfigureUtil;
import io.quarkus.devservices.common.ContainerLocator;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
/**
* Starts Apicurio Registry as dev service if needed.
*/
public class DevServicesApicurioRegistryProcessor {
private static final Logger log = Logger.getLogger(DevServicesApicurioRegistryProcessor.class);
private static final int APICURIO_REGISTRY_PORT = 8080; // inside the container
private static final String APICURIO_REGISTRY_URL_CONFIG = "mp.messaging.connector.smallrye-kafka.apicurio.registry.url";
private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "mp.messaging.connector.smallrye-kafka.schema.registry.url";
/**
* Label to add to shared Dev Service for Apicurio Registry running in containers.
* This allows other applications to discover the running service and use it instead of starting a new instance.
*/
private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-apicurio-registry";
private static final ContainerLocator apicurioRegistryContainerLocator = new ContainerLocator(DEV_SERVICE_LABEL,
APICURIO_REGISTRY_PORT);
static volatile RunningDevService devService;
static volatile ApicurioRegistryDevServiceCfg cfg;
static volatile boolean first = true;
@BuildStep(onlyIfNot = IsNormal.class, onlyIf = GlobalDevServicesConfig.Enabled.class)
public DevServicesResultBuildItem startApicurioRegistryDevService(LaunchModeBuildItem launchMode,
DockerStatusBuildItem dockerStatusBuildItem,
ApicurioRegistryDevServicesBuildTimeConfig apicurioRegistryDevServices,
List<DevServicesSharedNetworkBuildItem> devServicesSharedNetworkBuildItem,
Optional<ConsoleInstalledBuildItem> consoleInstalledBuildItem,
CuratedApplicationShutdownBuildItem closeBuildItem,
LoggingSetupBuildItem loggingSetupBuildItem, GlobalDevServicesConfig devServicesConfig) {
ApicurioRegistryDevServiceCfg configuration = getConfiguration(apicurioRegistryDevServices);
if (devService != null) {
boolean restartRequired = !configuration.equals(cfg);
if (!restartRequired) {
return devService.toBuildItem();
}
shutdownApicurioRegistry();
cfg = null;
}
StartupLogCompressor compressor = new StartupLogCompressor(
(launchMode.isTest() ? "(test) " : "") + "Apicurio Registry Dev Services Starting:",
consoleInstalledBuildItem, loggingSetupBuildItem);
try {
devService = startApicurioRegistry(dockerStatusBuildItem, configuration, launchMode,
!devServicesSharedNetworkBuildItem.isEmpty(), devServicesConfig.timeout);
compressor.close();
} catch (Throwable t) {
compressor.closeAndDumpCaptured();
throw new RuntimeException(t);
}
if (devService == null) {
return null;
}
cfg = configuration;
if (devService.isOwner()) {
log.infof("Dev Services for Apicurio Registry started. The registry is available at %s",
devService.getConfig().get(APICURIO_REGISTRY_URL_CONFIG));
}
// Configure the watch dog
if (first) {
first = false;
Runnable closeTask = new Runnable() {
@Override
public void run() {
if (devService != null) {
shutdownApicurioRegistry();
}
first = true;
devService = null;
cfg = null;
}
};
closeBuildItem.addCloseTask(closeTask, true);
}
return devService.toBuildItem();
}
private Map<String, String> getRegistryUrlConfigs(String baseUrl) {
return Map.of(
APICURIO_REGISTRY_URL_CONFIG, baseUrl + "/apis/registry/v2",
CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG, baseUrl + "/apis/ccompat/v6");
}
private void shutdownApicurioRegistry() {
if (devService != null) {
try {
devService.close();
} catch (Throwable e) {
log.error("Failed to stop Apicurio Registry", e);
} finally {
devService = null;
}
}
}
private RunningDevService startApicurioRegistry(DockerStatusBuildItem dockerStatusBuildItem,
ApicurioRegistryDevServiceCfg config, LaunchModeBuildItem launchMode,
boolean useSharedNetwork, Optional<Duration> timeout) {
if (!config.devServicesEnabled) {
// explicitly disabled
log.debug("Not starting dev services for Apicurio Registry, as it has been disabled in the config.");
return null;
}
if (ConfigUtils.isPropertyPresent(APICURIO_REGISTRY_URL_CONFIG)) {
log.debug("Not starting dev services for Apicurio Registry, " + APICURIO_REGISTRY_URL_CONFIG + " is configured.");
return null;
}
if (ConfigUtils.isPropertyPresent(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG)) {
log.debug("Not starting dev services for Apicurio Registry, " + CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG
+ " is configured.");
return null;
}
if (!hasKafkaChannelWithoutRegistry()) {
log.debug("Not starting dev services for Apicurio Registry, all the channels have a registry URL configured.");
return null;
}
if (!dockerStatusBuildItem.isDockerAvailable()) {
log.warn("Docker isn't working, please run Apicurio Registry yourself.");
return null;
}
// Starting the broker
return apicurioRegistryContainerLocator.locateContainer(config.serviceName, config.shared, launchMode.getLaunchMode())
.map(address -> new RunningDevService(Feature.APICURIO_REGISTRY_AVRO.getName(),
address.getId(), null,
// address does not have the URL Scheme - just the host:port, so prepend http://
getRegistryUrlConfigs("http://" + address.getUrl())))
.orElseGet(() -> {
ApicurioRegistryContainer container = new ApicurioRegistryContainer(
DockerImageName.parse(config.imageName), config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null,
useSharedNetwork);
timeout.ifPresent(container::withStartupTimeout);
container.start();
return new RunningDevService(Feature.APICURIO_REGISTRY_AVRO.getName(), container.getContainerId(),
container::close, getRegistryUrlConfigs(container.getUrl()));
});
}
private boolean hasKafkaChannelWithoutRegistry() {
Config config = ConfigProvider.getConfig();
for (String name : config.getPropertyNames()) {
boolean isIncoming = name.startsWith("mp.messaging.incoming.");
boolean isOutgoing = name.startsWith("mp.messaging.outgoing.");
boolean isConnector = name.endsWith(".connector");
boolean isKafka = isConnector
&& "smallrye-kafka".equals(config.getOptionalValue(name, String.class).orElse("ignored"));
boolean isConfigured = false;
if ((isIncoming || isOutgoing) && isKafka) {
isConfigured = ConfigUtils.isPropertyPresent(name.replace(".connector", ".apicurio.registry.url"))
|| ConfigUtils.isPropertyPresent(name.replace(".connector", ".schema.registry.url"));
}
if (!isConfigured) {
return true;
}
}
return false;
}
private ApicurioRegistryDevServiceCfg getConfiguration(ApicurioRegistryDevServicesBuildTimeConfig cfg) {
return new ApicurioRegistryDevServiceCfg(cfg);
}
private static final class ApicurioRegistryDevServiceCfg {
private final boolean devServicesEnabled;
private final String imageName;
private final Integer fixedExposedPort;
private final boolean shared;
private final String serviceName;
public ApicurioRegistryDevServiceCfg(ApicurioRegistryDevServicesBuildTimeConfig config) {
this.devServicesEnabled = config.enabled.orElse(true);
this.imageName = config.imageName;
this.fixedExposedPort = config.port.orElse(0);
this.shared = config.shared;
this.serviceName = config.serviceName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ApicurioRegistryDevServiceCfg that = (ApicurioRegistryDevServiceCfg) o;
return devServicesEnabled == that.devServicesEnabled
&& Objects.equals(imageName, that.imageName)
&& Objects.equals(fixedExposedPort, that.fixedExposedPort)
&& shared == that.shared
&& Objects.equals(serviceName, that.serviceName);
}
@Override
public int hashCode() {
return Objects.hash(devServicesEnabled, imageName, fixedExposedPort, shared, serviceName);
}
}
private static final class ApicurioRegistryContainer extends GenericContainer<ApicurioRegistryContainer> {
private final int fixedExposedPort;
private final boolean useSharedNetwork;
private String hostName = null;
private ApicurioRegistryContainer(DockerImageName dockerImageName, int fixedExposedPort, String serviceName,
boolean useSharedNetwork) {
super(dockerImageName);
this.fixedExposedPort = fixedExposedPort;
this.useSharedNetwork = useSharedNetwork;
if (serviceName != null) { // Only adds the label in dev mode.
withLabel(DEV_SERVICE_LABEL, serviceName);
}
withEnv("QUARKUS_PROFILE", "prod");
if (!dockerImageName.getRepository().endsWith("apicurio/apicurio-registry-mem")) {
throw new IllegalArgumentException("Only apicurio/apicurio-registry-mem images are supported");
}
}
@Override
protected void configure() {
super.configure();
if (useSharedNetwork) {
hostName = ConfigureUtil.configureSharedNetwork(this, "kafka");
return;
}
if (fixedExposedPort > 0) {
addFixedExposedPort(fixedExposedPort, APICURIO_REGISTRY_PORT);
} else {
addExposedPorts(APICURIO_REGISTRY_PORT);
}
}
public String getUrl() {
return String.format("http://%s:%s", getHostToUse(), getPortToUse());
}
private String getHostToUse() {
return useSharedNetwork ? hostName : getHost();
}
private int getPortToUse() {
return useSharedNetwork ? APICURIO_REGISTRY_PORT : getMappedPort(APICURIO_REGISTRY_PORT);
}
}
}