diff --git a/conf/broker.conf b/conf/broker.conf index 13e955bdee53d..700f9a5adc9b1 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -270,6 +270,10 @@ brokerMaxConnections=0 # The maximum number of connections per IP. If it exceeds, new connections are rejected. brokerMaxConnectionsPerIp=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 87d9e058f465c..906280c180253 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check. maxTopicsPerNamespace=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b832c9568d718..9ca086a4c7d67 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int brokerMaxConnectionsPerIp = 0; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'" + + " of namespace policy. This is enabled by default." + ) + private boolean isAllowAutoUpdateSchemaEnabled = true; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f4f7615688fc2..73885678bf9ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic { @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; - protected volatile boolean isAllowAutoUpdateSchema = true; + protected volatile Boolean isAllowAutoUpdateSchema; // schema validation enforced flag protected volatile boolean schemaValidationEnforced = false; @@ -333,20 +333,28 @@ public CompletableFuture addSchema(SchemaData schema) { String base = TopicName.get(getName()).getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return isAllowAutoUpdateSchema ? schemaRegistryService - .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy) - : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> - schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) - .thenCompose(schemaVersion -> { - if (schemaVersion == null) { - return FutureUtil - .failedFuture( - new IncompatibleSchemaException( - "Schema not found and schema auto updating is disabled.")); - } else { - return CompletableFuture.completedFuture(schemaVersion); - } - })); + + if (allowAutoUpdateSchema()) { + return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy); + } else { + return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> + schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) + .thenCompose(schemaVersion -> { + if (schemaVersion == null) { + return FutureUtil.failedFuture(new IncompatibleSchemaException( + "Schema not found and schema auto updating is disabled.")); + } else { + return CompletableFuture.completedFuture(schemaVersion); + } + })); + } + } + + private boolean allowAutoUpdateSchema() { + if (isAllowAutoUpdateSchema == null) { + return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled(); + } + return isAllowAutoUpdateSchema; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 293f71d5f878f..80168b9ae4c9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg } } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") + public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy) + throws Exception { + + final String tenant = PUBLIC_TENANT; + final String topic = "test-consumer-compatibility"; + String namespace = "test-namespace-" + randomName(16); + String fqtn = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topic + ).toString(); + + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), + SchemaCompatibilityStrategy.FULL); + + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); + admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); + + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + ProducerBuilder producerThreeBuilder = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtn); + try { + producerThreeBuilder.create(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled.")); + } + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true); + ConsumerBuilder comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test") + .topic(fqtn); + + Producer producer = producerThreeBuilder.create(); + Consumer consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + Message message = consumerTwo.receive(); + + Schemas.PersonTwo personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + producer.close(); + consumerTwo.close(); + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + + producer = producerThreeBuilder.create(); + consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + message = consumerTwo.receive(); + + personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + consumerTwo.close(); + producer.close(); + } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception { final String tenant = PUBLIC_TENANT; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 7f56bc8e50735..4d29a5fdee669 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -107,7 +107,7 @@ public class Policies { public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; @SuppressWarnings("checkstyle:MemberName") - public boolean is_allow_auto_update_schema = true; + public Boolean is_allow_auto_update_schema = null; @SuppressWarnings("checkstyle:MemberName") public boolean schema_validation_enforced = false; diff --git a/site2/website-next/docs/reference-configuration.md b/site2/website-next/docs/reference-configuration.md index 1908149053ba1..3199155d3aaad 100644 --- a/site2/website-next/docs/reference-configuration.md +++ b/site2/website-next/docs/reference-configuration.md @@ -349,6 +349,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater | managedLedgerInfoCompressionType | Compression type of managed ledger information.

Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`).

If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed.

**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None | | additionalServlets | Additional servlet name.

If you have multiple additional servlets, separate them by commas.

For example, additionalServlet_1, additionalServlet_2 | N/A | | additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet | +| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true| ## Client @@ -480,7 +481,6 @@ You can set the log level and configuration in the [log4j2.yaml](https://github | dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 | | dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0| | dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false| - | dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false | |dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0| |dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0| @@ -650,6 +650,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github |haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false| |bookieId | If you want to custom a bookie ID or use a dynamic network address for a bookie, you can set the `bookieId`.

Bookie advertises itself using the `bookieId` rather than the `BookieSocketAddress` (`hostname:port` or `IP:port`).

The `bookieId` is a non-empty string that can contain ASCII digits and letters ([a-zA-Z9-0]), colons, dashes, and dots.

For more information about `bookieId`, see [here](http://bookkeeper.apache.org/bps/BP-41-bookieid/).|/| | maxTopicsPerNamespace | The maximum number of persistent topics that can be created in the namespace. When the number of topics reaches this threshold, the broker rejects the request of creating a new topic, including the auto-created topics by the producer or consumer, until the number of connected consumers decreases. The default value 0 disables the check. | 0 | +| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true| ## WebSocket