Skip to content

Commit

Permalink
Merge branch 'up/master' into website/branch-2.7.2-chapter-4
Browse files Browse the repository at this point in the history
* up/master: (46 commits)
  [website][upgrade]feat: docs migration - version-2.7.2 Pulsar Schema (apache#12393)
  [docs] io-develop, fix broken link (apache#12414)
  docs(function): fix incorrect classname in python runtime sample (apache#12476)
  Remove redundant null check for getInternalListener (apache#12474)
  Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (apache#12451)
  [cli] Fix output format of string by pulsar-admin command (apache#11878)
  fix the race of delete subscription and delete topic (apache#12240)
  fix influxdb yaml doc (apache#12460)
  [Modernizer] Add Maven Modernizer plugin in pulsar-proxy module (apache#12326)
  fix DefaultCryptoKeyReaderTest can not run on windows (apache#12475)
  apache#12429 only fixed the compactor skips data issue, but the normal reader/consumer (apache#12464)
  broker resource group test optimize fail msg (apache#12438)
  Stop OffsetStore when stopping the connector (apache#12457)
  fix a typo in UnAckedMessageTracker (apache#12467)
  docs(function): fix typo in pip install (apache#12468)
  Optimize the code: remove extra spaces (apache#12470)
  optimize SecurityUtility code flow (apache#12431)
  Update lombok to 1.18.22 (apache#12466)
  Update team.js to add David K. as a committer (apache#12440)
  Fix java demo error in reset cursor admin (apache#12454)
  ...

# Conflicts:
#	site2/website-next/versioned_docs/version-2.7.2/schema-evolution-compatibility.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-get-started.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-manage.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-understand.md
#	site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
  • Loading branch information
Yan Zhang committed Oct 25, 2021
2 parents 987d7e8 + b1ef8be commit 69e812a
Show file tree
Hide file tree
Showing 741 changed files with 23,857 additions and 18,664 deletions.
13 changes: 13 additions & 0 deletions conf/broker.conf
Expand Up @@ -352,6 +352,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Default number of message dispatching throttling-limit for a subscription.
# Using a value of 0, is disabling default message dispatch-throttling.
dispatchThrottlingRatePerSubscriptionInMsg=0
Expand Down Expand Up @@ -391,6 +395,15 @@ dispatcherMinReadBatchSize=1
# Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
dispatcherMaxRoundRobinBatchSize=20

# The read failure backoff initial time in milliseconds. By default it is 15s.
dispatcherReadFailureBackoffInitialTimeInMs=15000

# The read failure backoff max time in milliseconds. By default it is 60s.
dispatcherReadFailureBackoffMaxTimeInMs=60000

# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
13 changes: 13 additions & 0 deletions conf/standalone.conf
Expand Up @@ -234,6 +234,10 @@ dispatchThrottlingRatePerTopicInMsg=0
# default message-byte dispatch-throttling
dispatchThrottlingRatePerTopicInByte=0

# Apply dispatch rate limiting on batch message instead individual
# messages with in batch message. (Default is disabled)
dispatchThrottlingOnBatchMessageEnabled=false

# Dispatch rate-limiting relative to publish rate.
# (Enabling flag will make broker to dynamically update dispatch-rate relatively to publish-rate:
# throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
Expand All @@ -243,6 +247,15 @@ dispatchThrottlingRateRelativeToPublishRate=false
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true

# The read failure backoff initial time in milliseconds. By default it is 15s.
dispatcherReadFailureBackoffInitialTimeInMs=15000

# The read failure backoff max time in milliseconds. By default it is 60s.
dispatcherReadFailureBackoffMaxTimeInMs=60000

# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

Expand Down
Expand Up @@ -425,7 +425,11 @@ void markDelete(Position position, Map<String, Long> properties)
* @param newReadPosition
* the position where to move the cursor
*/
void seek(Position newReadPosition);
default void seek(Position newReadPosition) {
seek(newReadPosition, false);
}

void seek(Position newReadPosition, boolean force);

/**
* Clear the cursor backlog.
Expand Down
Expand Up @@ -158,8 +158,8 @@ public void invalidateEntries(final PositionImpl lastPosition) {
Pair<Integer, Long> removed = entries.removeRange(firstPosition, lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
if (log.isDebugEnabled()) {
log.debug("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", ml.getName(),
if (log.isTraceEnabled()) {
log.trace("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", ml.getName(),
lastPosition, entriesRemoved, sizeRemoved);
}

Expand Down
Expand Up @@ -2163,18 +2163,16 @@ public void rewind() {
}

@Override
public void seek(Position newReadPositionInt) {
public void seek(Position newReadPositionInt, boolean force) {
checkArgument(newReadPositionInt instanceof PositionImpl);
PositionImpl newReadPosition = (PositionImpl) newReadPositionInt;

lock.writeLock().lock();
try {
if (newReadPosition.compareTo(markDeletePosition) <= 0) {
if (!force && newReadPosition.compareTo(markDeletePosition) <= 0) {
// Make sure the newReadPosition comes after the mark delete position
newReadPosition = ledger.getNextValidPosition(markDeletePosition);
}

PositionImpl oldReadPosition = readPosition;
readPosition = newReadPosition;
} finally {
lock.writeLock().unlock();
Expand Down
Expand Up @@ -175,7 +175,7 @@ public void rewind() {
}

@Override
public void seek(Position newReadPosition) {
public void seek(Position newReadPosition, boolean force) {
}

@Override
Expand Down
Expand Up @@ -170,7 +170,7 @@ protected void startBKCluster(String ledgerPath) throws Exception {

// Create Bookie Servers (B1, B2, B3)
for (int i = 0; i < numBookies; i++) {
if (ledgerPath != "") {
if (!"".equals(ledgerPath)) {
ServerConfiguration configuration = newServerConfiguration(ledgerPath + "/ledgers");
startBookie(configuration, ledgerPath + "/ledgers");
}else {
Expand Down Expand Up @@ -226,7 +226,7 @@ protected ServerConfiguration newServerConfiguration(int port, String zkServers,
File[] ledgerDirs, String ledgerRootPath) {
ServerConfiguration conf = new ServerConfiguration(baseConf);
conf.setBookiePort(port);
if (ledgerRootPath != "") {
if (!"".equals(ledgerRootPath)) {
conf.setMetadataServiceUri("zk://" + zkUtil.getZooKeeperConnectString() + ledgerRootPath);
}else {
conf.setZkServers(zkServers);
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Expand Up @@ -183,7 +183,7 @@ flexible messaging model and an intuitive client API.</description>
<hppc.version>0.7.3</hppc.version>
<spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
<assertj-core.version>3.18.1</assertj-core.version>
<lombok.version>1.18.20</lombok.version>
<lombok.version>1.18.22</lombok.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<jaxb-api>2.3.1</jaxb-api>
<javax.activation.version>1.2.0</javax.activation.version>
Expand Down Expand Up @@ -248,6 +248,7 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>6.1.6</dependency-check-maven.version>
<modernizer-maven-plugin.version>2.3.0</modernizer-maven-plugin.version>

<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
Expand Down
Expand Up @@ -699,6 +699,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Default number of message-bytes dispatching throttling-limit for every topic. \n\n"
+ "Using a value of 0, is disabling default message-byte dispatch-throttling")
private long dispatchThrottlingRatePerTopicInByte = 0;
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Apply dispatch rate limiting on batch message instead individual "
+ "messages with in batch message. (Default is disabled)")
private boolean dispatchThrottlingOnBatchMessageEnabled = false;

@FieldContext(
dynamic = true,
Expand Down Expand Up @@ -779,6 +785,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int dispatcherMinReadBatchSize = 1;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "The read failure backoff initial time in milliseconds. By default it is 15s."
)
private int dispatcherReadFailureBackoffInitialTimeInMs = 15000;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "The read failure backoff max time in milliseconds. By default it is 60s."
)
private int dispatcherReadFailureBackoffMaxTimeInMs = 60000;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "The read failure backoff mandatory stop time in milliseconds. By default it is 0s."
)
private int dispatcherReadFailureBackoffMandatoryStopTimeInMs = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Expand Up @@ -83,7 +83,7 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat

/**
* Gets the internal advertised listener for broker-to-broker communication.
* @return an advertised listener
* @return a non-null advertised listener
*/
public static AdvertisedListener getInternalListener(ServiceConfiguration config) {
Map<String, AdvertisedListener> result = MultipleListenerValidator
Expand Down
Expand Up @@ -97,7 +97,6 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
}
}
String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
sets.add(entry.getKey());
if (sets.size() > 1) {
Expand Down
Expand Up @@ -1412,7 +1412,7 @@ public TransactionBufferClient getTransactionBufferClient() {
*/
protected String brokerUrl(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
return internalListener != null && internalListener.getBrokerServiceUrl() != null
return internalListener.getBrokerServiceUrl() != null
? internalListener.getBrokerServiceUrl().toString() : null;
}

Expand All @@ -1425,7 +1425,7 @@ public static String brokerUrl(String host, int port) {
*/
public String brokerUrlTls(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
return internalListener != null && internalListener.getBrokerServiceUrlTls() != null
return internalListener.getBrokerServiceUrlTls() != null
? internalListener.getBrokerServiceUrlTls().toString() : null;
}

Expand Down
Expand Up @@ -102,7 +102,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
log.info(
"Attempting to shed load on {}, which has max resource usage above avgUsage and threshold {}%"
+ " > {}% + {}% -- Offloading at least {} MByte/s of traffic, left throughput {} MByte/s",
broker, currentUsage, avgUsage, threshold, minimumThroughputToOffload / MB,
broker, 100 * currentUsage, 100 * avgUsage, 100 * threshold, minimumThroughputToOffload / MB,
(brokerCurrentThroughput - minimumThroughputToOffload) / MB);

MutableDouble trafficMarkedToOffload = new MutableDouble(0);
Expand Down
Expand Up @@ -47,10 +47,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
protected final Subscription subscription;

protected final ServiceConfiguration serviceConfig;
protected final boolean dispatchThrottlingOnBatchMessageEnabled;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
}

/**
Expand Down Expand Up @@ -97,24 +99,26 @@ protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
return filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
isReplayRead);
}

public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}
totalEntries++;
ByteBuf metadataAndPayload = entry.getDataBuffer();
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
Expand Down Expand Up @@ -182,6 +186,7 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
return totalEntries;
}

/**
Expand Down
Expand Up @@ -118,7 +118,7 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
}
}
});
})
})
);
}
});
Expand All @@ -141,6 +141,17 @@ private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, T
}

private void notifyListener(Message<PulsarEvent> msg) {
// delete policies
if (msg.getValue() == null) {
TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(null);
}
}
return;
}

if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
return;
}
Expand Down Expand Up @@ -252,12 +263,11 @@ public void unLoad(NamespaceBundle bundle) {
removeOwnedNamespaceBundleAsync(bundle);
}

@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}

});
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return true;
}
});
}

private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
Expand Down Expand Up @@ -393,7 +403,8 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<Puls
if (e != null) {
future.completeExceptionally(e);
}
if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
if (msg.getValue() != null
&& EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
TopicPoliciesEvent topicPoliciesEvent = msg.getValue().getTopicPoliciesEvent();
if (topicName.equals(TopicName.get(
topicPoliciesEvent.getDomain(),
Expand Down
Expand Up @@ -495,6 +495,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

int firstAvailableConsumerPermits, currentTotalAvailablePermits;
Expand Down Expand Up @@ -541,8 +542,9 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
totalEntries += filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
Expand Down Expand Up @@ -571,13 +573,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}

// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}

Expand Down

0 comments on commit 69e812a

Please sign in to comment.