Skip to content

Commit

Permalink
[Broker] Fix read schema compatibility strategy priority
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Jan 25, 2022
1 parent 10036d5 commit 599c6f6
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 43 deletions.
Expand Up @@ -2610,4 +2610,10 @@ public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
}
}

public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return SchemaCompatibilityStrategy.FULL;
}
return schemaCompatibilityStrategy;
}
}
Expand Up @@ -2342,15 +2342,19 @@ protected SchemaCompatibilityStrategy internalGetSchemaCompatibilityStrategy() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
}
if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return schemaCompatibilityStrategy;
}
return schemaCompatibilityStrategy;

schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return schemaCompatibilityStrategy;
}

return pulsar().getConfig().getSchemaCompatibilityStrategy();
}

@Deprecated
Expand Down
Expand Up @@ -201,14 +201,16 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative,
String schemaId = getSchemaId();
Policies policies = getNamespacePolicies(namespaceName);

SchemaCompatibilityStrategy schemaCompatibilityStrategy;
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy
.fromAutoUpdatePolicy(policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = pulsar().getConfig().getSchemaCompatibilityStrategy();
}
}

final SchemaCompatibilityStrategy finalSchemaCompatibilityStrategy = schemaCompatibilityStrategy;
pulsar().getSchemaRegistryService()
.isCompatible(schemaId,
SchemaData.builder().data(payload.getSchema().getBytes(Charsets.UTF_8)).isDeleted(false)
Expand All @@ -217,7 +219,7 @@ public void testCompatibility(PostSchemaPayload payload, boolean authoritative,
schemaCompatibilityStrategy)
.thenAccept(isCompatible -> response.resume(Response.accepted()
.entity(IsCompatibilityResponse.builder().isCompatibility(isCompatible)
.schemaCompatibilityStrategy(schemaCompatibilityStrategy.name()).build())
.schemaCompatibilityStrategy(finalSchemaCompatibilityStrategy.name()).build())
.build()))
.exceptionally(error -> {
response.resume(new RestException(error));
Expand Down
Expand Up @@ -643,17 +643,26 @@ protected void setSchemaCompatibilityStrategy(Policies policies) {
if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) {
schemaCompatibilityStrategy =
brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy();
} else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = brokerService.pulsar()
.getConfig().getSchemaCompatibilityStrategy();
if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
}
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
return;
}

schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return;
}

schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
if (!SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
return;
}

schemaCompatibilityStrategy = brokerService.pulsar().getConfig().getSchemaCompatibilityStrategy();
if (SchemaCompatibilityStrategy.isUndefined(schemaCompatibilityStrategy)) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.FULL;
}
}

private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
Expand Down
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.admin;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-admin")
public class AdminApiNamespaceSchemaCompatibilityStrategyTest extends MockedPulsarServiceBaseTest {
private final String tenant = "test-tenant";
private final String namespace = tenant + "/" + "test-ns";

@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();

// Setup namespaces
admin.clusters()
.createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant(tenant, tenantInfo);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
}

@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testGetSchemaCompatibilityStrategy() throws PulsarAdminException {
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace), SchemaCompatibilityStrategy.UNDEFINED);
}

@Test
public void testGetSchemaAutoUpdateCompatibilityStrategy() throws PulsarAdminException {
assertNull(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace));
}

@Test
public void testGetSchemaCompatibilityStrategyWithBackwardSchemaAutoUpdateCompatibilityStrategy()
throws PulsarAdminException {
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace), SchemaCompatibilityStrategy.UNDEFINED);

admin.namespaces()
.setSchemaAutoUpdateCompatibilityStrategy(namespace, SchemaAutoUpdateCompatibilityStrategy.Forward);
Awaitility.await().untilAsserted(
() -> assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Forward));

admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.BACKWARD);
Awaitility.await().untilAsserted(
() -> assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespace),
SchemaCompatibilityStrategy.BACKWARD));
}
}
Expand Up @@ -32,8 +32,6 @@
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -42,9 +40,6 @@
@Slf4j
@Test(groups = "broker-admin")
public class AdminApiSchemaAutoUpdateTest extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(AdminApiSchemaAutoUpdateTest.class);

@BeforeMethod
@Override
public void setup() throws Exception {
Expand All @@ -67,8 +62,7 @@ public void cleanup() throws Exception {
}

private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Backward);

Expand All @@ -91,8 +85,7 @@ private void testAutoUpdateBackward(String namespace, String topicName) throws E
}

private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.Forward);

Expand All @@ -114,8 +107,7 @@ private void testAutoUpdateForward(String namespace, String topicName) throws Ex
}

private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);

try (Producer<V1Data> p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create()) {
p.send(new V1Data("test1", 1));
Expand All @@ -142,8 +134,7 @@ private void testAutoUpdateFull(String namespace, String topicName) throws Excep
}

private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace),
SchemaAutoUpdateCompatibilityStrategy.Full);
Assert.assertEquals(admin.namespaces().getSchemaAutoUpdateCompatibilityStrategy(namespace), null);
admin.namespaces().setSchemaAutoUpdateCompatibilityStrategy(namespace,
SchemaAutoUpdateCompatibilityStrategy.AutoUpdateDisabled);

Expand Down
Expand Up @@ -35,8 +35,8 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.schema.Schemas;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.Collections;
Expand All @@ -55,7 +55,7 @@ public class SchemaTypeCompatibilityCheckTest extends MockedPulsarServiceBaseTes
private static final String namespace = "test-namespace";
private static final String namespaceName = PUBLIC_TENANT + "/" + namespace;

@BeforeClass
@BeforeMethod
@Override
public void setup() throws Exception {
super.internalSetup();
Expand All @@ -71,7 +71,7 @@ public void setup() throws Exception {

}

@AfterClass(alwaysRun = true)
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
Expand Down
Expand Up @@ -100,8 +100,7 @@ public class Policies {

@SuppressWarnings("checkstyle:MemberName")
@Deprecated
public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy =
SchemaAutoUpdateCompatibilityStrategy.Full;
public SchemaAutoUpdateCompatibilityStrategy schema_auto_update_compatibility_strategy = null;

@SuppressWarnings("checkstyle:MemberName")
public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;
Expand Down
Expand Up @@ -71,10 +71,13 @@ public enum SchemaCompatibilityStrategy {
FULL_TRANSITIVE;


public static boolean isUndefined(SchemaCompatibilityStrategy strategy) {
return strategy == null || strategy == SchemaCompatibilityStrategy.UNDEFINED;
}

public static SchemaCompatibilityStrategy fromAutoUpdatePolicy(SchemaAutoUpdateCompatibilityStrategy strategy) {
if (strategy == null) {
return SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE;
return null;
}
switch (strategy) {
case Backward:
Expand Down

0 comments on commit 599c6f6

Please sign in to comment.