Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add markers to known_hosts parser re paramiko#771 #2270

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 39 additions & 4 deletions paramiko/hostkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@


from base64 import encodebytes, decodebytes
from enum import Enum
from typing import Iterable, Optional
import binascii
import os
import re
Expand Down Expand Up @@ -311,18 +313,29 @@ def __init__(self, line, exc):
self.args = (line, exc)


class HostKeyMarkers(str, Enum):
CERT_AUTHORITY = "@cert-authority"
REVOKED = "@revoked"


class HostKeyEntry:
"""
Representation of a line in an OpenSSH-style "known hosts" file.
"""

def __init__(self, hostnames=None, key=None):
def __init__(
self,
hostnames: Optional[Iterable[str]] = None,
key: Optional[PKey] = None,
marker: Optional[HostKeyMarkers] = None,
):
self.valid = (hostnames is not None) and (key is not None)
self.hostnames = hostnames
self.key = key
self._marker = marker

@classmethod
def from_line(cls, line, lineno=None):
def from_line(cls, line: str, lineno: Optional[int] = None):
"""
Parses the given line of text to find the names for the host,
the type of key, and the key data. The line is expected to be in the
Expand All @@ -334,6 +347,7 @@ def from_line(cls, line, lineno=None):
that should be taken care of before sending the line to us.

:param str line: a line from an OpenSSH known_hosts file
:param int|None lineno: the line number on which the entry appears
"""
log = get_logger("paramiko.hostkeys")
fields = re.split(" |\t", line)
Expand All @@ -342,7 +356,17 @@ def from_line(cls, line, lineno=None):
msg = "Not enough fields found in known_hosts in line {} ({!r})"
log.info(msg.format(lineno, line))
return None
fields = fields[:3]

try:
marker = HostKeyMarkers(fields[0])
fields = fields[1:4]
except ValueError:
marker = None
fields = fields[:3]

if marker is HostKeyMarkers.REVOKED:
log.info("Key is marked as revoked")
return None

names, key_type, key = fields
names = names.split(",")
Expand All @@ -359,7 +383,11 @@ def from_line(cls, line, lineno=None):
raise InvalidHostKey(line, e)

try:
return cls(names, PKey.from_type_string(key_type, key_bytes))
return cls(
names,
PKey.from_type_string(key_type, key_bytes),
marker=marker,
)
except UnknownKeyType:
# TODO 4.0: consider changing HostKeys API so this just raises
# naturally and the exception is muted higher up in the stack?
Expand All @@ -373,6 +401,13 @@ def to_line(self):
included.
"""
if self.valid:
if self._marker:
return "{} {} {} {}\n".format(
self._marker.value,
",".join(self.hostnames),
self.key.get_name(),
self.key.get_base64(),
)
return "{} {} {}\n".format(
",".join(self.hostnames),
self.key.get_name(),
Expand Down
24 changes: 20 additions & 4 deletions tests/test_hostkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
curvy.example.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlz\
dHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv9\
T9iRZjQzvJd5s+kBAZtpk=
@cert-authority ca.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZD\
B9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNA\
l8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+\
Pc2M=
@revoked revoked.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZD\
B9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNA\
l8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+\
Pc2M=
"""

test_hosts_file_tabs = """\
Expand All @@ -55,6 +63,14 @@
curvy.example.com\tecdsa-sha2-nistp256\tAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbml\
zdHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv\
9T9iRZjQzvJd5s+kBAZtpk=
@cert-authority\tca.example.com\tssh-rsa\tAAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZD\
B9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNA\
l8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+\
Pc2M=
@revoked\trevoked.example.com\tssh-rsa\tAAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZD\
B9J0s50l31MBGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNA\
l8TI0cAsW5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+\
Pc2M=
"""

keyblob = b"""\
Expand Down Expand Up @@ -83,7 +99,7 @@ def tearDown(self):

def test_load(self):
hostdict = paramiko.HostKeys("hostfile.temp")
assert len(hostdict) == 4
assert len(hostdict) == 5
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(
Expand All @@ -96,7 +112,7 @@ def test_add(self):
hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c="
key = paramiko.RSAKey(data=decodebytes(keyblob))
hostdict.add(hh, "ssh-rsa", key)
assert len(hostdict) == 5
assert len(hostdict) == 6
x = hostdict["foo.example.com"]
fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper()
self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp)
Expand All @@ -113,7 +129,7 @@ def test_dict(self):
fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper()
self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
assert list(hostdict) == hostdict.keys()
assert len(list(hostdict)) == len(hostdict.keys()) == 4
assert len(list(hostdict)) == len(hostdict.keys()) == 5

def test_dict_set(self):
hostdict = paramiko.HostKeys("hostfile.temp")
Expand All @@ -123,7 +139,7 @@ def test_dict_set(self):
hostdict["fake.example.com"] = {}
hostdict["fake.example.com"]["ssh-rsa"] = key

assert len(hostdict) == 5
assert len(hostdict) == 6
self.assertEqual(2, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
self.assertEqual(1, len(list(hostdict.values())[2]))
Expand Down