Source code for pyeudiw.satosa.backends.openid4vp.endpoints.status_endpoint

from typing import Callable

from satosa.attribute_mapping import AttributeMapper
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Redirect, Response

from pyeudiw.satosa.backends.openid4vp.endpoints.vp_base_endpoint import VPBaseEndpoint
from pyeudiw.satosa.utils.respcode import ResponseCodeSource
from pyeudiw.satosa.utils.response import JsonResponse
from pyeudiw.tools.utils import iat_now
from pyeudiw.trust.dynamic import CombinedTrustEvaluator


[docs] class StatusHandler(VPBaseEndpoint): def __init__( self, config: dict, internal_attributes: dict[str, dict[str, str | list[str]]], base_url: str, name: str, auth_callback_func: Callable[[Context, InternalData], Response], converter: AttributeMapper, trust_evaluator: CombinedTrustEvaluator, db_engine=None, ) -> None: """ Initialize the AuthorizationHandler with the given configuration, internal attributes, base URL, and name. :param config: Configuration dictionary for the handler. :param internal_attributes: Internal attributes mapping. :param base_url: Base URL for the handler. :param name: Name of the handler. :raises ValueError: If storage or QR code settings are not configured. """ super().__init__(config, internal_attributes, base_url, name, auth_callback_func, converter, trust_evaluator, db_engine) self.registered_get_response_endpoint = f"{self.client_id}/get_response" self.response_code_helper = ResponseCodeSource(self.config["response_code"]["sym_key"])
[docs] def endpoint(self, context: Context) -> Redirect | Response: """ This endpoint is called by the User-Agent/Wallet Instance to check the status of the request. :type context: the context of current request :param context: the request context :return: a response containing the status of the request :rtype: satosa.response.Response """ self._log_function_debug("status_endpoint", context) if not context.state or "SESSION_ID" not in context.state: return self._handle_400(context, "request error: missing SESSION_ID in context state", ValueError("Missing SESSION_ID in context state")) session_id = context.state["SESSION_ID"] try: if not context.qs_params or "id" not in context.qs_params: raise ValueError("id") state = context.qs_params["id"] if not state: raise ValueError("id") except Exception as e400: return self._handle_400(context, "request error: missing or invalid parameter [id]", e400) try: session = self.db_engine.get_by_state_and_session_id(state=state, session_id=session_id) except Exception as e401: self._log_error(context, f"Error while retrieving session by state {state} and session_id {session_id}: {e401}") return self._handle_401(context, "client error: no session associated to the state", e401) if session is None: return self._handle_401(context, "client error: no session found for the given state and session_id", Exception("Session is None")) request_object = session.get("request_object", None) if request_object: if iat_now() > request_object["exp"]: return self._status_session_expired_response(context) if session.get("finalized"): if session.get("error_response"): return self._status_session_finished_error_response(context, session["error_response"]) return self._status_session_finished_ok_response(state) if request_object is not None: return self._status_session_accepted_response() return self._status_session_created_response()
def _status_session_expired_response(self, context) -> Response: return self._handle_403( context, "request error: request expired", ) def _status_session_finished_ok_response(self, state: str) -> Response: resp_code = self.response_code_helper.create_code(state) return JsonResponse( {"redirect_uri": f"{self.registered_get_response_endpoint}?response_code={resp_code}"}, status="200", ) def _status_session_finished_error_response(self, context, wallet_error: dict) -> Response: self._log_error( context, f"the wallet rejected the authentication attempt and responsed with the following Authorization Response Error: {wallet_error}" ) return JsonResponse( { "error": "authentication_failed", "error_description": ( "The Wallet Instance or its User have rejected the request, " "the request is expired, or other errors prevented the authentication." ), }, status="401", ) def _status_session_accepted_response(self) -> Response: return JsonResponse({"response": "Accepted"}, status="202") def _status_session_created_response(self) -> Response: return JsonResponse({"response": "Request object issued"}, status="201")