Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Server v3] Check concurrency model for postgre implementation #7121

Open
vxgmichel opened this issue Apr 19, 2024 · 0 comments
Open

[Server v3] Check concurrency model for postgre implementation #7121

vxgmichel opened this issue Apr 19, 2024 · 0 comments
Assignees
Labels
A-Server Area: Server application I-Postgresql Impact: Postgresql issue
Milestone

Comments

@vxgmichel
Copy link
Contributor

A new concurrency model has been introduced in the postgre implementation of the parsec v3 server.

More specifically, requests that depends on a topic (for performing checks) but that do not update said topic should lock it for reading by using a FOR SHARE select query on the topic timestamp. On the other hand, the requests that plan to update a given topic should lock it using a FOR UPDATE select query on the topic timestamp.

This way, many read requests can run concurrently for a given topic but write requests have to run sequentially while no read request is performed.

This mechanism is already implemented here for the common topic:

def _make_q_lock_common_topic(for_update: bool = False, for_share=False) -> Q:
assert for_update ^ for_share
share_or_update = "SHARE" if for_share else "UPDATE"
return Q(f"""
SELECT last_timestamp
FROM common_topic
JOIN organization ON common_topic.organization = organization._id
WHERE organization_id = $organization_id
FOR {share_or_update}
""")
_q_check_common_topic = _make_q_lock_common_topic(for_share=True)
_q_lock_common_topic = _make_q_lock_common_topic(for_update=True)

async def _check_user(
self,
conn: AsyncpgConnection,
organization_id: OrganizationID,
user_id: UserID,
) -> tuple[UserProfile, DateTime] | CheckUserBadOutcome:
common_timestamp = await conn.fetchval(
*_q_check_common_topic(organization_id=organization_id.str)
)
if common_timestamp is None:
common_timestamp = DateTime.from_timestamp(0)
u_row = await conn.fetchrow(
*_q_get_user(organization_id=organization_id.str, user_id=user_id.str)
)
if not u_row:
return CheckUserBadOutcome.USER_NOT_FOUND
if u_row["revoked_on"] is not None:
return CheckUserBadOutcome.USER_REVOKED
return UserProfile.from_str(u_row["profile"]), common_timestamp
async def _lock_common_topic(
self, conn: AsyncpgConnection, organization_id: OrganizationID
) -> DateTime:
value = await conn.fetchval(*_q_lock_common_topic(organization_id=organization_id.str))
if value is None:
return DateTime.from_timestamp(0)
return value

And here for the realm topic:

def _make_q_lock_realm(for_update: bool = False, for_share=False) -> Q:
assert for_update ^ for_share
share_or_update = "SHARE" if for_share else "UPDATE"
return Q(f"""
WITH selected_realm AS (
SELECT _id
FROM realm
WHERE
organization = { q_organization_internal_id("$organization_id") }
AND realm_id = $realm_id
),
locked_realms AS (
SELECT selected_realm._id, last_timestamp
FROM realm_topic
INNER JOIN selected_realm ON realm_topic.realm = selected_realm._id
FOR {share_or_update}
)
SELECT
{ q_user_role(organization="realm.organization", realm="realm._id", user_id="$user_id") } role,
key_index,
last_timestamp
FROM realm
INNER JOIN locked_realms USING (_id)
""")

async def _check_realm_topic(
self,
conn: AsyncpgConnection,
organization_id: OrganizationID,
realm_id: VlobID,
author: DeviceID | UserID,
) -> tuple[RealmRole, KeyIndex, DateTime] | RealmCheckBadOutcome:
if isinstance(author, DeviceID):
author = author.user_id
row = await conn.fetchrow(
*q_check_realm_topic(
organization_id=organization_id.str,
realm_id=realm_id,
user_id=author.str,
)
)
if not row:
return RealmCheckBadOutcome.REALM_NOT_FOUND
if row["role"] is None:
return RealmCheckBadOutcome.USER_NOT_IN_REALM
return RealmRole.from_str(row["role"]), int(row["key_index"]), row["last_timestamp"]
async def _lock_realm_topic(
self,
conn: AsyncpgConnection,
organization_id: OrganizationID,
realm_id: VlobID,
author: DeviceID,
) -> tuple[RealmRole, KeyIndex, DateTime] | RealmCheckBadOutcome:
row = await conn.fetchrow(
*q_lock_realm_topic(
organization_id=organization_id.str,
realm_id=realm_id,
user_id=author.user_id.str,
)
)
if not row:
return RealmCheckBadOutcome.REALM_NOT_FOUND
if row["role"] is None:
return RealmCheckBadOutcome.USER_NOT_IN_REALM
return RealmRole.from_str(row["role"]), int(row["key_index"]), row["last_timestamp"]

However, this is does not take the last vlob update for a given realm into account.

More generally, this approach should be carefully analyzed and implemented systematically since race conditions could potentially cause non-compliant write operations to be accepted.

@vxgmichel vxgmichel changed the title [Sever v3] Check concurrency model for postgre implementation [Server v3] Check concurrency model for postgre implementation Apr 19, 2024
@mmmarcos mmmarcos added I-Postgresql Impact: Postgresql issue A-Server Area: Server application labels Apr 22, 2024
@mmmarcos mmmarcos added this to the v3.0 milestone Apr 22, 2024
@vxgmichel vxgmichel removed their assignment May 29, 2024
@mmmarcos mmmarcos modified the milestones: v3.0, v3.1 May 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-Server Area: Server application I-Postgresql Impact: Postgresql issue
Projects
None yet
Development

No branches or pull requests

3 participants