import os
from typing import Literal, Optional
from urllib.parse import ParseResult, urlparse
from pyeudiw.tools.utils import cacheable_get_http_url, get_http_url
from pyeudiw.trust.interface import TrustEvaluator
from pyeudiw.trust.exceptions import InvalidJwkMetadataException
DEFAULT_ISSUER_JWK_ENDPOINT = "/.well-known/jwt-vc-issuer"
DEFAULT_METADATA_ENDPOINT = "/.well-known/openid-credential-issuer"
DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS = {
"httpc_params": {
"connection": {"ssl": os.getenv("PYEUDIW_HTTPC_SSL", True)},
"session": {"timeout": os.getenv("PYEUDIW_HTTPC_TIMEOUT", 6)},
}
}
[docs]
class DirectTrust(TrustEvaluator):
pass
[docs]
class DirectTrustSdJwtVc(DirectTrust):
"""
DirectTrust trust models assumes that an issuer is always trusted, in the sense
that no trust verification actually happens. The issuer is assumed to be an URI
and its keys and metadata information are publicly exposed on the web.
Such keys/metadata can always be fetched remotely and long as the issuer is
available.
"""
def __init__(
self,
httpc_params: Optional[dict] = None,
cache_ttl: int = 0,
jwk_endpoint: str = DEFAULT_ISSUER_JWK_ENDPOINT,
metadata_endpoint: str = DEFAULT_METADATA_ENDPOINT,
):
self.httpc_params = httpc_params or DEFAULT_DIRECT_TRUST_SD_JWC_VC_PARAMS["httpc_params"]
self.cache_ttl = cache_ttl
self.jwk_endpoint = jwk_endpoint
self.metadata_endpoint = metadata_endpoint
self.http_async_calls = False
[docs]
def get_public_keys(self, issuer: str) -> list[dict]:
"""
Fetches the public key of the issuer by querying a given endpoint.
Previous responses might or might not be cached based on the cache_ttl
parameter.
:param issuer: the issuer of the public key
:type issuer: str
:raises InvalidJwkMetadataException: if the jwk metadata is invalid
:returns: a list of jwk(s)
"""
md = self._get_jwk_metadata(issuer)
if not issuer == (obt_issuer := md.get("issuer", None)):
raise InvalidJwkMetadataException(
f"invalid jwk metadata: obtained issuer :{obt_issuer}, expected issuer: {issuer}"
)
jwks = self._extract_jwks_from_jwk_metadata(md)
jwk_l: list[dict] = jwks.get("keys", [])
if not jwk_l:
raise InvalidJwkMetadataException(
"unable to find jwks in issuer jwk metadata"
)
return jwk_l
def _get_jwk_metadata(self, issuer: str) -> dict:
"""
call the jwk metadata endpoint and return the whole document
"""
jwk_endpoint = DirectTrustSdJwtVc.build_issuer_jwk_endpoint(
issuer, self.jwk_endpoint
)
if self.cache_ttl:
resp = cacheable_get_http_url(
self.cache_ttl,
jwk_endpoint,
self.httpc_params,
http_async=self.http_async_calls,
)
else:
resp = get_http_url(
[jwk_endpoint], self.httpc_params, http_async=self.http_async_calls
)[0]
if (not resp) or (resp.status_code != 200):
raise InvalidJwkMetadataException(
f"failed to fetch valid jwk metadata: obtained {resp}"
)
return resp.json()
def _get_jwks_by_reference(self, jwks_reference_uri: str) -> dict:
"""
call the jwks endpoint if jwks is defined by reference
"""
if self.cache_ttl:
resp = cacheable_get_http_url(
self.cache_ttl,
jwks_reference_uri,
self.httpc_params,
http_async=self.http_async_calls,
)
else:
resp = get_http_url(
[jwks_reference_uri],
self.httpc_params,
http_async=self.http_async_calls,
)[0]
return resp.json()
def _extract_jwks_from_jwk_metadata(self, metadata: dict) -> dict:
"""
parse the jwk metadata document and return the jwks
NOTE: jwks might be in the document by value or by reference
"""
jwks: dict[Literal["keys"], list[dict]] | None = metadata.get("jwks", None)
jwks_uri: str | None = metadata.get("jwks_uri", None)
if (not jwks) and (not jwks_uri):
raise InvalidJwkMetadataException(
"invalid issuing key metadata: missing both claims [jwks] and [jwks_uri]"
)
if jwks:
# get jwks by value
return jwks
return self._get_jwks_by_reference(jwks_uri)
[docs]
@staticmethod
def build_issuer_jwk_endpoint(
issuer_id: str, well_known_path_component: str
) -> str:
baseurl = urlparse(issuer_id)
well_known_path = well_known_path_component + baseurl.path
well_known_url: str = ParseResult(
baseurl.scheme,
baseurl.netloc,
well_known_path,
baseurl.params,
baseurl.query,
baseurl.fragment,
).geturl()
return well_known_url
def __str__(self) -> str:
return (
f"DirectTrustSdJwtVc("
f"httpc_params={self.httpc_params}, "
f"cache_ttl={self.cache_ttl}, "
f"jwk_endpoint={self.jwk_endpoint}, "
f"metadata_endpoint={self.metadata_endpoint}"
")"
)