Skip to content

Commit

Permalink
[#22260] YSQL: pg_cron: Set one node as cron leader and add test
Browse files Browse the repository at this point in the history
Summary:
Use shared memory `is_cron_leader` field to indicate which node will be the cron leader. Only the leader will schedule and run cron jobs. This ensures that cron jobs are run at most once. Distributed job scheduling and execution will happen as part of #22336.

If we had two or more leaders in the same minute we would not run the task multiple times since cron does not run tasks for the first minute of leadership. Interval jobs do not have this guarantee since their timer starts when we become the leader. Non Yugabyte pg_cron has the same behavior when pg restarts. Check comments in `YbCheckLeadership` and `StartAllPendingRuns` for more details.

For testing purposes we set `is_cron_leader` via `FLAG_TEST_is_ysql_cron_leader`. `TabletServer` uses a flag callback to set the shared memory to the value of the flag.
#22360 will add a StatefulService that will ensure one and only one tserver is always picked as the pg_cron leader.

- Moved `YBCPgResetCatalogReadTime` inside `RefreshTaskHash` as we do not need to do this every time
- Added `YbUpdateCatalogCacheVersion` since without it we hit `The catalog snapshot used for this transaction has been invalidated`.
- Replaced `YBCDeleteSysCatalogTuple` with `CatalogTupleDelete` so that indexes entries are also deleted.

Fixes #22260
Jira: DB-11179

Test Plan:
PgCronTest.AtMostOnceTest
PgCronTest.PerMinuteTask
yb_pg_cron-test

Reviewers: tnayak, fizaa

Reviewed By: tnayak

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34926
  • Loading branch information
hari90 committed May 15, 2024
1 parent 0025583 commit 92e3b75
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;
import org.yb.client.TestUtils;
import org.yb.util.YBTestRunnerNonTsanOnly;

@RunWith(value = YBTestRunner.class)
// Disable in TSAN since it times out on pg_cron exist #22295.
@RunWith(value = YBTestRunnerNonTsanOnly.class)
public class TestPgRegressThirdPartyExtensionsPgCron extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
Expand Down
6 changes: 3 additions & 3 deletions src/postgres/third-party-extensions/pg_cron/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ DATA_built = $(EXTENSION)--1.0.sql
DATA = $(wildcard $(EXTENSION)--*--*.sql)

REGRESS_OPTS =--temp-config=./pg_cron.conf --temp-instance=./tmp_check
REGRESS = pg_cron-test
REGRESS = pg_cron-test

# compilation configuration
MODULE_big = $(EXTENSION)
OBJS = $(patsubst %.c,%.o,$(wildcard src/*.c))
ifeq ($(CC),gcc)
PG_CPPFLAGS = -std=c99 -Wall -Wextra -Werror -Wno-unused-parameter -Wno-uninitialized -Wno-implicit-fallthrough -Iinclude -I$(libpq_srcdir)
PG_CPPFLAGS = -std=c99 -Wall -Wextra -Werror -Wno-unused-parameter -Wno-uninitialized -Wno-implicit-fallthrough -Wno-unused-function -Iinclude -I$(libpq_srcdir)
else
PG_CPPFLAGS = -std=c99 -Wall -Wextra -Werror -Wno-unused-parameter -Wno-implicit-fallthrough -Iinclude -I$(libpq_srcdir)
PG_CPPFLAGS = -std=c99 -Wall -Wextra -Werror -Wno-unused-parameter -Wno-implicit-fallthrough -Wno-unused-function -Iinclude -I$(libpq_srcdir)
endif
SHLIB_LINK = $(libpq) -L$(YB_BUILD_ROOT)/lib -lyb_pggate $(filter -lintl,$(LIBS))
EXTRA_CLEAN += $(addprefix src/,*.gcno *.gcda) # clean up after profiling runs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE EXTENSION pg_cron;
-- Recreate job with same name
SELECT cron.schedule('myjob', '0 11 * * *', 'SELECT 1');
schedule
----------
1
(1 row)

SELECT cron.unschedule('myjob');
unschedule
------------
t
(1 row)

SELECT cron.schedule('myjob', '0 11 * * *', 'SELECT 1');
schedule
----------
2
(1 row)
22 changes: 11 additions & 11 deletions src/postgres/third-party-extensions/pg_cron/sql/pg_cron-test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ SELECT cron.unschedule(1);
SELECT cron.schedule(repeat('a', 1000), '');

-- Invalid input: missing parts
SELECT cron.schedule('* * * *', 'SELECT 1');
SELECT cron.schedule('* * * *', 'SELECT 1');

-- Invalid input: trailing characters
SELECT cron.schedule('5 secondc', 'SELECT 1');
SELECT cron.schedule('50 seconds c', 'SELECT 1');
SELECT cron.schedule('5 secondc', 'SELECT 1');
SELECT cron.schedule('50 seconds c', 'SELECT 1');

-- Invalid input: seconds out of range
SELECT cron.schedule('-1 seconds', 'SELECT 1');
SELECT cron.schedule('0 seconds', 'SELECT 1');
SELECT cron.schedule('60 seconds', 'SELECT 1');
SELECT cron.schedule('10000000000 seconds', 'SELECT 1');
SELECT cron.schedule('-1 seconds', 'SELECT 1');
SELECT cron.schedule('0 seconds', 'SELECT 1');
SELECT cron.schedule('60 seconds', 'SELECT 1');
SELECT cron.schedule('10000000000 seconds', 'SELECT 1');

-- Try to update pg_cron on restart
SELECT cron.schedule('@restar', 'ALTER EXTENSION pg_cron UPDATE');
Expand Down Expand Up @@ -136,10 +136,10 @@ CREATE EXTENSION pg_cron;
select * from public.test;

-- valid interval jobs
SELECT cron.schedule('1 second', 'SELECT 1');
SELECT cron.schedule(' 30 sEcOnDs ', 'SELECT 1');
SELECT cron.schedule('59 seconds', 'SELECT 1');
SELECT cron.schedule('17 seconds ', 'SELECT 1');
SELECT cron.schedule('1 second', 'SELECT 1');
SELECT cron.schedule(' 30 sEcOnDs ', 'SELECT 1');
SELECT cron.schedule('59 seconds', 'SELECT 1');
SELECT cron.schedule('17 seconds ', 'SELECT 1');
SELECT jobid, jobname, schedule, command FROM cron.job ORDER BY jobid;

-- valid last of day job
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE EXTENSION pg_cron;

-- Recreate job with same name
SELECT cron.schedule('myjob', '0 11 * * *', 'SELECT 1');
SELECT cron.unschedule('myjob');
SELECT cron.schedule('myjob', '0 11 * * *', 'SELECT 1');
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ cron_unschedule(PG_FUNCTION_ARGS)
EnsureDeletePermission(cronJobsTable, heapTuple);

if (IsYugaByteEnabled())
YBCDeleteSysCatalogTuple(cronJobsTable, heapTuple);
CatalogTupleDelete(cronJobsTable, heapTuple);
else
simple_heap_delete(cronJobsTable, &heapTuple->t_self);

Expand Down Expand Up @@ -739,7 +739,7 @@ cron_unschedule_named(PG_FUNCTION_ARGS)
EnsureDeletePermission(cronJobsTable, heapTuple);

if (IsYugaByteEnabled())
YBCDeleteSysCatalogTuple(cronJobsTable, heapTuple);
CatalogTupleDelete(cronJobsTable, heapTuple);
else
simple_heap_delete(cronJobsTable, &heapTuple->t_self);

Expand Down
116 changes: 103 additions & 13 deletions src/postgres/third-party-extensions/pg_cron/src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
static char* pg_cron_cmdTuples(char *msg);
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);
static long YbSecondsPassed(TimestampTz startTime, TimestampTz stopTime);
static void YbCheckLeadership(List *taskList, TimestampTz currentTime);

/* global settings */
char *CronTableDatabaseName = "yugabyte";
Expand Down Expand Up @@ -190,6 +191,14 @@ static const struct config_enum_entry cron_message_level_options[] = {
{NULL, 0, false}
};

/*
* In Yugabyte since postgres is running on several nodes we pick a single
* node to act as the cron leader. Only this node will run cron jobs.
* Once distributed scheduling(#22336) is implemented the leader will schedule
* the job and other nodes will execute jobs that have been scheduled on them.
*/
bool ybIsLeader = false;

static const char *cron_error_severity(int elevel);

/*
Expand Down Expand Up @@ -577,7 +586,9 @@ PgCronLauncherMain(Datum arg)
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, pg_cron_sighup);
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, pg_cron_sigterm);
/* YB Note: Exit immediately. */
pqsignal(SIGTERM, die);
pqsignal(SIGQUIT, quickdie);

/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
Expand All @@ -595,8 +606,10 @@ PgCronLauncherMain(Datum arg)
/*
* Mark anything that was in progress before the database restarted as
* failed.
* YB Note: The cron leader will mark pending runs as failed.
*/
MarkPendingRunsAsFailed();
if (!IsYugaByteEnabled())
MarkPendingRunsAsFailed();

/* Determine how many tasks we can run concurrently */
if (MaxConnections < MaxRunningTasks)
Expand Down Expand Up @@ -646,9 +659,6 @@ PgCronLauncherMain(Datum arg)

while (!got_sigterm)
{
/* YB Note: The latest entries in the catalog must be read during every run */
if (IsYugaByteEnabled())
YBCPgResetCatalogReadTime();

List *taskList = NIL;
TimestampTz currentTime = 0;
Expand All @@ -664,21 +674,25 @@ PgCronLauncherMain(Datum arg)
CronReloadConfig = false;
}

currentTime = GetCurrentTimestamp();
YbCheckLeadership(taskList, currentTime);

/*
* Both CronReloadConfig and CronJobCacheValid are triggered by SIGHUP.
* ProcessConfigFile should come first, because RefreshTaskHash depends
* on settings that might have changed.
*
* In Yugabyte mode jobs scheduled from a different nodes cannot invalidate
* the cache on the cron leader. So in addition to the regular invalidations
* we RefreshTaskHash every YbJobListRefreshSeconds.
* NOTE: It can take up to YbJobListRefreshSeconds for change to the jobs
* to take effect.
* YB Note:
* Jobs scheduled from a different nodes cannot
* invalidate the cache on the cron leader. So in addition to the
* regular invalidations we RefreshTaskHash every
* YbJobListRefreshSeconds. NOTE: It can take up to
* YbJobListRefreshSeconds for change to the jobs to take effect.
*/
currentTime = GetCurrentTimestamp();
if (!CronJobCacheValid ||
(IsYugaByteEnabled() &&
YbSecondsPassed(ybLastRefreshTime, currentTime) >= YbJobListRefreshSeconds))
(IsYugaByteEnabled() && ybIsLeader &&
YbSecondsPassed(ybLastRefreshTime, currentTime) >=
YbJobListRefreshSeconds))
{
ybLastRefreshTime = currentTime;
RefreshTaskHash();
Expand All @@ -689,6 +703,9 @@ PgCronLauncherMain(Datum arg)

StartAllPendingRuns(taskList, currentTime);

/* YB Note: Check again since we could have lost leadership. */
YbCheckLeadership(taskList, currentTime);

WaitForCronTasks(taskList);
ManageCronTasks(taskList, currentTime);

Expand All @@ -712,6 +729,23 @@ StartAllPendingRuns(List *taskList, TimestampTz currentTime)
{
static TimestampTz lastMinute = 0;

/*
* YB Note: Only start jobs if we are the leader.
* Reset lastMinute because otherwise if we go from leader to follower and
* back to leader we would start all the runs while we were the follower.
* If we had two or more leaders in the same minute we would not run the
* task multiple times since we do not run tasks for the first minute of
* leadership.
* Interval jobs do not have this guarantee since their timer starts when we
* become the leader. Non Yugabyte pg_cron has the same behavior when pg
* restarts.
*/
if (IsYugaByteEnabled() && !ybIsLeader)
{
lastMinute = 0;
return;
}

int minutesPassed = 0;
ListCell *taskCell = NULL;
ClockProgress clockProgress;
Expand Down Expand Up @@ -2081,6 +2115,8 @@ CronBackgroundWorker(Datum main_arg)

/* handle SIGTERM like regular backend */
pqsignal(SIGTERM, die);
/* YB Note: Exit immediately. */
pqsignal(SIGQUIT, quickdie);
BackgroundWorkerUnblockSignals();

/* Set up a memory context and resource owner. */
Expand Down Expand Up @@ -2397,3 +2433,57 @@ YbSecondsPassed(TimestampTz startTime, TimestampTz stopTime)

return secondsPassed;
}

static void
YbCheckLeadership(List *taskList, TimestampTz currentTime)
{
if (!IsYugaByteEnabled())
return;

if (YBCIsCronLeader())
{
if (!ybIsLeader)
{
ereport(LOG, (errmsg("pg_cron switching to leader mode")));
ybIsLeader = true;

/*
* The first time we detect that we are a leader, mark any inflight
* job started by the previous leader as failed as that node might
* still be alive. It will mark the job as completed and stop
* scheduling new runs. This inconsistency will go away once
* distributed job scheduling (#22336) is implemented.
*/
MarkPendingRunsAsFailed();

/*
* Reset the start time used for interval job. Check comment in
* GetCronTask.
*/
ListCell *taskCell = NULL;
foreach (taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
task->lastStartTime = currentTime;
}

CronJobCacheValid = false;
}
}
else if (ybIsLeader)
{
ereport(LOG, (errmsg("pg_cron switching to idle mode")));
ybIsLeader = false;

/*
* Reset the pending run counts so that we do not start tasks that we
* marked as pending while we were the leader.
*/
ListCell *taskCell = NULL;
foreach(taskCell, taskList)
{
CronTask *task = (CronTask *) lfirst(taskCell);
task->pendingRunCount = 0;
}
}
}
10 changes: 10 additions & 0 deletions src/postgres/third-party-extensions/pg_cron/src/task_states.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "utils/hsearch.h"
#include "utils/memutils.h"

/* YB includes */
#include "pg_yb_utils.h"
#include "catalog/yb_catalog_version.h"

/* forward declarations */
static HTAB * CreateCronTaskHash(void);
Expand Down Expand Up @@ -81,6 +84,13 @@ CreateCronTaskHash(void)
void
RefreshTaskHash(void)
{
/* YB Note: Always read the latest entries in the catalog */
if (IsYugaByteEnabled())
{
YBCPgResetCatalogReadTime();
YbUpdateCatalogCacheVersion(YbGetMasterCatalogVersion());
}

List *jobList = NIL;
ListCell *jobCell = NULL;
CronTask *task = NULL;
Expand Down
1 change: 1 addition & 0 deletions src/postgres/third-party-extensions/pg_cron/yb_schedule
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# third-party-extensions/pg_cron/yb_schedule

test: pg_cron-test
test: yb_pg_cron-test
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ ADD_YB_TEST(cdcsdk_tablet_split-test)
ADD_YB_TEST(cdcsdk_consistent_snapshot-test)
ADD_YB_TEST(transaction-test)
ADD_YB_TEST(encryption-test)
ADD_YB_TEST(pg_cron-test)
ADD_YB_TEST(secure_connection_test)
ADD_YB_TEST(xcluster/xcluster_consistency-test)
ADD_YB_TEST(xcluster/xcluster_db_scoped-test)
Expand Down
13 changes: 13 additions & 0 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
#include "yb/util/test_util.h"
#include "yb/util/tsan_util.h"
#include "yb/util/flags.h"
#include "yb/yql/pgwrapper/libpq_utils.h"

#define YB_FORWARD_FLAG(flag_name) \
"--" BOOST_PP_STRINGIZE(flag_name) "="s + FlagToString(BOOST_PP_CAT(FLAGS_, flag_name))
Expand Down Expand Up @@ -2089,6 +2090,18 @@ Status ExternalMiniCluster::WaitForLoadBalancerToBecomeIdle(
return Status::OK();
}

Result<pgwrapper::PGConn> ExternalMiniCluster::ConnectToDB(
const std::string& db_name, std::optional<size_t> node_index, bool simple_query_protocol) {
if (!node_index) {
node_index = RandomUniformInt<size_t>(0, num_tablet_servers() - 1);
}

auto* ts = tablet_server(*node_index);
return pgwrapper::PGConnBuilder(
{.host = ts->bind_host(), .port = ts->pgsql_rpc_port(), .dbname = db_name})
.Connect(simple_query_protocol);
}

//------------------------------------------------------------
// ExternalDaemon
//------------------------------------------------------------
Expand Down
10 changes: 10 additions & 0 deletions src/yb/integration-tests/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class OpIdPB;
class NodeInstancePB;
class Subprocess;

namespace pgwrapper {
class PGConn;
} // namespace pgwrapper

namespace rpc {
class SecureContext;
}
Expand Down Expand Up @@ -541,6 +545,12 @@ class ExternalMiniCluster : public MiniClusterBase {
Status WaitForLoadBalancerToBecomeIdle(
const std::unique_ptr<yb::client::YBClient>& client, MonoDelta timeout);

// Create a PG connection to the given database. If node_index is not set, a random node is
// chosen.
Result<pgwrapper::PGConn> ConnectToDB(
const std::string& db_name = "yugabyte", std::optional<size_t> node_index = std::nullopt,
bool simple_query_protocol = false);

protected:
FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);

Expand Down

0 comments on commit 92e3b75

Please sign in to comment.