Source code for pyeudiw.satosa.backends.openid4vp.endpoints.get_response_endpoint

from typing import Callable

from satosa.attribute_mapping import AttributeMapper
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Redirect, Response

from pyeudiw.satosa.utils.html_template import Jinja2TemplateHandler
from pyeudiw.satosa.utils.respcode import ResponseCodeSource
from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.tools.base_endpoint import BaseEndpoint
from pyeudiw.tools.utils import iat_now
from pyeudiw.trust.dynamic import CombinedTrustEvaluator


[docs] class GetResponseHandler(BaseEndpoint): def __init__( self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, auth_callback_func: Callable[[Context, InternalData], Response], converter: AttributeMapper, trust_evaluator: CombinedTrustEvaluator ) -> None: """ Initialize the GetRequestHandler with the given configuration, internal attributes, base URL, and name. :param config: Configuration dictionary for the handler. :param internal_attributes: Internal attributes mapping. :param base_url: Base URL for the handler. :param name: Name of the handler. :raises ValueError: If storage settings are not configured. """ super().__init__(config, internal_attributes, base_url, name, auth_callback_func, converter) self.storage_settings = self.config.get("storage", {}) if not self.storage_settings: raise ValueError( "Storage settings are not configured. Please check your configuration." ) # Initialize the database engine self.db_engine = DBEngine(self.storage_settings) self.response_code_helper = ResponseCodeSource( self.config["response_code"]["sym_key"] ) # HTML template loader self.template = Jinja2TemplateHandler(self.config["ui"])
[docs] def endpoint(self, context: Context) -> Redirect | Response: """ This endpoint is called by the User-Agent/Wallet Instance after the authorization is done for retrieving the response. :type context: the context of current request :param context: the request context :return: a response containing the response :rtype: satosa.response.Response """ self._log_function_debug("get_response_endpoint", context) resp_code = (context.qs_params or {}).get("response_code", None) if not resp_code: return self._handle_400( context, "request error: missing or invalid parameter [response_code]" ) session_id = context.state.get("SESSION_ID", None) if context.state else None if not session_id: return self._handle_400( context, "request error: session id not found" ) try: state = self.response_code_helper.recover_state(resp_code) except Exception as e400: return self._handle_400( context, "request error: missing or invalid parameter [response_code]", e400 ) finalized_session = None try: finalized_session = self.db_engine.get_by_state_and_session_id( state=state, session_id=session_id ) except Exception as e401: self._log_error( context, f"Error while retrieving internal response with response_code {resp_code} and session_id {session_id}: {e401}" ) return self._handle_401( context, "client error: no session associated to the state", e401 ) if not finalized_session: return self._handle_400( context, "request error: session not finalized" ) _now = iat_now() _exp = finalized_session["request_object"]["exp"] if _exp < _now: return self._handle_400( context, "request error: request expired", ) if finalized_session.get("error_response"): return self._get_response_authorization_error_page(finalized_session["error_response"]) if finalized_session.get("internal_response"): return self._get_response_auth_callback(context, finalized_session["internal_response"]) return self._handle_500( context, "finished authentication at an invalid state", Exception("finished authentication is in an invalid state: neither user data nor error are located in a finished session", finalized_session) )
def _get_response_authorization_error_page(self, wallet_error_response: dict) -> Response: result = self.template.authorization_error_response_page.render({ "error": wallet_error_response.get("error"), "error_description": wallet_error_response.get("error_description") }) return Response(result, content="text/html; charset=utf8", status="401") def _get_response_auth_callback(self, context, internal_resp_data: dict): internal_response = InternalData() resp = internal_response.from_dict(internal_resp_data) if not hasattr(self, "_auth_callback") or self._auth_callback is None: raise AttributeError("The '_auth_callback' method is not defined or is None.") return self._auth_callback(context, resp)