Source code for pyeudiw.duckle_ql.handler

import logging
from typing import Dict, Any

from pyeudiw.duckle_ql.attribute_mapper import extract_claims, flatten_namespace
from pyeudiw.duckle_ql.credential import CredentialsRequest
from pyeudiw.duckle_ql.utils import DUCKLE_PRESENTATION, DUCKLE_QUERY_KEY
from pyeudiw.satosa.backends.openid4vp.exceptions import InvalidVPToken
from pyeudiw.satosa.backends.openid4vp.presentation_submission.base_vp_parser import BaseVPParser
from pyeudiw.satosa.backends.openid4vp.vp_mdoc_cbor import VpMDocCbor
from pyeudiw.satosa.backends.openid4vp.vp_sd_jwt_vc import VpVcSdJwtParserVerifier
from pyeudiw.trust.dynamic import CombinedTrustEvaluator

EXP_CLAIM = "exp"
SUB_CLAIM = "sub"
DECODE_OPT = {"verify_signature": False}
QUERY_CONFIG = "query"
CREDENTIALS = "credentials"
METADATA_JWKS_CONFIG_KEY = "metadata_jwks"

MSO_MDOC_FORMAT = "mso_mdoc"
VC_SD_JWT_FORMAT = "vc+sd-jwt"
DC_SD_JWT_FORMAT = "dc+sd-jwt"


[docs] class DuckleHandler(BaseVPParser): """Handler for processing Verifiable Presentations using DCQL.""" def __init__(self, trust_evaluator: CombinedTrustEvaluator, sig_alg_supported: list[str] = None, **kwargs) -> None: """ Initialize the DuckleHandler with the trust evaluator. :param trust_evaluator: The trust evaluator instance. :type trust_evaluator: CombinedTrustEvaluator :param sig_alg_supported: List of supported signature algorithms. :type sig_alg_supported: list[str] """ super().__init__(trust_evaluator, **kwargs) if sig_alg_supported is None: sig_alg_supported = [] self.sig_alg_supported = sig_alg_supported dcql_value = kwargs.get(DUCKLE_QUERY_KEY) or (kwargs.get(DUCKLE_PRESENTATION) or {}).get(DUCKLE_QUERY_KEY) if dcql_value is None: self.queries = CredentialsRequest(credentials=[]) elif isinstance(dcql_value, dict): self.queries = CredentialsRequest.model_validate(dcql_value) else: self.queries = CredentialsRequest.model_validate_json(dcql_value)
[docs] def parse(self, token: dict) -> Dict[str, Any]: """ Parse the Duckle Verifiable Presentation. :return: A dictionary representing the parsed presentation. :rtype: dict """ all_claims = {} credentials = self.queries.credentials for cred in credentials: token_str = token[cred.id] try: paths = [{"path": claim.path} for claim in cred.claims] if cred.format == VC_SD_JWT_FORMAT or cred.format == DC_SD_JWT_FORMAT: parser = VpVcSdJwtParserVerifier(self.trust_evaluator, self.sig_alg_supported) data = parser.parse(token_str) elif cred.format == MSO_MDOC_FORMAT: parser = VpMDocCbor(self.trust_evaluator) data = flatten_namespace(parser.parse(token_str)) else: raise InvalidVPToken(f"Unexpected token format {cred.format}") all_claims.update(extract_claims(data, paths)) except Exception as e: logging.exception(f"Error parsing token for credential {cred.id}") raise e return all_claims
[docs] def validate( self, token: dict, verifier_id: str, verifier_nonce: str, ) -> None: """ Parse the Duckle token, validating the signature and claims. :param token: The Duckle token string. :type token: str :param verifier_id: The verifier's ID (used for the audience claim). :type verifier_id: str :param verifier_nonce: The verifier's nonce. :type verifier_nonce: str :raises InvalidVPToken: If the signature or claims are invalid. """ credentials = self.queries.credentials expected_ids = [credential.id for credential in credentials] missing_ids = [id_ for id_ in expected_ids if id_ not in token] if missing_ids: logging.error(f"Missing credential IDs in DCQL token: {missing_ids!r}") raise InvalidVPToken("Missing credential IDs in DCQL token") unexpected_id = [key for key in token if key not in expected_ids] if unexpected_id: logging.error(f"Unexpected credential IDs in DCQL token: {unexpected_id!r}") raise InvalidVPToken("Unexpected credential IDs in DCQL token") for cred in credentials: token_str = token[cred.id] try: if cred.format == VC_SD_JWT_FORMAT or cred.format == DC_SD_JWT_FORMAT: parser = VpVcSdJwtParserVerifier(self.trust_evaluator, self.sig_alg_supported) elif cred.format == MSO_MDOC_FORMAT: parser = VpMDocCbor(self.trust_evaluator) else: raise InvalidVPToken(f"Unexpected token format {cred.format}") parser.validate(token_str, verifier_id, verifier_nonce) except Exception as e: logging.exception(f"Error parsing token for credential '{cred.id}'") raise e