Skip to content

Commit

Permalink
Add unit tests for GSSAPI authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
eltoder committed Feb 27, 2024
1 parent 0b29765 commit 469aab2
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 36 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/install-krb5.sh
@@ -0,0 +1,10 @@
#!/bin/bash

set -Eexuo pipefail

if [ "$RUNNER_OS" == "Linux" ]; then
# Assume Ubuntu since this is the only Linux used in CI.
sudo apt-get update
sudo apt-get install -y --no-install-recommends \
libkrb5-dev krb5-user krb5-kdc krb5-admin-server
fi
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Expand Up @@ -62,6 +62,7 @@ jobs:
- name: Install Python Deps
if: steps.release.outputs.version == 0
run: |
.github/workflows/install-krb5.sh
python -m pip install -U pip setuptools wheel
python -m pip install -e .[test]
Expand Down Expand Up @@ -122,6 +123,7 @@ jobs:
- name: Install Python Deps
if: steps.release.outputs.version == 0
run: |
.github/workflows/install-krb5.sh
python -m pip install -U pip setuptools wheel
python -m pip install -e .[test]
Expand Down
13 changes: 7 additions & 6 deletions asyncpg/protocol/coreproto.pyx
Expand Up @@ -719,17 +719,18 @@ cdef class CoreProtocol:
import gssapi
except ModuleNotFoundError:
raise RuntimeError(
'gssapi module not found; please install asyncpg[gss] to use '
'asyncpg with Kerberos or GSSAPI authentication'
'gssapi module not found; please install asyncpg[gssapi] to '
'use asyncpg with Kerberos or GSSAPI authentication'
) from None

service_name = self.con_params.krbsrvname or 'postgres'
# find the canonical name of the server host
if isinstance(self.address, str):
host = socket.gethostname()
else:
host = self.address[0]
host_cname = socket.gethostbyname_ex(host)[0].rstrip('.')
raise RuntimeError('GSSAPI authentication is only supported for '
'TCP/IP connections')

