Skip to content

Commit

Permalink
merge: #9508
Browse files Browse the repository at this point in the history
9508: feat(gateway): deprecate the contactPoint property and introduce the initialContactProperties r=Zelldon a=aivinog1

## Description

<!-- Please explain the changes you made here. -->
Replace the `contactPoint` property with the `initialContactPoints` property. So, there now could be an array of broker connections on the start.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #4856



Co-authored-by: Alexey Vinogradov <vinogradov.a.i.93@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and aivinog1 committed Jun 27, 2022
2 parents 9ae83c1 + bf33870 commit 52adfaf
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.netty.util.NetUtil;
import java.util.Collections;

public final class EmbeddedGatewayCfg extends GatewayCfg implements ConfigurationEntry {

Expand All @@ -24,7 +25,9 @@ public void init(final BrokerCfg globalConfig, final String brokerBase) {

// ensure embedded gateway can access local broker
getCluster()
.setContactPoint(NetUtil.toSocketAddressString(networkCfg.getInternalApi().getAddress()));
.setInitialContactPoints(
Collections.singletonList(
NetUtil.toSocketAddressString(networkCfg.getInternalApi().getAddress())));

// configure embedded gateway based on broker config
getNetwork().setPort(getNetwork().getPort() + (networkCfg.getPortOffset() * 10));
Expand Down
10 changes: 10 additions & 0 deletions dist/src/main/config/gateway.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
# minKeepAliveInterval: 30s

# cluster:
# Sets initial contact points (brokers), which the gateway should contact to
# The contact points of the internal network configuration must be specified.
# The format is [HOST:PORT]
# Example:
# initialContactPoints : [ 192.168.1.22:26502, 192.168.1.32:26502 ]
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_INITIALCONTACTPOINTS
# specifying a comma-separated list of contact points.
# initialContactPoints : [127.0.0.1:26502]

# WARNING: This setting is deprecated! Use initialContactPoints instead.
# Sets the broker the gateway should initial contact
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_CONTACTPOINT.
# contactPoint: 127.0.0.1:26502
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ AtomixCluster createAtomixCluster() {
.withClusterId(clusterConfig.getClusterName())
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
.withNodes(Address.from(clusterConfig.getContactPoint()))
.withNodes(
clusterConfig.getInitialContactPoints().stream()
.map(Address::from)
.toArray(Address[]::new))
.build())
.withMembershipProtocol(membershipProtocol)
.withMessageCompression(clusterConfig.getMessageCompression());
Expand Down
2 changes: 2 additions & 0 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Loggers.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
public final class Loggers {

public static final Logger GATEWAY_LOGGER = LoggerFactory.getLogger("io.camunda.zeebe.gateway");
public static final Logger GATEWAY_CFG_LOGGER =
LoggerFactory.getLogger("io.camunda.zeebe.gateway.impl.configuration");
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@
*/
package io.camunda.zeebe.gateway.impl.configuration;

import static io.camunda.zeebe.gateway.Loggers.GATEWAY_CFG_LOGGER;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CLUSTER_HOST;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CLUSTER_MEMBER_ID;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CLUSTER_NAME;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CLUSTER_PORT;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CONTACT_POINT_HOST;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_CONTACT_POINT_PORT;
import static io.camunda.zeebe.gateway.impl.configuration.ConfigurationDefaults.DEFAULT_REQUEST_TIMEOUT;
import static io.camunda.zeebe.util.StringUtil.LIST_SANITIZER;

import io.atomix.cluster.messaging.MessagingConfig.CompressionAlgorithm;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public final class ClusterCfg {
private String contactPoint = DEFAULT_CONTACT_POINT_HOST + ":" + DEFAULT_CONTACT_POINT_PORT;

private List<String> initialContactPoints =
Collections.singletonList(DEFAULT_CONTACT_POINT_HOST + ":" + DEFAULT_CONTACT_POINT_PORT);
private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private String clusterName = DEFAULT_CLUSTER_NAME;
private String memberId = DEFAULT_CLUSTER_MEMBER_ID;
Expand Down Expand Up @@ -82,12 +87,11 @@ public ClusterCfg setAdvertisedPort(final int advertisedPort) {
return this;
}

public String getContactPoint() {
return contactPoint;
}

@Deprecated(since = "8.1.0", forRemoval = true)
public ClusterCfg setContactPoint(final String contactPoint) {
this.contactPoint = contactPoint;
GATEWAY_CFG_LOGGER.warn(
"Configuring deprecated property 'contactPoint', will use 'initialContactPoints'. Please consider to migrate to 'initialContactPoints' property, which allows to set a list of contact points.");
setInitialContactPoints(Collections.singletonList(contactPoint));
return this;
}

Expand Down Expand Up @@ -134,10 +138,19 @@ public void setMessageCompression(final CompressionAlgorithm compressionAlgorith
messageCompression = compressionAlgorithm;
}

public List<String> getInitialContactPoints() {
return initialContactPoints;
}

public ClusterCfg setInitialContactPoints(final List<String> initialContactPoints) {
this.initialContactPoints = LIST_SANITIZER.apply(initialContactPoints);
return this;
}

@Override
public int hashCode() {
return Objects.hash(
contactPoint,
initialContactPoints,
requestTimeout,
clusterName,
memberId,
Expand All @@ -158,7 +171,7 @@ public boolean equals(final Object o) {
}
final ClusterCfg that = (ClusterCfg) o;
return port == that.port
&& Objects.equals(contactPoint, that.contactPoint)
&& Objects.equals(initialContactPoints, that.initialContactPoints)
&& Objects.equals(requestTimeout, that.requestTimeout)
&& Objects.equals(clusterName, that.clusterName)
&& Objects.equals(memberId, that.memberId)
Expand All @@ -171,9 +184,8 @@ public boolean equals(final Object o) {
@Override
public String toString() {
return "ClusterCfg{"
+ "contactPoint='"
+ contactPoint
+ '\''
+ "initialContactPoints="
+ initialContactPoints
+ ", requestTimeout="
+ requestTimeout
+ ", clusterName='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.netty.util.NetUtil;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -74,7 +75,8 @@ public void setUp() {
.getCluster()
.setHost("0.0.0.0")
.setPort(SocketUtil.getNextAddress().getPort())
.setContactPoint(NetUtil.toSocketAddressString(broker.getSocketAddress()))
.setInitialContactPoints(
Collections.singletonList(NetUtil.toSocketAddressString(broker.getSocketAddress())))
.setRequestTimeout(Duration.ofSeconds(3));
configuration.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.InputStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;

Expand All @@ -35,7 +36,7 @@ public final class GatewayCfgTest {
CUSTOM_CFG.getNetwork().setHost("192.168.0.1").setPort(123);
CUSTOM_CFG
.getCluster()
.setContactPoint("foobar:1234")
.setInitialContactPoints(List.of("foobar:1234", "barfoo:5678"))
.setRequestTimeout(Duration.ofHours(123))
.setClusterName("testCluster")
.setMemberId("testMember")
Expand Down Expand Up @@ -150,7 +151,7 @@ public void shouldUseEnvironmentVariables() {
// given
setEnv("zeebe.gateway.network.host", "zeebe");
setEnv("zeebe.gateway.network.port", "5432");
setEnv("zeebe.gateway.cluster.contactPoint", "broker:432");
setEnv("zeebe.gateway.cluster.initialContactPoints", "broker:432,anotherBroker:789");
setEnv("zeebe.gateway.threads.managementThreads", "32");
setEnv("zeebe.gateway.cluster.requestTimeout", Duration.ofMinutes(43).toString());
setEnv("zeebe.gateway.cluster.longPollingEnabled", "false");
Expand Down Expand Up @@ -184,7 +185,7 @@ public void shouldUseEnvironmentVariables() {
.setMinKeepAliveInterval(Duration.ofSeconds(30));
expected
.getCluster()
.setContactPoint("broker:432")
.setInitialContactPoints(List.of("broker:432", "anotherBroker:789"))
.setRequestTimeout(Duration.ofMinutes(43))
.setClusterName("envCluster")
.setMemberId("envMember")
Expand Down Expand Up @@ -214,6 +215,54 @@ public void shouldUseEnvironmentVariables() {
assertThat(gatewayCfg).isEqualTo(expected);
}

@Test
public void shouldSetInitialContactPointsWhenSetContactPoint() {
// given
final String contactPoint = "foo-bar:1";

// when
final GatewayCfg gatewayCfg =
new GatewayCfg().setCluster(new ClusterCfg().setContactPoint(contactPoint));

// then
assertThat(gatewayCfg.getCluster().getInitialContactPoints()).containsExactly(contactPoint);
}

@Test
public void shouldSetInitialContactPointsWhenUseContactPointEnvironmentVariable() {
// given
final String contactPoint = "broker:789";
setEnv("zeebe.gateway.cluster.contactPoint", contactPoint);

final GatewayCfg expected =
new GatewayCfg()
.setCluster(new ClusterCfg().setInitialContactPoints(List.of(contactPoint)));
expected.init();

// when
final GatewayCfg gatewayCfg = readDefaultConfig();

// then
assertThat(gatewayCfg).isEqualTo(expected);
}

@Test
public void shouldSetInitialContactPointsWhenUseContactPointConfig() {
// given
final String contactPoint = "broker:789";
final GatewayCfg expected =
new GatewayCfg()
.setCluster(new ClusterCfg().setInitialContactPoints(List.of(contactPoint)));
expected.init();

// when
final GatewayCfg gatewayCfg =
readConfig("/configuration/gateway.deprecated.contactPoint.custom.yaml");

// then
assertThat(gatewayCfg).isEqualTo(expected);
}

private void setEnv(final String key, final String value) {
environment.put(key, value);
}
Expand Down
4 changes: 3 additions & 1 deletion gateway/src/test/resources/configuration/gateway.custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ zeebe:
port: 123

cluster:
contactPoint: foobar:1234
initialContactPoints:
- foobar:1234
- barfoo:5678
maxMessageSize: 4G
requestTimeout: 123h
clusterName: testCluster
Expand Down
11 changes: 11 additions & 0 deletions gateway/src/test/resources/configuration/gateway.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
# minKeepAliveInterval: 30s

# cluster:
# Sets initial contact points of brokers
# The contact points of the internal network configuration must be specified.
# The format is [HOST:PORT]
# Example:
# initialContactPoints : [ 192.168.1.22:26502, 192.168.1.32:26502 ]
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_INITIALCONTACTPOINTS
# specifying a comma-separated list of contact points.
# Default is 127.0.0.1:26502
# initialContactPoints : [127.0.0.1:26502]

# WARNING: This setting is deprecated! Use initialContactPoints instead.
# Sets the broker the gateway should initial contact
# This setting can also be overridden using the environment variable ZEEBE_GATEWAY_CLUSTER_CONTACTPOINT.
# contactPoint: 127.0.0.1:26502
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
zeebe:
gateway:
cluster:
contactPoint: broker:789
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,19 @@ private File getBrokerBase(final int nodeId) {
}

private Gateway createGateway() {
final String contactPoint =
NetUtil.toSocketAddressString(getBrokerCfg(0).getNetwork().getInternalApi().getAddress());
final List<String> initialContactPoints =
brokerCfgs.values().stream()
.map(
brokerCfg ->
NetUtil.toSocketAddressString(
brokerCfg.getNetwork().getInternalApi().getAddress()))
.collect(Collectors.toList());

final GatewayCfg gatewayCfg = new GatewayCfg();
gatewayCfg.getCluster().setContactPoint(contactPoint).setClusterName(clusterName);
gatewayCfg
.getCluster()
.setInitialContactPoints(initialContactPoints)
.setClusterName(clusterName);
gatewayCfg.getNetwork().setPort(SocketUtil.getNextAddress().getPort());
gatewayCfg.getCluster().setPort(SocketUtil.getNextAddress().getPort());
// temporarily increase request time out, but we should make this configurable per test
Expand All @@ -381,7 +389,10 @@ private Gateway createGateway() {
.withClusterId(clusterCfg.getClusterName())
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
.withNodes(Address.from(clusterCfg.getContactPoint()))
.withNodes(
clusterCfg.getInitialContactPoints().stream()
.map(Address::from)
.toArray(Address[]::new))
.build())
.withMembershipProtocol(
SwimMembershipProtocol.builder().withSyncInterval(Duration.ofSeconds(1)).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void setup() {
.getCluster()
.setHost("0.0.0.0")
.setPort(SocketUtil.getNextAddress().getPort())
.setContactPoint(internalApi.toString())
.setInitialContactPoints(Collections.singletonList(internalApi.toString()))
.setRequestTimeout(Duration.ofSeconds(10));
configuration.init();

Expand Down

0 comments on commit 52adfaf

Please sign in to comment.