Skip to content

Commit

Permalink
KAFKA-8988; Replace CreatePartitions Request/Response with automated …
Browse files Browse the repository at this point in the history
…protocol (#7493)

This change updates the CreatePartitions request and response api objects
to use the generated protocol classes.

Reviewers: Jason Gustafson <jason@confluent.io>
  • Loading branch information
soondenana authored and hachikuji committed Jan 8, 2020
1 parent 1ccca5c commit e63becb
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 418 deletions.
Expand Up @@ -71,6 +71,10 @@
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
Expand Down Expand Up @@ -127,7 +131,6 @@
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
Expand Down Expand Up @@ -2323,35 +2326,55 @@ public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPar
for (String topic : newPartitions.keySet()) {
futures.put(topic, new KafkaFutureImpl<>());
}
final Map<String, PartitionDetails> requestMap = newPartitions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue())));

final List<CreatePartitionsTopic> topics = newPartitions.entrySet().stream()
.map(partitionsEntry -> {
NewPartitions newPartition = partitionsEntry.getValue();
List<List<Integer>> newAssignments = newPartition.assignments();
List<CreatePartitionsAssignment> assignments = newAssignments == null ? null :
newAssignments.stream()
.map(brokerIds -> new CreatePartitionsAssignment().setBrokerIds(brokerIds))
.collect(Collectors.toList());
return new CreatePartitionsTopic()
.setName(partitionsEntry.getKey())
.setCount(newPartition.totalCount())
.setAssignments(assignments);
})
.collect(Collectors.toList());

final long now = time.milliseconds();
runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {

@Override
public CreatePartitionsRequest.Builder createRequest(int timeoutMs) {
return new CreatePartitionsRequest.Builder(requestMap, timeoutMs, options.validateOnly());
CreatePartitionsRequestData requestData = new CreatePartitionsRequestData()
.setTopics(topics)
.setValidateOnly(options.validateOnly())
.setTimeoutMs(timeoutMs);

return new CreatePartitionsRequest.Builder(requestData);
}

@Override
public void handleResponse(AbstractResponse abstractResponse) {
CreatePartitionsResponse response = (CreatePartitionsResponse) abstractResponse;
// Check for controller change
for (ApiError error : response.errors().values()) {
if (error.error() == Errors.NOT_CONTROLLER) {
for (CreatePartitionsTopicResult topicResult: response.data().results()) {
Errors error = Errors.forCode(topicResult.errorCode());
if (error == Errors.NOT_CONTROLLER) {
metadataManager.clearController();
metadataManager.requestUpdate();
throw error.exception();
}
}
for (Map.Entry<String, ApiError> result : response.errors().entrySet()) {
KafkaFutureImpl<Void> future = futures.get(result.getKey());
if (result.getValue().isSuccess()) {
for (CreatePartitionsTopicResult topicResult: response.data().results()) {
Errors error = Errors.forCode(topicResult.errorCode());
KafkaFutureImpl<Void> future = futures.get(topicResult.name());
if (error == Errors.NONE) {
future.complete(null);
} else {
future.completeExceptionally(result.getValue().exception());
future.completeExceptionally(error.exception(topicResult.errorMessage()));
}
}
}
Expand Down Expand Up @@ -2861,10 +2884,6 @@ private boolean handleGroupRequestError(Errors error, KafkaFutureImpl<?> future)
return false;
}

private PartitionDetails partitionDetails(NewPartitions newPartitions) {
return new PartitionDetails(newPartitions.totalCount(), newPartitions.assignments());
}

private final static class ListConsumerGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ConsumerGroupListing> listings;
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
Expand Down Expand Up @@ -94,8 +96,6 @@
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
Expand Down Expand Up @@ -191,8 +191,8 @@ public Struct parseResponse(short version, ByteBuffer buffer) {
DescribeLogDirsResponse.schemaVersions()),
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
CreatePartitionsResponse.schemaVersions()),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS,
CreatePartitionsResponseData.SCHEMAS),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS, RenewDelegationTokenResponseData.SCHEMAS),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS),
Expand Down
Expand Up @@ -155,7 +155,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, shor
case SASL_AUTHENTICATE:
return new SaslAuthenticateResponse(struct, version);
case CREATE_PARTITIONS:
return new CreatePartitionsResponse(struct);
return new CreatePartitionsResponse(struct, version);
case CREATE_DELEGATION_TOKEN:
return new CreateDelegationTokenResponse(struct, version);
case RENEW_DELEGATION_TOKEN:
Expand Down
Expand Up @@ -17,221 +17,71 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.INT32;

public class CreatePartitionsRequest extends AbstractRequest {

private static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
private static final String NEW_PARTITIONS_KEY_NAME = "new_partitions";
private static final String COUNT_KEY_NAME = "count";
private static final String ASSIGNMENT_KEY_NAME = "assignment";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";

private static final Schema CREATE_PARTITIONS_REQUEST_V0 = new Schema(
new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(
new Schema(
TOPIC_NAME,
new Field(NEW_PARTITIONS_KEY_NAME, new Schema(
new Field(COUNT_KEY_NAME, INT32, "The new partition count."),
new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(new ArrayOf(INT32)),
"The assigned brokers.")
)))),
"List of topic and the corresponding new partitions."),
new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for the partitions to be created."),
new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN,
"If true then validate the request, but don't actually increase the number of partitions."));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema CREATE_PARTITIONS_REQUEST_V1 = CREATE_PARTITIONS_REQUEST_V0;

public static Schema[] schemaVersions() {
return new Schema[]{CREATE_PARTITIONS_REQUEST_V0, CREATE_PARTITIONS_REQUEST_V1};
}

