Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges #9813

Closed
deepthidevaki opened this issue Jul 14, 2022 · 17 comments · Fixed by #9886, #9905 or #10020
Assignees
Labels
kind/flake Categorizes issue or PR as related to a flaky test version:1.3.13 version:8.1.0-alpha4 version:8.1.0-alpha5 Marks an issue as being completely or in parts released in 8.1.0-alpha5 version:8.1.0 Marks an issue as being completely or in parts released in 8.1.0

Comments

@deepthidevaki
Copy link
Contributor

deepthidevaki commented Jul 14, 2022

Summary

Try to answer the following as best as possible

  • How often does the test fail? 1
  • Does it block your work? no
  • Do we suspect that it is a real failure?

Failures

Example assertion failure
Error:  CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges  Time elapsed: 31.408 s  <<< ERROR!
io.camunda.zeebe.client.api.command.ClientStatusException: Time out between gateway and broker: Request timed out after PT10S
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.transformExecutionException(ZeebeClientFutureImpl.java:93)
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:50)
	at io.camunda.zeebe.it.clustering.CompleteProcessInstanceAfterLeaderChangeTest.lambda$actions$4(CompleteProcessInstanceAfterLeaderChangeTest.java:97)
	at io.camunda.zeebe.it.clustering.CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges(CompleteProcessInstanceAfterLeaderChangeTest.java:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: Time out between gateway and broker: Request timed out after PT10S
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:48)
	... 19 more
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: Time out between gateway and broker: Request timed out after PT10S
	at io.grpc.Status.asRuntimeException(Status.java:535)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more

Logs

logs

@deepthidevaki deepthidevaki added the kind/flake Categorizes issue or PR as related to a flaky test label Jul 14, 2022
@deepthidevaki deepthidevaki self-assigned this Jul 25, 2022
zeebe-bors-camunda bot added a commit that referenced this issue Jul 26, 2022
9845: chore(ci): adjust direct Vault usage in Jenkins to new path r=npepinpe a=Langleu

## Description

