import logging
import re
from typing import Any
from pydantic import ValidationError
from pyeudiw.credential_presentation.handler import CredentialPresentationHandlers
from pyeudiw.satosa.backends.openid4vp.presentation_submission.exceptions import (
MissingHandler,
MalformedPath,
SubmissionValidationError,
VPTokenDescriptorMapMismatch,
ParseError,
ValidationError
)
from pyeudiw.satosa.backends.openid4vp.presentation_submission.schemas import PresentationSubmissionSchema
logger = logging.getLogger(__name__)
[docs]
class PresentationSubmissionHandler:
def __init__(
self,
config: CredentialPresentationHandlers,
) -> None:
"""
Initialize the PresentationSubmissionHandler handler with the submission data.
:param config: Configuration object.
"""
self.max_submission_size = config.max_submission_size or 4096
self.handlers = config.handlers
self.trust_evaluator = config.trust_evaluator
def _validate_submission(self, submission: dict[str, Any]) -> PresentationSubmissionSchema:
"""
Validate the submission data using Pydantic and check its total size.
:param submission: The presentation submission data.
:type submission: dict[str, Any]
:raises SubmissionValidationError: If the submission data is invalid or exceeds size limits.
:return: Validated submission schema.
:rtype: PresentationSubmissionSchema
"""
# Check submission size
submission_size = len(str(submission).encode("utf-8"))
if submission_size > self.max_submission_size:
raise SubmissionValidationError(
f"Submission size exceeds maximum allowed limit of {self.max_submission_size} bytes."
)
try:
return PresentationSubmissionSchema(**submission)
except ValidationError as e:
raise SubmissionValidationError(f"Submission validation failed: {e}")
def _extract_position(self, path: str) -> int:
"""
Extract the position and path from the descriptor path.
:param path: The descriptor path.
:type path: str
:raises MalformedPath: If the path is not in the correct format.
:return: Tuple of position and path.
:rtype: tuple[str, str]
"""
if path == "$":
return 0
pattern = r"\$[a-z_\-\.]*\[(\d+)\]"
match = re.match(pattern, path, re.I)
if match:
position = int(match.group(1))
return position
else:
raise MalformedPath(f"Invalid path format: {path}")
[docs]
def parse(self, submission: dict[str, Any], vp_tokens: list[str]) -> list[dict]:
"""
Parse the presentation submission data using the appropriate handler.
:param submission: The presentation submission data.
:type submission: dict[str, Any]
:raises MissingHandler: If the handler for the format is not found.
:raises VPTokenDescriptorMapMismatch: If the number of VP tokens does not match the number of descriptors.
:raises ParseError: If parsing fails.
:return: Parsed presentation submission data.
:rtype: dict
"""
descriptor_map_len = len(submission["descriptor_map"])
parsed_tokens: list[dict] = [{} for _ in range(descriptor_map_len)]
for descriptor in submission["descriptor_map"]:
handler = self.handlers.get(descriptor['format'])
if not handler:
raise MissingHandler(f"Handler for format '{descriptor['format']}' not found.")
position = self._extract_position(descriptor['path'])
try:
parsed_tokens[position] = handler.parse(vp_tokens[position])
except Exception as e:
raise ParseError(f"Error parsing token at position {position}: {e}")
return parsed_tokens
[docs]
def validate(
self,
submission: dict[str, Any],
vp_tokens: list[str],
verifier_id: str,
verifier_nonce: str
) -> None:
"""
Validate the presentation submission data using the appropriate handler.
:param submission: The presentation submission data.
:type submission: dict[str, Any]
:raises MissingHandler: If the handler for the format is not found.
:raises VPTokenDescriptorMapMismatch: If the number of VP tokens does not match the number of descriptors.
:raises ParseError: If parsing fails.
"""
try:
validated_submission = self._validate_submission(submission)
except Exception as e:
raise SubmissionValidationError(f"Submission validation failed: {e}")
descriptor_map_len = len(validated_submission.descriptor_map)
if len(vp_tokens) != descriptor_map_len:
raise VPTokenDescriptorMapMismatch(
f"Number of VP tokens ({len(vp_tokens)}) does not match the number of descriptors ({descriptor_map_len})."
)
for descriptor in validated_submission.descriptor_map:
handler = self.handlers.get(descriptor.format)
if not handler:
raise MissingHandler(f"Handler for format '{descriptor.format}' not found.")
position = self._extract_position(descriptor.path)
try:
handler.validate(vp_tokens[position], verifier_id, verifier_nonce)
except Exception as e:
raise ValidationError(f"Error parsing token at position {position}: {e}")