host = self.address[0]
host_cname = socket.gethostbyname_ex(host)[0]
gss_name = gssapi.Name(f'{service_name}/{host_cname}')
self.gss_ctx = gssapi.SecurityContext(name=gss_name, usage='initiate')

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Expand Up @@ -35,12 +35,14 @@ dependencies = [
github = "https://github.com/MagicStack/asyncpg"

[project.optional-dependencies]
gss = [
gssapi = [
'gssapi',
]
test = [
'flake8~=6.1',
'uvloop>=0.15.3; platform_system != "Windows" and python_version < "3.12.0"',
'gssapi; platform_system == "Linux"',
'k5test; platform_system == "Linux"',
]
docs = [
'Sphinx~=5.3.0',
Expand Down
106 changes: 77 additions & 29 deletions tests/test_connect.py
Expand Up @@ -130,30 +130,22 @@ def test_server_version_02(self):
CORRECT_PASSWORD = 'correct\u1680password'


class TestAuthentication(tb.ConnectedTestCase):
class BaseTestAuthentication(tb.ConnectedTestCase):
USERS = []

def setUp(self):
super().setUp()

if not self.cluster.is_managed():
self.skipTest('unmanaged cluster')

methods = [
('trust', None),
('reject', None),
('scram-sha-256', CORRECT_PASSWORD),
('md5', CORRECT_PASSWORD),
('password', CORRECT_PASSWORD),
]

self.cluster.reset_hba()

create_script = []
for method, password in methods:
for username, method, password in self.USERS:
if method == 'scram-sha-256' and self.server_version.major < 10:
continue

username = method.replace('-', '_')

# if this is a SCRAM password, we need to set the encryption method
# to "scram-sha-256" in order to properly hash the password
if method == 'scram-sha-256':
Expand All @@ -162,7 +154,7 @@ def setUp(self):
)

create_script.append(
'CREATE ROLE {}_user WITH LOGIN{};'.format(
'CREATE ROLE "{}" WITH LOGIN{};'.format(
username,
f' PASSWORD E{(password or "")!r}'
)
Expand All @@ -175,20 +167,20 @@ def setUp(self):
"SET password_encryption = 'md5';"
)

if _system != 'Windows':
if _system != 'Windows' and method != 'gss':
self.cluster.add_hba_entry(
type='local',
database='postgres', user='{}_user'.format(username),
database='postgres', user=username,
auth_method=method)

self.cluster.add_hba_entry(
type='host', address=ipaddress.ip_network('127.0.0.0/24'),
database='postgres', user='{}_user'.format(username),
database='postgres', user=username,
auth_method=method)

self.cluster.add_hba_entry(
type='host', address=ipaddress.ip_network('::1/128'),
database='postgres', user='{}_user'.format(username),
database='postgres', user=username,
auth_method=method)

# Put hba changes into effect
Expand All @@ -201,28 +193,28 @@ def tearDown(self):
# Reset cluster's pg_hba.conf since we've meddled with it
self.cluster.trust_local_connections()

methods = [
'trust',
'reject',
'scram-sha-256',
'md5',
'password',
]

drop_script = []
for method in methods:
for username, method, _ in self.USERS:
if method == 'scram-sha-256' and self.server_version.major < 10:
continue

username = method.replace('-', '_')

drop_script.append('DROP ROLE {}_user;'.format(username))
drop_script.append('DROP ROLE "{}";'.format(username))

drop_script = '\n'.join(drop_script)
self.loop.run_until_complete(self.con.execute(drop_script))

super().tearDown()


class TestAuthentication(BaseTestAuthentication):
USERS = [
('trust_user', 'trust', None),
('reject_user', 'reject', None),
('scram_sha_256_user', 'scram-sha-256', CORRECT_PASSWORD),
('md5_user', 'md5', CORRECT_PASSWORD),
('password_user', 'password', CORRECT_PASSWORD),
]

async def _try_connect(self, **kwargs):
# On Windows the server sometimes just closes
# the connection sooner than we receive the
Expand Down Expand Up @@ -388,6 +380,62 @@ async def test_auth_md5_unsupported(self, _):
await self.connect(user='md5_user', password=CORRECT_PASSWORD)


class TestGssAuthentication(BaseTestAuthentication):
@classmethod
def setUpClass(cls):
try:
from k5test.realm import K5Realm
except ModuleNotFoundError:
raise unittest.SkipTest('k5test not installed')

cls.realm = K5Realm()
cls.addClassCleanup(cls.realm.stop)
# Setup environment before starting the cluster.
cm = unittest.mock.patch.dict(os.environ, cls.realm.env)
cm.__enter__()
cls.addClassCleanup(cm.__exit__, None, None, None)
# Add credentials.
cls.realm.addprinc('postgres/localhost')
cls.realm.extract_keytab('postgres/localhost', cls.realm.keytab)

cls.USERS = [(cls.realm.user_princ, 'gss', None)]
super().setUpClass()

cls.cluster.override_connection_spec(host='localhost')

@classmethod
def get_server_settings(cls):
settings = super().get_server_settings()
settings['krb_server_keyfile'] = f'FILE:{cls.realm.keytab}'
return settings

@classmethod
def setup_cluster(cls):
cls.cluster = cls.new_cluster(pg_cluster.TempCluster)
cls.start_cluster(
cls.cluster, server_settings=cls.get_server_settings())

async def test_auth_gssapi(self):
conn = await self.connect(user=self.realm.user_princ)
await conn.close()

# Service name mismatch.
with self.assertRaisesRegex(
exceptions.InternalClientError,
'Server .* not found'
):
await self.connect(user=self.realm.user_princ, krbsrvname='wrong')

# Credentials mismatch.
self.realm.addprinc('wrong_user', 'password')
self.realm.kinit('wrong_user', 'password')
with self.assertRaisesRegex(
exceptions.InvalidAuthorizationSpecificationError,
'GSSAPI authentication failed for user'
):
await self.connect(user=self.realm.user_princ)


class TestConnectParams(tb.TestCase):

TESTS = [
Expand Down

0 comments on commit 469aab2

Please sign in to comment.