Skip to content

Commit

Permalink
MINOR: Upgrade mockito test dependencies (#12460)
Browse files Browse the repository at this point in the history
## Changes
- **mockito: 4.4.0 -> 4.6.1** (https://github.com/mockito/mockito/releases)
Most important updates:
  - Fixes mockito/mockito#2648 : Add support for customising strictness via @mock annotation and MockSettings mockito/mockito#2650

## Why is this change needed?

According to the [Mockito documentation](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#when(T)) :
> Although it is possible to verify a stubbed invocation, usually it's just redundant. Let's say you've stubbed foo.bar(). If your code cares what foo.bar() returns then something else breaks(often before even verify() gets executed). If your code doesn't care what get(0) returns then it should not be stubbed.

While working on the [Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest ](https://issues.apache.org/jira/browse/KAFKA-12947) I noticed that described behavior wasn't applied when you create a new `mock` like this.

```java
        final Metrics metrics = mock(Metrics.class);
        when(metrics.metric(metricName)).thenReturn(null);

        ... invoke SUT

        verify(metrics).metric(metricName); // this should be redundant (according to docs)

```

After further investigation I figured out that described behaviour wasn't implemented until`v4.6.1`.

With this change we are now able to mock objects like this:

```java
   Foo explicitStrictMock = mock(Foo.class, withSettings().strictness(Strictness.STRICT_STUBS));
```
- link to docs: [MockSettings.html#strictness](https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/quality/Strictness.html#STRICT_STUBS)

It looks like I can accomplish the same thing by using the `@RunWith(MockitoJUnitRunner.StrictStubs.class)
` instead of the `@RunWith(MockitoJUnitRunner.class)` so mockito dependency version update is not mandatory, but it would be nice to stay up-to-date and use the latest version (it's up to MR reviewer to decide if we are going to merge this now, or just close the MR and update mockito version later).

Reviewers: Ismael Juma <ismael@juma.me.uk>
  • Loading branch information
a0x8o committed Aug 9, 2022
1 parent 50b4982 commit 3a784ac
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 41 deletions.
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/api/LeaderAndIsr.scala
Expand Up @@ -62,7 +62,7 @@ case class LeaderAndIsr(
if (leader == LeaderAndIsr.NoLeader) None else Some(leader)
}

def equalsIgnorePartitionEpoch(other: LeaderAndIsr): Boolean = {
def equalsAllowStalePartitionEpoch(other: LeaderAndIsr): Boolean = {
if (this == other) {
true
} else if (other == null) {
Expand All @@ -71,7 +71,8 @@ case class LeaderAndIsr(
leader == other.leader &&
leaderEpoch == other.leaderEpoch &&
isr.equals(other.isr) &&
leaderRecoveryState == other.leaderRecoveryState
leaderRecoveryState == other.leaderRecoveryState &&
partitionEpoch <= other.partitionEpoch
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Expand Up @@ -2339,14 +2339,15 @@ class KafkaController(val config: KafkaConfig,
if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
None
} else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) {
partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
None
} else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) {
} else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
// If a partition is already in the desired state, just return it
// this check must be done before fencing based on partition epoch to maintain idempotency
partitionResponses(tp) = Right(currentLeaderAndIsr)
None
} else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
} else if (newLeaderAndIsr.partitionEpoch != currentLeaderAndIsr.partitionEpoch) {
partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION)
None
} else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) {
partitionResponses(tp) = Left(Errors.INVALID_REQUEST)
info(
s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " +
Expand Down
Expand Up @@ -1002,47 +1002,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {

val controller = getController().kafkaController
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
val newIsr = List(oldLeaderAndIsr.leader)
val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1
val topicId = controller.controllerContext.topicIds(tp.topic)
val brokerId = otherBroker.config.brokerId
val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)

// When re-sending the current ISR, we should not get and error or any ISR changes
val alterPartitionRequest = new AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setTopics(Seq(new AlterPartitionRequestData.TopicData()
.setTopicId(topicId)
.setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(tp.partition)
.setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
.setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
.setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit = {
val alterPartitionRequest = new AlterPartitionRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setTopics(Seq(new AlterPartitionRequestData.TopicData()
.setTopicId(topicId)
.setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(tp.partition)
.setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(requestPartitionEpoch)
.setNewIsr(newIsr.map(Int.box).asJava)
.setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
).asJava)
).asJava)
).asJava)

val future = new CompletableFuture[AlterPartitionResponseData]()
controller.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
future.complete
))
val future = new CompletableFuture[AlterPartitionResponseData]()
controller.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
future.complete
))

val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
.setTopicId(topicId)
.setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(tp.partition)
.setLeaderId(brokerId)
.setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
.setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
.setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
// When re-sending an ISR update, we should not get and error or any ISR changes
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
.setTopicId(topicId)
.setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(tp.partition)
.setLeaderId(brokerId)
.setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
.setPartitionEpoch(newPartitionEpoch)
.setIsr(newIsr.map(Int.box).asJava)
.setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
).asJava)
).asJava)
).asJava)
assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
}

assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
// send a request, expect the partition epoch to be incremented
sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)

// re-send the same request with various partition epochs (less/equal/greater than the current
// epoch), expect it to succeed while the partition epoch remains the same
sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
}

@Test
Expand Down Expand Up @@ -1100,7 +1111,6 @@ class ControllerIntegrationTest extends QuorumTestHarness {

assertAlterPartition(
partitionError = Errors.UNKNOWN_TOPIC_ID,
topicPartition = tp,
topicIdOpt = Some(Uuid.randomUuid())
)

Expand All @@ -1118,9 +1128,15 @@ class ControllerIntegrationTest extends QuorumTestHarness {

assertAlterPartition(
partitionError = Errors.INVALID_UPDATE_VERSION,
isr = Set(leaderId),
partitionEpoch = partitionEpoch - 1
)

assertAlterPartition(
partitionError = Errors.INVALID_UPDATE_VERSION,
partitionEpoch = partitionEpoch + 1
)

assertAlterPartition(
partitionError = Errors.FENCED_LEADER_EPOCH,
leaderEpoch = leaderEpoch - 1
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Expand Up @@ -104,7 +104,7 @@ versions += [
lz4: "1.8.0",
mavenArtifact: "3.8.4",
metrics: "2.2.0",
mockito: "4.4.0",
mockito: "4.6.1",
netty: "4.1.78.Final",
powermock: "2.0.9",
reflections: "0.9.12",
Expand Down

0 comments on commit 3a784ac

Please sign in to comment.