Skip to content

Commit

Permalink
PIP 110: Support Topic metadata - PART-1 create topic with properties (
Browse files Browse the repository at this point in the history
…#12818)

Fixes #12629

## Motivation
The original discussion mail :
https://lists.apache.org/thread/m9dkhq1fs6stsdwh78h84fsl5hs5v67f

Introduce the ability to store metadata about topics.

This would be very useful as with metadata you could add labels and other
pieces of information that would allow defining the purpose of a topic,
custom application-level properties.
This feature will allow application-level diagnostic tools and maintenance
tools to not need external databases to store such metadata.

Imagine that we could add a simple key value map (String keys and String
values) to the topic.
These metadata could be set during topic creation and also updated.
  • Loading branch information
Technoboy- committed Jan 25, 2022
1 parent 262c653 commit 10036d5
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ManagedLedgerConfig {
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -619,6 +620,16 @@ public void setBookKeeperEnsemblePlacementPolicyProperties(
this.bookKeeperEnsemblePlacementPolicyProperties = bookKeeperEnsemblePlacementPolicyProperties;
}


public Map<String, String> getProperties() {
return properties;
}


public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

public boolean isDeletionAtBatchIndexLevelEnabled() {
return deletionAtBatchIndexLevelEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callbac
log.info("Opening managed ledger {}", name);

// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), config.getProperties(),
new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
ledgersStat = stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand Down Expand Up @@ -51,7 +52,23 @@ interface MetaStoreCallback<T> {
* whether the managed ledger metadata should be created if it doesn't exist already
* @throws MetaStoreException
*/
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
MetaStoreCallback<ManagedLedgerInfo> callback) {
getManagedLedgerInfo(ledgerName, createIfMissing, null, callback);
}

/**
* Get the metadata used by the ManagedLedger.
*
* @param ledgerName
* the name of the ManagedLedger
* @param createIfMissing
* whether the managed ledger metadata should be created if it doesn't exist already
* @param properties
* ledger properties
* @throws MetaStoreException
*/
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> properties,
MetaStoreCallback<ManagedLedgerInfo> callback);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -81,7 +82,7 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compr
}

@Override
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map<String, String> properties,
MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
String path = PREFIX + ledgerName;
Expand All @@ -103,8 +104,17 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,

store.put(path, new byte[0], Optional.of(-1L))
.thenAccept(stat -> {
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, stat);
ManagedLedgerInfo.Builder ledgerBuilder = ManagedLedgerInfo.newBuilder();
if (properties != null) {
properties.forEach((k, v) -> {
ledgerBuilder.addProperties(
MLDataFormats.KeyValue.newBuilder()
.setKey(k)
.setValue(v)
.build());
});
}
callback.operationComplete(ledgerBuilder.build(), stat);
}).exceptionally(ex -> {
callback.operationFailed(getException(ex));
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,36 @@ public void cursorReadsWithDiscardedEmptyLedgers() throws Exception {
assertEquals(c1.readEntries(1).size(), 0);
}

@Test
public void testSetTopicMetadata() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("key1", "value1");
properties.put("key2", "value2");
final MetaStore store = factory.getMetaStore();
final CountDownLatch latch = new CountDownLatch(1);
final ManagedLedgerInfo[] storedMLInfo = new ManagedLedgerInfo[1];
store.getManagedLedgerInfo("my_test_ledger", true, properties, new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo result, Stat version) {
storedMLInfo[0] = result;
latch.countDown();
}

@Override
public void operationFailed(MetaStoreException e) {
latch.countDown();
fail("Should have failed here");
}
});
latch.await();

assertEquals(storedMLInfo[0].getPropertiesCount(), 2);
assertEquals(storedMLInfo[0].getPropertiesList().get(0).getKey(), "key1");
assertEquals(storedMLInfo[0].getPropertiesList().get(0).getValue(), "value1");
assertEquals(storedMLInfo[0].getPropertiesList().get(1).getKey(), "key2");
assertEquals(storedMLInfo[0].getPropertiesList().get(1).getValue(), "value2");
}

@Test
public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -569,6 +570,11 @@ protected List<String> getTopicPartitionList(TopicDomain topicDomain) {

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
boolean createLocalTopicOnly) {
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly, null);
}

protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
boolean createLocalTopicOnly, Map<String, String> properties) {
Integer maxTopicsPerNamespace = null;

try {
Expand Down Expand Up @@ -635,7 +641,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
return;
}

provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly, properties)
.thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
.whenComplete((ignored, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -674,7 +680,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
((TopicsImpl) pulsar().getBrokerService()
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
.createPartitionedTopicAsync(
topicName.getPartitionedTopicName(), numPartitions, true);
topicName.getPartitionedTopicName(), numPartitions, true, null);
})
.exceptionally(throwable -> {
log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
Expand Down Expand Up @@ -713,13 +719,13 @@ protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName)
});
}

private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse,
int numPartitions,
boolean createLocalTopicOnly) {
private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse, int numPartitions,
boolean createLocalTopicOnly,
Map<String, String> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
namespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopicAsync(topicName, new PartitionedTopicMetadata(numPartitions))
.createPartitionedTopicAsync(topicName, new PartitionedTopicMetadata(numPartitions, properties))
.whenComplete((ignored, ex) -> {
if (ex != null) {
if (ex instanceof AlreadyExistsException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ protected void internalRevokePermissionsOnTopic(String role) {
revokePermissions(topicName.toString(), role);
}

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand All @@ -391,7 +391,7 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
throw new RestException(Status.CONFLICT, "This topic already exists");
}

Topic createdTopic = getOrCreateTopic(topicName);
Topic createdTopic = getOrCreateTopic(topicName, properties);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
} catch (Exception e) {
if (e instanceof RestException) {
Expand Down Expand Up @@ -3891,8 +3891,12 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
}

private Topic getOrCreateTopic(TopicName topicName) {
return pulsar().getBrokerService().getTopic(
topicName.toString(), true).thenApply(Optional::get).join();
return getOrCreateTopic(topicName, null);
}

private Topic getOrCreateTopic(TopicName topicName, Map<String, String> properties) {
return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties)
.thenApply(Optional::get).join();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void createNonPartitionedTopic(
validateNamespaceName(tenant, cluster, namespace);
validateTopicName(tenant, cluster, namespace, encodedTopic);
validateGlobalNamespaceOwnership();
internalCreateNonPartitionedTopic(authoritative);
internalCreateNonPartitionedTopic(authoritative, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ public void createNonPartitionedTopic(
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Key value pair properties for the topic metadata")
Map<String, String> properties) {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
internalCreateNonPartitionedTopic(authoritative, properties);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v3;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*/
@Path("/persistent")
@Produces(MediaType.APPLICATION_JSON)
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic")
public class PersistentTopics extends PersistentTopicsBase {

@PUT
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.",
notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and"
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412,
message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The metadata for the topic",
required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata metadata,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly,
metadata.properties);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}

0 comments on commit 10036d5

Please sign in to comment.