diff --git a/conf/broker.conf b/conf/broker.conf index 85efa11658992..59626ed884dd8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1290,12 +1290,10 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe # if you enable this setting, it will cause non-java clients failed to produce. isSchemaValidationEnforced=false -# The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`, -# broker will use it in broker level. If schemaCompatibilityStrategy is `UNDEFINED` will use `FULL`. -# SchemaCompatibilityStrategy : UNDEFINED, ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, +# The schema compatibility strategy in broker level. +# SchemaCompatibilityStrategy : ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, # FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE -# default : UNDEFINED -schemaCompatibilityStrategy= +schemaCompatibilityStrategy=FULL ### --- Ledger Offloading --- ### diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 98c63c74309ea..fb6a134661336 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2195,10 +2195,9 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SCHEMA, - doc = "The schema compatibility strategy in broker level. If this config in namespace policy is `UNDEFINED`" - + ", schema compatibility strategy check will use it in broker level." + doc = "The schema compatibility strategy in broker level" ) - private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.UNDEFINED; + private SchemaCompatibilityStrategy schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL; /**** --- WebSocket. --- ****/ @FieldContext( @@ -2610,4 +2609,10 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() { } } + public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() { + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + return SchemaCompatibilityStrategy.FULL; + } + return schemaCompatibilityStrategy; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index d44779bf8e10a..8af4b0de0370c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -764,6 +765,20 @@ protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Thr } } + protected CompletableFuture getSchemaCompatibilityStrategyAsync() { + return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> { + SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); + } + } + return schemaCompatibilityStrategy; + }); + } + @CanIgnoreReturnValue public static T checkNotNull(T reference) { return com.google.common.base.Preconditions.checkNotNull(reference); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index e757a6f8f9711..16b1ec9236c79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2342,15 +2342,8 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() { validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); Policies policies = getNamespacePolicies(namespaceName); - SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } - } - return schemaCompatibilityStrategy; + + return policies.schema_compatibility_strategy; } @Deprecated diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java index dfd870a053d99..5b119ec881db4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java @@ -38,8 +38,6 @@ import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; @@ -136,16 +134,7 @@ public void deleteSchema(boolean authoritative, AsyncResponse response) { public void postSchema(PostSchemaPayload payload, boolean authoritative, AsyncResponse response) { validateDestinationAndAdminOperation(authoritative); - getNamespacePoliciesAsync(namespaceName).thenAccept(policies -> { - SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = - pulsar().getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } - } + getSchemaCompatibilityStrategyAsync().thenAccept(schemaCompatibilityStrategy -> { byte[] data; if (SchemaType.KEY_VALUE.name().equals(payload.getType())) { try { @@ -199,26 +188,17 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative, validateDestinationAndAdminOperation(authoritative); String schemaId = getSchemaId(); - Policies policies = getNamespacePolicies(namespaceName); - - SchemaCompatibilityStrategy schemaCompatibilityStrategy; - if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy - .fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy); - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; - } - pulsar().getSchemaRegistryService() - .isCompatible(schemaId, - SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) - .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) - .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(), - schemaCompatibilityStrategy) - .thenAccept(isCompatible -> response.resume(Response.accepted() - .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) - .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) - .build())) + getSchemaCompatibilityStrategyAsync().thenCompose(schemaCompatibilityStrategy -> pulsar() + .getSchemaRegistryService().isCompatible(schemaId, + SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false) + .timestamp(clock.millis()).type(SchemaType.valueOf(payload.getType())) + .user(defaultIfEmpty(clientAppId(), "")).props(payload.getProperties()).build(), + schemaCompatibilityStrategy) + .thenAccept(isCompatible -> response.resume(Response.accepted() + .entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible) + .schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build()) + .build()))) .exceptionally(error -> { response.resume(new RestException(error)); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 44e0463655019..376a821e63cf0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -651,17 +651,19 @@ protected void setSchemaCompatibilityStrategy(Policies policies) { if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); - } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = brokerService.pulsar() - .getConfig().getSchemaCompatibilityStrategy(); - if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { - schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( - policies.schema_auto_update_compatibility_strategy); + return; + } + + schemaCompatibilityStrategy = policies.schema_compatibility_strategy; + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( + policies.schema_auto_update_compatibility_strategy); + if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) { + schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy(); } - } else { - schemaCompatibilityStrategy = policies.schema_compatibility_strategy; } } + private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") .quantile(0.0) .quantile(0.50) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java index f555c9203d482..0408f49eab393 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaAutoUpdateTest.java @@ -32,8 +32,6 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -42,9 +40,6 @@ @Slf4j @Test(groups = "broker-admin") public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest { - - private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class); - @BeforeMethod @Override public void setup() throws Exception { @@ -67,8 +62,8 @@ public void cleanup() throws Exception { } private void testAutoUpdateBackward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Backward); @@ -91,8 +86,8 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E } private void testAutoUpdateForward(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Forward); @@ -114,8 +109,7 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex } private void testAutoUpdateFull(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) { p.send(new V1Data("test1", 1)); @@ -142,8 +136,8 @@ private void testAutoUpdateFull(String namespace, String topicName) throws Excep } private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception { - Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), - SchemaAutoUpdateCompatibilityStrategy.Full); + Assert.assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace)); + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index 86fe014c7e9d0..f9193564cd200 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -20,6 +20,7 @@ import static java.lang.String.format; import static java.nio.charset.StandardCharsets.US_ASCII; +import static org.junit.Assert.assertNull; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; @@ -44,9 +45,11 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -60,6 +63,8 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest { final String cluster = "test"; + private final String schemaCompatibilityNamespace = "schematest/test-schema-compatibility-ns"; + @BeforeMethod @Override public void setup() throws Exception { @@ -71,6 +76,7 @@ public void setup() throws Exception { admin.tenants().createTenant("schematest", tenantInfo); admin.namespaces().createNamespace("schematest/test", Sets.newHashSet("test")); admin.namespaces().createNamespace("schematest/"+cluster+"/test", Sets.newHashSet("test")); + admin.namespaces().createNamespace(schemaCompatibilityNamespace, Sets.newHashSet("test")); } @AfterMethod(alwaysRun = true) @@ -348,4 +354,51 @@ public long getCToken() { assertEquals(ledgerInfo.entries, entryId + 1); assertEquals(ledgerInfo.size, length); } + + @Test + public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + } + + @Test + public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException { + assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace)); + } + + @Test + public void testGetSchemaCompatibilityStrategyWhenSetSchemaAutoUpdateCompatibilityStrategy() + throws PulsarAdminException { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaAutoUpdateCompatibilityStrategy.Forward); + Awaitility.await().untilAsserted(() -> assertEquals(SchemaAutoUpdateCompatibilityStrategy.Forward, + admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace) + )); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaCompatibilityStrategy.BACKWARD); + Awaitility.await().untilAsserted(() -> assertEquals(SchemaCompatibilityStrategy.BACKWARD, + admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace))); + } + + @Test + public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpdateCompatibilityStrategy() + throws PulsarAdminException { + pulsar.getConfiguration().setSchemaCompatibilityStrategy(SchemaCompatibilityStrategy.FORWARD); + + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED); + + admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(schemaCompatibilityNamespace, + SchemaAutoUpdateCompatibilityStrategy.AlwaysCompatible); + Awaitility.await().untilAsserted(() -> assertEquals( + admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), + SchemaCompatibilityStrategy.UNDEFINED)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 80168b9ae4c9f..5b12f37d40060 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -240,7 +240,7 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); @@ -320,7 +320,7 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy); admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo()); @@ -399,7 +399,7 @@ public void testSchemaComparison() throws Exception { ); assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), - SchemaCompatibilityStrategy.FULL); + SchemaCompatibilityStrategy.UNDEFINED); byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class) .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes(); SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index f21475c22786d..2e7b840e5a60a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -35,8 +35,8 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.schema.Schemas; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; @@ -55,7 +55,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes private static final String namespace = "test-namespace"; private static final String namespaceName = PUBLIC_TENANT + "/" + namespace; - @BeforeClass + @BeforeMethod @Override public void setup() throws Exception { super.internalSetup(); @@ -71,7 +71,7 @@ public void setup() throws Exception { } - @AfterClass(alwaysRun = true) + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { super.internalCleanup(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 4d29a5fdee669..ebcd7fc09c19e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -100,8 +100,7 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") @Deprecated - public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = - SchemaAutoUpdateCompatibilityStrategy.Full; + public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = null; @SuppressWarnings("checkstyle:MemberName") public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java index 9a4f74c437b14..f3b4569bad8f3 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SchemaCompatibilityStrategy.java @@ -71,10 +71,13 @@ public enum SchemaCompatibilityStrategy { FULL_TRANSITIVE; + public static boolean isUndefined(SchemaCompatibilityStrategy strategy) { + return strategy == null || strategy == SchemaCompatibilityStrategy.UNDEFINED; + } public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) { if (strategy == null) { - return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE; + return null; } switch (strategy) { case Backward: