Skip to content

Commit

Permalink
CONFLUENT: Sync from apache/kafka trunk to confluentinc/kafka master …
Browse files Browse the repository at this point in the history
…(5 August 2022)

Version related conflicts:
* Jenkinsfile
* gradle.properties
* streams/quickstart/java/pom.xml
* streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
* streams/quickstart/pom.xml
* tests/kafkatest/__init__.py
* tests/kafkatest/version.py

* commit 'add7cd85baa61cd0e1430': (66 commits)
KAFKA-14136 Generate ConfigRecord for brokers even if the value is
unchanged (apache#12483)
  HOTFIX / KAFKA-14130: Reduce RackAwarenesssTest to unit Test (apache#12476)
  MINOR: Remove ARM/PowerPC builds from Jenkinsfile (apache#12380)
  KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (apache#12455)
  KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (apache#12468)
KAFKA-14129: KRaft must check manual assignments for createTopics are
contiguous (apache#12467)
KAFKA-13546: Do not fail connector validation if default topic
creation group is explicitly specified (apache#11615)
KAFKA-14122: Fix flaky test
DynamicBrokerReconfigurationTest#testKeyStoreAlter (apache#12452)
  MINOR; Use right enum value for broker registration change (apache#12236)
  MINOR; Synchronize access to snapshots' TreeMap (apache#12464)
  MINOR; Bump trunk to 3.4.0-SNAPSHOT (apache#12463)
  MINOR: Stop logging 404s at ERROR level in Connect
KAFKA-14095: Improve handling of sync offset failures in MirrorMaker
(apache#12432)
  Minor: enable index for emit final sliding window (apache#12461)
  MINOR: convert some more junit tests to support KRaft (apache#12456)
  KAFKA-14108: Ensure both JUnit 4 and JUnit 5 tests run (apache#12441)
  MINOR: Remove code of removed metric (apache#12453)
MINOR: Update comment on verifyTaskGenerationAndOwnership method in
DistributedHerder
KAFKA-14012: Add warning to closeQuietly documentation about method
references of null objects (apache#12321)
  MINOR: Fix static mock usage in ThreadMetricsTest (apache#12454)
  ...
  • Loading branch information
ijuma committed Aug 5, 2022
2 parents affe8cd + add7cd8 commit 97f97b0
Show file tree
Hide file tree
Showing 198 changed files with 7,362 additions and 3,143 deletions.
39 changes: 31 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ ext {
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16))
defaultJvmArgs.addAll(
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.nio.file=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-opens=java.base/java.util.regex=ALL-UNNAMED",
"--add-opens=java.base/java.util.stream=ALL-UNNAMED",
Expand Down Expand Up @@ -352,7 +354,7 @@ subprojects {
}

// Remove the relevant project name once it's converted to JUnit 5
def shouldUseJUnit5 = !(["runtime", "streams"].contains(it.project.name))
def shouldUseJUnit5 = !(["runtime"].contains(it.project.name))

def testLoggingEvents = ["passed", "skipped", "failed"]
def testShowStandardStreams = false
Expand Down Expand Up @@ -433,11 +435,11 @@ subprojects {
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) {
testsToExclude.addAll([
// connect tests
"**/AbstractHerderTest.*", "**/ConnectClusterStateImplTest.*", "**/ConnectorPluginsResourceTest.*",
"**/AbstractHerderTest.*", "**/ConnectorPluginsResourceTest.*",
"**/ConnectorsResourceTest.*", "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerConfigTransformerTest.*", "**/WorkerGroupMemberTest.*",
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerGroupMemberTest.*",
"**/WorkerTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
"**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", "**/ExactlyOnceWorkerSourceTaskTest.*",
"**/WorkerTaskTest.*",
Expand Down Expand Up @@ -495,8 +497,18 @@ subprojects {
exclude testsToExclude

if (shouldUseJUnit5) {
useJUnitPlatform {
includeTags "integration"
if (project.name == 'streams') {
useJUnitPlatform {
includeTags "integration"
includeTags "org.apache.kafka.test.IntegrationTest"
// Both engines are needed to run JUnit 4 tests alongside JUnit 5 tests.
// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration is complete.
includeEngines "junit-vintage", "junit-jupiter"
}
} else {
useJUnitPlatform {
includeTags "integration"
}
}
} else {
useJUnit {
Expand Down Expand Up @@ -532,8 +544,18 @@ subprojects {
exclude testsToExclude

if (shouldUseJUnit5) {
useJUnitPlatform {
excludeTags "integration"
if (project.name == 'streams') {
useJUnitPlatform {
excludeTags "integration"
excludeTags "org.apache.kafka.test.IntegrationTest"
// Both engines are needed to run JUnit 4 tests alongside JUnit 5 tests.
// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration is complete.
includeEngines "junit-vintage", "junit-jupiter"
}
} else {
useJUnitPlatform {
excludeTags "integration"
}
}
} else {
useJUnit {
Expand Down Expand Up @@ -1863,11 +1885,12 @@ project(':streams') {

// testCompileOnly prevents streams from exporting a dependency on test-utils, which would cause a dependency cycle
testCompileOnly project(':streams:test-utils')

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
testImplementation libs.log4j
testImplementation libs.junitJupiterApi
testImplementation libs.junitJupiter
testImplementation libs.junitVintageEngine
testImplementation libs.easymock
testImplementation libs.powermockJunit4
Expand Down
37 changes: 35 additions & 2 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -919,19 +920,51 @@ default ListConsumerGroupsResult listConsumerGroups() {
* @param options The options to use when listing the consumer group offsets.
* @return The ListGroupOffsetsResult
*/
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options);
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
@SuppressWarnings("deprecation")
ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec()
.topicPartitions(options.topicPartitions());

// We can use the provided options with the batched API, which uses topic partitions from
// the group spec and ignores any topic partitions set in the options.
return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options);
}

/**
* List the consumer group offsets available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with default options.
* This is a convenience method for {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
* to list offsets of all partitions of one group with default options.
*
* @return The ListGroupOffsetsResult.
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
}

/**
* List the consumer group offsets available in the cluster for the specified consumer groups.
*
* @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the consumer group offsets.
* @return The ListConsumerGroupOffsetsResult
*/
ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options);

/**
* List the consumer group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of consumer group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListConsumerGroupOffsetsResult.
*/
default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
return listConsumerGroupOffsets(groupSpecs, new ListConsumerGroupOffsetsOptions());
}

/**
* Delete consumer groups from the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
Expand Down Expand Up @@ -3400,13 +3401,14 @@ void handleFailure(Throwable throwable) {
}

@Override
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId,
final ListConsumerGroupOffsetsOptions options) {
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
ListConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> future =
ListConsumerGroupOffsetsHandler.newFuture(groupId);
ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(), options.requireStable(), logContext);
ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListConsumerGroupOffsetsHandler handler =
new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
return new ListConsumerGroupOffsetsResult(future.all());
}

@Override
Expand Down Expand Up @@ -3756,7 +3758,7 @@ private List<MemberIdentity> getMembersFromGroup(String groupId, String reason)
public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options) {
String reason = options.reason() == null || options.reason().isEmpty() ?
DEFAULT_LEAVE_GROUP_REASON : options.reason();
DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason());

List<MemberIdentity> members;
if (options.removeAll()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,28 @@
import java.util.List;

/**
* Options for {@link Admin#listConsumerGroupOffsets(String)}.
* Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and {@link Admin#listConsumerGroupOffsets(String)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends AbstractOptions<ListConsumerGroupOffsetsOptions> {

private List<TopicPartition> topicPartitions = null;
private List<TopicPartition> topicPartitions;
private boolean requireStable = false;

/**
* Set the topic partitions to list as part of the result.
* {@code null} includes all topic partitions.
* <p>
* @deprecated Since 3.3.
* Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
* to specify topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListGroupOffsetsOptions
*/
@Deprecated
public ListConsumerGroupOffsetsOptions topicPartitions(List<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
Expand All @@ -55,7 +60,12 @@ public ListConsumerGroupOffsetsOptions requireStable(final boolean requireStable

/**
* Returns a list of topic partitions to add as part of the result.
* <p>
* @deprecated Since 3.3.
* Use {@link Admin#listConsumerGroupOffsets(java.util.Map, ListConsumerGroupOffsetsOptions)}
* to specify topic partitions.
*/
@Deprecated
public List<TopicPartition> topicPartitions() {
return topicPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,75 @@

package org.apache.kafka.clients.admin;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Map;

/**
* The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
* The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
* {@link Admin#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {

final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures;

ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
this.future = future;
ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue, Entry::getValue));
}

/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
* If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
return future;
if (futures.size() != 1) {
throw new IllegalStateException("Offsets from multiple consumer groups were requested. " +
"Use partitionsToOffsetAndMetadata(groupId) instead to get future for a specific group.");
}
return futures.values().iterator().next();
}

/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects for
* the specified group. If the group doesn't have a committed offset for a specific
* partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata(String groupId) {
if (!futures.containsKey(groupId))
throw new IllegalArgumentException("Offsets for consumer group '" + groupId + "' were not requested.");
return futures.get(groupId);
}

/**
* Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata> objects,
* if requests for all the groups succeed.
*/
public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
nil -> {
Map<String, Map<TopicPartition, OffsetAndMetadata>> listedConsumerGroupOffsets = new HashMap<>(futures.size());
futures.forEach((key, future) -> {
try {
listedConsumerGroupOffsets.put(key, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
throw new RuntimeException(e);
}
});
return listedConsumerGroupOffsets;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Objects;

/**
* Specification of consumer group offsets to list using {@link Admin#listConsumerGroupOffsets(java.util.Map)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsSpec {

private Collection<TopicPartition> topicPartitions;

/**
* Set the topic partitions whose offsets are to be listed for a consumer group.
* {@code null} includes all topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListConsumerGroupOffsetSpec
*/
public ListConsumerGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}

/**
* Returns the topic partitions whose offsets are to be listed for a consumer group.
* {@code null} indicates that offsets of all partitions of the group are to be listed.
*/
public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ListConsumerGroupOffsetsSpec)) {
return false;
}
ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o;
return Objects.equals(topicPartitions, that.topicPartitions);
}

@Override
public int hashCode() {
return Objects.hash(topicPartitions);
}

@Override
public String toString() {
return "ListConsumerGroupOffsetsSpec(" +
"topicPartitions=" + topicPartitions +
')';
}
}

0 comments on commit 97f97b0

Please sign in to comment.