Source code for pyeudiw.satosa.backends.openid4vp.schemas.wallet_metadata
from typing import Optional, Dict, List
from urllib.parse import urlparse
from pydantic import BaseModel, field_validator
RESPONSE_MODES_SUPPORTED_CTX = "valid_response_mode_supported"
VP_FORMATS_SUPPORTED_CTX = "valid_vp_formats"
CLIENT_ID_SCHEMES_SUPPORTED_CTX = "valid_client_id_schemes_supported"
REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED = "valid_request_object_signing_alg_values_supported"
# TODO: Move this to a global file
_default_supported_algorithms = [
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512",
"PS256",
"PS384",
"PS512",
]
_default_response_types_supported = "vp_token"
_default_client_id_schemes_supported = "http"
[docs]
class WalletMetadata(BaseModel):
vp_formats_supported: Dict[str, Dict[str, List[str]]]
alg_values_supported: Optional[List[str]] = None
client_id_schemes_supported: Optional[List[str]] = None
authorization_endpoint: Optional[str] = None
request_object_signing_alg_values_supported: Optional[List[str]] = None
response_types_supported: Optional[list[str]] = None
response_modes_supported: Optional[list[str]] = None
[docs]
@field_validator("alg_values_supported", mode="before")
def validate_alg_values_supported(cls, v):
if isinstance(v, str) and v in _default_supported_algorithms:
return [v]
elif isinstance(v, list):
return [alg for alg in v if alg in _default_supported_algorithms]
elif v is None:
return _default_supported_algorithms
else:
raise ValueError("Invalid value for alg_values_supported")
[docs]
@field_validator("authorization_endpoint", mode="before")
def validate_authorization_endpoint(cls, v):
try:
parsed_redirect_uri = urlparse(v)
if not parsed_redirect_uri.scheme or not parsed_redirect_uri.netloc or not parsed_redirect_uri.path:
raise ValueError("Invalid value for authorization_endpoint")
return v
except Exception:
raise ValueError("Invalid value for authorization_endpoint")
[docs]
@field_validator("response_modes_supported", mode="before")
def validate_response_modes_supported(cls, v, info):
valid = (info.context or {}).get(RESPONSE_MODES_SUPPORTED_CTX)
if not valid:
return [v] if isinstance(v, str) else v
elif isinstance(v, str) and v == valid:
return [v]
elif isinstance(v, list):
return cls._valid_element_list(v, valid, "response_modes_supported")
elif v is None:
return [valid]
else:
raise ValueError("Invalid value for response_modes_supported")
[docs]
@field_validator("response_types_supported", mode="before")
def validate_response_types_supported(cls, v):
if isinstance(v, str) and v == _default_response_types_supported:
return [v]
elif isinstance(v, list):
return cls._valid_element_list(v, _default_response_types_supported, "response_types_supported")
elif v is None:
return [_default_response_types_supported]
else:
raise ValueError("Invalid value for response_types_supported")
[docs]
@field_validator("vp_formats_supported", mode="before")
def validate_vp_formats_supported(cls, v, info):
valid = (info.context or {}).get(VP_FORMATS_SUPPORTED_CTX)
if not valid:
return v
else:
filtered_vp_formats = {
k: v for k, v in v.items() if k in valid
}
if not filtered_vp_formats:
raise ValueError("Invalid value for response_modes_supported")
return filtered_vp_formats
[docs]
@field_validator("client_id_schemes_supported", mode="before")
def validate_client_id_schemes_supported(cls, v, info):
valid = (info.context or {}).get(CLIENT_ID_SCHEMES_SUPPORTED_CTX)
if not valid:
return [v] if isinstance(v, str) else v
elif isinstance(v, str) and v in valid:
return [v]
elif isinstance(v, list):
return cls._valid_element_list(v, valid, "client_id_schemes_supported")
elif v is None:
return valid
else:
raise ValueError("Invalid value for client_id_schemes_supported")
[docs]
@field_validator("request_object_signing_alg_values_supported", mode="before")
def validate_request_object_signing_alg_values_supported_supported(cls, v, info):
valid = (info.context or {}).get(REQUEST_OBJ_SIG_ALG_VALUES_SUPPORTED)
if not valid:
return [v] if isinstance(v, str) else v
elif isinstance(v, str) and v in valid:
return [v]
elif isinstance(v, list):
return cls._valid_element_list(v, valid, "request_object_signing_alg_values_supported")
elif v is None:
return valid
else:
raise ValueError("Invalid value for request_object_signing_alg_values_supported")
@staticmethod
def _valid_element_list(v: list, expected_value: str|list, field_name: str):
if len(v) == 0:
return [expected_value] if isinstance(expected_value, str) else expected_value
filtered = [mode for mode in v if
(mode == expected_value if isinstance(expected_value, str) else mode in expected_value)]
if not filtered or len(filtered) == 0:
raise ValueError(f"Invalid value for {field_name}")
return filtered
[docs]
class WalletPostRequest(BaseModel):
wallet_metadata: Optional[WalletMetadata] = None
wallet_nonce: Optional[str] = None