From b71bc6510f24541ccb0585d8cd8ebfbd84a336bd Mon Sep 17 00:00:00 2001 From: hangc0276 Date: Fri, 25 Jun 2021 22:58:34 +0800 Subject: [PATCH] fix flaky test testDuplicateConcurrentSubscribeCommand (#11089) ### Motivation When running test, it get the following exception accidentally ``` Error: testDuplicateConcurrentSubscribeCommand(org.apache.pulsar.broker.service.ServerCnxTest) Time elapsed: 0.75 s <<< FAILURE! java.lang.AssertionError: Response is not CommandError but org.apache.pulsar.common.api.proto.CommandSuccess@6d68db expected [true] but found [false] at org.testng.Assert.fail(Assert.java:99) at org.testng.Assert.failNotEquals(Assert.java:1037) at org.testng.Assert.assertTrue(Assert.java:45) at org.apache.pulsar.broker.service.ServerCnxTest.testDuplicateConcurrentSubscribeCommand(ServerCnxTest.java:767) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132) at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45) at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73) at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` The reason is in the test logic, it subscribe the topic in `Exclusive` mode first, and then subscribe second time and check the subscribe status. If the first subscribe status returns faster than the second subscribe, it will get the above exception. ### Modifications 1. Use `Awaitility.await().untilAsserted` to wait the second subscribe status return, otherwise the test will be terminated by test timeout(30s) 2. format `ServerCnx#handleSubscribe` code style --- .../pulsar/broker/service/ServerCnx.java | 256 +++++++++--------- .../pulsar/broker/service/ServerCnxTest.java | 13 +- 2 files changed, 136 insertions(+), 133 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 8006b895c0751..13408bc645912 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -938,145 +938,145 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { remoteAddress, getPrincipal()); } - log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); - try { - Metadata.validateMetadata(metadata); - } catch (IllegalArgumentException iae) { - final String msg = iae.getMessage(); - commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg); - return null; - } - CompletableFuture consumerFuture = new CompletableFuture<>(); - CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, - consumerFuture); - - if (existingConsumerFuture != null) { - if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { - Consumer consumer = existingConsumerFuture.getNow(null); - log.info("[{}] Consumer with the same id is already created:" - + " consumerId={}, consumer={}", - remoteAddress, consumerId, consumer); - commandSender.sendSuccessResponse(requestId); - return null; - } else { - // There was an early request to create a consumer with same consumerId. This can happen - // when - // client timeout is lower the broker timeouts. We need to wait until the previous - // consumer - // creation request either complete or fails. - log.warn("[{}][{}][{}] Consumer with id is already present on the connection," - + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); - ServerError error = null; - if (!existingConsumerFuture.isDone()) { - error = ServerError.ServiceNotReady; - } else { - error = getErrorCode(existingConsumerFuture); - consumers.remove(consumerId, existingConsumerFuture); - } - commandSender.sendErrorResponse(requestId, error, - "Consumer is already present on the connection"); - return null; - } + log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); + try { + Metadata.validateMetadata(metadata); + } catch (IllegalArgumentException iae) { + final String msg = iae.getMessage(); + commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg); + return null; + } + CompletableFuture consumerFuture = new CompletableFuture<>(); + CompletableFuture existingConsumerFuture = consumers.putIfAbsent(consumerId, + consumerFuture); + + if (existingConsumerFuture != null) { + if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) { + Consumer consumer = existingConsumerFuture.getNow(null); + log.info("[{}] Consumer with the same id is already created:" + + " consumerId={}, consumer={}", + remoteAddress, consumerId, consumer); + commandSender.sendSuccessResponse(requestId); + return null; + } else { + // There was an early request to create a consumer with same consumerId. This can happen + // when + // client timeout is lower the broker timeouts. We need to wait until the previous + // consumer + // creation request either complete or fails. + log.warn("[{}][{}][{}] Consumer with id is already present on the connection," + + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId); + ServerError error = null; + if (!existingConsumerFuture.isDone()) { + error = ServerError.ServiceNotReady; + } else { + error = getErrorCode(existingConsumerFuture); + consumers.remove(consumerId, existingConsumerFuture); } + commandSender.sendErrorResponse(requestId, error, + "Consumer is already present on the connection"); + return null; + } + } - boolean createTopicIfDoesNotExist = forceTopicCreation - && service.isAllowAutoTopicCreation(topicName.toString()); + boolean createTopicIfDoesNotExist = forceTopicCreation + && service.isAllowAutoTopicCreation(topicName.toString()); - service.getTopic(topicName.toString(), createTopicIfDoesNotExist) - .thenCompose(optTopic -> { - if (!optTopic.isPresent()) { - return FutureUtil - .failedFuture(new TopicNotFoundException( - "Topic " + topicName + " does not exist")); - } + service.getTopic(topicName.toString(), createTopicIfDoesNotExist) + .thenCompose(optTopic -> { + if (!optTopic.isPresent()) { + return FutureUtil + .failedFuture(new TopicNotFoundException( + "Topic " + topicName + " does not exist")); + } - Topic topic = optTopic.get(); + Topic topic = optTopic.get(); - boolean rejectSubscriptionIfDoesNotExist = isDurable - && !service.isAllowAutoSubscriptionCreation(topicName.toString()) - && !topic.getSubscriptions().containsKey(subscriptionName); + boolean rejectSubscriptionIfDoesNotExist = isDurable + && !service.isAllowAutoSubscriptionCreation(topicName.toString()) + && !topic.getSubscriptions().containsKey(subscriptionName); - if (rejectSubscriptionIfDoesNotExist) { - return FutureUtil - .failedFuture( - new SubscriptionNotFoundException( - "Subscription does not exist")); - } + if (rejectSubscriptionIfDoesNotExist) { + return FutureUtil + .failedFuture( + new SubscriptionNotFoundException( + "Subscription does not exist")); + } - if (schema != null) { - return topic.addSchemaIfIdleOrCheckCompatible(schema) - .thenCompose(v -> topic.subscribe( - ServerCnx.this, subscriptionName, consumerId, - subType, priorityLevel, consumerName, isDurable, - startMessageId, metadata, - readCompacted, initialPosition, startMessageRollbackDurationSec, - isReplicated, keySharedMeta)); - } else { - return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, - subType, priorityLevel, consumerName, isDurable, - startMessageId, metadata, readCompacted, initialPosition, - startMessageRollbackDurationSec, isReplicated, keySharedMeta); - } - }) - .thenAccept(consumer -> { - if (consumerFuture.complete(consumer)) { - log.info("[{}] Created subscription on topic {} / {}", - remoteAddress, topicName, subscriptionName); - commandSender.sendSuccessResponse(requestId); - } else { - // The consumer future was completed before by a close command - try { - consumer.close(); - log.info("[{}] Cleared consumer created after timeout on client side {}", - remoteAddress, consumer); - } catch (BrokerServiceException e) { - log.warn( - "[{}] Error closing consumer created" - + " after timeout on client side {}: {}", - remoteAddress, consumer, e.getMessage()); - } - consumers.remove(consumerId, consumerFuture); - } + if (schema != null) { + return topic.addSchemaIfIdleOrCheckCompatible(schema) + .thenCompose(v -> topic.subscribe( + ServerCnx.this, subscriptionName, consumerId, + subType, priorityLevel, consumerName, isDurable, + startMessageId, metadata, + readCompacted, initialPosition, startMessageRollbackDurationSec, + isReplicated, keySharedMeta)); + } else { + return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, + subType, priorityLevel, consumerName, isDurable, + startMessageId, metadata, readCompacted, initialPosition, + startMessageRollbackDurationSec, isReplicated, keySharedMeta); + } + }) + .thenAccept(consumer -> { + if (consumerFuture.complete(consumer)) { + log.info("[{}] Created subscription on topic {} / {}", + remoteAddress, topicName, subscriptionName); + commandSender.sendSuccessResponse(requestId); + } else { + // The consumer future was completed before by a close command + try { + consumer.close(); + log.info("[{}] Cleared consumer created after timeout on client side {}", + remoteAddress, consumer); + } catch (BrokerServiceException e) { + log.warn( + "[{}] Error closing consumer created" + + " after timeout on client side {}: {}", + remoteAddress, consumer, e.getMessage()); + } + consumers.remove(consumerId, consumerFuture); + } - }) - .exceptionally(exception -> { - if (exception.getCause() instanceof ConsumerBusyException) { - if (log.isDebugEnabled()) { - log.debug( - "[{}][{}][{}] Failed to create consumer because exclusive consumer" - + " is already connected: {}", - remoteAddress, topicName, subscriptionName, - exception.getCause().getMessage()); - } - } else if (exception.getCause() instanceof BrokerServiceException) { - log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", - remoteAddress, topicName, subscriptionName, - consumerId, exception.getCause().getMessage()); - } else { - log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", - remoteAddress, topicName, subscriptionName, - consumerId, exception.getCause().getMessage(), exception); - } + }) + .exceptionally(exception -> { + if (exception.getCause() instanceof ConsumerBusyException) { + if (log.isDebugEnabled()) { + log.debug( + "[{}][{}][{}] Failed to create consumer because exclusive consumer" + + " is already connected: {}", + remoteAddress, topicName, subscriptionName, + exception.getCause().getMessage()); + } + } else if (exception.getCause() instanceof BrokerServiceException) { + log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", + remoteAddress, topicName, subscriptionName, + consumerId, exception.getCause().getMessage()); + } else { + log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", + remoteAddress, topicName, subscriptionName, + consumerId, exception.getCause().getMessage(), exception); + } - // If client timed out, the future would have been completed by subsequent close. - // Send error - // back to client, only if not completed already. - if (consumerFuture.completeExceptionally(exception)) { - commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(exception), - exception.getCause().getMessage()); - } - consumers.remove(consumerId, consumerFuture); + // If client timed out, the future would have been completed by subsequent close. + // Send error + // back to client, only if not completed already. + if (consumerFuture.completeExceptionally(exception)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(exception), + exception.getCause().getMessage()); + } + consumers.remove(consumerId, consumerFuture); - return null; + return null; - }); - } else { - String msg = "Client is not authorized to subscribe"; - log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal()); - ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); - } - return null; + }); + } else { + String msg = "Client is not authorized to subscribe"; + log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal()); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); + } + return null; }).exceptionally(ex -> { logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex); commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index b2d1e54dcd74e..ed08dfbd17594 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import org.awaitility.Awaitility; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.matches; @@ -744,7 +745,7 @@ public void testSubscribeMultipleTimes() throws Exception { channel.finish(); } - @Test(timeOut = 5000) + @Test(timeOut = 30000) public void testDuplicateConcurrentSubscribeCommand() throws Exception { resetChannel(); setChannelConnected(); @@ -763,10 +764,12 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { "test" /* consumer name */, 0 /* avoid reseting cursor */); channel.writeInbound(clientCommand); - Object response = getResponse(); - assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); - CommandError error = (CommandError) response; - assertEquals(error.getError(), ServerError.ServiceNotReady); + Awaitility.await().untilAsserted(() -> { + Object response = getResponse(); + assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); + CommandError error = (CommandError) response; + assertEquals(error.getError(), ServerError.ServiceNotReady); + }); channel.finish(); }