Source code for pyeudiw.satosa.backends.openid4vp.vp_mdoc_cbor

from datetime import datetime, timezone

import cbor2
from cryptography.hazmat.primitives import serialization
from pymdoccbor.mdoc.verifier import MdocCbor

from pyeudiw.satosa.backends.openid4vp.exceptions import MdocCborValidationError, VPRevoked
from pyeudiw.credential_presentation.base_vp_parser import BaseVPParser
from pyeudiw.status_list.helper import StatusListTokenHelper
from pyeudiw.x509.verify import get_issuer_from_x5c

# ISO 18013-7 Annex B (OpenID4VP): SessionTranscript must have deviceEngagementBytes=null,
# eReaderKeyBytes=null, and handover with OpenID4VPHandover. CBOR map keys may be int or text.
_ST_KEY_DEVICE_ENGAGEMENT = ("deviceEngagementBytes", 0)
_ST_KEY_EREADER = ("eReaderKeyBytes", 1)
_ST_KEY_HANDOVER = ("handover", 2)


def _get_val(d: dict, keys: tuple) -> object:
    for k in keys:
        if k in d:
            return d[k]
    return None


def _validate_session_transcript_openid4vp(st: dict) -> None:
    """Validate SessionTranscript conforms to OpenID4VP profile (ISO 18013-7 Annex B)."""
    dev_eng = _get_val(st, _ST_KEY_DEVICE_ENGAGEMENT)
    ereader = _get_val(st, _ST_KEY_EREADER)
    handover = _get_val(st, _ST_KEY_HANDOVER)
    if dev_eng is not None:
        raise MdocCborValidationError("SessionTranscript must have deviceEngagementBytes=null for OpenID4VP")
    if ereader is not None:
        raise MdocCborValidationError("SessionTranscript must have eReaderKeyBytes=null for OpenID4VP")
    if handover is None:
        raise MdocCborValidationError("SessionTranscript must include handover for OpenID4VP")


def _check_session_transcripts_in_mdoc(mdoc: MdocCbor) -> None:
    """When deviceSigned contains SessionTranscript, validate OpenID4VP profile."""
    cdict = mdoc.data_as_cbor_dict
    for doc in cdict.get("documents", []):
        inner = doc.value if hasattr(doc, "value") else doc
        if not isinstance(inner, dict):
            continue
        ds = inner.get("deviceSigned", inner.get(2))
        if not ds:
            continue
        if isinstance(ds, bytes):
            try:
                ds = cbor2.loads(ds)
            except cbor2.CBORDecodeError:
                continue
        if not isinstance(ds, dict):
            continue
        st = ds.get("sessionTranscript", ds.get(0))
        if st is None:
            continue
        if isinstance(st, bytes):
            try:
                st = cbor2.loads(st)
            except cbor2.CBORDecodeError:
                raise MdocCborValidationError("invalid SessionTranscript in deviceSigned")
        if isinstance(st, dict):
            _validate_session_transcript_openid4vp(st)


[docs] class VpMDocCbor(BaseVPParser): def _is_expired(self, mdoc: MdocCbor) -> bool: for document in mdoc.documents: try: if document.issuersigned.issuer_auth.payload_as_dict["validityInfo"]["validUntil"] < datetime.now(timezone.utc): return True except KeyError: return True return False
[docs] def validate(self, token: str, verifier_id: str, verifier_nonce: str) -> None: mdoc = MdocCbor() mdoc.loads(data=token) if mdoc.verify() is False: raise MdocCborValidationError("Signature is invalid") _check_session_transcripts_in_mdoc(mdoc) try: for document in mdoc.documents: x5c = [cert.public_bytes(encoding=serialization.Encoding.PEM).decode() for cert in document.issuersigned.issuer_auth.x509_certificates] self.trust_evaluator.get_public_keys(get_issuer_from_x5c(x5c), {"x5c": x5c}) except Exception as e: raise MdocCborValidationError(f"Error validating keys: {e}") if self._is_expired(mdoc): raise MdocCborValidationError("Credential is expired") if mdoc.status: status_list = StatusListTokenHelper.from_status(mdoc.status) if status_list.is_expired() or status_list.get_status(mdoc.status["status_list"]["idx"]) > 0: raise VPRevoked("Status list indicates that the token is revoked")
[docs] def parse(self, token: str) -> dict: mdoc = MdocCbor() mdoc.loads(data=token) mdoc.verify() return mdoc.disclosure_map