Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoP and KoP cannot be used together #132

Closed
fanyouyong0526 opened this issue Sep 7, 2021 · 11 comments
Closed

MoP and KoP cannot be used together #132

fanyouyong0526 opened this issue Sep 7, 2021 · 11 comments
Assignees

Comments

@fanyouyong0526
Copy link

Add
brokerentrymetadatainterceptors = org. Apache. Pulsar. Common. Intercept. Appendindexmetadatainterceptor
to the configuration file
Cannot subscribe to published messages through MQTT

@Technoboy-
Copy link
Contributor

Hi, Could you give more description about your issue?
I have tried to set the above interceptor, it's working in my local.

@fanyouyong0526
Copy link
Author

KoP needs to be added after version 2.80

brokerentrymetadatainterceptors = org. Apache. Pulsar. Common. Intercept. Appendindexmetadatainterceptor

KoP works normally, but when I add MoP, the mqtt client can publish messages normally, but cannot subscribe to messages
ERROR:
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toMqttMessages(PulsarMessageConverter.java:71) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:85) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:41) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:538) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:469) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

@liangyuanpeng
Copy link
Contributor

liangyuanpeng commented Sep 8, 2021

I think you can provide more info for this, some like your topic, qos, etc..

And you are use mqtt consumer and mqtt producer, right?

mqtt producer --> mqtt connsumer

@fanyouyong0526
Copy link
Author

image
When I use the above configuration, remove it

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

MQTT clients can publish and subscribe to messages
image
image
but, Kafka cannot publish and subscribe messages .
image
If I modify the configuration as above add it
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Kafka client can publish and subscribe messages.
but the mqtt client can publish messages normally, cannot subscribe to messages
image
image
View the broker log with the following error message:

java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toMqttMessages(PulsarMessageConverter.java:71) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:85) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:41) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:538) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:469) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

@fanyouyong0526
Copy link
Author

The above is the detailed configuration and information.
I'm using a local build nar.
commit id is f6602d8
@Technoboy-

@fanyouyong0526
Copy link
Author

I tried to update the latest version of the mop code and found that the MQTT client could not subscribe to the messages with or without the addition of brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
to the configuration file

@codelipenghui
Copy link
Contributor

@fanyouyong0526 Which Pulsar version are you using, please make sure the Pulsar version is the same as the mop plugin version.

@codelipenghui
Copy link
Contributor

@Technoboy- Looks we need to add an integration test in the streamnative-tests to make sure MoP can work with KoP together.

@fanyouyong0526
Copy link
Author

pulsar version:2.8.0
mop:A locally built version,commitid is f6602d8

@BewareMyPower
Copy link

BewareMyPower commented Sep 24, 2021

Could you try 2.8.1? It's because Commands#parseMessageMetadata didn't skip the BrokerEntryMetadata in 2.8.0, but apache/pulsar#10968 added the skipBrokerEntryMetadataIfExist call in parseMessageMetadata. After that, parseMessageMetadata should work.

@Technoboy-
Copy link
Contributor

Please upgrade pulsar to 2.8.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants