Skip to content

Commit

Permalink
Fix wrong results of hasMessageAvailable and readNext after seeking b…
Browse files Browse the repository at this point in the history
…y timestamp (#422)

Fixes #420

It's a catch-up for apache/pulsar#22363
  • Loading branch information
BewareMyPower committed Mar 28, 2024
1 parent 763b85c commit 27d8cc0
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 36 deletions.
41 changes: 23 additions & 18 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
*/
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
startMessageId_ = seekMessageId_.get();
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
startMessageId_ = seekMessageId_.get();
}
SeekStatus expected = SeekStatus::COMPLETED;
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
auto seekCallback = seekCallback_.release();
Expand Down Expand Up @@ -1476,7 +1478,7 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
return;
}
const auto requestId = client->newRequestId();
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback);
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
}

void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
Expand All @@ -1495,8 +1497,8 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
return;
}
const auto requestId = client->newRequestId();
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(),
timestamp, callback);
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
callback);
}

bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
Expand All @@ -1509,7 +1511,7 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
(lastDequedMessageId_ == MessageId::earliest()) &&
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
}
if (compareMarkDeletePosition) {
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
Expand All @@ -1518,8 +1520,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
}
auto handleResponse = [self, response, callback] {
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
// We only care about comparing ledger ids and entry ids as mark delete position doesn't
// have other ids such as batch index
// We only care about comparing ledger ids and entry ids as mark delete position
// doesn't have other ids such as batch index
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId());
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
Expand All @@ -1528,7 +1530,8 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
callback(ResultOk, false);
}
};
if (self->config_.isStartMessageIdInclusive()) {
if (self->config_.isStartMessageIdInclusive() &&
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
Expand Down Expand Up @@ -1644,8 +1647,8 @@ bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ ==

uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }

void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId,
long timestamp, ResultCallback callback) {
void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
ResultCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
Expand All @@ -1655,21 +1658,21 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me

auto expected = SeekStatus::NOT_STARTED;
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
<< static_cast<int>(expected));
callback(ResultNotAllowedError);
return;
}

const auto originalSeekMessageId = seekMessageId_.get();
seekMessageId_ = seekId;
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = std::move(callback);
if (timestamp > 0) {
LOG_INFO(getName() << " Seeking subscription to " << timestamp);
if (boost::get<uint64_t>(&seekArg)) {
hasSoughtByTimestamp_.store(true, std::memory_order_release);
} else {
LOG_INFO(getName() << " Seeking subscription to " << seekId);
seekMessageId_ = *boost::get<MessageId>(&seekArg);
}
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = std::move(callback);
LOG_INFO(getName() << " Seeking subscription to " << seekArg);

std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};

Expand All @@ -1692,7 +1695,9 @@ void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const Me
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
startMessageId_ = seekMessageId_.get();
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
startMessageId_ = seekMessageId_.get();
}
seekCallback_.release()(result);
}
} else {
Expand Down
15 changes: 14 additions & 1 deletion lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pulsar/Reader.h>

#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include <functional>
#include <list>
#include <memory>
Expand Down Expand Up @@ -201,7 +202,18 @@ class ConsumerImpl : public ConsumerImplBase {
BrokerGetLastMessageIdCallback callback);

void clearReceiveQueue();
void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
using SeekArg = boost::variant<uint64_t, MessageId>;
friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
auto ptr = boost::get<uint64_t>(&seekArg);
if (ptr) {
os << *ptr;
} else {
os << *boost::get<MessageId>(&seekArg);
}
return os;
}

void seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
ResultCallback callback);
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);

Expand Down Expand Up @@ -250,6 +262,7 @@ class ConsumerImpl : public ConsumerImplBase {
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
std::atomic<bool> hasSoughtByTimestamp_{false};

bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }

Expand Down
84 changes: 67 additions & 17 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,16 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
client.close();
}

TEST(ReaderSeekTest, testSeekForMessageId) {
Client client(serviceUrl);
class ReaderSeekTest : public ::testing::TestWithParam<bool> {
public:
void SetUp() override { client = Client{serviceUrl}; }

void TearDown() override { client.close(); }

Client client{serviceUrl};
};

TEST_F(ReaderSeekTest, testSeekForMessageId) {
const std::string topic = "test-seek-for-message-id-" + std::to_string(time(nullptr));

Producer producer;
Expand Down Expand Up @@ -752,18 +759,24 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
producer.close();
}

class ReaderSeekTest : public ::testing::TestWithParam<bool> {};

TEST(ReaderSeekTest, testStartAtLatestMessageId) {
Client client(serviceUrl);
#define EXPECT_HAS_MESSAGE_AVAILABLE(reader, expected) \
{ \
bool actual; \
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(actual)); \
EXPECT_EQ(actual, (expected)); \
}

TEST_F(ReaderSeekTest, testStartAtLatestMessageId) {
const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));

MessageId id;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id));
for (int i = 0; i < 10; i++) {
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id));
}

Reader readerExclusive;
ASSERT_EQ(ResultOk,
Expand All @@ -774,20 +787,24 @@ TEST(ReaderSeekTest, testStartAtLatestMessageId) {
client.createReader(topic, MessageId::latest(),
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));

EXPECT_HAS_MESSAGE_AVAILABLE(readerExclusive, false);
EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);

Message msg;
bool hasMsgAvaliable = false;
readerInclusive.hasMessageAvailable(hasMsgAvaliable);
ASSERT_TRUE(hasMsgAvaliable);
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
ASSERT_EQ(msg.getDataAsString(), "msg-9");

readerInclusive.seek(0L);
EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
ASSERT_EQ(msg.getDataAsString(), "msg-0");

readerExclusive.close();
readerInclusive.close();
producer.close();
}

TEST(ReaderTest, testSeekInProgress) {
Client client(serviceUrl);
TEST_F(ReaderSeekTest, testSeekInProgress) {
const auto topic = "test-seek-in-progress-" + std::to_string(time(nullptr));
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
Expand All @@ -798,11 +815,9 @@ TEST(ReaderTest, testSeekInProgress) {
Result result;
promise.getFuture().get(result);
ASSERT_EQ(result, ResultNotAllowedError);
client.close();
}

TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
Client client(serviceUrl);
const auto topic = "test-has-message-available-after-seek-to-end-" + std::to_string(time(nullptr));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
Expand All @@ -814,7 +829,6 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {

bool hasMessageAvailable;
if (GetParam()) {
// Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been initialized
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
}

Expand All @@ -834,8 +848,44 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_FALSE(hasMessageAvailable);
}

client.close();
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
using namespace std::chrono;
const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
MessageId sentMsgId;
const auto timestampBeforeSend =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), sentMsgId));

auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
if (GetParam()) {
if (msgId == MessageId::earliest()) {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
} else {
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
}
}
};

std::vector<MessageId> msgIds{MessageId::earliest(), sentMsgId, MessageId::latest()};

for (auto&& msgId : msgIds) {
Reader reader;
createReader(reader, msgId);
ASSERT_EQ(ResultOk,
reader.seek(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()));
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
}
for (auto&& msgId : msgIds) {
Reader reader;
createReader(reader, msgId);
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
}
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
Expand Down

0 comments on commit 27d8cc0

Please sign in to comment.