from uuid import uuid4
from satosa.context import Context
from satosa.response import Response
from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.satosa.exceptions import InvalidRequestException
from pyeudiw.satosa.frontends.openid4vci.endpoints.vci_base_endpoint import VCIBaseEndpoint, POST_ACCEPTED_METHODS
from pyeudiw.satosa.frontends.openid4vci.models.nonce_response import NonceResponse
from pyeudiw.satosa.frontends.openid4vci.storage.engine import OpenId4VciDBEngineHandler
from pyeudiw.satosa.frontends.openid4vci.tools.exceptions import InvalidScopeException
from pyeudiw.satosa.utils.session import get_session_id
from pyeudiw.satosa.utils.validation import validate_content_type, validate_request_method
from pyeudiw.tools.content_type import HTTP_CONTENT_TYPE_HEADER, APPLICATION_JSON
[docs]
class NonceHandler(VCIBaseEndpoint):
def __init__(self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, *args):
"""
Initialize the nonce endpoint class.
Args:
config (dict): The configuration dictionary.
internal_attributes (dict): The internal attributes config.
base_url (str): The base URL of the service.
name (str): The name of the SATOSA module to append to the URL.
"""
super().__init__(config, internal_attributes, base_url, name)
self.jws_helper = JWSHelper(self.config["metadata_jwks"])
self.db_engine = OpenId4VciDBEngineHandler(config).db_engine
[docs]
def endpoint(self, context: Context) -> Response:
"""
Handle a POST request to the nonce endpoint.
Args:
context (Context): The SATOSA context.
Returns:
A Response object.
"""
try:
validate_request_method(context.request_method, POST_ACCEPTED_METHODS)
validate_content_type(context.http_headers[HTTP_CONTENT_TYPE_HEADER], APPLICATION_JSON)
if self._get_body(context):
return self._handle_400(context, "Request body must be empty for nonce endpoint")
c_nonce = str(uuid4())
self.db_engine.update_nonce_by_session_id(get_session_id(context), c_nonce)
return NonceResponse.to_response(c_nonce)
except (InvalidRequestException, InvalidScopeException) as e:
return self._handle_400(context, e.message, e)
except Exception as e:
self._log_error(e.__class__.__name__, f"Error during invoke nonce endpoint: {e}")
return self._handle_500(context, "error during invoke nonce endpoint", e)