Skip to content

Commit

Permalink
fix: change the definition of recentlyJoinedConsumers position
Browse files Browse the repository at this point in the history
  • Loading branch information
equanz committed Apr 25, 2023
1 parent 6cfa468 commit 8c33d71
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ private PositionImplRecyclable(Handle<PositionImplRecyclable> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

public void setLedgerId(final long ledgerId) {
this.ledgerId = ledgerId;
}

public void setEntryId(final long entryId) {
this.entryId = entryId;
}

public static PositionImplRecyclable create() {
return RECYCLER.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private PositionImpl readPositionWhenJoining;
private String lastSentPositionsWhenJoiningString;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down Expand Up @@ -865,8 +865,8 @@ public ConsumerStatsImpl getStats() {
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
if (lastSentPositionsWhenJoiningString != null) {
stats.lastSentPositionsWhenJoining = lastSentPositionsWhenJoiningString;
}
return stats;
}
Expand Down Expand Up @@ -1084,8 +1084,8 @@ public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) {
this.readPositionWhenJoining = readPositionWhenJoining;
public void setLastSentPositionsWhenJoiningString(String lastSentPositionsWhenJoiningString) {
this.lastSentPositionsWhenJoiningString = lastSentPositionsWhenJoiningString;
}

public int getMaxUnackedMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,11 @@ public ManagedCursor getCursor() {
}

protected int getStickyKeyHash(Entry entry) {
return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
return getStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}

protected int getStickyKeyHash(byte[] stickyKey) {
return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1186,11 +1186,19 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.allowOutOfOrderDelivery = keySharedDispatcher.isAllowOutOfOrderDelivery();
subStats.keySharedMode = keySharedDispatcher.getKeySharedMode().toString();

LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = keySharedDispatcher
LinkedHashMap<Consumer, PersistentStickyKeyDispatcherMultipleConsumers.LastSentPositions>
recentlyJoinedConsumers = keySharedDispatcher
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
// Dispatchers allows same name consumers
final StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("consumerName=").append(k.consumerName())
.append(", consumerId=").append(k.consumerId());
if (k.cnx() != null) {
stringBuilder.append(", address=").append(k.cnx().clientAddress());
}
subStats.recentlyJoinedConsumers.put(stringBuilder.toString(), v.toPositionSetString());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public interface ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
boolean isBlockedConsumerOnUnackedMsgs();

/** The read position of the cursor when the consumer joining. */
String getReadPositionWhenJoining();
/** Last sent positions per sticky key of the cursor when the consumer joining. */
String getLastSentPositionsWhenJoining();

/** Address of this consumer. */
String getAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public interface SubscriptionStats {
/** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */
String getKeySharedMode();

/** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
Map<String, String> getConsumersAfterMarkDeletePosition();
/** This is for Key_Shared subscription to get the recentlyJoinedConsumers in the Key_Shared subscription. */
Map<String, String> getRecentlyJoinedConsumers();

/** SubscriptionProperties (key/value strings) associated with this subscribe. */
Map<String, String> getSubscriptionProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;

/** The read position of the cursor when the consumer joining. */
public String readPositionWhenJoining;
/** Last sent positions per sticky key of the cursor when the consumer joining. */
public String lastSentPositionsWhenJoining;

/** Address of this consumer. */
@JsonIgnore
Expand Down Expand Up @@ -129,7 +129,7 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) {
this.availablePermits += stats.availablePermits;
this.unackedMessages += stats.unackedMessages;
this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs;
this.readPositionWhenJoining = stats.readPositionWhenJoining;
this.lastSentPositionsWhenJoining = stats.lastSentPositionsWhenJoining;
return this;
}

Expand Down Expand Up @@ -177,8 +177,8 @@ public void setClientVersion(String clientVersion) {
this.stringBuffer.append(clientVersion);
}

public String getReadPositionWhenJoining() {
return readPositionWhenJoining;
public String getLastSentPositionsWhenJoining() {
return lastSentPositionsWhenJoining;
}

public String getLastAckedTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */
public String keySharedMode;

/** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
public Map<String, String> consumersAfterMarkDeletePosition;
/** This is for Key_Shared subscription to get the recentlyJoinedConsumers in the Key_Shared subscription. */
public Map<String, String> recentlyJoinedConsumers;

/** The number of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRanges;
Expand All @@ -149,7 +149,7 @@ public class SubscriptionStatsImpl implements SubscriptionStats {

public SubscriptionStatsImpl() {
this.consumers = new ArrayList<>();
this.consumersAfterMarkDeletePosition = new LinkedHashMap<>();
this.recentlyJoinedConsumers = new LinkedHashMap<>();
this.subscriptionProperties = new HashMap<>();
this.bucketDelayedIndexStats = new HashMap<>();
}
Expand All @@ -169,7 +169,7 @@ public void reset() {
lastExpireTimestamp = 0L;
lastMarkDeleteAdvancedTimestamp = 0L;
consumers.clear();
consumersAfterMarkDeletePosition.clear();
recentlyJoinedConsumers.clear();
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
delayedMessageIndexSizeInBytes = 0;
Expand Down Expand Up @@ -210,7 +210,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
}
}
this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery;
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
this.recentlyJoinedConsumers.putAll(stats.recentlyJoinedConsumers);
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
Expand Down

0 comments on commit 8c33d71

Please sign in to comment.