Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix:PulsarKafkaProducer is not thread safe #4745

Merged
merged 9 commits into from Jul 24, 2019
Merged

Conversation

fxbing
Copy link
Contributor

@fxbing fxbing commented Jul 17, 2019

fix #4707
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java#L229
In this line, the update operation of cluster is not atomic. The new value of cluster is based on th old.
But in multithreading,multiple threads may be dependent on the same old value for updating.
So in https://github.com/apache/pulsar/blob/v2.3.2/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java#L270, thread which is executed at last can run normally, others can't.

And I have added a test for it.

String topic1 = "persistent://public/default/topic-" + System.currentTimeMillis();
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic1, "Hello");
producer.send(record1, (recordMetadata, e) -> {
throw new Error();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fxbing can you explain how does the test work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie OK, I have added it.

@sijie sijie added area/client component/kafka type/bug The PR fixed a bug or issue reported a bug labels Jul 17, 2019
@sijie sijie added this to the 2.4.1 milestone Jul 17, 2019
@sijie
Copy link
Member

sijie commented Jul 17, 2019

@fxbing

I think the Kafka test failed. Can you take a look?

	Test Result (10 failures / +10)
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe
org.apache.kafka.clients.producer.PulsarKafkaProducerThreadSafeTest.testPulsarKafkaProducerThreadSafe

@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

@sijie OK, I have updated it.

@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

run java8 tests

3 similar comments
@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 18, 2019

run integration tests

String topic1 = "persistent://public/default/topic-" + System.currentTimeMillis();
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic1, "Hello");
producer.send(record1, (recordMetadata, e) -> {
throw new Error();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why do you throw a new Error here? Can you explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie I understand the wrong usage of this callback funcition. In fact, threadSafe will not affect callback.
May I change it to producer.send(record1);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If PulsarKafkaProducer is thread safe, test success.
If not, producer.send(record1); will throw exception and test failed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you can change to use producer.send()

@sijie
Copy link
Member

sijie commented Jul 21, 2019

@fxbing can you rebase to latest master?

@jiazhai
Copy link
Member

jiazhai commented Jul 21, 2019

run cpp tests
run integration tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 21, 2019

run integration tests
run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run integration tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run cpp tests

2 similar comments
@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run cpp tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run cpp tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run cpp tests
run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run integration tests

1 similar comment
@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run integration tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 22, 2019

run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 23, 2019

run integration tests
run java8 tests

@fxbing
Copy link
Contributor Author

fxbing commented Jul 23, 2019

run java8 tests

1 similar comment
@fxbing
Copy link
Contributor Author

fxbing commented Jul 23, 2019

run java8 tests

@sijie sijie merged commit 0362944 into apache:master Jul 24, 2019
easyfan pushed a commit to easyfan/pulsar that referenced this pull request Jul 26, 2019
jiazhai pushed a commit that referenced this pull request Aug 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PulsarKafkaProducer is not thread safe
3 participants