Skip to content

Commit

Permalink
GSS-API wrap and unwrap operation for the sspi module (#1692)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuco committed May 29, 2021
1 parent 4ad3c5f commit 368e5e3
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Expand Up @@ -24,6 +24,10 @@ Since build 300:
containing the null character followed by junk characters.
(#1654, #1660, Lincoln Puzey)

* Add wrap and unwrap operations defined in the GSSAPI to the sspi module
and enhance the examples given in this module.
(#1692, Emmanuel Coirier)

Since build 228:
----------------
* Fixed a bug where win32com.client.VARIANT params were returned in the reverse
Expand Down
164 changes: 150 additions & 14 deletions win32/Lib/sspi.py
Expand Up @@ -25,6 +25,9 @@ def reset(self):
"""Reset everything to an unauthorized state"""
self.ctxt = None
self.authenticated = False
self.initiator_name = None
self.service_name = None

# The next seq_num for an encrypt/sign operation
self.next_seq_num = 0

Expand All @@ -37,7 +40,7 @@ def _get_next_seq_num(self):
return ret

def encrypt(self, data):
"""Encrypt a string, returning a tuple of (encrypted_data, encryption_data).
"""Encrypt a string, returning a tuple of (encrypted_data, trailer).
These can be passed to decrypt to get back the original string.
"""
pkg_size_info=self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
Expand Down Expand Up @@ -87,6 +90,76 @@ def verify(self, data, sig):
sigbuf[1].Buffer=sig
self.ctxt.VerifySignature(sigbuf,self._get_next_seq_num())

def unwrap(self, token):
"""
GSSAPI's unwrap with SSPI.
https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
Usable mainly with Kerberos SSPI package, but this is not enforced.
Return the clear text, and a boolean that is True if the token was encrypted.
"""
buffer = win32security.PySecBufferDescType()
# This buffer will contain a "stream", which is the token coming from the other side
buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM))
buffer[0].Buffer = token

# This buffer will receive the clear, or just unwrapped text if no encryption was used.
# Will be resized by the lib.
buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA))

pfQOP = self.ctxt.DecryptMessage(buffer, self._get_next_seq_num())

r = buffer[1].Buffer
return r, not (pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT)

def wrap(self, msg, encrypt=False):
"""
GSSAPI's wrap with SSPI.
https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi
Usable mainly with Kerberos SSPI package, but this is not enforced.
Wrap a message to be sent to the other side. Encrypted if encrypt is True.
"""

size_info = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES)
trailer_size = size_info['SecurityTrailer']
block_size = size_info['BlockSize']

buffer = win32security.PySecBufferDescType()

# This buffer will contain unencrypted data to wrap, and maybe encrypt.
buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA))
buffer[0].Buffer = msg

# Will receive the token that forms the beginning of the msg
buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN))

# The trailer is needed in case of block encryption
buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING))

fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT
self.ctxt.EncryptMessage(fQOP, buffer, self._get_next_seq_num())

# Sec token, then data, then padding
r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer
return r

def _amend_ctx_name(self):
"""Adds initiator and service names in the security context for ease of use"""
if not self.authenticated:
raise ValueError("Sec context is not completely authenticated")

try:
names = self.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES)
except error:
# The SSP doesn't provide these attributes.
pass
else:
self.initiator_name, self.service_name = names


class ClientAuth(_BaseAuth):
"""Manages the client side of an SSPI authentication handshake
"""
Expand All @@ -111,13 +184,14 @@ def __init__(self,
None, auth_info)
_BaseAuth.__init__(self)

# Perform *one* step of the client authentication process.

def authorize(self, sec_buffer_in):
"""Perform *one* step of the client authentication process. Pass None for the first round"""
if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType:
# User passed us the raw data - wrap it into a SecBufferDesc
sec_buffer_new=win32security.PySecBufferDescType()
tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'],
sspicon.SECBUFFER_TOKEN)
sspicon.SECBUFFER_TOKEN)
tokenbuf.Buffer=sec_buffer_in
sec_buffer_new.append(tokenbuf)
sec_buffer_in = sec_buffer_new
Expand All @@ -144,7 +218,11 @@ def authorize(self, sec_buffer_in):

if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE):
self.ctxt.CompleteAuthToken(sec_buffer_out)

self.authenticated = err == 0
if self.authenticated:
self._amend_ctx_name()

return err, sec_buffer_out

class ServerAuth(_BaseAuth):
Expand Down Expand Up @@ -172,13 +250,13 @@ def __init__(self,
self.pkg_info['Name'], sspicon.SECPKG_CRED_INBOUND, None, None)
_BaseAuth.__init__(self)

# Perform *one* step of the server authentication process.
def authorize(self, sec_buffer_in):
"""Perform *one* step of the server authentication process."""
if sec_buffer_in is not None and type(sec_buffer_in) != win32security.PySecBufferDescType:
# User passed us the raw data - wrap it into a SecBufferDesc
sec_buffer_new=win32security.PySecBufferDescType()
tokenbuf=win32security.PySecBufferType(self.pkg_info['MaxToken'],
sspicon.SECBUFFER_TOKEN)
sspicon.SECBUFFER_TOKEN)
tokenbuf.Buffer=sec_buffer_in
sec_buffer_new.append(tokenbuf)
sec_buffer_in = sec_buffer_new
Expand All @@ -201,26 +279,84 @@ def authorize(self, sec_buffer_in):

if err in (sspicon.SEC_I_COMPLETE_NEEDED,sspicon.SEC_I_COMPLETE_AND_CONTINUE):
self.ctxt.CompleteAuthToken(sec_buffer_out)

self.authenticated = err == 0
if self.authenticated:
self._amend_ctx_name()

return err, sec_buffer_out

if __name__=='__main__':
# Setup the 2 contexts.
sspiclient=ClientAuth("NTLM")
sspiserver=ServerAuth("NTLM")

# This is the security package (the security support provider / the security backend)
# we want to use for this example.
ssp = "Kerberos" # or "NTLM" or "Negotiate" which enable negotiation between
# Kerberos (prefered) and NTLM (if not supported on the other side).

flags = (
sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication
sspicon.ISC_REQ_INTEGRITY | # check for integrity
sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages
sspicon.ISC_REQ_CONFIDENTIALITY | # request confidentiality
sspicon.ISC_REQ_REPLAY_DETECT # request replay detection
)

# Get our identity, mandatory for the Kerberos case *for this example*
# Kerberos cannot be used if we don't tell it the target we want
# to authenticate to.
cred_handle, exp = win32security.AcquireCredentialsHandle(
None, ssp, sspicon.SECPKG_CRED_INBOUND, None, None
)
cred = cred_handle.QueryCredentialsAttributes(sspicon.SECPKG_CRED_ATTR_NAMES)
print("We are:", cred)

# Setup the 2 contexts. In real life, only one is needed: the other one is
# created in the process we want to communicate with.
sspiclient=ClientAuth(ssp, scflags=flags, targetspn=cred)
sspiserver=ServerAuth(ssp, scflags=flags)

print("SSP : %s (%s)" % (sspiclient.pkg_info["Name"], sspiclient.pkg_info["Comment"]))

# Perform the authentication dance, each loop exchanging more information
# on the way to completing authentication.
sec_buffer=None
while 1:
client_step = 0
server_step = 0
while not(sspiclient.authenticated) or len(sec_buffer[0].Buffer):
client_step += 1
err, sec_buffer = sspiclient.authorize(sec_buffer)
err, sec_buffer = sspiserver.authorize(sec_buffer)
if err==0:
print("Client step %s" % client_step)
if sspiserver.authenticated and len(sec_buffer[0].Buffer) == 0:
break

server_step += 1
err, sec_buffer = sspiserver.authorize(sec_buffer)
print("Server step %s" % server_step)

# Authentication process is finished.
print("Initiator name from the service side:", sspiserver.initiator_name)
print("Service name from the client side: ", sspiclient.service_name)

data = "hello".encode("ascii") # py3k-friendly

# Simple signature, not compatible with GSSAPI.
sig = sspiclient.sign(data)
sspiserver.verify(data, sig)

data, key = sspiclient.encrypt(data)
assert sspiserver.decrypt(data, key) == data
# Encryption
encrypted, sig = sspiclient.encrypt(data)
decrypted = sspiserver.decrypt(encrypted, sig)
assert decrypted == data

# GSSAPI wrapping, no encryption (NTLM always encrypts)
wrapped = sspiclient.wrap(data)
unwrapped, was_encrypted = sspiserver.unwrap(wrapped)
print("encrypted ?", was_encrypted)
assert data == unwrapped

# GSSAPI wrapping, with encryption
wrapped = sspiserver.wrap(data, encrypt=True)
unwrapped, was_encrypted = sspiclient.unwrap(wrapped)
print("encrypted ?", was_encrypted)
assert data == unwrapped

print("cool!")

0 comments on commit 368e5e3

Please sign in to comment.