Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[Schema Registry] Force authorization when kafkaEnableMultiTenantMetadata is false #1807

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
Expand Down Expand Up @@ -135,16 +136,17 @@ public String authenticate(FullHttpRequest request) throws SchemaStorageExceptio

private void performAuthorizationValidation(String username, String role, String tenant)
throws SchemaStorageException {
if (kafkaConfig.isAuthorizationEnabled() && kafkaConfig.isKafkaEnableMultiTenantMetadata()) {
if (kafkaConfig.isAuthorizationEnabled()) {
KafkaPrincipal kafkaPrincipal =
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null, null);
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, role, username, null,
new AuthenticationDataSource() {});
String topicName = MetadataUtils.constructSchemaRegistryTopicName(tenant, kafkaConfig);
try {
Boolean tenantExists =
authorizer.canAccessTenantAsync(kafkaPrincipal, Resource.of(ResourceType.TENANT, tenant))
.get();
if (tenantExists == null || !tenantExists) {
log.debug("SchemaRegistry username {} role {} tenant {} does not exist",
log.debug("SchemaRegistry username {} role {} tenant {} does not exist {}",
username, role, tenant, topicName);
throw new SchemaStorageException("Role " + role + " cannot access topic " + topicName + " "
+ "tenant " + tenant + " does not exist (wrong username?)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class KafkaAuthorizationTestBase extends KopProtocolHandlerTestB

protected static final String TENANT = "KafkaAuthorizationTest";
protected static final String NAMESPACE = "ns1";
private static final String SCHEMA_NAMESPACE = "ns2";
private static final String SHORT_TOPIC = "topic1";
private static final String TOPIC = "persistent://" + TENANT + "/" + NAMESPACE + "/" + SHORT_TOPIC;
private static final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
Expand Down Expand Up @@ -120,7 +121,7 @@ protected void setup() throws Exception {
conf.setKafkaMetadataNamespace("__kafka");
conf.setKafkaTenant(TENANT);
conf.setKafkaNamespace(NAMESPACE);
conf.setKopSchemaRegistryNamespace(NAMESPACE);
conf.setKopSchemaRegistryNamespace(SCHEMA_NAMESPACE);

conf.setClusterName(super.configClusterName);
conf.setAuthorizationEnabled(true);
Expand Down Expand Up @@ -712,18 +713,16 @@ public static Object[][] tokenPrefix() {
// this test creates the schema registry topic, and this may interfere with other tests
@Test(timeOut = 30000, priority = 1000, dataProvider = "tokenPrefix")
public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Exception {

if (conf.isKafkaEnableMultiTenantMetadata()) {
// ensure that the KOP metadata namespace exists and that the user can write to it
// because we require "produce" permissions on the Schema Registry Topic
// while working in Multi Tenant mode
if (!admin.namespaces().getNamespaces(TENANT).contains(TENANT + "/" + conf.getKafkaMetadataNamespace())) {
admin.namespaces().createNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace());
}
admin.namespaces()
.grantPermissionOnNamespace(TENANT + "/" + conf.getKafkaMetadataNamespace(), SIMPLE_USER,
Sets.newHashSet(AuthAction.produce, AuthAction.consume));
admin.namespaces().grantPermissionOnNamespace(conf.getKafkaTenant() + "/" + conf.getKafkaNamespace(),
SIMPLE_USER, Sets.newHashSet(AuthAction.produce, AuthAction.consume));
final String tenant = (conf.isKafkaEnableMultiTenantMetadata() ? TENANT : conf.getKafkaMetadataTenant());
final var namespaces = admin.namespaces().getNamespaces(tenant);
final String schemaNamespace = tenant + "/" + conf.getKopSchemaRegistryNamespace();
if (!namespaces.contains(schemaNamespace)) {
admin.namespaces().createNamespace(schemaNamespace);
}
admin.namespaces().grantPermissionOnNamespace(schemaNamespace, SIMPLE_USER,
Sets.newHashSet(AuthAction.produce));

String topic = "SchemaRegistryTest-testAvroProduceAndConsumeWithAuth" + withTokenPrefix;
IndexedRecord avroRecord = createAvroRecord();
Expand Down Expand Up @@ -759,7 +758,7 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex

@Test(timeOut = 30000)
public void testSchemaNoAuth() {
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, false);
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, null);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
Expand All @@ -772,6 +771,22 @@ public void testSchemaNoAuth() {
producer.close();
}

@Test(timeOut = 30000)
public void testSchemaWrongAuth() {
final var wrongToken = AuthTokenUtils.createToken(secretKey, "wrong-user", Optional.empty());
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, wrongToken);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RestClientException);
var restException = (RestClientException) e.getCause();
assertEquals(restException.getErrorCode(), HttpResponseStatus.FORBIDDEN.code());
assertTrue(restException.getMessage().contains("cannot access topic"));
}
producer.close();
}

private IndexedRecord createAvroRecord() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
+ "\"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
Expand All @@ -783,10 +798,10 @@ private IndexedRecord createAvroRecord() {
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix) {
return createAvroProducer(withTokenPrefix, true);
return createAvroProducer(withTokenPrefix, userToken);
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) {
private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, String schemaToken) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
Expand All @@ -803,10 +818,10 @@ private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefi
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

if (withSchemaToken) {
if (schemaToken != null) {
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
username + ":" + (withTokenPrefix ? password : schemaToken));
}

return new KafkaProducer<>(props);
Expand Down