Source code for pyeudiw.satosa.frontends.openid4vci.endpoints.metadata_endpoint
import json
from satosa.context import Context
from satosa.response import Response
from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.satosa.frontends.openid4vci.endpoints.vci_base_endpoint import VCIBaseEndpoint
from pyeudiw.tools.content_type import APPLICATION_JSON, ENTITY_STATEMENT_JWT
from pyeudiw.tools.utils import exp_from_now, iat_now
[docs]
class MetadataHandler(VCIBaseEndpoint):
def __init__(self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, *args):
"""
Initialize the OpenID4VCI metadata endpoint class.
Args:
config (dict): The configuration dictionary.
internal_attributes (dict): The internal attributes config.
base_url (str): The base URL of the service.
name (str): The name of the SATOSA module to append to the URL.
"""
self.federation_config = config.get("trust", {}).get("federation", {}).get("config", {})
super().__init__(config, internal_attributes, base_url, name)
self.metadata_jwks = config.get("metadata_jwks", [])
def _ensure_credential_issuer(self, metadata: dict, metadata_key: str, issuer_key: str):
metadata_val = metadata.get(metadata_key)
if metadata_val and isinstance(metadata_val, dict) and not metadata_val.get(issuer_key):
metadata_val[issuer_key] = self._backend_url
@property
def metadata(self) -> dict:
metadata = self.config.get("metadata", {})
[
self._ensure_credential_issuer(metadata, metadata_key, issuer_key)
for mapping_config in self.credential_configuration.ensure_credential_issuer
if isinstance(mapping_config, dict)
for metadata_key, issuer_key in mapping_config.items()
]
return metadata
@property
def entity_configuration_as_dict(self) -> dict:
"""Returns the entity configuration as a dictionary."""
ec_payload = {
"exp": exp_from_now(minutes=self.federation_config.get("entity_configuration_exp")),
"iat": iat_now(),
"iss": self.entity_id,
"sub": self.entity_id,
"jwks": {"keys": self.metadata_jwks},
"metadata": self.metadata,
"authority_hints": self.federation_config.get("authority_hints", []),
}
return ec_payload
@property
def entity_configuration(self) -> str:
"""
Returns the entity configuration as a JWT.
:return: The entity configuration
:rtype: str
"""
data = self.entity_configuration_as_dict
_jwk = self.metadata_jwks[0]
jwshelper = JWSHelper(_jwk)
return jwshelper.sign(
protected={
"alg": self.federation_config.get("default_sig_alg"),
"kid": _jwk["kid"],
"typ": "entity-statement+jwt",
},
plain_dict=data,
)
[docs]
def endpoint(self, context: Context) -> Response:
"""
Handle request to the metadata endpoint.
Args:
context (Context): The SATOSA context.
Returns:
A Response object.
"""
is_json = context.qs_params.get("format", "") == "json"
return Response(
json.dumps(self.entity_configuration_as_dict) if is_json else self.entity_configuration,
status="200",
content=APPLICATION_JSON if is_json else ENTITY_STATEMENT_JWT,
)
def _validate_configs(self):
credential_configuration = self.config_utils.get_credential_configurations()
if not credential_configuration:
self._validate_required_configs(
[
("credential_configurations", credential_configuration),
]
)
self.credential_configuration = credential_configuration