Infra is restructuring Vault paths to ease the self-service.
See announcement on [slack](https://camunda.slack.com/archives/C03AEFWGJ9K/p1657524606321389).

GitHub paths will also be adjusted later this week in a different PR.
Goal is to combine everything under a product space, which makes reusing of secrets also easier as you don't have to duplicate them between GitHub Actions and Jenkins.

## Related issues

On the Infra side --> [INFRA-3326](https://jira.camunda.com/browse/INFRA-3326)



9884: Simplify ControlledActorClockEndpointIT r=npepinpe a=npepinpe

## Description

This PR simplifies the `ControlledActorClockEndpointIT`, dropping the dependency on Elasticsearch and using `BpmnAssert` to verify the records produced.

I also opted to not use `ZeebeClock` directly, or even the `ZeebeTestEngine#increaseTime`, as we want to explicitly test the endpoint, and not the interfaces provided by other libraries.



9886: test(qa): wait until message is published before restarting the broker r=deepthidevaki a=deepthidevaki

## Description

The test was flaky - it sometime failed in 'correlate message after restart' while waiting for the instance to be completed. It is not clear why exactly the test fails, but a probable reason is that the message was not published before the broker was restarted.

## Related issues

closes #9813



Co-authored-by: Langleu <lars.lange@camunda.com>
Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
@deepthidevaki
Copy link
Contributor Author

deepthidevaki commented Jul 26, 2022

Still fails #9886 (comment) 😭

@deepthidevaki deepthidevaki reopened this Jul 26, 2022
zeebe-bors-camunda bot added a commit that referenced this issue Jul 26, 2022
9894: [Backport stable/1.3] test(qa): wait until message is published before restarting the broker r=deepthidevaki a=backport-action

# Description
Backport of #9886 to `stable/1.3`.

relates to #9813

9896: [Backport stable/1.3] fix(engine): add grace period to detect end of processing r=remcowesterhoud a=backport-action

# Description
Backport of #9082 to `stable/1.3`.

relates to #8738

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: pihme <pihme@users.noreply.github.com>
zeebe-bors-camunda bot added a commit that referenced this issue Jul 26, 2022
9895: [Backport stable/8.0] test(qa): wait until message is published before restarting the broker r=deepthidevaki a=backport-action

# Description
Backport of #9886 to `stable/8.0`.

relates to #9813

9897: [Backport stable/8.0] fix(engine): add grace period to detect end of processing r=remcowesterhoud a=backport-action

# Description
Backport of #9082 to `stable/8.0`.

relates to #8738

closes #9641

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: pihme <pihme@users.noreply.github.com>
@deepthidevaki
Copy link
Contributor Author

Found the following logs from the failed test

14:36:41.364 [Broker-0-StreamProcessor-1] [Broker-0-zb-actors-0] ERROR io.camunda.zeebe.processor - Expected to execute side effects for record 'LoggedEvent [type=0, version=0, streamId=1, position=1, key=-1, timestamp=1658846201352, sourceEventPosition=-1] RecordMetadata{recordType=COMMAND, intentValue=255, intent=PUBLISH, requestStreamId=1, requestId=0, protocolVersion=3, valueType=MESSAGE, rejectionType=NULL_VAL, rejectionReason=, brokerVersion=8.1.0}' successfully, but exception was thrown.
java.lang.RuntimeException: java.lang.NullPointerException
	at io.camunda.zeebe.streamprocessor.DirectProcessingResult.executePostCommitTasks(DirectProcessingResult.java:69) ~[zeebe-workflow-engine-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.streamprocessor.ProcessingStateMachine.lambda$executeSideEffects$11(ProcessingStateMachine.java:409) ~[zeebe-workflow-engine-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.retry.ActorRetryMechanism.run(ActorRetryMechanism.java:36) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.retry.AbortableRetryStrategy.run(AbortableRetryStrategy.java:45) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorJob.invoke(ActorJob.java:92) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorJob.execute(ActorJob.java:45) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorTask.execute(ActorTask.java:119) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorThread.executeCurrentTask(ActorThread.java:106) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorThread.doWork(ActorThread.java:87) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.scheduler.ActorThread.run(ActorThread.java:198) ~[zeebe-scheduler-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
Caused by: java.lang.NullPointerException
	at java.util.Objects.requireNonNull(Objects.java:208) ~[?:?]
	at io.camunda.zeebe.broker.transport.commandapi.CommandResponseWriterImpl.tryWriteResponse(CommandResponseWriterImpl.java:99) ~[zeebe-broker-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl.flush(LegacyTypedResponseWriterImpl.java:142) ~[zeebe-workflow-engine-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.engine.processing.message.MessagePublishProcessor.sendCorrelateCommand(MessagePublishProcessor.java:193) ~[zeebe-workflow-engine-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at io.camunda.zeebe.streamprocessor.DirectProcessingResult.executePostCommitTasks(DirectProcessingResult.java:67) ~[zeebe-workflow-engine-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	... 9 more

Not sure if this explains the failure. Related to #9860

@deepthidevaki
Copy link
Contributor Author

Found this log

09:18:57.222 [Broker-0-StreamProcessor-1] [Broker-0-zb-actors-1] INFO  io.camunda.zeebe.broker.transport - Sending command MESSAGE_SUBSCRIPTION CREATE to partition 1, leader 1

where the command is sent to member 1. But at this time, the leader is 0 and member 1 (which was the previous leader) is already shutdown. It seems the topology is not updated when this command is send. I think that that topology will be eventually updated. But the retry delay is set to 10 seconds. The tests fails before the retry is attempted.

@deepthidevaki
Copy link
Contributor Author

Also the NPE doesn't seems to affect this test because when this error happens, there are no subscription to correlate.

zeebe-bors-camunda bot added a commit that referenced this issue Jul 28, 2022
9908: [Backport stable/1.3] test(qa): increase timeout r=deepthidevaki a=backport-action

# Description
Backport of #9905 to `stable/1.3`.

relates to #9813

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
zeebe-bors-camunda bot added a commit that referenced this issue Jul 28, 2022
9909: [Backport stable/8.0] test(qa): increase timeout r=deepthidevaki a=backport-action

# Description
Backport of #9905 to `stable/8.0`.

relates to #9813

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
@korthout
Copy link
Member

korthout commented Aug 1, 2022

Happened again, see #9935 (comment)

Error:  Tests run: 12, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 270.506 s <<< FAILURE! - in io.camunda.zeebe.it.clustering.CompleteProcessInstanceAfterLeaderChangeTest
Error:  CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges  Time elapsed: 42.355 s  <<< ERROR!
io.camunda.zeebe.client.api.command.ClientStatusException: Time out between gateway and broker: Request ProtocolRequest{id=47, subject=command-api-1, sender=0.0.0.0:2294, payload=byte[]{length=155, hash=-932148898}} to 0.0.0.0:2283 timed out in PT20S
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.transformExecutionException(ZeebeClientFutureImpl.java:93)
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:50)
	at io.camunda.zeebe.it.clustering.CompleteProcessInstanceAfterLeaderChangeTest.lambda$actions$4(CompleteProcessInstanceAfterLeaderChangeTest.java:100)
	at io.camunda.zeebe.it.clustering.CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges(CompleteProcessInstanceAfterLeaderChangeTest.java:175)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: Time out between gateway and broker: Request ProtocolRequest{id=47, subject=command-api-1, sender=0.0.0.0:2294, payload=byte[]{length=155, hash=-932148898}} to 0.0.0.0:2283 timed out in PT20S
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
	at io.camunda.zeebe.client.impl.ZeebeClientFutureImpl.join(ZeebeClientFutureImpl.java:48)
	... 19 more
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: Time out between gateway and broker: Request ProtocolRequest{id=47, subject=command-api-1, sender=0.0.0.0:2294, payload=byte[]{length=155, hash=-932148898}} to 0.0.0.0:2283 timed out in PT20S
	at io.grpc.Status.asRuntimeException(Status.java:535)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:487)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more

@korthout korthout reopened this Aug 1, 2022
@deepthidevaki
Copy link
Contributor Author

😭
Will look into it. Probably then the topology is never updated 🤔

@korthout
Copy link
Member

korthout commented Aug 1, 2022

Thanks @deepthidevaki Sorry to have to open this one again 🙇 But thanks for looking into it.

@remcowesterhoud
Copy link
Contributor

Here is another example pipeline: https://github.com/camunda/zeebe/runs/7667370072?check_suite_focus=true

@lenaschoenburg lenaschoenburg changed the title FLaky CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges Flaky CompleteProcessInstanceAfterLeaderChangeTest.shouldCompleteProcessInstanceAfterSeveralLeaderChanges Aug 4, 2022
@deepthidevaki
Copy link
Contributor Author

Was quite difficult to reproduce it. But reproduced it after several runs https://github.com/camunda/zeebe/actions/runs/2783045847

From the logs it seems the topology is not updated correctly.

08:48:44.386 [Broker-0-ZeebePartition-1] [Broker-0-zb-actors-1] INFO  io.camunda.zeebe.broker.system - Transition to LEADER on term 3 completed
08:48:44.386 [Broker-0-StreamProcessor-1] [Broker-0-zb-actors-0] INFO  io.camunda.zeebe.broker.system - Update leader of partition 1 to node 0
08:48:44.386 [Broker-0-StreamProcessor-1] [Broker-0-zb-actors-0] INFO  io.camunda.zeebe.broker.system - Update leader of partition 1 to node 1

Broker 0 became the leader. The topology listener updates the leader to 0, but then it updates again to broker 1. Broker 1 was leader in the previous term. So the current leader is overwritten. No idea how it can happen.

@lenaschoenburg
Copy link
Member

lenaschoenburg commented Aug 4, 2022

When the partition leader is updated, we now schedule a timer:
https://github.com/camunda/zeebe/blob/b05271207aff00bffb372a0d9d7e35b881502624/broker/src/main/java/io/camunda/zeebe/broker/partitioning/topology/TopologyPartitionListenerImpl.java#L28

Before, we directly submitted the job, which is ordered.

Quoting from the documentation of DeadlineTimerWheel:

Timers that expire in the same tick are not be ordered with one another. As ticks are fairly coarse resolution normally, this means that some timers may expire out of order.

@deepthidevaki
Copy link
Contributor Author

Great point @oleschoenburg

I suspected some thing like it, but the following test passed

@Test
  public void shouldExecuteInOrder() throws InterruptedException {
    // given
    final AtomicReference<ActorControl> actorControl = new AtomicReference<>();
    final CompletableFuture<Void> started = new CompletableFuture<>();
    final Actor temp =
        new Actor() {
          @Override
          protected void onActorStarted() {
            actorControl.set(actor);
            started.complete(null);
          }
        };

    actorSchedulerRule.submitActor(temp).join();
    started.join();

    // when
    final var actor = actorControl.get();

    final Runnable step1 = mock(Runnable.class);
    final Runnable step2 = mock(Runnable.class);
    actor.submit(() -> actor.runDelayed(Duration.ZERO, step1));
    actor.submit(() -> actor.runDelayed(Duration.ZERO, step2));
    ;
    final var inorder = inOrder(step1, step2);
    inorder.verify(step1, timeout(1000)).run();
    inorder.verify(step2, timeout(1000)).run();
  }

But as the documentation says, if it is not guaranteed, then that would explain why it fails non-deterministically.

@lenaschoenburg
Copy link
Member

IMO TopologyPartitionListenerImpl could use a small rewrite to just use a ConcurrentHashMap instead of relying on Int2IntHashMap plus the actor scheduler.

@deepthidevaki
Copy link
Contributor Author

IMO TopologyPartitionListenerImpl could use a small rewrite to just use a ConcurrentHashMap instead of relying on Int2IntHashMap plus the actor scheduler.

TopologyPartitionListenerImpl is only used by the InterPartitionCommandSender. I was thinking of merging them. And instead of sharing the actor with the StreamProcessor, it can run on its own actor. Then it doesn't have to use the ProcessSchedulingService to schedule the task, instead directly run on the actor. This way we can also build them outside of the engine. Right now they are constructed as part of TypedRecordProcessorFactory.

@lenaschoenburg
Copy link
Member

So InterPartitionCommandSender implements TopologyPartitionListener and we remove TopologyPartitionListenerImpl? That sounds even better 😍 🚀

@Zelldon
Copy link
Member

Zelldon commented Aug 4, 2022

Would reslove this #9756 (comment) as well

@deepthidevaki
Copy link
Contributor Author

Then I will do that. Not yet sure if using its own actor or as @oleschoenburg said get rid of the actor and use a ConcurrentHashMap is better.

@npepinpe
Copy link
Member

npepinpe commented Aug 4, 2022

imo having the transport be its own actor kind of makes sense. you can pass the job of sending a command to another actor and keep processing 👍

edit: i know the sender is not the transport, but you get the idea 😄

zeebe-bors-camunda bot added a commit that referenced this issue Aug 17, 2022
10020: Merge topology listener with InterPartitionCommandSender r=deepthidevaki a=deepthidevaki

## Description

Flaky test #9813 was due to the changes in `ProcessSchedulingService`, which instead of submitting a new task scheduled a task with 0 delay. There is not guarantee in which order the tasks are executed when multiple tasks times out at the same time. This breaks the assumption in the topology listener, and it led to listeners executing in different order than they were infoked. As a result, the the new leader was overwritten by a previous invocation of the listener with the old leader. 

To fix it, in this PR
* Merged topology listener with InterPartitionCommandSender
* InterPartitionCommandSender has its own actor, instead of sharing the StreamProcessorActor. This avoids the need to use `ProcessSchedulingService` and thus the non-deterministic execution of the listener.
* Refactoring to allow installing InterPartitionCommandSender during PartitionTransition instead of engine factory.

## Related issues

closes #9813 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
@Zelldon Zelldon added the version:8.1.0 Marks an issue as being completely or in parts released in 8.1.0 label Oct 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/flake Categorizes issue or PR as related to a flaky test version:1.3.13 version:8.1.0-alpha4 version:8.1.0-alpha5 Marks an issue as being completely or in parts released in 8.1.0-alpha5 version:8.1.0 Marks an issue as being completely or in parts released in 8.1.0
Projects
None yet
6 participants