Source code for pyeudiw.satosa.backends.openid4vp.endpoints.request_endpoint

from copy import copy
from typing import Any
from typing import Callable

from satosa.attribute_mapping import AttributeMapper
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Response

from pyeudiw.jwt.exceptions import JWSSigningError
from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.duckle_ql.utils import DUCKLE_PRESENTATION, DUCKLE_QUERY_KEY
from pyeudiw.satosa.backends.openid4vp.authorization_request import build_authorization_request_claims
from pyeudiw.satosa.backends.openid4vp.endpoints.vp_base_endpoint import VPBaseEndpoint
from pyeudiw.satosa.backends.openid4vp.schemas.wallet_metadata import (
    WalletPostRequest,
    RESPONSE_MODES_SUPPORTED_CTX,
    VP_FORMATS_SUPPORTED_CTX,
    CLIENT_ID_SCHEMES_SUPPORTED_CTX,
    REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED,
)
from pyeudiw.trust.dynamic import CombinedTrustEvaluator


[docs] class RequestHandler(VPBaseEndpoint): _REQUEST_OBJECT_TYP = "oauth-authz-req+jwt" _RESP_CONTENT_TYPE = f"application/{_REQUEST_OBJECT_TYP}" def __init__( self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, auth_callback_func: Callable[[Context, InternalData], Response], converter: AttributeMapper, trust_evaluator: CombinedTrustEvaluator, db_engine=None, ) -> None: """ Initialize the AuthorizationHandler with the given configuration, internal attributes, base URL, and name. :param config: Configuration dictionary for the handler. :param internal_attributes: Internal attributes mapping. :param base_url: Base URL for the handler. :param name: Name of the handler. :raises ValueError: If storage or QR code settings are not configured. """ super().__init__(config, internal_attributes, base_url, name, auth_callback_func, converter, trust_evaluator, db_engine) self.absolute_response_url = f"{self.client_id}/response-uri" self.metadata_jwks_by_kids = {i["kid"]: i for i in self.config["metadata_jwks"]} self.trust_evaluator = trust_evaluator self._credential_supported_formats = [f["format"] for f in config["credential_presentation_handlers"]["formats"]] client_id_schemes = [] for _, value in config["trust"].items(): client_id = value.get("config", {}).get("client_id") if client_id: prefix = client_id.split(":", 1)[0] client_id_schemes.append(prefix) self._client_id_schemes_supported = client_id_schemes if client_id_schemes else None
[docs] def endpoint(self, context: Context) -> Response: self._log_function_debug("request_endpoint", context) if context.request_method == "GET": try: if not context.qs_params or "id" not in context.qs_params: raise ValueError("state is missing") state = context.qs_params["id"] if not state: raise ValueError("state is missing") except Exception as e400: return self._handle_400(context, "request error: missing or invalid parameter [id]", e400) try: document = self.db_engine.get_by_state(state) if not document: raise ValueError("session not found") except ValueError as e401: return self._handle_401(context, "session error: cannot find the session associated to the state", e401) except Exception as e500: return self._handle_500( context, "session error: cannot retrieve the session", e500, ) else: try: if not context.state or "SESSION_ID" not in context.state: raise ValueError("session_id is missing") session_id = context.state["SESSION_ID"] if not session_id: raise ValueError("session_id is missing") except Exception as e400: return self._handle_400(context, "request error: missing or invalid parameter [SESSION_ID]", e400) try: document = self.db_engine.get_by_session_id(session_id) if not document: raise ValueError("session not found") except ValueError as e401: return self._handle_401(context, "session error: cannot find the session associated to the session_id", e401) except Exception as e500: return self._handle_500( context, "session error: cannot retrieve the session", e500, ) try: client_metadata = self.trust_evaluator.get_metadata(self.client_id) except Exception: client_metadata = None request = context.request or {} if context.request_method == "POST": try: wallet_post_request = WalletPostRequest.model_validate( request, context={ RESPONSE_MODES_SUPPORTED_CTX: self.config["authorization"].get("response_mode", "direct_post_jwt"), VP_FORMATS_SUPPORTED_CTX: self._credential_supported_formats, CLIENT_ID_SCHEMES_SUPPORTED_CTX: self._client_id_schemes_supported, REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED: self.config["jwt"].get("sig_alg_supported"), }, ) except Exception as e: self._log_warning(context, f"wallet metadata not provided or invalid: {e}") wallet_post_request = WalletPostRequest( wallet_metadata=None, wallet_nonce=None, ) else: wallet_post_request = WalletPostRequest( wallet_metadata=None, wallet_nonce=None, ) data = build_authorization_request_claims( self.client_id, document["state"], self.absolute_response_url, self.config["authorization"], client_metadata=client_metadata, submission_data=self._build_submission_data(), wallet_nonce=wallet_post_request.wallet_nonce, ) if _aud := self.config["authorization"].get("aud"): data["aud"] = _aud # take the session created in the pre-request authz endpoint try: document_id = document["document_id"] data_copy = copy(data) if wallet_post_request.wallet_metadata: data_copy["wallet_metadata"] = wallet_post_request.wallet_metadata.model_dump() self.db_engine.update_request_object(document_id, data_copy) except ValueError as e401: return self._handle_401(context, "session error: cannot find the session associated to the state", e401) except Exception as e500: return self._handle_500( context, "session error: cannot update the session", e500, ) _protected_jwt_headers = { "typ": RequestHandler._REQUEST_OBJECT_TYP, } # load all the trust handlers request jwt header parameters, if any trust_params = self.trust_evaluator.get_jwt_header_trust_parameters(issuer=self.client_id) _protected_jwt_headers.update(trust_params) if ("x5c" in _protected_jwt_headers) or ("kid" in _protected_jwt_headers): # let helper decide which key best fit the given header, otherise use default hich is the first confgiured key helper = JWSHelper(self.config["metadata_jwks"]) else: helper = JWSHelper(self.default_metadata_private_jwk) alg_values_supported = (wallet_post_request.wallet_metadata.alg_values_supported if wallet_post_request.wallet_metadata else []) or [] try: request_object_jwt = helper.sign( data, protected=_protected_jwt_headers, signing_algs=alg_values_supported, ) self._log_debug(context, f"created request object {request_object_jwt}") return Response( message=request_object_jwt, status="200", content=RequestHandler._RESP_CONTENT_TYPE, ) except JWSSigningError as e400: return self._handle_400( context, "request error: cannot sign the request object, possibly due to a non supported algorithm", e400, ) except Exception as e500: return self._handle_500( context, "internal error: error while processing the request object", e500, )
@property def default_metadata_private_jwk(self) -> dict[str, Any]: """ Returns the default metadata private JWK :return: The first JWK from the metadata_jwks configuration. :rtype: dict[str, Any] """ return self.config["metadata_jwks"][0] def _build_submission_data(self) -> dict[str, Any] | None: dcql_query = self.config.get(DUCKLE_QUERY_KEY) if dcql_query: return {DUCKLE_QUERY_KEY: dcql_query, "typo": DUCKLE_PRESENTATION} return None