Source code for pyeudiw.satosa.backends.openid4vp.openid4vp

import logging
from typing import Callable, Any

from satosa.backends.base import BackendModule
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Response

from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.tools.endpoints_loader import EndpointsLoader
from pyeudiw.trust.dynamic import CombinedTrustEvaluator
from pyeudiw.trust.handler.interface import TrustHandlerInterface

logger = logging.getLogger(__name__)

[docs] class OpenID4VPBackend(BackendModule): def __init__( self, auth_req_callback_func: Callable[[Context, InternalData], Response], internal_attributes: dict[str, dict[str, str | list[str]]], config: dict[str, Any], base_url: str, name: str, ) -> None: """ Initialize the OpenID4VP backend module. :param auth_req_callback_func: Function to handle authentication requests. :param internal_attributes: Internal attributes mapping. :param config: Configuration dictionary for the backend. :param base_url: Base URL for the backend. :param name: Name of the backend module. """ super().__init__(auth_req_callback_func, internal_attributes, base_url, name) self.config = config self.base_url = base_url self.name = name if self.config["authorization"].get("client_id"): self.client_id = self.config["authorization"]["client_id"] elif self.config["metadata"].get("client_id"): self.client_id = self.config["metadata"]["client_id"] else: self.client_id = f"{base_url}/{name}" self._backend_url = f"{base_url}/{name}" self.storage_settings = self.config.get("storage", {}) if not self.storage_settings: raise ValueError( "Storage settings are not configured. Please check your configuration." ) # Initialize the database engine self.db_engine = DBEngine(self.storage_settings) # This loads all the configured trust evaluation mechanisms trust_configuration = self.config.get("trust", {}) trust_caching_mode = self.config.get("trust_caching_mode", "update_first") self.trust_evaluator = CombinedTrustEvaluator.from_config( trust_configuration, self.db_engine, default_client_id = self.client_id, mode = trust_caching_mode ) self.endpoints = {}
[docs] def register_endpoints(self, **kwargs): """ See super class satosa.backends.base.BackendModule :rtype: list[(str, ((satosa.context.Context, Any) -> satosa.response.Response, Any))] :raise ValueError: if more than one backend is configured """ el = EndpointsLoader( self.config, self.internal_attributes, self.base_url, self.name, self.auth_callback_func, self.converter, self.trust_evaluator) url_map = [] for path, inst in el.endpoint_instances.items(): url_map.append((f"{self.name}/{path}", inst)) metadata_map = self.trust_evaluator.build_metadata_endpoints( self.name, self._backend_url ) url_map.extend(metadata_map) for path, inst in url_map: self.endpoints[f"{path.split('/')[-1].replace('-', '_').replace('$', '')}"] = inst logger.debug(f"Loaded OpenID4VP endpoints: {url_map}") return url_map
[docs] def start_auth(self, context: Context, internal_request) -> Response: """ This is the start up function of the backend authorization. :param context: the request context :type context: satosa.context.Context :param internal_request: Information about the authorization request :type internal_request: satosa.internal.InternalData :rtype: satosa.response.Response :return: response """ pre_request_endpoint = self.endpoints.get("pre_request") if not pre_request_endpoint: raise ValueError("No pre-request endpoint configured in the OpenID4VP backend") return pre_request_endpoint(context)
[docs] def get_trust_backend_by_class_name(self, class_name: str) -> TrustHandlerInterface | None: for i in self.trust_evaluator.handlers: if i.__class__.__name__ == class_name: return i