Skip to content

Commit

Permalink
Make sure policies.is_allow_auto_update_schema not null (#14409)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjialing218 committed Feb 23, 2022
1 parent 6c34217 commit 7d60795
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 14 deletions.
Expand Up @@ -293,6 +293,10 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}

return policies;
} catch (RestException re) {
Expand Down Expand Up @@ -517,20 +521,7 @@ protected void validateClusterExists(String cluster) {
protected Policies getNamespacePolicies(String tenant, String cluster, String namespace) {
NamespaceName ns = NamespaceName.get(tenant, cluster, namespace);

try {
Policies policies = namespaceResources().getPolicies(ns)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(ns).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), ns, e);
throw new RestException(e);
}
return getNamespacePolicies(ns);
}

protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
Expand Down
Expand Up @@ -736,6 +736,7 @@ public void namespaces() throws Exception {
policies.bundles = PoliciesUtil.defaultBundle();
policies.auth_policies.getNamespaceAuthentication().put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
policies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.allOf(AuthAction.class));
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();

assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"), policies.auth_policies.getNamespaceAuthentication());
Expand All @@ -746,6 +747,7 @@ public void namespaces() throws Exception {
admin.namespaces().revokePermissionsOnNamespace("prop-xyz/ns1", "my-role");
policies.auth_policies.getNamespaceAuthentication().remove("spiffe://developer/passport-role");
policies.auth_policies.getNamespaceAuthentication().remove("my-role");
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);

assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), null);
Expand Down
Expand Up @@ -649,6 +649,7 @@ public void namespaces() throws Exception {
Policies policies = new Policies();
policies.bundles = PoliciesUtil.defaultBundle();
policies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.allOf(AuthAction.class));
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();

assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/use/ns1"), policies.auth_policies.getNamespaceAuthentication());
Expand All @@ -657,6 +658,7 @@ public void namespaces() throws Exception {

admin.namespaces().revokePermissionsOnNamespace("prop-xyz/use/ns1", "my-role");
policies.auth_policies.getNamespaceAuthentication().remove("my-role");
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);

assertNull(admin.namespaces().getPersistence("prop-xyz/use/ns1"));
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -247,6 +248,7 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy


pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);

ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand All @@ -259,6 +261,9 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy
}

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
Policies policies = admin.namespaces().getPolicies(namespaceName.toString());
Assert.assertTrue(policies.is_allow_auto_update_schema);

ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand Down

0 comments on commit 7d60795

Please sign in to comment.