Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 12757][broker] add broker config isAllowAutoUpdateSchema #12786

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/broker.conf
Expand Up @@ -270,6 +270,10 @@ brokerMaxConnections=0
# The maximum number of connections per IP. If it exceeds, new connections are rejected.
brokerMaxConnectionsPerIp=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Expand Up @@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
# Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
maxTopicsPerNamespace=0

# Allow schema to be auto updated at broker level. User can override this by
# 'is_allow_auto_update_schema' of namespace policy.
isAllowAutoUpdateSchemaEnabled=true

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down
Expand Up @@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int brokerMaxConnectionsPerIp = 0;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+ " of namespace policy. This is enabled by default."
)
private boolean isAllowAutoUpdateSchemaEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Expand Up @@ -98,7 +98,7 @@ public abstract class AbstractTopic implements Topic {
@Getter
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
protected volatile Boolean isAllowAutoUpdateSchema;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;

Expand Down Expand Up @@ -333,20 +333,28 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return isAllowAutoUpdateSchema ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
: schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil
.failedFuture(
new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));

if (allowAutoUpdateSchema()) {
return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
} else {
return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));
}
}

private boolean allowAutoUpdateSchema() {
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
return isAllowAutoUpdateSchema;
}

@Override
Expand Down
Expand Up @@ -218,6 +218,87 @@ public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrateg
}
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
throws Exception {

final String tenant = PUBLIC_TENANT;
final String topic = "test-consumer-compatibility";
String namespace = "test-namespace-" + randomName(16);
String fqtn = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topic
).toString();

NamespaceName namespaceName = NamespaceName.get(tenant, namespace);

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME)
);

assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
SchemaCompatibilityStrategy.FULL);

admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());


pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.topic(fqtn);
try {
producerThreeBuilder.create();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
withPojo(Schemas.PersonTwo.class).build()))
.subscriptionName("test")
.topic(fqtn);

Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
Message<Schemas.PersonTwo> message = consumerTwo.receive();

Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

producer.close();
consumerTwo.close();

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);

producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();

producer.send(new Schemas.PersonTwo(2, "Lucy"));
message = consumerTwo.receive();

personTwo = message.getValue();
consumerTwo.acknowledge(message);

assertEquals(personTwo.getId(), 2);
assertEquals(personTwo.getName(), "Lucy");

consumerTwo.close();
producer.close();
}

@Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
final String tenant = PUBLIC_TENANT;
Expand Down
Expand Up @@ -107,7 +107,7 @@ public class Policies {
public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;

@SuppressWarnings("checkstyle:MemberName")
public boolean is_allow_auto_update_schema = true;
public Boolean is_allow_auto_update_schema = null;

@SuppressWarnings("checkstyle:MemberName")
public boolean schema_validation_enforced = false;
Expand Down
3 changes: 2 additions & 1 deletion site2/website-next/docs/reference-configuration.md
Expand Up @@ -349,6 +349,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
| managedLedgerInfoCompressionType | Compression type of managed ledger information. <br /><br />Available options are `NONE`, `LZ4`, `ZLIB`, `ZSTD`, and `SNAPPY`). <br /><br />If this value is `NONE` or invalid, the `managedLedgerInfo` is not compressed. <br /><br />**Note** that after enabling this configuration, if you want to degrade a broker, you need to change the value to `NONE` and make sure all ledger metadata is saved without compression. | None |
| additionalServlets | Additional servlet name. <br /><br />If you have multiple additional servlets, separate them by commas. <br /><br />For example, additionalServlet_1, additionalServlet_2 | N/A |
| additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet |
| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true|

## Client

Expand Down Expand Up @@ -480,7 +481,6 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
| dispatchThrottlingRatePerTopicInMsg | Default messages (per second) dispatch throttling-limit for every topic. When the value is set to 0, default message dispatch throttling-limit is disabled. |0 |
| dispatchThrottlingRatePerTopicInByte | Default byte (per second) dispatch throttling-limit for every topic. When the value is set to 0, default byte dispatch throttling-limit is disabled. | 0|
| dispatchThrottlingOnBatchMessageEnabled |Apply dispatch rate limiting on batch message instead individual messages with in batch message. (Default is disabled). | false|

| dispatchThrottlingRateRelativeToPublishRate | Enable dispatch rate-limiting relative to publish rate. | false |
|dispatchThrottlingRatePerSubscriptionInMsg|The defaulted number of message dispatching throttling-limit for a subscription. The value of 0 disables message dispatch-throttling.|0|
|dispatchThrottlingRatePerSubscriptionInByte|The default number of message-bytes dispatching throttling-limit for a subscription. The value of 0 disables message-byte dispatch-throttling.|0|
Expand Down Expand Up @@ -650,6 +650,7 @@ You can set the log level and configuration in the [log4j2.yaml](https://github
|haProxyProtocolEnabled | Enable or disable the [HAProxy](http://www.haproxy.org/) protocol. |false|
|bookieId | If you want to custom a bookie ID or use a dynamic network address for a bookie, you can set the `bookieId`. <br /><br />Bookie advertises itself using the `bookieId` rather than the `BookieSocketAddress` (`hostname:port` or `IP:port`).<br /><br /> The `bookieId` is a non-empty string that can contain ASCII digits and letters ([a-zA-Z9-0]), colons, dashes, and dots. <br /><br />For more information about `bookieId`, see [here](http://bookkeeper.apache.org/bps/BP-41-bookieid/).|/|
| maxTopicsPerNamespace | The maximum number of persistent topics that can be created in the namespace. When the number of topics reaches this threshold, the broker rejects the request of creating a new topic, including the auto-created topics by the producer or consumer, until the number of connected consumers decreases. The default value 0 disables the check. | 0 |
| isAllowAutoUpdateSchemaEnabled | Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema' of namespace policy. |true|

## WebSocket

Expand Down