Source code for pyeudiw.satosa.frontends.openid4vci.models.par_request

import logging
from typing import List, Optional
from urllib.parse import urlparse

from pydantic import model_validator

from pyeudiw.satosa.exceptions import InvalidRequestException
from pyeudiw.satosa.frontends.openid4vci.models.auhtorization_detail import AuthorizationDetail
from pyeudiw.satosa.frontends.openid4vci.models.openid4vci_basemodel import (
    OpenId4VciBaseModel,
    CONFIG_CTX,
    CLIENT_ID_CTX,
    ENDPOINT_CTX,
    ENTITY_ID_CTX
)
from pyeudiw.tools.date import is_valid_unix_timestamp

logger = logging.getLogger(__name__)

[docs] class ParRequest(OpenId4VciBaseModel): state: str = None scope: str = None client_id: str = None redirect_uri: str = None response_type: str = None code_challenge: str = None code_challenge_method: str = None authorization_details: List[AuthorizationDetail] = None
[docs] @model_validator(mode='after') def check_par_request(self) -> "ParRequest": config = self.get_config_utils().get_oauth_authorization_server() req_client_id = self.get_ctx(CLIENT_ID_CTX) endpoint = self.get_ctx(ENDPOINT_CTX) self.validate_state(endpoint) self.validate_client_id(req_client_id, endpoint) if config.response_types_supported: self.validate_response_type(config.response_types_supported, endpoint) self.validate_code_challenge(endpoint) if config.code_challenge_methods_supported: self.validate_code_challenge_method(config.code_challenge_methods_supported, endpoint) if config.scopes_supported: self.validate_scope(config.scopes_supported, endpoint) self.validate_authorization_details(endpoint) if not self.scope and (not self.authorization_details or len(self.authorization_details) == 0): raise InvalidRequestException("Missing `scope` and `authorization_details` in `par` endpoint") self.validate_redirect_uri(endpoint) return self
[docs] def validate_redirect_uri(self, endpoint: str): self.redirect_uri = self.strip(self.redirect_uri) self.check_missing_parameter(self.redirect_uri, "redirect_uri", endpoint) try: parsed_redirect_uri = urlparse(self.redirect_uri) if not parsed_redirect_uri.scheme or not parsed_redirect_uri.netloc or not parsed_redirect_uri.path: logger.error(f"invalid redirect_uri value '{self.redirect_uri}' in `{endpoint}` endpoint") raise InvalidRequestException("invalid `redirect_uri` parameter") except Exception as e: logger.error(f"invalid redirect_uri value '{self.redirect_uri}' in `{endpoint}` endpoint: {e}") raise InvalidRequestException("invalid `redirect_uri` parameter")
[docs] def validate_authorization_details(self, endpoint: str): if self.authorization_details: for ad in self.authorization_details: AuthorizationDetail.model_validate(ad, context = { CONFIG_CTX: self.get_config(), ENDPOINT_CTX: endpoint })
[docs] def validate_state(self, endpoint: str): self.state = self.strip(self.state) self.check_missing_parameter(self.state, "state", endpoint) if len(self.state) < 32 or not self.state.isalnum(): logger.error(f"invalid state {self.state} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `state` parameter")
[docs] def validate_client_id(self, req_client_id: str, endpoint: str): self.client_id = self.strip(self.client_id) self.check_missing_parameter(self.client_id, "client_id", endpoint) if self.client_id != req_client_id: logger.error(f"invalid request client_id {self.client_id} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `client_id` parameter")
[docs] def validate_response_type(self, response_types_supported: list[str], endpoint: str): self.response_type = self.strip(self.response_type) self.check_missing_parameter(self.response_type, "response_type", endpoint) if self.response_type not in response_types_supported: logger.error(f"invalid response type {self.response_type} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `response_type` parameter")
[docs] def validate_code_challenge(self, endpoint: str): self.code_challenge = self.strip(self.code_challenge) self.check_missing_parameter(self.code_challenge, "code_challenge", endpoint)
[docs] def validate_code_challenge_method(self, code_challenge_methods_supported: list[str], endpoint: str): self.code_challenge_method = self.strip(self.code_challenge_method) self.check_missing_parameter(self.code_challenge_method, "code_challenge_method", endpoint) if self.code_challenge_method not in code_challenge_methods_supported: logger.error(f"invalid code_challenge_method {self.code_challenge_method} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `code_challenge_method` parameter")
[docs] def validate_scope(self, scopes_supported: list[str], endpoint: str): self.scope = self.strip(self.scope) if self.scope: scopes = self.scope.split(" ") for s in scopes: if s not in scopes_supported: logger.error(f"invalid scope value '{s}' in `{endpoint}` endpoint") raise InvalidRequestException("invalid `scope` parameter")
[docs] class SignedParRequest(OpenId4VciBaseModel): iss: str = None aud: str = None exp: int = None iat: int = None response_type: str = None response_mode: str = None client_id: str = None state: str = None code_challenge: str = None code_challenge_method: str = None scope: Optional[str] = None authorization_details: Optional[List[AuthorizationDetail]] = None redirect_uri: str = None jti: str = None issuer_state: str = None
[docs] @model_validator(mode='after') def check_par_request(self) -> "ParRequest": config = self.get_config_utils().get_oauth_authorization_server() req_client_id = self.get_ctx(CLIENT_ID_CTX) endpoint = self.get_ctx(ENDPOINT_CTX) self.validate_iss(req_client_id, endpoint) self.validate_aud(endpoint) self.validate_state(endpoint) self.validate_client_id(req_client_id, endpoint) if not is_valid_unix_timestamp(self.exp, None): logger.error(f"invalid exp {self.exp} in request `{endpoint}` endpoint") raise InvalidRequestException("invalid `exp` parameter") if not is_valid_unix_timestamp(self.iat): logger.error(f"invalid iat {self.iat} in request `{endpoint}` endpoint") raise InvalidRequestException("invalid `iat` parameter") if int(self.exp) - int(self.iat) > 300: logger.error(f"expired request token in `{endpoint}` endpoint") raise InvalidRequestException("expired token") self.validate_response_type(config.response_types_supported, endpoint) self.validate_response_mode(config.response_modes_supported, endpoint) self.validate_code_challenge(endpoint) self.validate_code_challenge_method(config.code_challenge_methods_supported, endpoint) self.validate_scope(config.scopes_supported, endpoint) self.validate_authorization_details(endpoint) if not self.scope and (not self.authorization_details or len(self.authorization_details) == 0): raise InvalidRequestException("Missing `scope` and `authorization_details` in `par` endpoint") self.validate_redirect_uri(endpoint) self.validate_jti(endpoint) return self
[docs] def validate_authorization_details(self, endpoint: str): if self.authorization_details: for ad in self.authorization_details: AuthorizationDetail.model_validate(ad, context = { CONFIG_CTX: self.get_config(), ENDPOINT_CTX: endpoint })
[docs] def validate_scope(self, scopes_supported: list[str], endpoint: str): self.scope = self.strip(self.scope) if self.scope: scopes = self.scope.split(" ") for s in scopes: if s not in scopes_supported: logger.error(f"invalid scope value '{s}' in `{endpoint}` endpoint") raise InvalidRequestException("invalid `scope` parameter")
[docs] def validate_code_challenge(self, endpoint: str): self.code_challenge = self.strip(self.code_challenge) self.check_missing_parameter(self.code_challenge, "code_challenge", endpoint)
[docs] def validate_code_challenge_method(self, code_challenge_methods_supported: list[str], endpoint: str): self.code_challenge_method = self.strip(self.code_challenge_method) self.check_missing_parameter(self.code_challenge_method, "code_challenge_method", endpoint) if self.code_challenge_method not in code_challenge_methods_supported: logger.error(f"invalid code_challenge_method {self.code_challenge_method} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `code_challenge_method` parameter")
[docs] def validate_response_mode(self, response_modes_supported: list[str], endpoint: str): self.response_mode = self.strip(self.response_mode) self.check_missing_parameter(self.response_mode, "response_mode", endpoint) if self.response_mode not in response_modes_supported: logger.error(f"invalid response_mode {self.response_mode} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `response_mode` parameter")
[docs] def validate_response_type(self, response_types_supported: list[str], endpoint: str): self.response_type = self.strip(self.response_type) self.check_missing_parameter(self.response_type, "response_type", endpoint) if self.response_type not in response_types_supported: logger.error(f"invalid response type {self.response_type} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `response_type` parameter")
[docs] def validate_client_id(self, req_client_id: str, endpoint: str): self.client_id = self.strip(self.client_id) self.check_missing_parameter(self.client_id, "client_id", endpoint) if self.client_id != req_client_id: logger.error(f"invalid request client_id {self.client_id} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `client_id` parameter")
[docs] def validate_state(self, endpoint: str): self.state = self.strip(self.state) self.check_missing_parameter(self.state, "state", endpoint) if len(self.state) < 32 or not self.state.isalnum(): logger.error(f"invalid state {self.state} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `state` parameter")
[docs] def validate_iss(self, req_client_id: str, endpoint: str): self.iss = self.strip(self.iss) self.check_missing_parameter(self.iss, "iss", endpoint) if self.iss != req_client_id: logger.error(f"invalid request iss {self.iss} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `iss` parameter")
[docs] def validate_aud(self, endpoint: str): self.aud = self.strip(self.aud) self.check_missing_parameter(self.aud, "aud", endpoint) if self.aud != self.get_ctx(ENTITY_ID_CTX): logger.error(f"invalid request `aud` {self.aud} in `{endpoint}` endpoint") raise InvalidRequestException("invalid `aud` parameter")
[docs] def validate_redirect_uri(self, endpoint: str): self.redirect_uri = self.strip(self.redirect_uri) self.check_missing_parameter(self.redirect_uri, "redirect_uri", endpoint) try: parsed_redirect_uri = urlparse(self.redirect_uri) if not parsed_redirect_uri.scheme or not parsed_redirect_uri.netloc or not parsed_redirect_uri.path: logger.error(f"invalid redirect_uri value '{self.redirect_uri}' in `{endpoint}` endpoint") raise InvalidRequestException("invalid `redirect_uri` parameter") except Exception as e: logger.error(f"invalid redirect_uri value '{self.redirect_uri}' in `{endpoint}` endpoint: {e}") raise InvalidRequestException("invalid `redirect_uri` parameter")
[docs] def validate_jti(self, endpoint: str): self.jti = self.strip(self.jti) self.check_missing_parameter(self.jti, "jti", endpoint) if self.iss not in self.jti: logger.error(f"invalid jti {self.jti} in request `{endpoint}` endpoint") raise InvalidRequestException("invalid `jti` parameter") if len(self.jti) - len(self.iss) == 0: logger.error(f"invalid jti {self.jti} in request `{endpoint}` endpoint") raise InvalidRequestException("invalid `jti` parameter")