// It is an error for duplicate topics to be present in the request,
// so track duplicates here to allow KafkaApis to report per-topic errors.
private final Set<String> duplicates;
private final Map<String, PartitionDetails> newPartitions;
private final int timeout;
private final boolean validateOnly;

public static class PartitionDetails {

private final int totalCount;

private final List<List<Integer>> newAssignments;

public PartitionDetails(int totalCount) {
this(totalCount, null);
}

public PartitionDetails(int totalCount, List<List<Integer>> newAssignments) {
this.totalCount = totalCount;
this.newAssignments = newAssignments;
}

public int totalCount() {
return totalCount;
}

public List<List<Integer>> newAssignments() {
return newAssignments;
}

@Override
public String toString() {
return "(totalCount=" + totalCount() + ", newAssignments=" + newAssignments() + ")";
}

}
private final CreatePartitionsRequestData data;

public static class Builder extends AbstractRequest.Builder<CreatePartitionsRequest> {

private final Map<String, PartitionDetails> newPartitions;
private final int timeout;
private final boolean validateOnly;
private final CreatePartitionsRequestData data;

public Builder(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly) {
public Builder(CreatePartitionsRequestData data) {
super(ApiKeys.CREATE_PARTITIONS);
this.newPartitions = newPartitions;
this.timeout = timeout;
this.validateOnly = validateOnly;
this.data = data;
}

@Override
public CreatePartitionsRequest build(short version) {
return new CreatePartitionsRequest(newPartitions, timeout, validateOnly, version);
return new CreatePartitionsRequest(data, version);
}

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=CreatePartitionsRequest").
append(", newPartitions=").append(newPartitions).
append(", timeout=").append(timeout).
append(", validateOnly=").append(validateOnly).
append(")");
return bld.toString();
return data.toString();
}
}

CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly, short apiVersion) {
CreatePartitionsRequest(CreatePartitionsRequestData data, short apiVersion) {
super(ApiKeys.CREATE_PARTITIONS, apiVersion);
this.newPartitions = newPartitions;
this.duplicates = Collections.emptySet();
this.timeout = timeout;
this.validateOnly = validateOnly;
this.data = data;
}

public CreatePartitionsRequest(Struct struct, short apiVersion) {
super(ApiKeys.CREATE_PARTITIONS, apiVersion);
Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
Map<String, PartitionDetails> counts = new HashMap<>(topicCountArray.length);
Set<String> dupes = new HashSet<>();
for (Object topicPartitionCountObj : topicCountArray) {
Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj;
String topic = topicPartitionCountStruct.get(TOPIC_NAME);
Struct partitionCountStruct = topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME);
int count = partitionCountStruct.getInt(COUNT_KEY_NAME);
Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
PartitionDetails newPartition;
if (assignmentsArray != null) {
List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length);
for (Object replicas : assignmentsArray) {
Object[] replicasArray = (Object[]) replicas;
List<Integer> replicasList = new ArrayList<>(replicasArray.length);
assignments.add(replicasList);
for (Object broker : replicasArray) {
replicasList.add((Integer) broker);
}
}
newPartition = new PartitionDetails(count, assignments);
} else {
newPartition = new PartitionDetails(count);
}
PartitionDetails dupe = counts.put(topic, newPartition);
if (dupe != null) {
dupes.add(topic);
}
}
this.newPartitions = counts;
this.duplicates = dupes;
this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
}

public Set<String> duplicates() {
return duplicates;
}

public Map<String, PartitionDetails> newPartitions() {
return newPartitions;
}

public int timeout() {
return timeout;
}

public boolean validateOnly() {
return validateOnly;
this(new CreatePartitionsRequestData(struct, apiVersion), apiVersion);
}

@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
List<Struct> topicPartitionsList = new ArrayList<>();
for (Map.Entry<String, PartitionDetails> topicPartitionCount : this.newPartitions.entrySet()) {
Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey());
PartitionDetails partitionDetails = topicPartitionCount.getValue();
Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
partitionCountStruct.set(COUNT_KEY_NAME, partitionDetails.totalCount());
Object[][] assignments = null;
if (partitionDetails.newAssignments() != null) {
assignments = new Object[partitionDetails.newAssignments().size()][];
int i = 0;
for (List<Integer> partitionAssignment : partitionDetails.newAssignments()) {
assignments[i] = partitionAssignment.toArray(new Object[0]);
i++;
}
}
partitionCountStruct.set(ASSIGNMENT_KEY_NAME, assignments);
topicPartitionCountStruct.set(NEW_PARTITIONS_KEY_NAME, partitionCountStruct);
topicPartitionsList.add(topicPartitionCountStruct);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicPartitionsList.toArray(new Object[0]));
struct.set(TIMEOUT_KEY_NAME, this.timeout);
struct.set(VALIDATE_ONLY_KEY_NAME, this.validateOnly);
return struct;
return data.toStruct(version());
}

public CreatePartitionsRequestData data() {
return data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Map<String, ApiError> topicErrors = new HashMap<>();
for (String topic : newPartitions.keySet()) {
topicErrors.put(topic, ApiError.fromThrowable(e));
CreatePartitionsResponseData response = new CreatePartitionsResponseData();
response.setThrottleTimeMs(throttleTimeMs);

ApiError apiError = ApiError.fromThrowable(e);
for (CreatePartitionsTopic topic : data.topics()) {
response.results().add(new CreatePartitionsTopicResult()
.setName(topic.name())
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message())
);
}
return new CreatePartitionsResponse(throttleTimeMs, topicErrors);
return new CreatePartitionsResponse(response);
}

public static CreatePartitionsRequest parse(ByteBuffer buffer, short version) {
Expand Down

0 comments on commit e63becb

Please sign in to comment.