Skip to content

Commit

Permalink
auth: WSSE token needs to be string not bytes
Browse files Browse the repository at this point in the history
When `_wsse_username_token` returned bytes, the header would look like
```
UsernameToken Username="username", PasswordDigest="b'Ue8bnW/FAKEWubcqChWs='", Nonce="673552abeefb06a6", Created="2020-09-29T14:42:57Z"
```
And the extra `b'` at the beginning of the password digest would cause authentication to fail.

#179

Co-Authored-By: Sergey Shepelev <temotor@gmail.com>
  • Loading branch information
pearmaster and temoto committed Sep 29, 2020
1 parent 9bf300c commit 595e248
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
2 changes: 1 addition & 1 deletion python3/httplib2/__init__.py
Expand Up @@ -551,7 +551,7 @@ def _cnonce():
def _wsse_username_token(cnonce, iso_now, password):
return base64.b64encode(
_sha(("%s%s%s" % (cnonce, iso_now, password)).encode("utf-8")).digest()
).strip()
).strip().decode("utf-8")


# For credentials we need two things, first
Expand Down
42 changes: 29 additions & 13 deletions tests/__init__.py
Expand Up @@ -556,9 +556,7 @@ def wrapped(request, *a, **kw):
return wrapper


def http_reflect_with_auth(
allow_scheme, allow_credentials, out_renew_nonce=None, out_requests=None
):
def http_reflect_with_auth(allow_scheme, allow_credentials, out_renew_nonce=None, out_requests=None):
"""allow_scheme - 'basic', 'digest', etc allow_credentials - sequence of ('name', 'password') out_renew_nonce - None | [function]
Way to return nonce renew function to caller.
Expand All @@ -575,9 +573,7 @@ def http_reflect_with_auth(

def renew_nonce():
if gnextnonce[0]:
assert False, (
"previous nextnonce was not used, probably bug in " "test code"
)
assert False, "previous nextnonce was not used, probably bug in test code"
gnextnonce[0] = gen_digest_nonce()
return gserver_nonce[0], gnextnonce[0]

Expand All @@ -596,6 +592,8 @@ def deny(**kwargs):
+ ', nonce="{nonce}", opaque="{opaque}"'
+ (", stale=true" if nonce_stale else "")
).format(realm=realm, nonce=gserver_nonce[0], opaque=server_opaque)
elif allow_scheme == "wsse":
authenticate = 'wsse realm="{realm}", profile="UsernameToken"'.format(realm=realm)
else:
raise Exception("unknown allow_scheme={0}".format(allow_scheme))
deny_headers = {"www-authenticate": authenticate}
Expand All @@ -612,9 +610,7 @@ def http_reflect_with_auth_handler(request):
if not auth_header:
return deny()
if " " not in auth_header:
return http_response_bytes(
status=400, body=b"authorization header syntax error"
)
return http_response_bytes(status=400, body=b"authorization header syntax error")
scheme, data = auth_header.split(" ", 1)
scheme = scheme.lower()
if scheme != allow_scheme:
Expand Down Expand Up @@ -698,11 +694,31 @@ def http_reflect_with_auth_handler(request):
}
return make_http_reflect(headers=allow_headers)(request)
return deny(body=b"supplied credentials are not allowed")
elif scheme == "wsse":
x_wsse = request.headers.get("x-wsse", "")
if x_wsse.count(",") != 3:
return http_response_bytes(status=400, body=b"x-wsse header syntax error")
auth_info = http_parse_auth(x_wsse)
client_username = auth_info.get("username", "")
client_nonce = auth_info.get("nonce", "")
client_created = auth_info.get("created", "")
client_digest = auth_info.get("passworddigest", "")
allow_password = None
for allow_username, allow_password in allow_credentials:
if client_username == allow_username:
break
else:
return deny(body=b"unknown username")

digest = hashlib.sha1("".join((client_nonce, client_created, allow_password)).encode("utf-8")).digest()
digest_b64 = base64.b64encode(digest).decode()
print("$$$ check client={} == real={}".format(client_digest, digest_b64))
if client_digest == digest_b64:
return make_http_reflect()(request)

return deny(body=b"supplied credentials are not allowed")
else:
return http_response_bytes(
status=400,
body="unknown authorization scheme={0}".format(scheme).encode(),
)
return http_response_bytes(status=400, body="unknown authorization scheme={0}".format(scheme).encode(),)

return http_reflect_with_auth_handler

Expand Down
47 changes: 43 additions & 4 deletions tests/test_auth.py
Expand Up @@ -364,8 +364,47 @@ def test_digest_object_auth_info():


def test_wsse_algorithm():
digest = httplib2._wsse_username_token(
"d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
)
expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
assert expected == digest


def test_wsse_invalid():
http = httplib2.Http()
username = "user294"
password = tests.gen_password()
grenew_nonce = [None]
requests = []
handler = tests.http_reflect_with_auth(
allow_scheme="wsse",
allow_credentials=((username, password),),
out_renew_nonce=grenew_nonce,
out_requests=requests,
)
http.add_credentials(username, "wrong" + password)
with tests.server_request(handler, request_count=2) as uri:
response, _ = http.request(uri)
assert requests[0][1].status == 401
assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"'
assert requests[1][1].status == 401
assert response.status == 401


def test_wsse_ok():
http = httplib2.Http()
username = "user314"
password = tests.gen_password()
grenew_nonce = [None]
requests = []
handler = tests.http_reflect_with_auth(
allow_scheme="wsse",
allow_credentials=((username, password),),
out_renew_nonce=grenew_nonce,
out_requests=requests,
)
http.add_credentials(username, password)
with tests.server_request(handler, request_count=2) as uri:
response, _ = http.request(uri)
assert requests[0][1].status == 401
assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"'
assert response.status == 200

0 comments on commit 595e248

Please sign in to comment.