diff --git a/conf/broker.conf b/conf/broker.conf index 552bdb47d06ee..a51a5f75a8faf 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -264,6 +264,10 @@ brokerMaxConnections=0 # The maximum number of connections per IP. If it exceeds, new connections are rejected. brokerMaxConnectionsPerIp=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 7f52eca406d28..d8c137084601d 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -169,6 +169,10 @@ defaultNumberOfNamespaceBundles=4 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check. maxTopicsPerNamespace=0 +# Allow schema to be auto updated at broker level. User can override this by +# 'is_allow_auto_update_schema' of namespace policy. +isAllowAutoUpdateSchemaEnabled=true + # Enable check for minimum allowed client library version clientLibraryVersionCheckEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5eb0a6bf134a1..21c0e5fb3abef 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -553,6 +553,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int brokerMaxConnectionsPerIp = 0; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'" + + " of namespace policy. This is enabled by default." + ) + private boolean isAllowAutoUpdateSchemaEnabled = true; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index ebd05b6ed8530..5d2414734934b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic { @Getter protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; - protected volatile boolean isAllowAutoUpdateSchema = true; + protected volatile Boolean isAllowAutoUpdateSchema; // schema validation enforced flag protected volatile boolean schemaValidationEnforced = false; @@ -330,20 +330,28 @@ public CompletableFuture addSchema(SchemaData schema) { String base = TopicName.get(getName()).getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); - return isAllowAutoUpdateSchema ? schemaRegistryService - .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy) - : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> - schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) - .thenCompose(schemaVersion -> { - if (schemaVersion == null) { - return FutureUtil - .failedFuture( - new IncompatibleSchemaException( - "Schema not found and schema auto updating is disabled.")); - } else { - return CompletableFuture.completedFuture(schemaVersion); - } - })); + + if (allowAutoUpdateSchema()) { + return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy); + } else { + return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> + schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) + .thenCompose(schemaVersion -> { + if (schemaVersion == null) { + return FutureUtil.failedFuture(new IncompatibleSchemaException( + "Schema not found and schema auto updating is disabled.")); + } else { + return CompletableFuture.completedFuture(schemaVersion); + } + })); + } + } + + private boolean allowAutoUpdateSchema() { + if (isAllowAutoUpdateSchema == null) { + return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled(); + } + return isAllowAutoUpdateSchema; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 293f71d5f878f..80168b9ae4c9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg } } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") + public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy) + throws Exception { + + final String tenant = PUBLIC_TENANT; + final String topic = "test-consumer-compatibility"; + String namespace = "test-namespace-" + randomName(16); + String fqtn = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topic + ).toString(); + + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), + SchemaCompatibilityStrategy.FULL); + + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); + admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); + + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + ProducerBuilder producerThreeBuilder = pulsarClient + .newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtn); + try { + producerThreeBuilder.create(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled.")); + } + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true); + ConsumerBuilder comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test") + .topic(fqtn); + + Producer producer = producerThreeBuilder.create(); + Consumer consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + Message message = consumerTwo.receive(); + + Schemas.PersonTwo personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + producer.close(); + consumerTwo.close(); + + pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false); + + producer = producerThreeBuilder.create(); + consumerTwo = comsumerBuilder.subscribe(); + + producer.send(new Schemas.PersonTwo(2, "Lucy")); + message = consumerTwo.receive(); + + personTwo = message.getValue(); + consumerTwo.acknowledge(message); + + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); + + consumerTwo.close(); + producer.close(); + } + @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy") public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception { final String tenant = PUBLIC_TENANT; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ff773f6f8670f..86227159ccb42 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -107,7 +107,7 @@ public class Policies { public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; @SuppressWarnings("checkstyle:MemberName") - public boolean is_allow_auto_update_schema = true; + public Boolean is_allow_auto_update_schema = null; @SuppressWarnings("checkstyle:MemberName") public boolean schema_validation_enforced = false;