Source code for pyeudiw.satosa.frontends.openid4vci.endpoints.nonce_endpoint

from uuid import uuid4

from satosa.context import Context
from satosa.response import Response

from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.satosa.exceptions import InvalidRequestException
from pyeudiw.satosa.frontends.openid4vci.endpoints.vci_base_endpoint import VCIBaseEndpoint, POST_ACCEPTED_METHODS
from pyeudiw.satosa.frontends.openid4vci.models.nonce_response import NonceResponse
from pyeudiw.satosa.frontends.openid4vci.storage.engine import OpenId4VciDBEngineHandler
from pyeudiw.satosa.frontends.openid4vci.tools.exceptions import InvalidScopeException
from pyeudiw.satosa.utils.session import get_session_id
from pyeudiw.satosa.utils.validation import validate_content_type, validate_request_method
from pyeudiw.tools.content_type import HTTP_CONTENT_TYPE_HEADER, APPLICATION_JSON


[docs] class NonceHandler(VCIBaseEndpoint): def __init__(self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, *args): """ Initialize the nonce endpoint class. Args: config (dict): The configuration dictionary. internal_attributes (dict): The internal attributes config. base_url (str): The base URL of the service. name (str): The name of the SATOSA module to append to the URL. """ super().__init__(config, internal_attributes, base_url, name) self.jws_helper = JWSHelper(self.config["metadata_jwks"]) self.db_engine = OpenId4VciDBEngineHandler(config).db_engine
[docs] def endpoint(self, context: Context) -> Response: """ Handle a POST request to the nonce endpoint. Args: context (Context): The SATOSA context. Returns: A Response object. """ try: validate_request_method(context.request_method, POST_ACCEPTED_METHODS) validate_content_type(context.http_headers[HTTP_CONTENT_TYPE_HEADER], APPLICATION_JSON) if self._get_body(context): return self._handle_400(context, "Request body must be empty for nonce endpoint") c_nonce = str(uuid4()) self.db_engine.update_nonce_by_session_id(get_session_id(context), c_nonce) return NonceResponse.to_response(c_nonce) except (InvalidRequestException, InvalidScopeException) as e: return self._handle_400(context, e.message, e) except Exception as e: self._log_error(e.__class__.__name__, f"Error during invoke nonce endpoint: {e}") return self._handle_500(context, "error during invoke nonce endpoint", e)