Skip to content

Commit

Permalink
[Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
Browse files Browse the repository at this point in the history
(cherry picked from commit fa7be23)
  • Loading branch information
Jason918 authored and codelipenghui committed Nov 18, 2021
1 parent fdcf5a4 commit 8b62272
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 16 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Expand Up @@ -264,6 +264,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Expand Up @@ -169,6 +169,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Expand Up @@ -553,6 +553,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int brokerMaxConnectionsPerIp = 0;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+ " of namespace policy. This is enabled by default."
)
private boolean isAllowAutoUpdateSchemaEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Expand Up @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

Expand Down Expand Up @@ -330,20 +330,28 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return isAllowAutoUpdateSchema ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
: schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil
.failedFuture(
new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));

if (allowAutoUpdateSchema()) {
return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
} else {
return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));
}
}

private boolean allowAutoUpdateSchema() {
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
return isAllowAutoUpdateSchema;
}

@Override
Expand Down
Expand Up @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg
}
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
throws Exception {

final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);

admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());


pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn);
try {
producerThreeBuilder.create();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtn);

Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
Message<Schemas.PersonTwo> message = consumerTwo.receive();

Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

producer.close();
consumerTwo.close();

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);

producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
message = consumerTwo.receive();

personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

consumerTwo.close();
producer.close();
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down
Expand Up @@ -107,7 +107,7 @@ public class Policies {
public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;

@SuppressWarnings("checkstyle:MemberName")
public boolean is_allow_auto_update_schema = true;
public Boolean is_allow_auto_update_schema = null;

@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;
Expand Down

0 comments on commit 8b62272

Please sign in to comment.