import json
from typing import TypeVar
import cryptojwt.jwe.exception
from satosa.context import Context
from pyeudiw.jwt.exceptions import JWEDecryptionError
from pyeudiw.jwt.exceptions import JWSVerificationError
from pyeudiw.jwt.jwe_helper import JWEHelper
from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.jwt.utils import decode_jwt_header
from pyeudiw.jwt.utils import is_jwe_format
from pyeudiw.satosa.backends.openid4vp.exceptions import (
AuthRespParsingException,
AuthRespValidationException,
)
from pyeudiw.satosa.backends.openid4vp.interface import AuthorizationResponseParser
from pyeudiw.satosa.backends.openid4vp.schemas.response import (
AuthorizeResponseDirectPostJwt,
AuthorizeResponsePayload,
ResponseMode,
)
_S = TypeVar("_S", str, list[str])
[docs]
def normalize_jsonstring_to_string(s: _S) -> _S:
"""
Normalize s from string (or list of string) or JSON String (or list
of JSON String) to simply string (or list of string).
For example, this would map a vp_token from JSON String "ey...Ui5" to
the naitve string ey...Ui5 (note the missing quote ").
Note that this method is NOT intended to parse JSON String.
For that purpose, json.loads should be preferred. Instead, this method
should be used when an imput might be a string OR a JSON string.
"""
if isinstance(s, str):
return s.strip('"')
if isinstance(s, list):
return [v.strip('"') for v in s]
return s
[docs]
def detect_response_mode(context: Context) -> ResponseMode:
"""
Try to make inference on which response mode type this is based on the
content of an http request body
"""
if "response" in context.request:
return ResponseMode.direct_post_jwt
if "vp_token" in context.request:
return ResponseMode.direct_post
if "error" in context.request:
return ResponseMode.error
raise AuthRespParsingException("HTTP POST request body does not contain a recognized openid4vp response mode")
def _check_http_post_headers(context: Context) -> None:
"""
:raises AuthRespParsingException: if the request in the context does not \
look like a POST request
"""
http_method = context.request_method.upper() if context.request_method else None
if http_method != "POST":
err_msg = f"HTTP method [{http_method}] not supported"
raise AuthRespParsingException(err_msg, err_msg)
# missing header is ok; but if it's there, it must be correct
if context.http_headers:
content_type = context.http_headers["HTTP_CONTENT_TYPE"]
if "application/x-www-form-urlencoded" not in content_type:
err_msg = f"HTTP content type [{content_type}] not supported"
raise AuthRespParsingException(err_msg, err_msg)
[docs]
class DirectPostParser(AuthorizationResponseParser):
"""DirectPostParser parses authorization responses sent as body of an
http post request.
The reference specification is defined here
https://openid.net/specs/openid-4-verifiable-presentations-1_0.html#name-response-parameters
"""
def __init__(self):
pass
[docs]
def parse_and_validate(self, context: Context) -> AuthorizeResponsePayload:
_check_http_post_headers(context)
resp_data: dict = context.request
try:
d = {}
if vp_token := resp_data.get("vp_token", None):
# vp_token should be a JSON string but caller might not be compliant and use string instead
vp_token = normalize_jsonstring_to_string(vp_token)
d["vp_token"] = vp_token
if state := resp_data.get("state", None):
d["state"] = state
if presentation_submission := resp_data["presentation_submission"]:
if isinstance(presentation_submission, dict):
d["presentation_submission"] = presentation_submission
else:
d["presentation_submission"] = json.loads(presentation_submission)
return AuthorizeResponsePayload(**d)
except Exception as e:
raise AuthRespParsingException("invalid data in direct_post request body", e)
[docs]
class DirectPostJwtJweParser(AuthorizationResponseParser):
"""DirectPostJwtJweParser parses authorization responses sent as body of an
http post request. The parser expectes a response wrapped in a jwt; more
precisely the managed response is x-www-form-urlencoded in the form of
response=<jwt> where <jwt> is an **encrypted but not signed** response.
As such, the class required a jwe helper with the correct key able to
decrypt the jwe.
The reference specification is defined here
https://openid.net/specs/openid-4-verifiable-presentations-1_0.html#name-response-mode-direct_postjw
"""
def __init__(self, jwe_decryptor: JWEHelper, jws_verifier: JWSHelper, enc_alg_supported: list[str] = [], enc_enc_supported: list[str] = []) -> None:
self.jwe_decryptor = jwe_decryptor
self.jws_verifier = jws_verifier
self.enc_alg_supported = enc_alg_supported
self.enc_enc_supported = enc_enc_supported
[docs]
def parse_and_validate(self, context: Context) -> AuthorizeResponsePayload:
_check_http_post_headers(context)
resp_data_raw: dict = context.request
try:
resp_data = AuthorizeResponseDirectPostJwt(**resp_data_raw)
except Exception as e:
raise AuthRespParsingException("invalid data in direct_post.jwt request body", e)
if is_jwe_format(resp_data.response):
# if the response is a JWE, we need to decrypt it
header = decode_jwt_header(resp_data.response)
if not header.get("alg") in self.enc_alg_supported:
raise AuthRespValidationException("invalid data in direct_post.jwt: alg not supported")
if not header.get("enc") in self.enc_enc_supported:
raise AuthRespValidationException("invalid data in direct_post.jwt: enc not supported")
try:
payload = self.jwe_decryptor.decrypt(resp_data.response)
except JWEDecryptionError as e:
raise AuthRespParsingException("invalid data in direct_post.jwt request body: not a jwe", e)
except cryptojwt.jwe.exception.DecryptionFailed:
raise AuthRespValidationException("invalid data in direct_post.jwt: unable to decrypt token")
except Exception as e:
# unfortunately library cryptojwt is not very exhaustive on why an operation failed...
raise AuthRespValidationException("invalid data in direct_post.jwt request body", e)
else:
# if the response is a JWT, we just decode it
try:
payload = self.jws_verifier.verify(resp_data.response)
except JWSVerificationError as e:
raise AuthRespParsingException("invalid data in direct_post.jwt request body: cannot validate the jws", e)
except Exception as e:
raise AuthRespParsingException("invalid data in direct_post.jwt request body: not a jwt", e)
# iss, exp and aud MUST be OMITTED in the JWT Claims Set of the JWE
if ("iss" in payload) or ("exp" in payload):
raise AuthRespParsingException(
"response token contains an unexpected lifetime claims",
Exception("wallet mishbeahiour: JWe with bad claims"),
)
try:
return AuthorizeResponsePayload(**payload)
except Exception as e:
raise AuthRespParsingException(
"invalid data in the direct_post.jwt: token payload does not have the expected claims",
e,
)