Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-1.2.0] PartitionId in the logging context #7910

Merged
18 commits merged into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/RaftServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
protected EntryValidator entryValidator = new NoopEntryValidator();
protected RaftElectionConfig electionConfig = RaftElectionConfig.ofDefaultElection();
protected RaftPartitionConfig partitionConfig = new RaftPartitionConfig();
protected int partitionId;

protected Builder(final MemberId localMemberId) {
this.localMemberId = checkNotNull(localMemberId, "localMemberId cannot be null");
Expand Down Expand Up @@ -500,6 +501,11 @@ public Builder withPartitionConfig(final RaftPartitionConfig partitionConfig) {
this.partitionConfig = partitionConfig;
return this;
}

public Builder withPartitionId(final int partitionId) {
this.partitionId = partitionId;
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,26 @@ public CompletableFuture<Void> bootstrap(final Collection<MemberId> cluster) {
return bootstrapFutureRef.get().whenComplete((result, error) -> bootstrapFutureRef.set(null));
}

@Override
public RaftMember getLeader() {
return raft.getLeader();
}

@Override
public RaftMember getLocalMember() {
return localMember;
}

@Override
public Collection<RaftMember> getMembers() {
return new ArrayList<>(members);
}

@Override
public long getTerm() {
return raft.getTerm();
}

private void ensureConfigurationIsConsistent(final Collection<MemberId> cluster) {
final var configuration = configurationRef.get();
final var hasPersistedConfiguration = configuration != null;
Expand Down Expand Up @@ -191,26 +211,6 @@ private void createInitialConfig(final Collection<MemberId> cluster) {
configure(new Configuration(0, 0, localMember.getLastUpdated().toEpochMilli(), activeMembers));
}

@Override
public RaftMember getLeader() {
return raft.getLeader();
}

@Override
public RaftMember getLocalMember() {
return localMember;
}

@Override
public Collection<RaftMember> getMembers() {
return new ArrayList<>(members);
}

@Override
public long getTerm() {
return raft.getTerm();
}

/**
* Returns a member by ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public RaftServer build() {
final RaftContext raft =
new RaftContext(
name,
partitionId,
localMemberId,
membershipService,
protocol,
Expand Down
10 changes: 10 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.MDC;

/**
* Manages the volatile state and state transitions of a Raft server.
Expand Down Expand Up @@ -130,9 +131,11 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {

private long lastHeartbeat;
private final RaftPartitionConfig partitionConfig;
private final int partitionId;

public RaftContext(
final String name,
final int partitionId,
final MemberId localMemberId,
final ClusterMembershipService membershipService,
final RaftServerProtocol protocol,
Expand All @@ -146,6 +149,7 @@ public RaftContext(
this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.storage = checkNotNull(storage, "storage cannot be null");
random = randomFactory.get();
this.partitionId = partitionId;

raftRoleMetrics = new RaftRoleMetrics(name);

Expand All @@ -170,6 +174,8 @@ public RaftContext(
threadContext =
threadContextFactory.createContext(
namedThreads(baseThreadName, log), this::onUncaughtException);
// in order to set the partition id once in the raft thread
threadContext.execute(() -> MDC.put("partitionId", Integer.toString(partitionId)));

// Open the metadata store.
meta = storage.openMetaStore();
Expand Down Expand Up @@ -1090,6 +1096,10 @@ public Duration getMaxQuorumResponseTimeout() {
return partitionConfig.getMaxQuorumResponseTimeout();
}

public int getPartitionId() {
return partitionId;
}

/** Raft server state. */
public enum State {
ACTIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private RaftServer buildServer() {

return RaftServer.builder(localMemberId)
.withName(partition.name())
.withPartitionId(partitionId)
.withMembershipService(membershipService)
.withProtocol(createServerProtocol())
.withPartitionConfig(partitionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public RaftContext createRaftContext(final MemberId memberId, final Random rando
final var raft =
new RaftContext(
memberId.id() + "-partition-1",
1,
memberId,
mock(ClusterMembershipService.class),
new ControllableRaftServerProtocol(memberId, serverProtocols, messageQueue),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ public final class ExporterDirector extends Actor implements HealthMonitorable,
private final ExporterMode exporterMode;
private final Duration distributionInterval;
private ExporterPositionsDistributionService exporterDistributionService;
private final int partitionId;

public ExporterDirector(final ExporterDirectorContext context, final boolean shouldPauseOnStart) {
name = context.getName();
containers =
context.getDescriptors().stream().map(ExporterContainer::new).collect(Collectors.toList());

logStream = Objects.requireNonNull(context.getLogStream());
final var partitionId = logStream.getPartitionId();
partitionId = logStream.getPartitionId();
metrics = new ExporterMetrics(partitionId);
recordExporter = new RecordExporter(metrics, containers, partitionId);
exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
Expand Down Expand Up @@ -134,6 +135,13 @@ public ActorFuture<ExporterPhase> getPhase() {
return actor.call(() -> exporterPhase);
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
public String getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.util.sched.Actor;
import java.util.Collection;
import java.util.Map;

public final class LogDeletionService extends Actor implements PersistedSnapshotListener {
private final LogCompactor logCompactor;
private final String actorName;
private final Collection<PersistedSnapshotStore> persistedSnapshotStores;
private final int partitionId;

public LogDeletionService(
final int nodeId,
Expand All @@ -27,6 +29,14 @@ public LogDeletionService(
this.persistedSnapshotStores = persistedSnapshotStores;
this.logCompactor = logCompactor;
actorName = buildActorName(nodeId, "DeletionService", partitionId);
this.partitionId = partitionId;
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
Expand Down