from __future__ import annotations
import logging
from copy import deepcopy
import pydantic
from pyeudiw.federation.exceptions import (
InvalidEntityHeader,
InvalidEntityStatementPayload,
MissingJwksClaim,
MissingTrustMark,
TrustAnchorNeeded,
UnknownKid,
)
from pyeudiw.federation.schemas.entity_configuration import (
EntityConfigurationHeader,
EntityStatementPayload,
)
from pyeudiw.jwk.jwks import find_jwk_by_kid
from pyeudiw.jwt.jws_helper import JWSHelper
from pyeudiw.jwt.utils import decode_jwt_header, decode_jwt_payload
from pyeudiw.tools.utils import get_http_url
OIDCFED_FEDERATION_WELLKNOWN_URL = ".well-known/openid-federation"
logger = logging.getLogger(__name__)
[docs]
def get_federation_jwks(jwt_payload: dict) -> list[dict]:
"""
Returns the list of JWKS inside a JWT payload.
:param jwt_payload: the jwt payload from where extract the JWKs.
:type jwt_payload: dict
:returns: A list of entity jwk's keys.
:rtype: list[dict]
"""
jwks = jwt_payload.get("jwks", {})
keys = jwks.get("keys", [])
return keys
[docs]
def get_entity_statements(urls: list[str] | str, httpc_params: dict, http_async: bool = True) -> list[bytes]:
"""
Fetches an entity statement from the specified urls.
:param urls: The url or a list of url where perform the GET HTTP calls
:type urls: list[str] | str
:param httpc_params: parameters to perform http requests.
:type httpc_params: dict
:param http_async: if is set to True the operation will be performed in async (deafault True)
:type http_async: bool
:returns: A list of entity statements.
:rtype: list[Response]
"""
urls = urls if isinstance(urls, list) else [urls]
for url in urls:
logger.debug(f"Starting Entity Statement Request to {url}")
return [i.content for i in get_http_url(urls, httpc_params, http_async)]
[docs]
def get_entity_configurations(subjects: list[str] | str, httpc_params: dict, http_async: bool = False) -> list[bytes]:
"""
Fetches an entity configuration from the specified subjects.
:param subjects: The url or a list of url where perform the GET HTTP calls
:type subjects: list[str] | str
:param httpc_params: parameters to perform http requests.
:type httpc_params: dict
:param http_async: if is set to True the operation will be performed in async (deafault True)
:type http_async: bool
:returns: A list of entity statements.
:rtype: list[Response]
"""
subjects = subjects if isinstance(subjects, list) else [subjects]
urls = []
for subject in subjects:
if subject[-1] != "/":
subject = f"{subject}/"
url = f"{subject}{OIDCFED_FEDERATION_WELLKNOWN_URL}"
urls.append(url)
logger.info(f"Starting Entity Configuration Request for {url}")
return [i.content for i in get_http_url(urls, httpc_params, http_async)]
[docs]
class TrustMark:
"""The class representing a Trust Mark"""
def __init__(self, jwt: str, httpc_params: dict):
"""
Create an instance of Trust Mark
:param jwt: the JWT containing the trust marks
:type jwt: str
:param httpc_params: parameters to perform http requests.
:type httpc_params: dict
"""
self.jwt = jwt
self.header = decode_jwt_header(jwt)
self.payload = decode_jwt_payload(jwt)
self.id = self.payload["id"]
self.sub = self.payload["sub"]
self.iss = self.payload["iss"]
self.is_valid = False
self.issuer_entity_configuration: list[bytes] = None
self.httpc_params = httpc_params
[docs]
def validate_by(self, ec: dict) -> bool:
"""
Validates Trust Marks by an Entity Configuration
:param ec: the entity configuration to validate by
:type ec: dict
:returns: True if is valid otherwise False
:rtype: bool
"""
try:
EntityConfigurationHeader(**self.header)
except pydantic.ValidationError as e:
raise InvalidEntityHeader(
# pragma: no cover
f"Trust Mark validation failed: "
f"{e}"
)
_kid = self.header["kid"]
if _kid not in ec.kids:
raise UnknownKid(f"Trust Mark validation failed: " f"{self.header.get('kid')} not found in {ec.jwks}") # pragma: no cover
_jwk = find_jwk_by_kid(ec.jwks, _kid)
# verify signature
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)
self.is_valid = True
return payload
[docs]
def validate_by_its_issuer(self) -> bool:
"""
Validates Trust Marks by it's issuer
:returns: True if is valid otherwise False
:rtype: bool
"""
if not self.issuer_entity_configuration:
self.issuer_entity_configuration = [i.content for i in get_entity_configurations(self.iss, self.httpc_params, False)]
_kid = self.header.get("kid")
try:
ec = EntityStatement(self.issuer_entity_configuration[0])
ec.validate_by_itself()
except UnknownKid:
logger.warning(f"Trust Mark validation failed by its Issuer: " f"{_kid} not found in " f"{self.issuer_entity_configuration.jwks}")
return False
except Exception:
logger.warning(f"Issuer {self.iss} of trust mark {self.id} is not valid.")
self.is_valid = False
return False
# verify signature
_jwk = find_jwk_by_kid(ec.jwks, _kid)
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)
self.is_valid = True
return payload
def __repr__(self) -> str:
return f"{self.id} to {self.sub} issued by {self.iss}"
[docs]
class EntityStatement:
"""
The self issued/signed statement of a federation entity
"""
def __init__(
self,
jwt: str,
httpc_params: dict,
filter_by_allowed_trust_marks: list[str] = [],
trust_anchor_entity_conf: EntityStatement | None = None,
trust_mark_issuers_entity_confs: list[EntityStatement] = [],
):
"""
Creates EntityStatement istance
:param jwt: the JWT containing the trust marks.
:type jwt: str
:param httpc_params: parameters to perform http requests.
:type httpc_params: dict
:param filter_by_allowed_trust_marks: allowed trust marks list.
:type filter_by_allowed_trust_marks: list[str]
:param trust_anchor_entity_conf: the trust anchor entity conf or None
:type trust_anchor_entity_conf: EntityStatement | None
:param trust_mark_issuers_entity_confs: the list containig the trust mark's entiity confs
"""
self.jwt = jwt
self.header = decode_jwt_header(jwt)
self.payload = decode_jwt_payload(jwt)
self.sub = self.payload["sub"]
self.iss = self.payload["iss"]
self.exp = self.payload["exp"]
self.jwks = get_federation_jwks(self.payload)
if not self.jwks or not self.jwks[0]:
_msg = f"Missing jwks in the statement for {self.sub}"
logger.error(_msg)
raise MissingJwksClaim(_msg)
self.kids = [i.get("kid") for i in self.jwks]
self.httpc_params = httpc_params
self.filter_by_allowed_trust_marks = filter_by_allowed_trust_marks
self.trust_anchor_entity_conf = trust_anchor_entity_conf
self.trust_mark_issuers_entity_confs = trust_mark_issuers_entity_confs
# a dict with sup_sub : superior entity configuration
self.verified_superiors = {}
# as previous but with superiors with invalid entity configurations
self.failed_superiors = {}
# a dict with sup_sub : entity statement issued for self
self.verified_by_superiors = {}
# a dict with the paylaod of valid entity statements for each descendant subject
self.verified_descendant_statements = {}
self.failed_descendant_statements = {}
# a dict with the RAW JWT of valid entity statements for each descendant subject
self.verified_descendant_statements_as_jwt = {}
self.verified_trust_marks = []
self.is_valid = False
[docs]
def update_trust_anchor_conf(self, trust_anchor_entity_conf: "EntityStatement") -> None:
"""
Updates the internal Trust Anchor conf.
:param trust_anchor_entity_conf: the trust anchor entity conf
:type trust_anchor_entity_conf: EntityStatement
"""
self.trust_anchor_entity_conf = trust_anchor_entity_conf
[docs]
def validate_by_itself(self) -> bool:
"""
validates the entity configuration by it self
"""
try:
EntityConfigurationHeader(**self.header)
except pydantic.ValidationError as e:
raise InvalidEntityHeader(
# pragma: no cover
f"Trust Mark validation failed: "
f"{e}"
)
_kid = self.header.get("kid")
if _kid not in self.kids:
raise UnknownKid(f"{_kid} not found in {self.jwks}") # pragma: no cover
# verify signature
_jwk = find_jwk_by_kid(self.jwks, _kid)
jwsh = JWSHelper(_jwk)
jwsh.verify(self.jwt)
self.is_valid = True
return True
[docs]
def validate_by_allowed_trust_marks(self) -> bool:
"""
validate the entity configuration ony if marked by a well known
trust mark, issued by a trusted issuer
"""
if not self.trust_anchor_entity_conf:
raise TrustAnchorNeeded("To validate the trust marks the " "Trust Anchor Entity Configuration " "is needed.")
if not self.filter_by_allowed_trust_marks:
return True
if not self.payload.get("trust_marks"):
logger.warning(f"{self.sub} doesn't have the trust marks claim " "in its Entity Configuration")
return False
trust_marks = []
is_valid = False
for tm in self.payload["trust_marks"]:
if tm.get("id", None) not in self.filter_by_allowed_trust_marks:
continue
try:
trust_mark = TrustMark(tm["trust_mark"])
except KeyError:
logger.warning(f"Trust Mark decoding failed on [{tm}]. " "Missing 'trust_mark' claim in it")
except Exception:
logger.warning(f"Trust Mark decoding failed on [{tm}]")
continue
else:
trust_marks.append(trust_mark)
if not trust_marks:
raise MissingTrustMark("Required Trust marks are missing.") # pragma: no cover
trust_mark_issuers_by_id = self.trust_anchor_entity_conf.payload.get("trust_marks_issuers", {})
# TODO : cache of issuers -> it would be better to have a proxy function
#
# required_issuer_ecs = []
# for trust_mark in trust_marks:
# if trust_mark.iss not in [
# i.payload.get('iss', None)
# for i in self.trust_mark_issuers_entity_confs
# ]:
# required_issuer_ecs.append(trust_mark.iss)
# TODO: snippet for CACHE
# if required_issuer_ec:
# ## fetch the issuer entity configuration and validate it
# iecs = get_entity_configurations(
# [required_issuer_ecs], self.httpc_params
# )
# for jwt in iecs:
# try:
# ec = self.__class__(jwt, httpc_params=self.httpc_params)
# ec.validate_by_itself()
# except Exception as e:
# logger.warning(
# "Trust Marks issuer Entity Configuration "
# f"failed for {jwt}: {e}"
# )
# continue
# self.trust_mark_issuers_entity_confs.append(ec)
for trust_mark in trust_marks:
id_issuers = trust_mark_issuers_by_id.get(trust_mark.id, None)
if id_issuers and trust_mark.iss not in id_issuers:
is_valid = False
elif id_issuers and trust_mark.iss in id_issuers:
is_valid = trust_mark.validate_by_its_issuer()
elif not id_issuers:
is_valid = trust_mark.validate_by(self.trust_anchor_entity_conf)
if not trust_mark.is_valid:
is_valid = False
if is_valid:
logger.info(f"Trust Mark {trust_mark} is valid")
self.verified_trust_marks.append(trust_mark)
else:
logger.warning(f"Trust Mark {trust_mark} is not valid")
return is_valid
[docs]
def get_superiors(
self,
authority_hints: list[str] = [],
max_authority_hints: int = 0,
superiors_hints: list[dict] = [],
) -> dict:
"""
get superiors entity configurations
:param authority_hints: the authority hint list
:type authority_hints: list[str]
:param max_authority_hints: the number of max authority hint
:type max_authority_hints: int
:param superiors_hints: the list of superior hints
:type superiors_hints: list[dict]
:returns: a dict with the superior's entity configurations
:rtype: dict
"""
# apply limits if defined
authority_hints = authority_hints or deepcopy(self.payload.get("authority_hints", []))
if max_authority_hints and authority_hints != authority_hints[:max_authority_hints]:
logger.warning(
f"Found {len(authority_hints)} but "
f"authority maximum hints is set to {max_authority_hints}. "
"the following authorities will be ignored: "
f"{', '.join(authority_hints[max_authority_hints:])}"
)
authority_hints = authority_hints[:max_authority_hints]
for sup in superiors_hints:
if sup.sub in authority_hints:
logger.info("Getting Cached Entity Configurations for " f"{[i.sub for i in superiors_hints]}")
authority_hints.pop(authority_hints.index(sup.sub))
self.verified_superiors[sup.sub] = sup
logger.debug(f"Getting Entity Configurations for {authority_hints}")
jwts = []
if self.trust_anchor_entity_conf:
ta_id = self.trust_anchor_entity_conf.payload.get("sub", {})
if ta_id in authority_hints:
jwts = [self.trust_anchor_configuration]
if not jwts:
jwts = get_entity_configurations(authority_hints, self.httpc_params, False)
for jwt in jwts:
try:
ec = self.__class__(
jwt,
httpc_params=self.httpc_params,
trust_anchor_entity_conf=self.trust_anchor_entity_conf,
)
except Exception as e:
logger.warning(f"Get Entity Configuration for {jwt}: {e}")
continue
if ec.validate_by_itself():
target = self.verified_superiors
else:
target = self.failed_superiors
target[ec.payload["sub"]] = ec
for ahints in authority_hints:
if not self.verified_superiors.get(ahints, None):
logger.warning(f"{ahints} is not available, missing or not valid authority hint")
continue
return self.verified_superiors
[docs]
def validate_descendant_statement(self, jwt: str) -> bool:
"""
jwt is a descendant entity statement issued by self
:param jwt: the JWT to validate by
:type jwt: str
:returns: True if is valid or False otherwise
:rtype: bool
"""
header = decode_jwt_header(jwt)
payload = decode_jwt_payload(jwt)
try:
EntityConfigurationHeader(**header)
except pydantic.ValidationError as e:
raise InvalidEntityHeader(f"Trust Mark validation failed: " f"{e}") # pragma: no cover
try:
EntityStatementPayload(**payload)
except pydantic.ValidationError as e:
raise InvalidEntityStatementPayload(f"Trust Mark validation failed: " f"{e}") # pragma: no cover
_kid = header.get("kid")
if _kid not in self.kids:
raise UnknownKid(f"{_kid} not found in {self.jwks}")
# verify signature
_jwk = find_jwk_by_kid(self.jwks, _kid)
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(jwt)
self.verified_descendant_statements[payload["sub"]] = payload
self.verified_descendant_statements_as_jwt[payload["sub"]] = jwt
return self.verified_descendant_statements
[docs]
def validate_by_superior_statement(self, jwt: str, ec: "EntityStatement") -> str:
"""
validates self with the jwks contained in statement of the superior
:param jwt: the statement issued by a superior in form of JWT
:type jwt: str
:param ec: is a superior entity configuration
:type ec: EntityStatement
:returns: the entity configuration subject if is valid
:rtype: str
"""
is_valid = None
payload = {}
try:
payload = decode_jwt_payload(jwt)
ec.validate_by_itself()
ec.validate_descendant_statement(jwt)
_jwks = get_federation_jwks(payload)
_jwk = find_jwk_by_kid(_jwks, self.header["kid"])
jwsh = JWSHelper(_jwk)
payload = jwsh.verify(self.jwt)
is_valid = True
except Exception as e:
logger.warning(f"{self.sub} failed validation with " f"{ec.sub}'s superior statement '{payload or jwt}'. " f"Exception: {e}")
is_valid = False
if is_valid:
target = self.verified_by_superiors
ec.verified_descendant_statements[self.sub] = payload
ec.verified_descendant_statements_as_jwt[self.sub] = jwt
target[payload["iss"]] = ec
self.is_valid = True
return self.verified_by_superiors.get(ec.sub)
else:
target = self.failed_superiors
ec.failed_descendant_statements[self.sub] = payload
self.is_valid = False
[docs]
def validate_by_superiors(
self,
superiors_entity_configurations: dict = {},
) -> dict:
"""
validates the entity configuration with the entity statements issued by its superiors
this methods create self.verified_superiors and failed ones and self.verified_by_superiors and failed ones
:param superiors_entity_configurations: an object containing the entity configurations of superiors
:type superiors_entity_configurations: dict
:returns: an object containing the superior validations
:rtype: dict
"""
for ec in superiors_entity_configurations:
if ec.sub in ec.verified_by_superiors:
# already fetched and cached
continue
try:
# get superior fetch url
fetch_api_url = ec.payload["metadata"]["federation_entity"]["federation_fetch_endpoint"]
except KeyError:
logger.warning("Missing federation_fetch_endpoint in " f"federation_entity metadata for {self.sub} by {ec.sub}.")
self.failed_superiors[ec.sub] = None
continue
else:
_url = f"{fetch_api_url}?sub={self.sub}"
logger.info(f"Getting entity statements from {_url}")
jwts = get_entity_statements([_url], self.httpc_params, False)
if not jwts:
logger.error(f"Empty response for {_url}")
jwt = jwts[0]
if jwt:
self.validate_by_superior_statement(jwt, ec)
else:
logger.error(f"JWT validation for {_url}")
return self.verified_by_superiors
def __repr__(self) -> str:
return f"{self.sub} valid {self.is_valid}"