diff --git a/CHANGES.txt b/CHANGES.txt index 658445ff2..f052218d3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,10 @@ Since build 300: containing the null character followed by junk characters. (#1654, #1660, Lincoln Puzey) +* Add wrap and unwrap operations defined in the GSSAPI to the sspi module + and enhance the examples given in this module. + (#1692, Emmanuel Coirier) + Since build 228: ---------------- * Fixed a bug where win32com.client.VARIANT params were returned in the reverse diff --git a/win32/Lib/sspi.py b/win32/Lib/sspi.py index e3fcf43ed..0bcc0777d 100644 --- a/win32/Lib/sspi.py +++ b/win32/Lib/sspi.py @@ -25,6 +25,9 @@ def reset(self): """Reset everything to an unauthorized state""" self.ctxt = None self.authenticated = False + self.initiator_name = None + self.service_name = None + # The next seq_num for an encrypt/sign operation self.next_seq_num = 0 @@ -37,7 +40,7 @@ def _get_next_seq_num(self): return ret def encrypt(self, data): - """Encrypt a string, returning a tuple of (encrypted_data, encryption_data). + """Encrypt a string, returning a tuple of (encrypted_data, trailer). These can be passed to decrypt to get back the original string. """ pkg_size_info=self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) @@ -87,6 +90,76 @@ def verify(self, data, sig): sigbuf[1].Buffer=sig self.ctxt.VerifySignature(sigbuf,self._get_next_seq_num()) + def unwrap(self, token): + """ + GSSAPI's unwrap with SSPI. + https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi + + Usable mainly with Kerberos SSPI package, but this is not enforced. + + Return the clear text, and a boolean that is True if the token was encrypted. + """ + buffer = win32security.PySecBufferDescType() + # This buffer will contain a "stream", which is the token coming from the other side + buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM)) + buffer[0].Buffer = token + + # This buffer will receive the clear, or just unwrapped text if no encryption was used. + # Will be resized by the lib. + buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA)) + + pfQOP = self.ctxt.DecryptMessage(buffer, self._get_next_seq_num()) + + r = buffer[1].Buffer + return r, not (pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT) + + def wrap(self, msg, encrypt=False): + """ + GSSAPI's wrap with SSPI. + https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi + + Usable mainly with Kerberos SSPI package, but this is not enforced. + + Wrap a message to be sent to the other side. Encrypted if encrypt is True. + """ + + size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) + trailer_size = size_info['SecurityTrailer'] + block_size = size_info['BlockSize'] + + buffer = win32security.PySecBufferDescType() + + # This buffer will contain unencrypted data to wrap, and maybe encrypt. + buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) + buffer[0].Buffer = msg + + # Will receive the token that forms the beginning of the msg + buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN)) + + # The trailer is needed in case of block encryption + buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING)) + + fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT + self.ctxt.EncryptMessage(fQOP, buffer, self._get_next_seq_num()) + + # Sec token, then data, then padding + r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer + return r + + def _amend_ctx_name(self): + """Adds initiator and service names in the security context for ease of use""" + if not self.authenticated: + raise ValueError("Sec context is not completely authenticated") + + try: + names = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES) + except error: + # The SSP doesn't provide these attributes. + pass + else: + self.initiator_name, self.service_name = names + + class ClientAuth(_BaseAuth): """Manages the client side of an SSPI authentication handshake """ @@ -111,13 +184,14 @@ def __init__(self, None, auth_info) _BaseAuth.__init__(self) - # Perform *one* step of the client authentication process. + def authorize(self, sec_buffer_in): + """Perform *one* step of the client authentication process. Pass None for the first round""" if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType: # User passed us the raw data - wrap it into a SecBufferDesc sec_buffer_new=win32security.PySecBufferDescType() tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'], - sspicon.SECBUFFER_TOKEN) + sspicon.SECBUFFER_TOKEN) tokenbuf.Buffer=sec_buffer_in sec_buffer_new.append(tokenbuf) sec_buffer_in = sec_buffer_new @@ -144,7 +218,11 @@ def authorize(self, sec_buffer_in): if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE): self.ctxt.CompleteAuthToken(sec_buffer_out) + self.authenticated = err == 0 + if self.authenticated: + self._amend_ctx_name() + return err, sec_buffer_out class ServerAuth(_BaseAuth): @@ -172,13 +250,13 @@ def __init__(self, self.pkg_info['Name'], sspicon.SECPKG_CRED_INBOUND, None, None) _BaseAuth.__init__(self) - # Perform *one* step of the server authentication process. def authorize(self, sec_buffer_in): + """Perform *one* step of the server authentication process.""" if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType: # User passed us the raw data - wrap it into a SecBufferDesc sec_buffer_new=win32security.PySecBufferDescType() tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'], - sspicon.SECBUFFER_TOKEN) + sspicon.SECBUFFER_TOKEN) tokenbuf.Buffer=sec_buffer_in sec_buffer_new.append(tokenbuf) sec_buffer_in = sec_buffer_new @@ -201,26 +279,84 @@ def authorize(self, sec_buffer_in): if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE): self.ctxt.CompleteAuthToken(sec_buffer_out) + self.authenticated = err == 0 + if self.authenticated: + self._amend_ctx_name() + return err, sec_buffer_out if __name__=='__main__': - # Setup the 2 contexts. - sspiclient=ClientAuth("NTLM") - sspiserver=ServerAuth("NTLM") - + # This is the security package (the security support provider / the security backend) + # we want to use for this example. + ssp = "Kerberos" # or "NTLM" or "Negotiate" which enable negotiation between + # Kerberos (prefered) and NTLM (if not supported on the other side). + + flags = ( + sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication + sspicon.ISC_REQ_INTEGRITY | # check for integrity + sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages + sspicon.ISC_REQ_CONFIDENTIALITY | # request confidentiality + sspicon.ISC_REQ_REPLAY_DETECT # request replay detection + ) + + # Get our identity, mandatory for the Kerberos case *for this example* + # Kerberos cannot be used if we don't tell it the target we want + # to authenticate to. + cred_handle, exp = win32security.AcquireCredentialsHandle( + None, ssp, sspicon.SECPKG_CRED_INBOUND, None, None + ) + cred = cred_handle.QueryCredentialsAttributes(sspicon.SECPKG_CRED_ATTR_NAMES) + print("We are:", cred) + + # Setup the 2 contexts. In real life, only one is needed: the other one is + # created in the process we want to communicate with. + sspiclient=ClientAuth(ssp, scflags=flags, targetspn=cred) + sspiserver=ServerAuth(ssp, scflags=flags) + + print("SSP : %s (%s)" % (sspiclient.pkg_info["Name"], sspiclient.pkg_info["Comment"])) + # Perform the authentication dance, each loop exchanging more information # on the way to completing authentication. sec_buffer=None - while 1: + client_step = 0 + server_step = 0 + while not(sspiclient.authenticated) or len(sec_buffer[0].Buffer): + client_step += 1 err, sec_buffer = sspiclient.authorize(sec_buffer) - err, sec_buffer = sspiserver.authorize(sec_buffer) - if err==0: + print("Client step %s" % client_step) + if sspiserver.authenticated and len(sec_buffer[0].Buffer) == 0: break + + server_step += 1 + err, sec_buffer = sspiserver.authorize(sec_buffer) + print("Server step %s" % server_step) + + # Authentication process is finished. + print("Initiator name from the service side:", sspiserver.initiator_name) + print("Service name from the client side: ", sspiclient.service_name) + data = "hello".encode("ascii") # py3k-friendly + + # Simple signature, not compatible with GSSAPI. sig = sspiclient.sign(data) sspiserver.verify(data, sig) - data, key = sspiclient.encrypt(data) - assert sspiserver.decrypt(data, key) == data + # Encryption + encrypted, sig = sspiclient.encrypt(data) + decrypted = sspiserver.decrypt(encrypted, sig) + assert decrypted == data + + # GSSAPI wrapping, no encryption (NTLM always encrypts) + wrapped = sspiclient.wrap(data) + unwrapped, was_encrypted = sspiserver.unwrap(wrapped) + print("encrypted ?", was_encrypted) + assert data == unwrapped + + # GSSAPI wrapping, with encryption + wrapped = sspiserver.wrap(data, encrypt=True) + unwrapped, was_encrypted = sspiclient.unwrap(wrapped) + print("encrypted ?", was_encrypted) + assert data == unwrapped + print("cool!")