Source code for pyeudiw.satosa.backends.openid4vp.schemas.wallet_metadata

from typing import Optional, Dict, List
from urllib.parse import urlparse

from pydantic import BaseModel, field_validator

RESPONSE_MODES_SUPPORTED_CTX = "valid_response_mode_supported"
VP_FORMATS_SUPPORTED_CTX = "valid_vp_formats"
CLIENT_ID_SCHEMES_SUPPORTED_CTX = "valid_client_id_schemes_supported"
REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED = "valid_request_object_signing_alg_values_supported"

# TODO: Move this to a global file
_default_supported_algorithms = [
    "RS256",
    "RS384",
    "RS512",
    "ES256",
    "ES384",
    "ES512",
    "PS256",
    "PS384",
    "PS512",
]

_default_response_types_supported = "vp_token"
_default_client_id_schemes_supported = "http"

[docs] class WalletMetadata(BaseModel): vp_formats_supported: Dict[str, Dict[str, List[str]]] alg_values_supported: Optional[List[str]] = None client_id_schemes_supported: Optional[List[str]] = None authorization_endpoint: Optional[str] = None request_object_signing_alg_values_supported: Optional[List[str]] = None response_types_supported: Optional[list[str]] = None response_modes_supported: Optional[list[str]] = None
[docs] @field_validator("alg_values_supported", mode="before") def validate_alg_values_supported(cls, v): if isinstance(v, str) and v in _default_supported_algorithms: return [v] elif isinstance(v, list): return [alg for alg in v if alg in _default_supported_algorithms] elif v is None: return _default_supported_algorithms else: raise ValueError("Invalid value for alg_values_supported")
[docs] @field_validator("authorization_endpoint", mode="before") def validate_authorization_endpoint(cls, v): try: parsed_redirect_uri = urlparse(v) if not parsed_redirect_uri.scheme or not parsed_redirect_uri.netloc or not parsed_redirect_uri.path: raise ValueError("Invalid value for authorization_endpoint") return v except Exception: raise ValueError("Invalid value for authorization_endpoint")
[docs] @field_validator("response_modes_supported", mode="before") def validate_response_modes_supported(cls, v, info): valid = (info.context or {}).get(RESPONSE_MODES_SUPPORTED_CTX) if not valid: return [v] if isinstance(v, str) else v elif isinstance(v, str) and v == valid: return [v] elif isinstance(v, list): return cls._valid_element_list(v, valid, "response_modes_supported") elif v is None: return [valid] else: raise ValueError("Invalid value for response_modes_supported")
[docs] @field_validator("response_types_supported", mode="before") def validate_response_types_supported(cls, v): if isinstance(v, str) and v == _default_response_types_supported: return [v] elif isinstance(v, list): return cls._valid_element_list(v, _default_response_types_supported, "response_types_supported") elif v is None: return [_default_response_types_supported] else: raise ValueError("Invalid value for response_types_supported")
[docs] @field_validator("vp_formats_supported", mode="before") def validate_vp_formats_supported(cls, v, info): valid = (info.context or {}).get(VP_FORMATS_SUPPORTED_CTX) if not valid: return v else: filtered_vp_formats = { k: v for k, v in v.items() if k in valid } if not filtered_vp_formats: raise ValueError("Invalid value for response_modes_supported") return filtered_vp_formats
[docs] @field_validator("client_id_schemes_supported", mode="before") def validate_client_id_schemes_supported(cls, v, info): valid = (info.context or {}).get(CLIENT_ID_SCHEMES_SUPPORTED_CTX) if not valid: return [v] if isinstance(v, str) else v elif isinstance(v, str) and v in valid: return [v] elif isinstance(v, list): return cls._valid_element_list(v, valid, "client_id_schemes_supported") elif v is None: return valid else: raise ValueError("Invalid value for client_id_schemes_supported")
[docs] @field_validator("request_object_signing_alg_values_supported", mode="before") def validate_request_object_signing_alg_values_supported_supported(cls, v, info): valid = (info.context or {}).get(REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED) if not valid: return [v] if isinstance(v, str) else v elif isinstance(v, str) and v in valid: return [v] elif isinstance(v, list): return cls._valid_element_list(v, valid, "request_object_signing_alg_values_supported") elif v is None: return valid else: raise ValueError("Invalid value for request_object_signing_alg_values_supported")
@staticmethod def _valid_element_list(v: list, expected_value: str|list, field_name: str): if len(v) == 0: return [expected_value] if isinstance(expected_value, str) else expected_value filtered = [mode for mode in v if (mode == expected_value if isinstance(expected_value, str) else mode in expected_value)] if not filtered or len(filtered) == 0: raise ValueError(f"Invalid value for {field_name}") return filtered
[docs] class WalletPostRequest(BaseModel): wallet_metadata: Optional[WalletMetadata] = None wallet_nonce: Optional[str] = None