From d2865ed7c3364406b2f1f4f984dcdd315196a353 Mon Sep 17 00:00:00 2001 From: Jeff Forcier Date: Fri, 24 Dec 2021 13:30:06 -0500 Subject: [PATCH] Fix #1955 --- paramiko/auth_handler.py | 53 ++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index 845b9143e..da109d7c7 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -206,24 +206,19 @@ def _disconnect_no_more_auth(self): self.transport._send_message(m) self.transport.close() - def _get_algorithm_and_bits(self, key): + def _get_key_type_and_bits(self, key): """ - Given any key, return appropriate signing algorithm & bits-to-sign. + Given any key, return its type/algorithm & bits-to-sign. Intended for input to or verification of, key signatures. """ - key_type, bits = None, None # Use certificate contents, if available, plain pubkey otherwise if key.public_blob: - key_type = key.public_blob.key_type - bits = key.public_blob.key_blob + return key.public_blob.key_type, key.public_blob.key_blob else: - key_type = key.get_name() - bits = key - algorithm = self._finalize_pubkey_algorithm(key_type) - return algorithm, bits + return key.get_name(), key - def _get_session_blob(self, key, service, username): + def _get_session_blob(self, key, service, username, algorithm): m = Message() m.add_string(self.transport.session_id) m.add_byte(cMSG_USERAUTH_REQUEST) @@ -231,7 +226,7 @@ def _get_session_blob(self, key, service, username): m.add_string(service) m.add_string("publickey") m.add_boolean(True) - algorithm, bits = self._get_algorithm_and_bits(key) + _, bits = self._get_key_type_and_bits(key) m.add_string(algorithm) m.add_string(bits) return m.asbytes() @@ -282,6 +277,17 @@ def _parse_service_request(self, m): # dunno this one self._disconnect_service_not_available() + def _generate_key_from_request(self, algorithm, keyblob): + # For use in server mode. + options = self.transport.preferred_pubkeys + if algorithm.replace("-cert-v01@openssh.com", "") not in options: + err = ( + "Auth rejected: pubkey algorithm '{}' unsupported or disabled" + ) + self._log(INFO, err.format(algorithm)) + return None + return self.transport._key_info[algorithm](Message(keyblob)) + def _finalize_pubkey_algorithm(self, key_type): # Short-circuit for non-RSA keys if "rsa" not in key_type: @@ -333,13 +339,15 @@ def _parse_service_accept(self, m): m.add_string(password) elif self.auth_method == "publickey": m.add_boolean(True) - algorithm, bits = self._get_algorithm_and_bits( - self.private_key - ) + key_type, bits = self._get_key_type_and_bits(self.private_key) + algorithm = self._finalize_pubkey_algorithm(key_type) m.add_string(algorithm) m.add_string(bits) blob = self._get_session_blob( - self.private_key, "ssh-connection", self.username + self.private_key, + "ssh-connection", + self.username, + algorithm, ) sig = self.private_key.sign_ssh_data(blob, algorithm) m.add_string(sig) @@ -551,10 +559,13 @@ def _parse_userauth_request(self, m): ) elif method == "publickey": sig_attached = m.get_boolean() - keytype = m.get_text() + # NOTE: server never wants to guess a client's algo, they're + # telling us directly. No need for _finalize_pubkey_algorithm + # anywhere in this flow. + algorithm = m.get_text() keyblob = m.get_binary() try: - key = self.transport._key_info[keytype](Message(keyblob)) + key = self._generate_key_from_request(algorithm, keyblob) except SSHException as e: self._log(INFO, "Auth rejected: public key: {}".format(str(e))) key = None @@ -572,20 +583,20 @@ def _parse_userauth_request(self, m): username, key ) if result != AUTH_FAILED: - sig_algo = self._finalize_pubkey_algorithm(keytype) # key is okay, verify it if not sig_attached: # client wants to know if this key is acceptable, before it # signs anything... send special "ok" message m = Message() m.add_byte(cMSG_USERAUTH_PK_OK) - # TODO: suspect we're not testing this - m.add_string(sig_algo) + m.add_string(algorithm) m.add_string(keyblob) self.transport._send_message(m) return sig = Message(m.get_binary()) - blob = self._get_session_blob(key, service, username) + blob = self._get_session_blob( + key, service, username, algorithm + ) if not key.verify_ssh_sig(blob, sig): self._log(INFO, "Auth rejected: invalid signature") result = AUTH_FAILED