import json
import re
from typing import Any, Callable
from pydantic import ValidationError
from satosa.attribute_mapping import AttributeMapper
from satosa.context import Context
from satosa.response import Response
from pyeudiw.jwt.exceptions import JWSVerificationError
from pyeudiw.satosa.exceptions import InvalidRequestException
from pyeudiw.satosa.frontends.openid4vci.tools.config import Openid4VciFrontendConfigUtils
from pyeudiw.satosa.frontends.openid4vci.tools.exceptions import InvalidScopeException
from pyeudiw.tools.base_endpoint import BaseEndpoint
REQUEST_URI_PREFIX = "urn:ietf:params:oauth:request_uri"
GET_ACCEPTED_METHODS = ["GET"]
POST_ACCEPTED_METHODS = ["POST"]
[docs]
class VCIBaseEndpoint(BaseEndpoint):
def __init__(
self,
config: dict,
internal_attributes: dict[str, dict[str, str | list[str]]],
base_url: str,
name: str,
auth_callback: Callable[[Context, Any], Response] | None = None,
converter: AttributeMapper | None = None):
"""
Initialize the OpenID4VCI endpoints class.
Args:
config (dict): The configuration dictionary.
internal_attributes (dict): The internal attributes config.
base_url (str): The base URL of the service.
name (str): The name of the SATOSA module to append to the URL.
auth_callback (Callable, optional): A callback function to handle authorization requests. Defaults to None.
"""
super().__init__(config, internal_attributes, base_url, name, auth_callback, converter)
self.config_utils = Openid4VciFrontendConfigUtils(config)
self._validate_configs()
def _handle_validate_request_error(self, e: Exception, endpoint_name: str):
if isinstance(e, InvalidRequestException) or isinstance(e, InvalidScopeException):
return e.message
elif isinstance(e, JWSVerificationError):
self._log_error(
e.__class__.__name__,
f"{str(e)} in`{endpoint_name}` endpoint"
)
return "Not a valid JWS format"
elif isinstance(e, TypeError):
match = re.search(r"got an unexpected keyword argument '([^']+)'", str(e))
if match:
parameter_name = match.group(1)
self._log_error(
e.__class__.__name__,
f"missing {parameter_name} in request `{endpoint_name}` endpoint"
)
return f"missing `{parameter_name}` parameter"
else:
return "invalid request"
elif isinstance(e, ValidationError):
errors = e.errors()
for err in errors:
parameter_name = err['loc'][0] if len(err['loc']) > 0 else None
if parameter_name:
self._log_error(
e.__class__.__name__,
f"invalid {parameter_name} in request `{endpoint_name}` endpoint: {err['msg']}"
)
else:
self._log_error(
e.__class__.__name__,
f"invalid request in `{endpoint_name}` endpoint: {err['msg']}"
)
return "invalid request"
else:
raise e
@staticmethod
def _to_request_uri(random_part: str) -> str:
"""
Generate the full `request_uri` from a random component.
Args:
random_part (str): The unique identifier to include in the URI.
Returns:
str: A full URN request_uri string.
"""
return f"{REQUEST_URI_PREFIX}:{random_part}"
@staticmethod
def _get_body(context: Context):
"""
Retrieve body from the HTTP request.
"""
if not context.request or context.request == '{}':
return None
if isinstance(context.request, dict) or isinstance(context.request, set):
return context.request
try: parsed = json.loads(context.request)
except (json.JSONDecodeError, TypeError): parsed = context.request
return parsed
@property
def entity_id(self) -> str:
if _cid := self.config_utils.get_openid_credential_issuer().credential_issuer:
return _cid
else:
return self._backend_url
@property
def status_endpoint(self) -> str | None:
try:
status_path = self.config_utils.get_credential_configurations().status_list.path
status_path = status_path.lstrip("/")
return f"{self._backend_url}/{status_path}"
except AttributeError:
return None
@property
def dpop_required(self) -> bool:
"""
Check if DPoD is required.
Returns:
bool: True if DPoD is required, False otherwise. Defaults to True.
"""
return self.config.get("security", {}).get("dpop_required", True)
@property
def wallet_attestation_required(self) -> bool:
"""
Check if wallet attestation is required.
Returns:
bool: True if wallet_attestation is required, False otherwise. Defaults to True.
"""
return self.config.get("security", {}).get("wallet_attestation_required", True)
@property
def dpop_signing_alg_values_supported(self) -> list[str] | None:
"""
Get the supported DPoP signing algorithms.
Returns:
list[str]: List of supported DPoP signing algorithms.
"""
authz_server = self.config_utils.get_oauth_authorization_server()
if authz_server:
return authz_server.dpop_signing_alg_values_supported
return None
@property
def signed_par_request(self) -> str:
"""
Check if signed par request is required.
Returns:
str: "true", "false" or "both". Defaults to "true".
"""
return self.config.get("security", {}).get("signed_par_request", "true").lower()