diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 34d2b6ecb86f5..b7fdb245d2c93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -23,11 +23,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -41,6 +43,7 @@ public abstract class AbstractReplicator { protected final String topicName; protected final String localCluster; protected final String remoteCluster; + protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; protected volatile ProducerImpl producer; @@ -63,18 +66,19 @@ protected enum State { } public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException { + BrokerService brokerService) throws NamingException, PulsarServerException { validatePartitionedTopic(topicName, brokerService); this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; this.localCluster = localCluster.intern(); this.remoteCluster = remoteCluster.intern(); - this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); + this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); + this.client = (PulsarClientImpl) brokerService.pulsar().getClient(); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - this.producerBuilder = client.newProducer() // + this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // .topic(topicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) .enableBatching(false) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index b003db823ed4f..dd57fd91337cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -48,7 +49,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl(); public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException { + BrokerService brokerService) throws NamingException, PulsarServerException { super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); producerBuilder.blockIfQueueFull(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index e3255fe81b683..a9c0f7f97b6c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractTopic; @@ -561,7 +562,7 @@ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic replicators.computeIfAbsent(remoteCluster, r -> { try { return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService); - } catch (NamingException e) { + } catch (NamingException | PulsarServerException e) { isReplicatorStarted.set(false); log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 0f3f18c4492de..aa53e150672ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -41,6 +42,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; @@ -56,6 +58,7 @@ import org.apache.pulsar.common.api.proto.MarkerType; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; @@ -102,7 +105,7 @@ public class PersistentReplicator extends AbstractReplicator private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException { + BrokerService brokerService) throws NamingException, PulsarServerException { super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); this.topic = topic; this.cursor = cursor; @@ -358,7 +361,15 @@ public void readEntriesComplete(List entries, Object ctx) { headersAndPayload.retain(); - producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); + getSchemaInfo(msg).thenAccept(schemaInfo -> { + msg.setSchemaInfoForReplicator(schemaInfo); + producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); + }).exceptionally(ex -> { + log.error("[{}][{} -> {}] Failed to get schema from local cluster", topicName, + localCluster, remoteCluster, ex); + return null; + }); + atLeastOneMessageSentForReplication = true; } } catch (Exception e) { @@ -379,6 +390,14 @@ public void readEntriesComplete(List entries, Object ctx) { } } + private CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { + if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { + return CompletableFuture.completedFuture(null); + } + return client.getSchemaProviderLoadingCache().get(topicName) + .getSchemaByVersion(msg.getSchemaVersion()); + } + public void updateCursorState() { if (this.cursor != null) { if (producer != null && producer.isConnected()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index db7998857c090..c03457ccdd75d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1431,7 +1431,7 @@ protected boolean addReplicationCluster(String remoteCluster, ManagedCursor curs try { return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, brokerService); - } catch (NamingException e) { + } catch (NamingException | PulsarServerException e) { isReplicatorStarted.set(false); log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 9b74331450c9c..5c074fc78a5f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -181,6 +181,7 @@ public void setup() throws Exception { mlFactoryMock = mock(ManagedLedgerFactory.class); doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); + doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient(); ZooKeeper mockZk = createMockZooKeeper(); doReturn(mockZk).when(pulsar).getZkClient(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 071160b09eb2e..527fd2c28ff63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.schema.Schemas; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.slf4j.Logger; @@ -375,6 +376,62 @@ public void testReplication(String namespace) throws Exception { consumer3.receive(1); } + @Test + public void testReplicationWithSchema() throws Exception { + PulsarClient client1 = pulsar1.getClient(); + PulsarClient client2 = pulsar2.getClient(); + PulsarClient client3 = pulsar3.getClient(); + final TopicName topic = TopicName + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/testReplicationWithSchema")); + + final String subName = "my-sub"; + + @Cleanup + Producer producer1 = client1.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .create(); + @Cleanup + Producer producer2 = client2.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .create(); + @Cleanup + Producer producer3 = client3.newProducer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .create(); + + List> producers = Lists.newArrayList(producer1, producer2, producer3); + + @Cleanup + Consumer consumer1 = client1.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Consumer consumer2 = client2.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .subscriptionName(subName) + .subscribe(); + + @Cleanup + Consumer consumer3 = client3.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) + .topic(topic.toString()) + .subscriptionName(subName) + .subscribe(); + + for (int i = 0; i < 3; i++) { + producers.get(i).send(new Schemas.PersonOne(i)); + Message msg1 = consumer1.receive(); + Message msg2 = consumer2.receive(); + Message msg3 = consumer3.receive(); + assertTrue(msg1 != null && msg2 != null && msg3 != null); + assertTrue(msg1.getValue().equals(msg2.getValue()) && msg2.getValue().equals(msg3.getValue())); + consumer1.acknowledge(msg1); + consumer2.acknowledge(msg2); + consumer3.acknowledge(msg3); + } + } + @Test public void testReplicationOverrides() throws Exception { log.info("--- Starting ReplicatorTest::testReplicationOverrides ---"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 1af69159b4820..0297b142a1a3a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -64,6 +64,7 @@ public class MessageImpl implements Message { private ByteBuf payload; private Schema schema; + private SchemaInfo schemaInfoForReplicator; private SchemaState schemaState = SchemaState.None; private Optional encryptionCtx = Optional.empty(); @@ -418,7 +419,10 @@ private void ensureSchemaIsLoaded() { } } - private SchemaInfo getSchemaInfo() { + public SchemaInfo getSchemaInfo() { + if (schema == null) { + return null; + } ensureSchemaIsLoaded(); if (schema instanceof AutoConsumeSchema) { return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion()); @@ -426,6 +430,18 @@ private SchemaInfo getSchemaInfo() { return schema.getSchemaInfo(); } + public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) { + if (msgMetadata.hasReplicatedFrom()) { + this.schemaInfoForReplicator = schemaInfo; + } else { + throw new IllegalArgumentException("Only allowed to set schemaInfoForReplicator for a replicated message."); + } + } + + public SchemaInfo getSchemaInfoForReplicator() { + return msgMetadata.hasReplicatedFrom() ? this.schemaInfoForReplicator : null; + } + @Override public T getValue() { SchemaInfo schemaInfo = getSchemaInfo(); @@ -671,6 +687,10 @@ public List getReplicateTo() { return msgMetadata.getReplicateTosList(); } + public boolean hasReplicateFrom() { + return msgMetadata.hasReplicatedFrom(); + } + void setMessageId(MessageIdImpl messageId) { this.messageId = messageId; } @@ -690,6 +710,9 @@ int getUncompressedSize() { } SchemaState getSchemaState() { + if (getSchemaInfo() == null) { + return SchemaState.Ready; + } return schemaState; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 7d4e8a7a99a47..9513caa89befb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -610,8 +610,8 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call if (!changeToRegisteringSchemaState()) { return; } - SchemaInfo schemaInfo = Optional.ofNullable(msg.getSchemaInternal()) - .map(Schema::getSchemaInfo) + SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo(); + schemaInfo = Optional.ofNullable(schemaInfo) .filter(si -> si.getType().getValue() > 0) .orElse(Schema.BYTES.getSchemaInfo()); getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 00c90fb83e212..894f67de852e6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -988,7 +988,7 @@ private SchemaInfoProvider newSchemaProvider(String topicName) { return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this); } - protected LoadingCache getSchemaProviderLoadingCache() { + public LoadingCache getSchemaProviderLoadingCache() { return schemaProviderLoadingCache; }