Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

c++ client Producer with name 'xxx-18-31667' is already connected to topic #13289

Closed
aloyszhang opened this issue Dec 14, 2021 · 8 comments
Closed
Assignees
Labels
help wanted lifecycle/stale Stale type/bug The PR fixed a bug or issue reported a bug

Comments

@aloyszhang
Copy link
Contributor

aloyszhang commented Dec 14, 2021

Describe the bug
C++ producer may blocked sometimes, and from the logs in broker we can see
image

@aloyszhang aloyszhang added the type/bug The PR fixed a bug or issue reported a bug label Dec 14, 2021
@aloyszhang
Copy link
Contributor Author

cc @BewareMyPower

@BewareMyPower
Copy link
Contributor

I have checked the C++ client, it looks like the error is not caused by the issue like apache/pulsar-client-go#639. See

if (!producerName_.empty()) {
userProvidedProducerName_ = true;
}

The userProvidedProducerName_ field is only initialized once when the ProducerImpl is created.

@BewareMyPower
Copy link
Contributor

And I simulated the reconnection by modifying the SDK implementation just now.

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3ad6f4062f4..43a3cdc83ce 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -49,6 +49,8 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
 
 static const int KeepAliveIntervalInSeconds = 30;
 
+static std::atomic_int reconnect_{0};
+
 // Convert error codes from protobuf to client API Result
 static Result getResult(ServerError serverError) {
     switch (serverError) {
@@ -805,6 +807,12 @@ void ClientConnection::handleIncomingCommand() {
             // Handle normal commands
             switch (incomingCmd_.type()) {
                 case BaseCommand::SEND_RECEIPT: {
+                    reconnect_++;
+                    if (reconnect_ == 3) {
+                        LOG_INFO(cnxString_ << " trigger reconnection manually");
+                        close();
+                        return;
+                    }
                     const CommandSendReceipt& sendReceipt = incomingCmd_.send_receipt();
                     int producerId = sendReceipt.producer_id();
                     uint64_t sequenceId = sendReceipt.sequence_id();
@@ -812,8 +820,9 @@ void ClientConnection::handleIncomingCommand() {
                     MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(),
                                                     messageIdData.entryid(), messageIdData.batch_index());
 
-                    LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId
-                                         << " -- msg: " << sequenceId << "-- message id: " << messageId);
+                    LOG_INFO(cnxString_ << "Got receipt for producer: " << producerId
+                                        << " -- msg: " << sequenceId << "-- message id: " << messageId
+                                        << " | reconnect_: " << reconnect_);
 
                     Lock lock(mutex_);
                     ProducersMap::iterator it = producers_.find(producerId);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index f81e205475d..1ee938116d4 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -150,6 +150,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
                                              conf_.getProperties(), conf_.getSchema(), epoch_,
                                              userProvidedProducerName_, conf_.isEncryptionEnabled());
+    LOG_INFO(getName() << " newProducer userProvidedProducerName_: " << userProvidedProducerName_
+                       << ", producerName_: " << producerName_);
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
                                std::placeholders::_1, std::placeholders::_2));
@@ -191,6 +193,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
         producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
+        LOG_INFO(getName() << " handleCreateProducer userProvidedProducerName_: " << userProvidedProducerName_
+                           << ", producerName_: " << producerName_);
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
             lastSequenceIdPublished_ = responseData.lastSequenceId;

i.e. there will be a reconnection for the 3rd time to send a message. And I also added some logs for debugging.

Here are the output:

2021-12-15 16:34:38.488 INFO  [0x7000035ed000] ClientConnection:825 | [[::1]:51361 -> [::1]:6650] Got receipt for producer: 0 -- msg: 0-- message id: (1,0,-1,-1) | reconnect_: 1
2021-12-15 16:34:38.488 INFO  [0x11ae9f600] SampleProducer:45 | Message sent: Ok
2021-12-15 16:34:39.497 INFO  [0x7000035ed000] ClientConnection:825 | [[::1]:51361 -> [::1]:6650] Got receipt for producer: 0 -- msg: 1-- message id: (1,1,-1,-1) | reconnect_: 2
2021-12-15 16:34:39.497 INFO  [0x11ae9f600] SampleProducer:45 | Message sent: Ok
2021-12-15 16:34:40.505 INFO  [0x7000035ed000] ClientConnection:812 | [[::1]:51361 -> [::1]:6650]  trigger reconnection manually
2021-12-15 16:34:40.505 INFO  [0x7000035ed000] ClientConnection:1551 | [[::1]:51361 -> [::1]:6650] Connection closed
2021-12-15 16:34:40.505 INFO  [0x7000035ed000] HandlerBase:142 | [persistent://public/default/my-topic, standalone-0-1] Schedule reconnection in 0.1 s
2021-12-15 16:34:40.505 ERROR [0x7000035ed000] ClientConnection:591 | [[::1]:51361 -> [::1]:6650] Read operation failed: Bad file descriptor
2021-12-15 16:34:40.505 INFO  [0x7000035ed000] ClientConnection:257 | [[::1]:51361 -> [::1]:6650] Destroyed connection
2021-12-15 16:34:40.605 INFO  [0x7000035ed000] HandlerBase:64 | [persistent://public/default/my-topic, standalone-0-1] Getting connection from pool
2021-12-15 16:34:40.605 INFO  [0x7000035ed000] ConnectionPool:86 | Deleting stale connection from pool for pulsar://localhost:6650 use_count: -1 @ 0x0
2021-12-15 16:34:40.605 INFO  [0x7000035ed000] ClientConnection:183 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2021-12-15 16:34:40.606 INFO  [0x7000035ed000] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2021-12-15 16:34:40.607 INFO  [0x7000035ed000] ClientConnection:369 | [[::1]:51368 -> [::1]:6650] Connected to broker
2021-12-15 16:34:40.610 INFO  [0x7000035ed000] ProducerImpl:154 | [persistent://public/default/my-topic, standalone-0-1]  newProducer userProvidedProducerName_: 0, producerName_: standalone-0-1
2021-12-15 16:34:40.611 INFO  [0x7000035ed000] ProducerImpl:190 | [persistent://public/default/my-topic, standalone-0-1] Created producer on broker [[::1]:51368 -> [::1]:6650] 
2021-12-15 16:34:40.611 INFO  [0x7000035ed000] ProducerImpl:197 | [persistent://public/default/my-topic, standalone-0-1]  handleCreateProducer userProvidedProducerName_: 0, producerName_: standalone-0-1
2021-12-15 16:34:40.614 INFO  [0x7000035ed000] ClientConnection:825 | [[::1]:51368 -> [::1]:6650] Got receipt for producer: 0 -- msg: 2-- message id: (1,3,-1,-1) | reconnect_: 4
2021-12-15 16:34:40.614 INFO  [0x11ae9f600] SampleProducer:45 | Message sent: Ok

From following lines:

2021-12-15 16:34:40.610 INFO  [0x7000035ed000] ProducerImpl:154 | [persistent://public/default/my-topic, standalone-0-1]  newProducer userProvidedProducerName_: 0, producerName_: standalone-0-1
2021-12-15 16:34:40.611 INFO  [0x7000035ed000] ProducerImpl:197 | [persistent://public/default/my-topic, standalone-0-1]  handleCreateProducer userProvidedProducerName_: 0, producerName_: standalone-0-1

We can see userProvidedProducerName_ is always false (0). I also checked from the broker side as well.


I think there is something wrong at broker side. When tryOverwriteOldProducer is called, the old Producer is not removed from producers cache. The exception is caused by following validation.

    public boolean isSuccessorTo(Producer other) {
        return Objects.equals(producerName, other.producerName)
                && Objects.equals(topic, other.topic)
                && producerId == other.producerId
                && Objects.equals(cnx, other.cnx) // the connections are not the same
                && other.getEpoch() < epoch; // the epochs are not the same
    }

The code is from latest master, you could also check your own Pulsar source code.

@BewareMyPower
Copy link
Contributor

image

For example, the image above is the result after I modified AbstractTopic#removeProducer like:

    @Override
    public void removeProducer(Producer producer) {
        checkArgument(producer.getTopic() == this);

        //if (producers.remove(producer.getProducerName(), producer)) {
        //    handleProducerRemoved(producer);
        //}
    }

@wenbingshen
Copy link
Member

wenbingshen commented Dec 15, 2021

I think it's the same problem, I have done some investigation in this issue: #13342

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@github-actions
Copy link

The issue had no activity for 30 days, mark with Stale label.

@michaeljmarshall
Copy link
Member

I'm going to close this issue for the same justification provided here #13342 (comment).

It's like that this issue was solved in a subsequent release of Pulsar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted lifecycle/stale Stale type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

5 participants