Skip to content

Commit

Permalink
[#22273] CDCSDK: Fix the segmentation fault in walsender for dynamic …
Browse files Browse the repository at this point in the history
…table addition

Summary:
When the walsender starts, in the `ReplicationSlotAcquire()` function, we fetch and store the replica identities of all the tables in the publication at that time in a hash table. These stored replica identities are used for decoding of records. However, the replica identities of the tables added dynamically to the publication while streaming is in progress are not added to this hash table. As a result a segmentation fault was observed while decoding the records belonging to these tables.

This diff fixes this issue by refreshing the stored replica identities while handling the dynamic table addition / removal. This way we update the stored replica identities before decoding the records belonging to dynamically added tables.

In addition to this, this diff also introduces changes to avoid a possible race condition when a dynamic table is created. This race condition could occur when the walsender tries to process the publication refresh while the master background task is yet to add the dynamically created table to the stream metadata, causing the walsender to crash. In order to handle this, when the `UpdatePublicationTableList` rpc is called, we get the stream metadata and verify that all the table_ids sent by the walsender are present in stream metadata. In walsender, the replica identities will be refreshed only after we return from `UpdatePublicationTableList`. This will ensure that the background task has done its job of adding replica identity and table id to stream metadata before we fetch this information .
Jira: DB-11189

Test Plan:
Jenkins: test regex: .*CDCSDKConsumptionConsistentChangesTest.*|.*ReplicationSlot.*
./yb_build.sh --java-test 'org.yb.pgsql.TestPgReplicationSlot#testDynamicTableAdditionForAllTablesPublication'
./yb_build.sh --cxx-test integration-tests_cdcsdk_consumption_consistent_changes-test --gtest_filter CDCSDKConsumptionConsistentChangesTest.TestDynamicTablesAdditionForTableCreatedAfterStream

Reviewers: asrinivasan, skumar, stiwary

Reviewed By: stiwary

Subscribers: yql, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D34788
  • Loading branch information
Sumukh-Phalgaonkar committed May 14, 2024
1 parent 0dc50d0 commit d572645
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,6 @@ public void testDynamicTableAdditionForAllTablesPublication() throws Exception {
PGReplicationConnection replConnection = conn.unwrap(PGConnection.class).getReplicationAPI();

createStreamAndWaitForSnapshotTimeToPass(replConnection, slotName, "pgoutput");
try (Statement stmt = connection.createStatement()) {
stmt.execute("BEGIN");
stmt.execute("INSERT INTO t1 VALUES(1, 'abcd')");
stmt.execute("INSERT INTO t2 VALUES(2, 'defg')");
stmt.execute("COMMIT");
stmt.execute("CREATE TABLE t3 (a int primary key, b text)");
}

PGReplicationStream stream = replConnection.replicationStream()
.logical()
Expand All @@ -553,6 +546,14 @@ public void testDynamicTableAdditionForAllTablesPublication() throws Exception {
.withSlotOption("publication_names", "pub")
.start();

try (Statement stmt = connection.createStatement()) {
stmt.execute("BEGIN");
stmt.execute("INSERT INTO t1 VALUES(1, 'abcd')");
stmt.execute("INSERT INTO t2 VALUES(2, 'defg')");
stmt.execute("COMMIT");
stmt.execute("CREATE TABLE t3 (a int primary key, b text)");
}

Thread.sleep(kPublicationRefreshIntervalSec * 2 * 1000);
try (Statement stmt = connection.createStatement()) {
stmt.execute("BEGIN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ static XLogRecPtr CalculateRestartLSN(XLogRecPtr confirmed_flush);
static void CleanupAckedTransactions(XLogRecPtr confirmed_flush);

static Oid *YBCGetTableOids(List *tables);
static void YBCRefreshReplicaIdentities();

void
YBCInitVirtualWal(List *yb_publication_names)
Expand Down Expand Up @@ -253,6 +254,9 @@ YBCReadRecord(XLogReaderState *state, XLogRecPtr RecPtr,
list_free(tables);
AbortCurrentTransaction();

// Refresh the replica identities.
YBCRefreshReplicaIdentities();

needs_publication_table_list_refresh = false;
}

Expand Down Expand Up @@ -545,3 +549,27 @@ YBCGetTableOids(List *tables)

return table_oids;
}

static void
YBCRefreshReplicaIdentities()
{
YBCReplicationSlotDescriptor *yb_replication_slot;
int replica_identity_idx = 0;

YBCGetReplicationSlot(MyReplicationSlot->data.name.data, &yb_replication_slot);

for (replica_identity_idx = 0;
replica_identity_idx <
yb_replication_slot->replica_identities_count;
replica_identity_idx++)
{
YBCPgReplicaIdentityDescriptor *desc =
&yb_replication_slot->replica_identities[replica_identity_idx];

YBCPgReplicaIdentityDescriptor *value =
hash_search(MyReplicationSlot->data.yb_replica_identities,
&desc->table_oid, HASH_ENTER, NULL);
value->table_oid = desc->table_oid;
value->identity_type = desc->identity_type;
}
}
5 changes: 0 additions & 5 deletions src/postgres/src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,6 @@ ReplicationSlotAcquire(const char *name, bool nowait)
ctl.entrysize = sizeof(YBCPgReplicaIdentityDescriptor);
ctl.hcxt = GetCurrentMemoryContext();

/*
* TODO(#21028): This HTAB must be refreshed in case of dynamic table
* additions so that it also includes the replica identity of the newly
* added columns. It is not necessary to handle drop of a table though.
*/
replica_identities = hash_create("yb_repl_slot_replica_identities",
32, /* start small and extend */
&ctl, HASH_ELEM | HASH_BLOBS);
Expand Down
43 changes: 43 additions & 0 deletions src/yb/cdc/cdcsdk_virtual_wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "yb/cdc/cdc_state_table.h"

#include "yb/cdc/cdcsdk_virtual_wal.h"
#include "yb/cdc/xrepl_stream_metadata.h"
#include "yb/util/backoff_waiter.h"

DEFINE_RUNTIME_uint32(
cdcsdk_max_consistent_records, 500,
Expand Down Expand Up @@ -1055,6 +1057,11 @@ Status CDCSDKVirtualWAL::UpdatePublicationTableListInternal(
}

if (!tables_to_be_added.empty()) {
// Wait and validate that all the tables to be added to the streaming list have been added to
// the stream. This is required for the dynamically created tables, as their addition to the
// stream is done by the master background thread. Calling GetTabletListAndCheckpoint on a table
// that is yet to be added to the stream will cause it to fail.
RETURN_NOT_OK(ValidateTablesToBeAddedPresentInStream(tables_to_be_added, deadline));
for (auto table_id : tables_to_be_added) {
// Initialize the tablet_queues_, tablet_id_to_table_id_map_, and tablet_next_req_map_
auto s = GetTabletListAndCheckpoint(table_id, hostport, deadline);
Expand Down Expand Up @@ -1134,5 +1141,41 @@ std::string CDCSDKVirtualWAL::GetPubRefreshTimesString() {
return oss.str();
}

Status CDCSDKVirtualWAL::ValidateTablesToBeAddedPresentInStream(
const std::unordered_set<TableId>& tables_to_be_added, const CoarseTimePoint deadline) {
CoarseTimePoint now = CoarseMonoClock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
deadline - now - std::chrono::milliseconds(1));
MonoDelta timeout = MonoDelta::FromNanoseconds(duration.count());

RETURN_NOT_OK(WaitFor(
[&]() -> Result<bool> {
auto stream_metadata_result =
cdc_service_->GetStream(stream_id_, RefreshStreamMapOption::kAlways);
if (!stream_metadata_result.ok()) {
LOG_WITH_PREFIX(WARNING) << "Unable to get stream metadata for stream id: " << stream_id_
<< " " << ResultToStatus(stream_metadata_result);
return false;
}
const auto& stream_metadata = **stream_metadata_result;

std::unordered_set<TableId> tables_in_stream;
for (const auto& table_id : stream_metadata.GetTableIds()) {
tables_in_stream.insert(table_id);
}

bool all_tables_present_in_stream = true;

for (const auto& table_id : tables_to_be_added) {
all_tables_present_in_stream &= tables_in_stream.contains(table_id);
}

return all_tables_present_in_stream;
},
timeout, "Timed out waiting for table to get added to the stream"));

return Status::OK();
}

} // namespace cdc
} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/cdc/cdcsdk_virtual_wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class CDCSDKVirtualWAL {

std::vector<TabletId> GetTabletsForTable(const TableId& table_id);

Status ValidateTablesToBeAddedPresentInStream(
const std::unordered_set<TableId>& tables_to_be_added, const CoarseTimePoint deadline);

std::string LogPrefix() const;

CDCServiceImpl* cdc_service_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2325,7 +2325,7 @@ TEST_F(

TEST_F(
CDCSDKConsumptionConsistentChangesTest, TestDynamicTablesAdditionForTableCreatedAfterStream) {
auto publication_refresh_interval = MonoDelta::FromSeconds(1);
auto publication_refresh_interval = MonoDelta::FromSeconds(10);
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_stream_records_threshold_size_bytes) = 1_KB;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_publication_list_refresh_interval_secs) =
Expand All @@ -2349,14 +2349,6 @@ TEST_F(
conn.Execute("CREATE TABLE test2 (id int primary key, value_1 int) SPLIT INTO 1 TABLETS"));
auto table_2 = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, "test2"));

ASSERT_OK(WaitFor(
[&]()-> Result<bool> {
auto tables_in_stream = VERIFY_RESULT(GetCDCStreamTableIds(stream_id));
return tables_in_stream.size() == 2;
},
MonoDelta::FromSeconds(60),
"Timed out waiting for the table to get added to the stream"));

ASSERT_OK(InitVirtualWAL(stream_id, {table_1.table_id()}));

vector<CDCSDKProtoRecordPB> records;
Expand Down Expand Up @@ -2398,7 +2390,7 @@ TEST_F(
records.insert(
records.end(), change_resp.cdc_sdk_proto_records().begin(),
change_resp.cdc_sdk_proto_records().end());
return records.size() == 5;
return records.size() >= 4;
},
MonoDelta::FromSeconds(60),
"Timed out waiting to receive the records of second transaction"));
Expand All @@ -2414,6 +2406,12 @@ TEST_F(
ASSERT_TRUE(has_records_from_test1 && has_records_from_test2);

for (int i = 0; i < 8; i++) {
// This is because in TSAN builds, the records of txn 2 are received by the VWAL in two
// GetChanges calls instead of one. As a result VWAL does not ship a DDL record for txn 2 in
// TSAN builds.
if (IsTsan() && i == 0) {
ASSERT_EQ(0, count[i]);
}
ASSERT_EQ(expected_count[i], count[i]);
}
}
Expand Down

0 comments on commit d572645

Please sign in to comment.