Skip to content

Commit

Permalink
control: advisory locks => SELECT ... WITH UPDATE SKIP LOCKED
Browse files Browse the repository at this point in the history
Also add an index on builds.account_id.
  • Loading branch information
jgraettinger committed Mar 10, 2022
1 parent 9b1c28e commit fc1706c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
DROP INDEX builds_id_where_queued;

DROP INDEX builds_account_id;

DROP TABLE builds;
3 changes: 3 additions & 0 deletions crates/control/migrations/20220303214925_create_builds.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ CREATE TABLE builds (
"updated_at" TIMESTAMPTZ NOT NULL
);

-- Index for identifying builds of a given account.
CREATE INDEX builds_account_id ON builds USING BTREE (account_id);

-- Index for efficiently identifying builds that are queued,
-- which is a small subset of the overall builds that exist.
CREATE UNIQUE INDEX builds_id_where_queued ON builds USING BTREE (id)
Expand Down
96 changes: 48 additions & 48 deletions crates/control/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -191,54 +191,6 @@
},
"query": "\n SELECT id as \"id!: Id<Connector>\", description, name, maintainer, type as \"type!: ConnectorType\", created_at, updated_at\n FROM connectors\n WHERE id = $1\n "
},
"313ded554111cc7d36f9b1760e64a1a9b53438d60b6d070d2bf4045b3512bf62": {
"describe": {
"columns": [
{
"name": "account_id: Id<Account>",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "catalog: Option<Json<models::Catalog>>",
"ordinal": 1,
"type_info": "Json"
},
{
"name": "created_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "id: Id<Build>",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "state: Json<State>",
"ordinal": 4,
"type_info": "Jsonb"
},
{
"name": "updated_at",
"ordinal": 5,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n account_id as \"account_id: Id<Account>\",\n catalog as \"catalog: Option<Json<models::Catalog>>\",\n created_at,\n id as \"id: Id<Build>\",\n state as \"state: Json<State>\",\n updated_at\n FROM builds\n WHERE state->>'type' = 'queued' AND\n pg_try_advisory_xact_lock(id)\n ORDER BY id ASC\n LIMIT 1\n "
},
"40eb0abd64861d5312663b9d2c80b2681950a48283d6cb34ede4e7d92757bbfc": {
"describe": {
"columns": [
Expand Down Expand Up @@ -458,6 +410,54 @@
},
"query": "\n SELECT\n account_id AS \"account_id: Id<Account>\",\n catalog AS \"catalog: Option<Json<models::Catalog>>\",\n created_at,\n id AS \"id: Id<Build>\",\n state AS \"state: Json<State>\",\n updated_at\n FROM builds\n WHERE id = $1 AND account_id = $2\n "
},
"ad999cf9b68926b6f897cb66f44fe8f0a2b34bdf26c04d5284abc1c957155901": {
"describe": {
"columns": [
{
"name": "account_id: Id<Account>",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "catalog: Option<Json<models::Catalog>>",
"ordinal": 1,
"type_info": "Json"
},
{
"name": "created_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "id: Id<Build>",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "state: Json<State>",
"ordinal": 4,
"type_info": "Jsonb"
},
{
"name": "updated_at",
"ordinal": 5,
"type_info": "Timestamptz"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n account_id as \"account_id: Id<Account>\",\n catalog as \"catalog: Option<Json<models::Catalog>>\",\n created_at,\n id as \"id: Id<Build>\",\n state as \"state: Json<State>\",\n updated_at\n FROM builds\n WHERE state->>'type' = 'queued'\n ORDER BY id ASC\n LIMIT 1\n FOR UPDATE SKIP LOCKED\n "
},
"b514524b7f8225168c0fb0d8b2cfb812a07a7b72b7375dc76eed95a2bc98f0ef": {
"describe": {
"columns": [
Expand Down
34 changes: 2 additions & 32 deletions crates/control/src/repo/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,6 @@ pub async fn insert(
// If a Some(Build) is returned, it's guaranteed that no parallel invocation of
// dequeue_build will return that same Build so long as the argument Transaction
// is alive.
//
// The query leverages PostgreSQL's advisory lock mechanism to obtain a
// transaction-scoped exclusive lock on the `id` of its returned row.
// `pg_try_advisory_xact_lock(id)` returns true if it obtains a lock on `id`,
// or false if it's locked already.
//
// It's therefore important that the query plan call `pg_try_advisory_xact_lock`
// on as few actual `id`'s as possible, as extra locking reduces parallelism.
// For this query, we're capitalizing on the index `builds_id_where_queued`.
// The optimizer uses it in an Index Scan to try successive increasing `id`'s
// until one is found which is lock-able.
//
// This index also means that the common case (there are no queued builds) is
// very cheap to poll, even if the total number of builds is large.
//
// If the query plan were to change on us then `pg_try_advisory_xact_lock(id)`
// could lock additional non-returned `id`'s, which would reduce the potential
// parallelism but would not be a correctness issue.
//
// All obtained locks are automatically released on transaction close.
// If a builder fails its lock is automatically removed, making the Build
// eligible again for dequeue by other workers.
//
/*
QUERY PLAN
---------------------------------------------------------------------------------------------
Limit (cost=0.13..6.18 rows=1 width=68)
-> Index Scan using builds_id_where_queued on builds (cost=0.13..12.22 rows=2 width=68)
Filter: pg_try_advisory_xact_lock(id)
*/
pub async fn dequeue_build<'c>(
txn: &mut Transaction<'c, Postgres>,
) -> Result<Option<Build>, sqlx::Error> {
Expand All @@ -123,10 +93,10 @@ pub async fn dequeue_build<'c>(
state as "state: Json<State>",
updated_at
FROM builds
WHERE state->>'type' = 'queued' AND
pg_try_advisory_xact_lock(id)
WHERE state->>'type' = 'queued'
ORDER BY id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
"#,
)
.fetch_optional(txn)
Expand Down

0 comments on commit fc1706c

Please sign in to comment.