Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16712: Fix race in TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest #15962

Merged
merged 3 commits into from
May 16, 2024

Conversation

gaurav-narula
Copy link
Contributor

TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest has a race when it sets RemoteLogMetadataTopicPartitioner using the setter.

This change fixes the race condition by passing the RemoteLogMetadataTopicPartitioner instance in a Function<Integer, RemoteLogMetaedataTopicPartitioner> which is used in configure() in TopicBasedRemoteLogMetadataManager.

It also improves the waitingFor condition by spying on RemotePartitionMetadataStore and awaiting on Phasers to ensure ConsumerManager makes progress before performing assertions.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…bscriptionsTest

`TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest` has a race
when it sets the `RemoteLogMetadataTopicPartitioner` using the setter.

This change fixes the race condition by passing the
`RemoteLogMetadataTopicPartitioner` instance in a `Function<Integer,
RemoteLogMetaedataTopicPartitioner>` which is used in `configure()`
in `TopicBasedRemoteLogMetadataManager`.

It also improves the waitingFor condition by spying on
`RemotePartitionMetadataStore` and awaiting on Phasers to
ensure ConsumerManager makes progress before performing assertions.
@gaurav-narula
Copy link
Contributor Author

CC: @kamalcph this is following the discussion at #15885 (comment)

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

This is the first time, Phaser is being used in the project. And, it is not straight-forward to understand. Could you please add more comments?

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula nice fix!

@@ -119,9 +120,6 @@ public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);

topicBasedRemoteLogMetadataManager.configure(configs);
if (remoteLogMetadataTopicPartitioner != null) {
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setRlmTopicPartitioner gets unused now, so could you please remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 6880e9f

Phaser initializationPhaser = new Phaser(2); // 1 to register test thread, 1 to register leaderTopicIdPartition
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationPhaser.arriveAndDeregister();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use CountDownLatch instead? And we can override the stub (i.e create doAnswer again) before running onPartitionLeadershipChanges? For example:

        CountDownLatch latch = new CountDownLatch(1);
        doAnswer(invocationOnMock -> {
            Object result = invocationOnMock.callRealMethod();
            latch.countDown();
            return result;
        }).when(spyRemotePartitionMetadataStore).markInitialized(any());
...

        latch.await(30_000, TimeUnit.MILLISECONDS);

...

        CountDownLatch latch2 = new CountDownLatch(1);
        doAnswer(invocationOnMock -> {
            Object result = invocationOnMock.callRealMethod();
            latch2.countDown();
            return result;
        }).when(spyRemotePartitionMetadataStore).markInitialized(any());

...
        rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
            Collections.singleton(followerTopicIdPartition));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some comments in 9cfda4f to describe the Phaser operations following Kamal's suggestion.

Please let me know if that's okay. I'd prefer not duplicating the stub given a data structure in the standard library fits the purpose but I'm happy to change it if you feel it's confusing for future readers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments LGTM. thanks!

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 will merge it if no related failed tests

@chia7712 chia7712 merged commit a1c2c68 into apache:trunk May 16, 2024
1 check failed
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request May 24, 2024
…scriptionsTest (apache#15962)

TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest has a race when it sets RemoteLogMetadataTopicPartitioner using the setter.

This change fixes the race condition by passing the RemoteLogMetadataTopicPartitioner instance in a Function<Integer, RemoteLogMetaedataTopicPartitioner> which is used in configure() in TopicBasedRemoteLogMetadataManager.

It also improves the waitingFor condition by spying on RemotePartitionMetadataStore and awaiting on Phasers to ensure ConsumerManager makes progress before performing assertions.

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants