diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index dd1d381a1..da68cdb47 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -62,7 +62,7 @@ case class LeaderAndIsr( if (leader == LeaderAndIsr.NoLeader) None else Some(leader) } - def equalsIgnorePartitionEpoch(other: LeaderAndIsr): Boolean = { + def equalsAllowStalePartitionEpoch(other: LeaderAndIsr): Boolean = { if (this == other) { true } else if (other == null) { @@ -71,7 +71,8 @@ case class LeaderAndIsr( leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr) && - leaderRecoveryState == other.leaderRecoveryState + leaderRecoveryState == other.leaderRecoveryState && + partitionEpoch <= other.partitionEpoch } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8d16eb7e1..d179bcd6c 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -2339,14 +2339,15 @@ class KafkaController(val config: KafkaConfig, if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) { partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH) None - } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) { - partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION) - None - } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) { + } else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) { // If a partition is already in the desired state, just return it + // this check must be done before fencing based on partition epoch to maintain idempotency partitionResponses(tp) = Right(currentLeaderAndIsr) None - } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) { + } else if (newLeaderAndIsr.partitionEpoch != currentLeaderAndIsr.partitionEpoch) { + partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION) + None + } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) { partitionResponses(tp) = Left(Errors.INVALID_REQUEST) info( s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " + diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 57cbeafd4..761888943 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -1002,47 +1002,58 @@ class ControllerIntegrationTest extends QuorumTestHarness { val controller = getController().kafkaController val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) - val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr + val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr + val newIsr = List(oldLeaderAndIsr.leader) + val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1 val topicId = controller.controllerContext.topicIds(tp.topic) val brokerId = otherBroker.config.brokerId val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId) - // When re-sending the current ISR, we should not get and error or any ISR changes - val alterPartitionRequest = new AlterPartitionRequestData() - .setBrokerId(brokerId) - .setBrokerEpoch(brokerEpoch) - .setTopics(Seq(new AlterPartitionRequestData.TopicData() - .setTopicId(topicId) - .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() - .setPartitionIndex(tp.partition) - .setLeaderEpoch(newLeaderAndIsr.leaderEpoch) - .setPartitionEpoch(newLeaderAndIsr.partitionEpoch) - .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava) - .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit = { + val alterPartitionRequest = new AlterPartitionRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(Seq(new AlterPartitionRequestData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch) + .setPartitionEpoch(requestPartitionEpoch) + .setNewIsr(newIsr.map(Int.box).asJava) + .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value) + ).asJava) ).asJava) - ).asJava) - val future = new CompletableFuture[AlterPartitionResponseData]() - controller.eventManager.put(AlterPartitionReceived( - alterPartitionRequest, - AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION, - future.complete - )) + val future = new CompletableFuture[AlterPartitionResponseData]() + controller.eventManager.put(AlterPartitionReceived( + alterPartitionRequest, + AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION, + future.complete + )) - val expectedAlterPartitionResponse = new AlterPartitionResponseData() - .setTopics(Seq(new AlterPartitionResponseData.TopicData() - .setTopicId(topicId) - .setPartitions(Seq(new AlterPartitionResponseData.PartitionData() - .setPartitionIndex(tp.partition) - .setLeaderId(brokerId) - .setLeaderEpoch(newLeaderAndIsr.leaderEpoch) - .setPartitionEpoch(newLeaderAndIsr.partitionEpoch) - .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava) - .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + // When re-sending an ISR update, we should not get and error or any ISR changes + val expectedAlterPartitionResponse = new AlterPartitionResponseData() + .setTopics(Seq(new AlterPartitionResponseData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderId(brokerId) + .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch) + .setPartitionEpoch(newPartitionEpoch) + .setIsr(newIsr.map(Int.box).asJava) + .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value) + ).asJava) ).asJava) - ).asJava) + assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) + } - assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) + // send a request, expect the partition epoch to be incremented + sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch) + + // re-send the same request with various partition epochs (less/equal/greater than the current + // epoch), expect it to succeed while the partition epoch remains the same + sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch) + sendAndVerifyAlterPartitionResponse(newPartitionEpoch) } @Test @@ -1100,7 +1111,6 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertAlterPartition( partitionError = Errors.UNKNOWN_TOPIC_ID, - topicPartition = tp, topicIdOpt = Some(Uuid.randomUuid()) ) @@ -1118,9 +1128,15 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertAlterPartition( partitionError = Errors.INVALID_UPDATE_VERSION, + isr = Set(leaderId), partitionEpoch = partitionEpoch - 1 ) + assertAlterPartition( + partitionError = Errors.INVALID_UPDATE_VERSION, + partitionEpoch = partitionEpoch + 1 + ) + assertAlterPartition( partitionError = Errors.FENCED_LEADER_EPOCH, leaderEpoch = leaderEpoch - 1 diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 6c6b975ee..2a0220c1f 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -104,7 +104,7 @@ versions += [ lz4: "1.8.0", mavenArtifact: "3.8.4", metrics: "2.2.0", - mockito: "4.4.0", + mockito: "4.6.1", netty: "4.1.78.Final", powermock: "2.0.9", reflections: "0.9.12",