Skip to content

PIP 35: Improve topic lookup for topics that have high number of partitions

lipenghui edited this page Jun 4, 2021 · 3 revisions

Motivation

As this github issue brings up, for a partitioned topic, client has to send LookUp request for each partition. Here we propose a solution to make lookup easier for partitioned topic.

We could either make current LookUp(getBroker) api smarter and it will check if topic is a partitioned topic or not. If it’s a normal topic just return normal LookUp response which will be Pair<InetSocketAddress, InetSocketAddress>, if it’s a partitioned topic then it will return List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that serving that partitioned topic.

Or we could create a new BatchLookUp api, like getBrokers. It can take a list of topic as input, where each topic has to be either a non partitioned topic or a partition of a partitioned topic and it’ll return Map<TopicName, Pair<InetSocketAddress, InetSocketAddress>> where it’s a map of input topic to corresponding address.

Prefer to go with second approach as it not only support lookup for all partitions of partitioned topic, it also support bulk lookup for many normal topics in one request.

Proposal

In TopicLookupBase is where the core lookup logic will happens. Add new lookupTopicBatchAsync, within the method, after validation, for each topic, we’ll fist find out the bundle it belong, then invoke NameSpaceService.findBrokerServiceUrl for each bundle and create a Map<Bundle , List>. Then iterate through map and create response for each topicname.

In LookupService, when findBrokers invoke newBatchLookup, it’ll get a response of a list, where some of response contains redirect, for these response will invoke findBrokers again to contact correct cluster. Combine the result of 2 invocation to get a final result.

Here’s proposed change to protocal buffer definition and apis:

PulsarApi.proto

	Add 
	Type enum:
		BATCH_LOOKUP           = 23;
		BATCH_LOOKUP_RESPONSE  = 24;

	BaseBommand:
		optional CommandBatchLookupTopic lookupTopic                    = 23;
		optional CommandBatchLookupTopicResponse lookupTopicResponse    = 24;

	CommandBatchLookupTopic:
	{
		repeated string topics           = 1;
		required uint64 request_id       = 2;
	}

	CommandTopicResponse:
	{	
		repeated string topicname               = 10;
	}
  • Commands.java

    • Add newBatchLookUp
  • ClientCnx.java

    • Add newBatchLookup/handleBatchLookUpResponse
  • ServiceCnx.java

    • Add handleBatchLookup
  • LookupProxyHandler.java

    • Add handleBatchLookup
  • ProxyConnection.java

    • Add handleBatchLookup
  • PulsarDecoder.java

    • Handle batchLookup
  • Add BatchLookupResult.java

  • TopicLookupBase.java

    • Add lookupTopicBatchAsync
List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics) {
	List<CompletableFuture> resultFutures;
List<CompletableFuture> futures;
	Map<Bundle.toString, List<Topic> map;
            topics.foreach((topic) -> {
CompletableFuture validateFuture;
            CompletableFuture getNameSpaceBundleFuture;
CompletableFuture lookupFuture;

getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete)
	validateFuture.thenAccept(validaitonFailureResponse -> {
		if(validaitonFailureResponse != null)
			lookupFuture.complete(validaitonFailureResponse)
pulsarService.getNamespaceService().getBundleAsync().
thenAccept(bundle -> map.get(bundle).add(requestId))

getNameSpaceBundleFuture.complete())

futures.add(getNameSpaceBundleFuture)
}}))
FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> {
 	entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> {
//Response will be address with success or redirect flag, and a list of topicnames in lookup request that belong to this bundle
		lookupFuture.complete(newLookupResponse()))
		resultFutures.add(lookupFuture)
})
}))

return resultFutures;
}
  • TopicLookup.java(v2)

    • Add lookupTopicBatchAsync
  • BinaryProto/HttpLookupService.java

    • Add getBrokers/findBrokers
Clone this wiki locally