Skip to content

Commit

Permalink
Add getTxnID method in Transaction.java (#11438)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1977a84)
  • Loading branch information
liangyepianzhou authored and codelipenghui committed Jul 30, 2021
1 parent 5397edc commit 38d3389
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 10 deletions.
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.transaction;

import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.collect.Sets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,12 +34,18 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand All @@ -53,23 +61,26 @@ public class TransactionTest extends TransactionTestBase {

private static final String TENANT = "tnx";
private static final String NAMESPACE1 = TENANT + "/ns1";
private static final int NUM_BROKERS = 1;
private static final int NUM_PARTITIONS = 1;

@BeforeMethod
protected void setup() throws Exception {
this.setBrokerCount(1);
this.setBrokerCount(NUM_BROKERS);
this.internalSetup();

String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
.serviceUrl("http://localhost:" + webServicePort).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), NUM_PARTITIONS);
pulsarClient.close();
pulsarClient = PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
Expand Down Expand Up @@ -139,4 +150,21 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
.enableBatchIndexAcknowledgment(true)
.subscribe();
}

@Test
public void testGetTxnID() throws Exception {
// wait tc init success to ready state
Assert.assertTrue(waitForCoordinatorToBeAvailable(NUM_BROKERS, NUM_PARTITIONS));
Transaction transaction = pulsarClient.newTransaction()
.build().get();
TxnID txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 0);
Assert.assertEquals(txnID.getMostSigBits(), 0);
transaction.abort();
transaction = pulsarClient.newTransaction()
.build().get();
txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 1);
Assert.assertEquals(txnID.getMostSigBits(), 0);
}
}
Expand Up @@ -21,19 +21,17 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.netty.channel.EventLoopGroup;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -49,17 +47,22 @@
import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;

@Slf4j
public abstract class TransactionTestBase extends TestRetrySupport {
Expand Down Expand Up @@ -291,5 +294,16 @@ protected final void internalCleanup() {
log.warn("Failed to clean up mocked pulsar service:", e);
}
}

public boolean waitForCoordinatorToBeAvailable(int numOfBroker, int numOfTCPerBroker){
// wait tc init success to ready state
Awaitility.await().untilAsserted(() -> {
TransactionMetadataStore transactionMetadataStore =
getPulsarServiceList().get(numOfBroker - 1).getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(numOfTCPerBroker - 1));
assertNotNull(transactionMetadataStore);
assertEquals(((MLTransactionMetadataStore) transactionMetadataStore).getState(),
TransactionMetadataStoreState.State.Ready);
});
return true;
}
}
Expand Up @@ -43,4 +43,9 @@ public interface Transaction {
*/
CompletableFuture<Void> abort();

/**
* Get TxnID of the transaction.
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();
}
Expand Up @@ -208,6 +208,11 @@ public CompletableFuture<Void> abort() {
});
}

@Override
public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

private CompletableFuture<Void> checkIfOpen() {
if (state == State.OPEN) {
return CompletableFuture.completedFuture(null);
Expand Down

0 comments on commit 38d3389

Please sign in to comment.