Source code for pyeudiw.satosa.backends.openid4vp.endpoints.get_response_endpoint

from typing import Callable

from satosa.attribute_mapping import AttributeMapper
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Redirect, Response

from pyeudiw.satosa.utils.html_template import Jinja2TemplateHandler
from pyeudiw.satosa.utils.respcode import ResponseCodeSource
from pyeudiw.storage.db_engine import DBEngine
from pyeudiw.tools.base_endpoint import BaseEndpoint
from pyeudiw.tools.utils import iat_now
from pyeudiw.trust.dynamic import CombinedTrustEvaluator


[docs] class GetResponseHandler(BaseEndpoint): def __init__( self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, auth_callback_func: Callable[[Context, InternalData], Response], converter: AttributeMapper, trust_evaluator: CombinedTrustEvaluator, db_engine=None, ) -> None: """ Initialize the GetRequestHandler with the given configuration, internal attributes, base URL, and name. :param config: Configuration dictionary for the handler. :param internal_attributes: Internal attributes mapping. :param base_url: Base URL for the handler. :param name: Name of the handler. :raises ValueError: If storage settings are not configured. """ super().__init__(config, internal_attributes, base_url, name, auth_callback_func, converter) self.storage_settings = self.config.get("storage", {}) if not self.storage_settings: raise ValueError("Storage settings are not configured. Please check your configuration.") # Reuse shared db_engine from backend when provided to avoid multiple MongoClient instances. self.db_engine = db_engine if db_engine is not None else DBEngine(self.storage_settings) self.response_code_helper = ResponseCodeSource(self.config["response_code"]["sym_key"]) # HTML template loader self.template = Jinja2TemplateHandler(self.config["ui"])
[docs] def endpoint(self, context: Context) -> Redirect | Response: """ This endpoint is called by the User-Agent/Wallet Instance after the authorization is done for retrieving the response. :type context: the context of current request :param context: the request context :return: a response containing the response :rtype: satosa.response.Response """ self._log_function_debug("get_response_endpoint", context) resp_code = (context.qs_params or {}).get("response_code", None) if not resp_code: return self._handle_400(context, "request error: missing or invalid parameter [response_code]") session_id = context.state.get("SESSION_ID", None) if context.state else None if not session_id: return self._handle_400(context, "request error: session id not found") try: state = self.response_code_helper.recover_state(resp_code) except Exception as e400: return self._handle_400(context, "request error: missing or invalid parameter [response_code]", e400) finalized_session = None try: finalized_session = self.db_engine.get_by_state_and_session_id(state=state, session_id=session_id) except Exception as e401: self._log_error(context, f"Error while retrieving internal response with response_code {resp_code} and session_id {session_id}: {e401}") return self._handle_401(context, "client error: no session associated to the state", e401) if not finalized_session: return self._handle_400(context, "request error: session not finalized") _now = iat_now() _exp = finalized_session["request_object"]["exp"] if _exp < _now: return self._handle_400( context, "request error: request expired", ) if finalized_session.get("error_response"): return self._get_response_authorization_error_page(finalized_session["error_response"]) if finalized_session.get("internal_response"): return self._get_response_auth_callback(context, finalized_session["internal_response"]) return self._handle_500( context, "finished authentication at an invalid state", Exception("finished authentication is in an invalid state: neither user data nor error are located in a finished session", finalized_session), )
def _get_response_authorization_error_page(self, wallet_error_response: dict) -> Response: result = self.template.authorization_error_response_page.render( {"error": wallet_error_response.get("error"), "error_description": wallet_error_response.get("error_description")} ) return Response(result, content="text/html; charset=utf8", status="401") def _get_response_auth_callback(self, context, internal_resp_data: dict): internal_response = InternalData() resp = internal_response.from_dict(internal_resp_data) if not hasattr(self, "_auth_callback") or self._auth_callback is None: raise AttributeError("The '_auth_callback' method is not defined or is None.") return self._auth_callback(context, resp)