Skip to content

Commit

Permalink
Merge branch 'up/master' into website/branch-2.6.2-part-1
Browse files Browse the repository at this point in the history
* up/master:
  [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (apache#12786)
  Update deploy-bare-metal.md (apache#12432)
  [Broker] Fix producer getting incorrectly removed from topic's producers map (apache#12846)
  Add error log when new jetty client (apache#12840)
  JavaInstanceTest should be AssertEquals (apache#12836)
  [Transaction] Fix transaction flaky test testMaxReadPositionForNormalPublish (apache#12681)
  The problem of two exception handling (apache#12744)
  Fix TopicPoliciesCacheNotInitException issue. (apache#12773)
  Added local filesystem backend for package manager (apache#12708)
  • Loading branch information
Yan Zhang committed Nov 18, 2021
2 parents feca320 + fa7be23 commit 4f01b73
Show file tree
Hide file tree
Showing 26 changed files with 759 additions and 88 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Expand Up @@ -270,6 +270,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Expand Up @@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Expand Up @@ -234,12 +234,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
ml.invalidateLedgerHandle(lh);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return null;
}
);
}, ml.getExecutor().chooseThread(ml.getName()));
}
}

Expand Down Expand Up @@ -333,17 +328,7 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
if (exception instanceof BKException
&& ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
} else {
ml.invalidateLedgerHandle(lh);
ManagedLedgerException mlException = createManagedLedgerException(exception);
callback.readEntriesFailed(mlException, ctx);
}
return null;
});
}, ml.getExecutor().chooseThread(ml.getName()));
}
}

Expand Down
Expand Up @@ -215,9 +215,6 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
ml.mbean.addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
}

Expand Down
Expand Up @@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int brokerMaxConnectionsPerIp = 0;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+ " of namespace policy. This is enabled by default."
)
private boolean isAllowAutoUpdateSchemaEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker;

import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName;
import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING;
import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -63,7 +64,6 @@
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -230,8 +230,7 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc

public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
return pulsarService.getBrokerService()
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
.TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
.getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
TransactionRecoverTracker recoverTracker =
new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
Expand Down
Expand Up @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

Expand Down Expand Up @@ -333,20 +333,28 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return isAllowAutoUpdateSchema ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
: schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil
.failedFuture(
new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));

if (allowAutoUpdateSchema()) {
return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
} else {
return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));
}
}

private boolean allowAutoUpdateSchema() {
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
return isAllowAutoUpdateSchema;
}

@Override
Expand Down Expand Up @@ -639,13 +647,9 @@ protected void internalAddProducer(Producer producer) throws BrokerServiceExcept

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer)) {
oldProducer.close(false);
canOverwrite = true;
}
if (canOverwrite) {
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
Expand Down
Expand Up @@ -153,22 +153,17 @@ private String parseRemoteClusterName(String producerName, boolean isRemote, Str
return null;
}

@Override
public int hashCode() {
return Objects.hash(producerName);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Producer) {
Producer other = (Producer) obj;
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx);
}

return false;
/**
* Method to determine if this producer can replace another producer.
* @param other - producer to compare to this one
* @return true if this producer is a subsequent instantiation of the same logical producer. Otherwise, false.
*/
public boolean isSuccessorTo(Producer other) {
return Objects.equals(producerName, other.producerName)
&& Objects.equals(topic, other.topic)
&& producerId == other.producerId
&& Objects.equals(cnx, other.cnx)
&& other.getEpoch() < epoch;
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
Expand Down
Expand Up @@ -171,6 +171,10 @@ private void notifyListener(Message<PulsarEvent> msg) {

@Override
public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
}
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
Expand Down Expand Up @@ -209,24 +213,29 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
result.complete(null);
} else {
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
policyCacheInitMap.put(namespace, false);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
creatSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
readerCompletableFuture.whenComplete((reader, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
result.completeExceptionally(ex);
} else {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}
});
prepareInitPoliciesCache(namespace, result);
}
}
return result;
}

private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
creatSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
readerCompletableFuture.whenComplete((reader, ex) -> {
if (ex != null) {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
result.completeExceptionally(ex);
} else {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}
});
}
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
NamespaceName namespace) {
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
Expand Down Expand Up @@ -294,6 +303,9 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
reader.closeAsync();
return;
}
if (hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
Expand All @@ -302,6 +314,9 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(e);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
reader.closeAsync();
return;
}
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
Expand All @@ -316,7 +331,6 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);

// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
Expand Down
Expand Up @@ -501,10 +501,10 @@ public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}

publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -442,11 +443,20 @@ public void testAddRemoveProducer() throws Exception {
// OK
}

// 4. simple remove producer
// 4. Try to remove with unequal producer
Producer producerCopy = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty());
topic.removeProducer(producerCopy);
// Expect producer to be in map
assertEquals(topic.getProducers().size(), 1);
assertSame(topic.getProducers().get(producer.getProducerName()), producer);

// 5. simple remove producer
topic.removeProducer(producer);
assertEquals(topic.getProducers().size(), 0);

// 5. duplicate remove
// 6. duplicate remove
topic.removeProducer(producer); /* noop */
}

Expand Down

0 comments on commit 4f01b73

Please sign in to comment.