diff --git a/python3/httplib2/__init__.py b/python3/httplib2/__init__.py index cf2db600..02c19b73 100644 --- a/python3/httplib2/__init__.py +++ b/python3/httplib2/__init__.py @@ -551,7 +551,7 @@ def _cnonce(): def _wsse_username_token(cnonce, iso_now, password): return base64.b64encode( _sha(("%s%s%s" % (cnonce, iso_now, password)).encode("utf-8")).digest() - ).strip() + ).strip().decode("utf-8") # For credentials we need two things, first diff --git a/tests/__init__.py b/tests/__init__.py index 02a3ecf8..28c8d135 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -556,9 +556,7 @@ def wrapped(request, *a, **kw): return wrapper -def http_reflect_with_auth( - allow_scheme, allow_credentials, out_renew_nonce=None, out_requests=None -): +def http_reflect_with_auth(allow_scheme, allow_credentials, out_renew_nonce=None, out_requests=None): """allow_scheme - 'basic', 'digest', etc allow_credentials - sequence of ('name', 'password') out_renew_nonce - None | [function] Way to return nonce renew function to caller. @@ -575,9 +573,7 @@ def http_reflect_with_auth( def renew_nonce(): if gnextnonce[0]: - assert False, ( - "previous nextnonce was not used, probably bug in " "test code" - ) + assert False, "previous nextnonce was not used, probably bug in test code" gnextnonce[0] = gen_digest_nonce() return gserver_nonce[0], gnextnonce[0] @@ -596,6 +592,8 @@ def deny(**kwargs): + ', nonce="{nonce}", opaque="{opaque}"' + (", stale=true" if nonce_stale else "") ).format(realm=realm, nonce=gserver_nonce[0], opaque=server_opaque) + elif allow_scheme == "wsse": + authenticate = 'wsse realm="{realm}", profile="UsernameToken"'.format(realm=realm) else: raise Exception("unknown allow_scheme={0}".format(allow_scheme)) deny_headers = {"www-authenticate": authenticate} @@ -612,9 +610,7 @@ def http_reflect_with_auth_handler(request): if not auth_header: return deny() if " " not in auth_header: - return http_response_bytes( - status=400, body=b"authorization header syntax error" - ) + return http_response_bytes(status=400, body=b"authorization header syntax error") scheme, data = auth_header.split(" ", 1) scheme = scheme.lower() if scheme != allow_scheme: @@ -698,11 +694,31 @@ def http_reflect_with_auth_handler(request): } return make_http_reflect(headers=allow_headers)(request) return deny(body=b"supplied credentials are not allowed") + elif scheme == "wsse": + x_wsse = request.headers.get("x-wsse", "") + if x_wsse.count(",") != 3: + return http_response_bytes(status=400, body=b"x-wsse header syntax error") + auth_info = http_parse_auth(x_wsse) + client_username = auth_info.get("username", "") + client_nonce = auth_info.get("nonce", "") + client_created = auth_info.get("created", "") + client_digest = auth_info.get("passworddigest", "") + allow_password = None + for allow_username, allow_password in allow_credentials: + if client_username == allow_username: + break + else: + return deny(body=b"unknown username") + + digest = hashlib.sha1("".join((client_nonce, client_created, allow_password)).encode("utf-8")).digest() + digest_b64 = base64.b64encode(digest).decode() + print("$$$ check client={} == real={}".format(client_digest, digest_b64)) + if client_digest == digest_b64: + return make_http_reflect()(request) + + return deny(body=b"supplied credentials are not allowed") else: - return http_response_bytes( - status=400, - body="unknown authorization scheme={0}".format(scheme).encode(), - ) + return http_response_bytes(status=400, body="unknown authorization scheme={0}".format(scheme).encode(),) return http_reflect_with_auth_handler diff --git a/tests/test_auth.py b/tests/test_auth.py index 6efd6cbb..975a56ff 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -364,8 +364,47 @@ def test_digest_object_auth_info(): def test_wsse_algorithm(): - digest = httplib2._wsse_username_token( - "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" - ) - expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" + digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm") + expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY=" assert expected == digest + + +def test_wsse_invalid(): + http = httplib2.Http() + username = "user294" + password = tests.gen_password() + grenew_nonce = [None] + requests = [] + handler = tests.http_reflect_with_auth( + allow_scheme="wsse", + allow_credentials=((username, password),), + out_renew_nonce=grenew_nonce, + out_requests=requests, + ) + http.add_credentials(username, "wrong" + password) + with tests.server_request(handler, request_count=2) as uri: + response, _ = http.request(uri) + assert requests[0][1].status == 401 + assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"' + assert requests[1][1].status == 401 + assert response.status == 401 + + +def test_wsse_ok(): + http = httplib2.Http() + username = "user314" + password = tests.gen_password() + grenew_nonce = [None] + requests = [] + handler = tests.http_reflect_with_auth( + allow_scheme="wsse", + allow_credentials=((username, password),), + out_renew_nonce=grenew_nonce, + out_requests=requests, + ) + http.add_credentials(username, password) + with tests.server_request(handler, request_count=2) as uri: + response, _ = http.request(uri) + assert requests[0][1].status == 401 + assert requests[1][0].headers["authorization"] == 'WSSE profile="UsernameToken"' + assert response.status == 200