diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 98c63c74309eaf..4deb501211d0fc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2610,4 +2610,10 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() { } } + public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() { + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return SchemaCompatibilityStrategy.FULL; + } + return schemaCompatibilityStrategy; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index e757a6f8f97118..413dbcba45b05e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2342,15 +2342,19 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() { validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); Policies policies = getNamespacePolicies(namespaceName); + SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } + if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return schemaCompatibilityStrategy; } - return schemaCompatibilityStrategy; + + schemaCompatibilityStrategy = + SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); + if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return schemaCompatibilityStrategy; + } + + return pulsar().getConfig().getSchemaCompatibilityStrategy(); } @Deprecated diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index dfd870a053d99c..6c8f56f4d0ebbb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -201,14 +201,16 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative, String schemaId = getSchemaId(); Policies policies = getNamespacePolicies(namespaceName); - SchemaCompatibilityStrategy schemaCompatibilityStrategy; - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); + } } + final SchemaCompatibilityStrategy finalSchemaCompatibilityStrategy = schemaCompatibilityStrategy; pulsar().getSchemaRegistryService() .isCompatible(schemaId, SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) @@ -217,7 +219,7 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative, schemaCompatibilityStrategy) .thenAccept(isCompatible -> response.resume(Response.accepted() .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) - .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) + .schemaCompatibilityStrategy(finalSchemaCompatibilityStrategy.name()).build()) .build())) .exceptionally(error -> { response.resume(new RestException(error)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e6ef3c512b8390..78752ddcfba377 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -643,17 +643,26 @@ protected void setSchemaCompatibilityStrategy(Policies policies) { if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); - } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = brokerService.pulsar() - .getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); - } - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + return; + } + + schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return; + } + + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return; + } + + schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy(); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; } } + private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") .quantile(0.0) .quantile(0.50) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceSchemaCompatibilityStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceSchemaCompatibilityStrategyTest.java new file mode 100644 index 00000000000000..6f86031867dc2e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiNamespaceSchemaCompatibilityStrategyTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.admin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-admin") +public class AdminApiNamespaceSchemaCompatibilityStrategyTest extends MockedPulsarServiceBaseTest { + private final String tenant = "test-tenant"; + private final String namespace = tenant + "/" + "test-ns"; + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + // Setup namespaces + admin.clusters() + .createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant(tenant, tenantInfo); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); + } + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace), SchemaCompatibilityStrategy.UNDEFINED); + } + + @Test + public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException { + assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + } + + @Test + public void testGetSchemaCompatibilityStrategyWithBackwardSchemaAutoUpdateCompatibilityStrategy() + throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace), SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces() + .setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Forward); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), + SchemaAutoUpdateCompatibilityStrategy.Forward)); + + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace), + SchemaCompatibilityStrategy.BACKWARD)); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java index f555c9203d482d..312f7be7169efb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java @@ -32,8 +32,6 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -42,9 +40,6 @@ @Slf4j @Test(groups = "broker-admin") public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { - - private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class); - @BeforeMethod @Override public void setup() throws Exception { @@ -67,8 +62,7 @@ public void cleanup() throws Exception { } private void testAutoUpdateBackward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null); admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Backward); @@ -91,8 +85,7 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E } private void testAutoUpdateForward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null); admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Forward); @@ -114,8 +107,7 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex } private void testAutoUpdateFull(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null); try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) { p.send(new V1Data("test1", 1)); @@ -142,8 +134,7 @@ private void testAutoUpdateFull(String namespace, String topicName) throws Excep } private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null); admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index f21475c22786d1..2e7b840e5a60ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -35,8 +35,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.schema.Schemas; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; @@ -55,7 +55,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes private static final String namespace = "test-namespace"; private static final String namespaceName = PUBLIC_TENANT + "/" + namespace; - @BeforeClass + @BeforeMethod @Override public void setup() throws Exception { super.internalSetup(); @@ -71,7 +71,7 @@ public void setup() throws Exception { } - @AfterClass(alwaysRun = true) + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { super.internalCleanup(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 4d29a5fdee6692..ebcd7fc09c19e6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -100,8 +100,7 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") @Deprecated - public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = - SchemaAutoUpdateCompatibilityStrategy.Full; + public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = null; @SuppressWarnings("checkstyle:MemberName") public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java index 9a4f74c437b140..f3b4569bad8f30 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java @@ -71,10 +71,13 @@ public enum SchemaCompatibilityStrategy { FULL_TRANSITIVE; + public static boolean isUndefined(SchemaCompatibilityStrategy strategy) { + return strategy == null || strategy == SchemaCompatibilityStrategy.UNDEFINED; + } public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) { if (strategy == null) { - return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE; + return null; } switch (strategy) { case Backward: