Source code for pyeudiw.satosa.frontends.openid4vci.models.token_request

import logging
from hashlib import sha256, sha512
from typing import Optional

from pydantic import model_validator

from pyeudiw.satosa.exceptions import InvalidRequestException
from pyeudiw.satosa.frontends.openid4vci.models.openid4vci_basemodel import OpenId4VciBaseModel

logger = logging.getLogger(__name__)

AUTHORIZATION_CODE_GRANT = "authorization_code"
REFRESH_TOKEN_GRANT = "refresh_token" # nosec B105

REDIRECT_URI_CTX = "redirect_uri"
CODE_CHALLENGE_METHOD_CTX = "code_challenge_method"
CODE_CHALLENGE_CTX = "code_challenge"
SCOPE_CTX = "scope"

TOKEN_ENDPOINT = "token" # nosec B105

[docs] class TokenRequest(OpenId4VciBaseModel): """ Represents a request to the token endpoint in an OpenID4VCI flow. Fields: grant_type (str): Either 'authorization_code' or 'refresh_token'. code (str): Required for 'authorization_code' grant type. redirect_uri (str): Required for 'authorization_code' grant type. code_verifier (str): Required for PKCE validation. refresh_token (str): Required for 'refresh_token' grant type. scope (Optional[str]): Optional, only valid for 'refresh_token'. Validation Logic: - Validates `grant_type` is one of the supported values. - Enforces required fields depending on the grant type. - Validates PKCE `code_verifier` against the challenge using SHA256 or SHA512. - Ensures `scope` values are in the allowed configuration list. - Raises `InvalidRequestException` with appropriate messages if checks fail. """ grant_type: str = None code: Optional[str] = None redirect_uri: Optional[str] = None code_verifier: Optional[str] = None refresh_token: Optional[str] = None scope: Optional[str] = None
[docs] @model_validator(mode='after') def check_token_request(self) -> "TokenRequest": self.validate_grant_type() is_authorization_code_grant = self.grant_type == AUTHORIZATION_CODE_GRANT self.validate_code(is_authorization_code_grant) self.validate_redirect_uri(is_authorization_code_grant) self.validate_code_verifier(is_authorization_code_grant) self.validate_refresh_token(is_authorization_code_grant) self.validate_scope(is_authorization_code_grant) return self
[docs] def validate_scope(self, is_authorization_code_grant): self.scope = self.strip(self.scope) if is_authorization_code_grant: self.check_unexpected_parameter(self.scope, "scope", TOKEN_ENDPOINT) elif self.scope: scopes = self.scope.split(" ") par_scope_ctx = self.get_ctx(SCOPE_CTX) par_scopes = par_scope_ctx.split(" ") if par_scope_ctx is not None else None for s in scopes: if s not in self.get_config().metadata.oauth_authorization_server.scopes_supported: logger.error(f"invalid scope value '{s}' in `token` endpoint") raise InvalidRequestException(f"invalid scope value '{s}'") elif par_scopes and (s not in par_scopes): logger.error(f"invalid scope in `token` endpoint: value '{s}' not present in previous `par` endpoint") raise InvalidRequestException(f"invalid scope value '{s}'")
[docs] def validate_refresh_token(self, is_authorization_code_grant): self.refresh_token = self.strip(self.refresh_token) if is_authorization_code_grant: self.check_unexpected_parameter(self.refresh_token, "refresh_token", TOKEN_ENDPOINT) else: self.check_missing_parameter(self.refresh_token, "refresh_token", TOKEN_ENDPOINT)
[docs] def validate_code_verifier(self, is_authorization_code_grant): self.code_verifier = self.strip(self.code_verifier) if is_authorization_code_grant: self.check_missing_parameter(self.code_verifier, "code_verifier", TOKEN_ENDPOINT) match self.get_ctx(CODE_CHALLENGE_METHOD_CTX).upper(): case "S256": code_verifier_encode = sha256(self.code_verifier.encode('utf-8')).hexdigest() case "S512": code_verifier_encode = sha512(self.code_verifier.encode('utf-8')).hexdigest() case _: logger.error( f"unexpected code_challenge_method {self.get_ctx(CODE_CHALLENGE_METHOD_CTX)} for code_verifier in token request") raise InvalidRequestException("Invalid `code_verifier`") if code_verifier_encode != self.get_ctx(CODE_CHALLENGE_CTX): logger.error( f"Invalid `code_verifier` {code_verifier_encode} in token request with authorization_code as `grant_type`") raise InvalidRequestException("Invalid `code_verifier`") else: self.check_unexpected_parameter(self.code_verifier, "code_verifier", TOKEN_ENDPOINT)
[docs] def validate_redirect_uri(self, is_authorization_code_grant): self.redirect_uri = self.strip(self.redirect_uri) if is_authorization_code_grant: self.check_missing_parameter(self.redirect_uri, "redirect_uri", TOKEN_ENDPOINT) if self.get_ctx(REDIRECT_URI_CTX) != self.redirect_uri: logger.error("Invalid `redirect_uri` in token request with authorization_code as `grant_type`") raise InvalidRequestException("Invalid `redirect_uri`") else: self.check_unexpected_parameter(self.redirect_uri, "redirect_uri", TOKEN_ENDPOINT)
[docs] def validate_grant_type(self): self.grant_type = self.strip(self.grant_type) self.check_missing_parameter(self.grant_type, "grant_type", TOKEN_ENDPOINT) if self.grant_type not in [AUTHORIZATION_CODE_GRANT, REFRESH_TOKEN_GRANT]: logger.error(f"Invalid `grant_type` {self.grant_type} in token request") raise InvalidRequestException("invalid `grant_type`")
[docs] def validate_code(self, is_authorization_code_grant: bool): self.code = self.strip(self.code) if is_authorization_code_grant: self.check_missing_parameter(self.code, "code", TOKEN_ENDPOINT) else: self.check_unexpected_parameter(self.code, "code", TOKEN_ENDPOINT)