Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop OffsetStore when stopping the connector #12457

Merged
merged 2 commits into from Oct 24, 2021

Conversation

dlg99
Copy link
Contributor

@dlg99 dlg99 commented Oct 22, 2021

Motivation

Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

Modifications

Closing offset store on connector stop.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

NO

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • no-need-doc

@eolivelli eolivelli added the doc-not-needed Your PR changes do not impact docs label Oct 22, 2021
@eolivelli
Copy link
Contributor

@dlg99:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@eolivelli
Copy link
Contributor

@dlg99:Thanks for providing doc info!

if (null != producer) {
try {
producer.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the flush and close in the same try-catch block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 in that case exception from flush prevent close(). We do want to close the producer.

@hangc0276 hangc0276 added area/connector release/2.8.2 release/2.9.1 type/bug The PR fixed a bug or issue reported a bug labels Oct 22, 2021
@hangc0276 hangc0276 added this to the 2.10.0 milestone Oct 22, 2021
@eolivelli
Copy link
Contributor

it looks like a test is failing due to this patch


Warning:  Tests run: 4, Failures: 0, Errors: 0, Skipped: 1, Time elapsed: 59.254 s - in org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStoreTest
Error:  Tests run: 23, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 135.979 s <<< FAILURE! - in org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest
Error:  subscriptionTypeTest(org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest)  Time elapsed: 2.103 s  <<< FAILURE!
java.lang.RuntimeException: Failed to setup pulsar producer/reader to cluster 
	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:155)
	at org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkTaskContext.<init>(PulsarKafkaSinkTaskContext.java:73)
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSink.open(KafkaConnectSink.java:162)
	at org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest.subscriptionTypeTest(KafkaConnectSinkTest.java:209)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: java.lang.NullPointerException
		at org.apache.pulsar.io.kafka.connect.KafkaConnectSink.close(KafkaConnectSink.java:127)
		at org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest.subscriptionTypeTest(KafkaConnectSinkTest.java:207)
		... 12 more
Caused by: org.apache.pulsar.client.api.PulsarClientException$AlreadyClosedException: Client already closed : state = Closed
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1022)
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:91)
	at org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore.start(PulsarOffsetBackingStore.java:141)
	... 15 more

@dlg99
Copy link
Contributor Author

dlg99 commented Oct 22, 2021

@eolivelli fixed. I did the patch on 2.7 branch originally, in master the client is passed through the context and should not be closed by the connector.

@codelipenghui codelipenghui merged commit 63454e9 into apache:master Oct 24, 2021
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Oct 25, 2021
### Motivation

Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
apache#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

### Modifications

Closing offset store on connector stop.

(cherry picked from commit 63454e9)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Oct 25, 2021
Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
apache#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

Closing offset store on connector stop.

(cherry picked from commit 63454e9)
zeo1995 pushed a commit to zeo1995/pulsar that referenced this pull request Oct 25, 2021
* up/master: (46 commits)
  [website][upgrade]feat: docs migration - version-2.7.2 Pulsar Schema (apache#12393)
  [docs] io-develop, fix broken link (apache#12414)
  docs(function): fix incorrect classname in python runtime sample (apache#12476)
  Remove redundant null check for getInternalListener (apache#12474)
  Fix the retry topic's `REAL_TOPIC` & `ORIGIN_MESSAGE_ID` property should not be modified once it has been written. (apache#12451)
  [cli] Fix output format of string by pulsar-admin command (apache#11878)
  fix the race of delete subscription and delete topic (apache#12240)
  fix influxdb yaml doc (apache#12460)
  [Modernizer] Add Maven Modernizer plugin in pulsar-proxy module (apache#12326)
  fix DefaultCryptoKeyReaderTest can not run on windows (apache#12475)
  apache#12429 only fixed the compactor skips data issue, but the normal reader/consumer (apache#12464)
  broker resource group test optimize fail msg (apache#12438)
  Stop OffsetStore when stopping the connector (apache#12457)
  fix a typo in UnAckedMessageTracker (apache#12467)
  docs(function): fix typo in pip install (apache#12468)
  Optimize the code: remove extra spaces (apache#12470)
  optimize SecurityUtility code flow (apache#12431)
  Update lombok to 1.18.22 (apache#12466)
  Update team.js to add David K. as a committer (apache#12440)
  Fix java demo error in reset cursor admin (apache#12454)
  ...

# Conflicts:
#	site2/website-next/versioned_docs/version-2.7.2/schema-evolution-compatibility.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-get-started.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-manage.md
#	site2/website-next/versioned_docs/version-2.7.2/schema-understand.md
#	site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
hangc0276 pushed a commit that referenced this pull request Nov 4, 2021
Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

Closing offset store on connector stop.

(cherry picked from commit 63454e9)
@hangc0276 hangc0276 added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Nov 4, 2021
eolivelli pushed a commit that referenced this pull request Nov 9, 2021
### Motivation

Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

### Modifications

Closing offset store on connector stop.

(cherry picked from commit 63454e9)
@eolivelli eolivelli modified the milestones: 2.10.0, 2.9.0 Nov 9, 2021
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request Nov 29, 2021
### Motivation

Source connectors based on KCA (all debezium ones) don't stop properly on error / don't restart.
apache#12441 fixes one problem, this PR fixes another: ofsetStore is not closed on connector stop() and producer/consumer aren't closed too, preventing the connector from shutting down.

### Modifications

Closing offset store on connector stop.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connector cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/2.8.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants