Source code for pyeudiw.satosa.backends.openid4vp.vp_sd_jwt_vc

from pyeudiw.jwt.helper import is_jwt_expired
from pyeudiw.jwt.utils import decode_jwt_header, decode_jwt_payload
from pyeudiw.satosa.backends.openid4vp.exceptions import MissingIssuer, VPRevoked, VPExpired
from pyeudiw.satosa.backends.openid4vp.presentation_submission.base_vp_parser import BaseVPParser
from pyeudiw.sd_jwt.schema import VerifierChallenge
from pyeudiw.sd_jwt.schema import is_sd_jwt_kb_format
from pyeudiw.sd_jwt.sd_jwt import SdJwt
from pyeudiw.status_list.helper import StatusListTokenHelper
from pyeudiw.trust.dynamic import CombinedTrustEvaluator


[docs] class VpVcSdJwtParserVerifier(BaseVPParser): def __init__(self, trust_evaluator: CombinedTrustEvaluator, sig_alg_supported: list[str] = [], **kwargs) -> None: """ Initialize the VpVcSdJwtParserVerifier with the trust evaluator. :param trust_evaluator: The trust evaluator instance. :type trust_evaluator: CombinedTrustEvaluator :param sig_alg_supported: List of supported signature algorithms. :type sig_alg_supported: list[str] """ self.sig_alg_supported = sig_alg_supported super().__init__(trust_evaluator, **kwargs) def _get_issuer_name(self, sdjwt: SdJwt) -> str: """ Get the issuer name from the token payload. :raises MissingIssuer: if the issuer name is missing in the token payload :return: the issuer name :rtype: str """ iss = sdjwt.get_issuer_jwt().payload.get("iss", None) if not iss: raise MissingIssuer("missing required information in token paylaod: [iss]") return iss
[docs] def parse(self, token: str) -> dict: sdjwt = SdJwt(token) return sdjwt.get_disclosed_claims()
def _is_revoked(self) -> bool: # TODO: implement revocation check return False
[docs] def validate( self, token: str, verifier_id: str, verifier_nonce: str, ) -> None: # precomputed values if not is_sd_jwt_kb_format(token): raise ValueError("Token is not in the expected format") sdjwt = SdJwt(token) static_trust_materials = {} header = decode_jwt_header(token) alg = header.get("alg", None) if alg not in self.sig_alg_supported: raise ValueError(f"Unsupported algorithm: {alg}") if "x5c" in header: static_trust_materials["x5c"] = header["x5c"] if "trust_chain" in header: static_trust_materials["trust_chain"] = header["trust_chain"] public_keys = self.trust_evaluator.get_public_keys(self._get_issuer_name(sdjwt), static_trust_materials) sdjwt.verify_issuer_jwt_signature(public_keys) challenge: VerifierChallenge = {} challenge["aud"] = verifier_id challenge["nonce"] = verifier_nonce sdjwt.verify_holder_kb_jwt(challenge) if is_jwt_expired(sdjwt.issuer_jwt.jwt): raise VPExpired("VP is expired") payload = decode_jwt_payload(token) if "status" in payload and "status_list" in payload["status"]: status_list = StatusListTokenHelper.from_status(payload["status"]) if status_list.is_expired() or status_list.get_status(payload["status"]["status_list"]["idx"]) > 0: raise VPRevoked("Status list indicates that the token is revoked")