Source code for pyeudiw.federation.policy

import logging
from typing import Optional

from .exceptions import PolicyError

__author__ = "Roland Hedberg"
__license__ = "Apache 2.0"
__version__ = ""

logger = logging.getLogger(__name__)


[docs] def combine_subset_of(s1, s2): return list(set(s1).intersection(set(s2)))
[docs] def combine_superset_of(s1, s2): return list(set(s1).intersection(set(s2)))
[docs] def combine_one_of(s1, s2): return list(set(s1).intersection(set(s2)))
[docs] def combine_add(s1, s2): if isinstance(s1, list): set1 = set(s1) else: set1 = {s1} if isinstance(s2, list): set2 = set(s2) else: set2 = {s2} return list(set1.union(set2))
POLICY_FUNCTIONS = { "subset_of", "superset_of", "one_of", "add", "value", "default", "essential", } OP2FUNC = { "subset_of": combine_subset_of, "superset_of": combine_superset_of, "one_of": combine_one_of, "add": combine_add, }
[docs] def do_sub_one_super_add(superior, child, policy): if policy in superior and policy in child: comb = OP2FUNC[policy](superior[policy], child[policy]) if comb: return comb else: raise PolicyError("Value sets doesn't overlap") elif policy in superior: return superior[policy] elif policy in child: return child[policy]
[docs] def do_value(superior, child, policy): if policy in superior and policy in child: if superior[policy] == child[policy]: return superior[policy] else: raise PolicyError("Not allowed to combine values") elif policy in superior: return superior[policy] elif policy in child: return child[policy]
[docs] def do_default(superior, child, policy): # A child's default can not override a superiors if policy in superior and policy in child: if superior["default"] == child["default"]: return superior["default"] else: raise PolicyError("Not allowed to change default") elif policy in superior: return superior[policy] elif policy in child: return child[policy]
[docs] def do_essential(superior, child, policy): # essential: a child can make it True if a superior has states False # but not the other way around if policy in superior and policy in child: if not superior[policy] and child["essential"]: return True else: return superior[policy] elif policy in superior: return superior[policy] elif policy in child: # Not in superior is the same as essential=True return True
DO_POLICY = { "superset_of": do_sub_one_super_add, "subset_of": do_sub_one_super_add, "one_of": do_sub_one_super_add, "add": do_sub_one_super_add, "value": do_value, "default": do_default, "essential": do_essential, }
[docs] def combine_claim_policy(superior, child): """ Combine policy rules. Applying the child policy can only make the combined policy more restrictive. :param superior: Superior policy :param child: Intermediates policy """ # weed out everything I don't recognize superior_set = set(superior).intersection(POLICY_FUNCTIONS) child_set = set(child).intersection(POLICY_FUNCTIONS) if "value" in superior_set: # An exact value can not be restricted. if child_set: if "essential" in child_set: if len(child_set) == 1: return {"value": superior["value"], "essential": child["essential"]} else: raise PolicyError( f"value can only be combined with essential, not {child_set}" ) elif "value" in child_set: if child["value"] != superior["value"]: # Not OK raise PolicyError("Child can not set another value then superior") else: return superior else: raise PolicyError( f"Not allowed combination of policies: {superior} + {child}" ) return superior else: if "essential" in superior_set and "essential" in child_set: # can only go from False to True if ( superior["essential"] != child["essential"] and child["essential"] is False ): raise PolicyError("Essential can not go from True to False") comb_policy = superior_set.union(child_set) if "one_of" in comb_policy: if "subset_of" in comb_policy or "superset_of" in comb_policy: raise PolicyError( "one_of can not be combined with subset_of/superset_of" ) rule = {} for policy in comb_policy: rule[policy] = DO_POLICY[policy](superior, child, policy) if comb_policy == {"superset_of", "subset_of"}: # make sure the subset_of is a superset of superset_of. if set(rule["superset_of"]).difference(set(rule["subset_of"])): raise PolicyError("superset_of not a super set of subset_of") elif comb_policy == {"superset_of", "subset_of", "default"}: # make sure the subset_of is a superset of superset_of. if set(rule["superset_of"]).difference(set(rule["subset_of"])): raise PolicyError("superset_of not a super set of subset_of") if set(rule["default"]).difference(set(rule["subset_of"])): raise PolicyError("default not a sub set of subset_of") if set(rule["superset_of"]).difference(set(rule["default"])): raise PolicyError("default not a super set of subset_of") elif comb_policy == {"subset_of", "default"}: if set(rule["default"]).difference(set(rule["subset_of"])): raise PolicyError("default not a sub set of subset_of") elif comb_policy == {"superset_of", "default"}: if set(rule["superset_of"]).difference(set(rule["default"])): raise PolicyError("default not a super set of subset_of") elif comb_policy == {"one_of", "default"}: if isinstance(rule["default"], list): if set(rule["default"]).difference(set(rule["one_of"])): raise PolicyError("default not a super set of one_of") else: if {rule["default"]}.difference(set(rule["one_of"])): raise PolicyError("default not a super set of one_of") return rule
[docs] def combine(superior: dict, sub: dict) -> dict: """ :param rule: Dictionary with two keys metadata_policy and metadata :param sub: Dictionary with two keys metadata_policy and metadata :return: """ sup_metadata = superior.get("metadata", {}) sub_metadata = sub.get("metadata", {}) sup_m_set = set(sup_metadata.keys()) if sub_metadata: chi_m_set = set(sub_metadata.keys()) _overlap = chi_m_set.intersection(sup_m_set) if _overlap: for key in _overlap: if sup_metadata[key] != sub_metadata[key]: raise PolicyError( "A subordinate is not allowed to set a value different then the superiors" ) _metadata = sup_metadata.copy() _metadata.update(sub_metadata) superior["metadata"] = _metadata # Now for metadata_policies _sup_policy = superior.get("metadata_policy", {}) _sub_policy = sub.get("metadata_policy", {}) if _sub_policy: sup_set = set(_sup_policy.keys()) chi_set = set(sub["metadata_policy"].keys()) # A metadata_policy claim can not change a metadata claim for claim in chi_set.intersection(sup_m_set): combine_claim_policy({"value": sup_metadata[claim]}, _sub_policy[claim]) _mp = {} for claim in set(sup_set).intersection(chi_set): _mp[claim] = combine_claim_policy(_sup_policy[claim], _sub_policy[claim]) for claim in sup_set.difference(chi_set): _mp[claim] = _sup_policy[claim] for claim in chi_set.difference(sup_set): _mp[claim] = _sub_policy[claim] superior["metadata_policy"] = _mp return superior
[docs] def gather_policies(chain, entity_type): """ Gather and combine all the metadata policies that are defined in the trust chain :param chain: A list of Entity Statements :return: The combined metadata policy """ try: combined_policy = chain[0]["metadata_policy"][entity_type] except KeyError: combined_policy = {} for es in chain[1:]: try: child = es["metadata_policy"][entity_type] except KeyError: pass else: combined_policy = combine(combined_policy, child) return combined_policy
[docs] def union(val1, val2): if isinstance(val1, list): base = set(val1) else: base = {val1} if isinstance(val2, list): ext = set(val2) else: ext = {val2} return base.union(ext)
[docs] class TrustChainPolicy(object):
[docs] def gather_policies(self, chain, entity_type): """ Gather and combine all the metadata policies that are defined in the trust chain :param chain: A list of Entity Statements :return: The combined metadata policy """ _rule = {"metadata_policy": {}, "metadata": {}} for _item in ["metadata_policy", "metadata"]: try: _rule[_item] = chain[0][_item][entity_type] except KeyError: pass for es in chain[1:]: _sub_policy = {"metadata_policy": {}, "metadata": {}} for _item in ["metadata_policy", "metadata"]: try: _sub_policy[_item] = es[_item][entity_type] except KeyError: pass if _sub_policy == {"metadata_policy": {}, "metadata": {}}: continue _overlap = set(_sub_policy["metadata_policy"]).intersection( set(_sub_policy["metadata"]) ) if _overlap: # Not allowed raise PolicyError( "Claim appearing both in metadata and metadata_policy not allowed" ) _rule = combine(_rule, _sub_policy) return _rule
def _apply_metadata_policy(self, metadata, metadata_policy): """ Apply a metadata policy to a metadata statement. The order is value, add, default and then check subset_of/superset_of and one_of """ policy_set = set(metadata_policy.keys()) metadata_set = set(metadata.keys()) # Metadata claims that there exists a policy for for claim in metadata_set.intersection(policy_set): if "value" in metadata_policy[claim]: # value overrides everything metadata[claim] = metadata_policy[claim]["value"] else: if "one_of" in metadata_policy[claim]: # The is for claims that can have only one value # Should not be but ... if isinstance(metadata[claim], list): _claim = [ c for c in metadata[claim] if c in metadata_policy[claim]["one_of"] ] if _claim: metadata[claim] = _claim[0] else: raise PolicyError( "{}: None of {} among {}".format( claim, metadata[claim], metadata_policy[claim]["one_of"], ) ) else: if metadata[claim] in metadata_policy[claim]["one_of"]: pass else: raise PolicyError( f"{metadata[claim]} not among {metadata_policy[claim]['one_of']}" ) else: # The following is for claims that can have lists of values if "add" in metadata_policy[claim]: metadata[claim] = list( union(metadata[claim], metadata_policy[claim]["add"]) ) if "subset_of" in metadata_policy[claim]: _val = set(metadata_policy[claim]["subset_of"]).intersection( set(metadata[claim]) ) if _val: metadata[claim] = list(_val) else: raise PolicyError( "{} not subset of {}".format( metadata[claim], metadata_policy[claim]["subset_of"] ) ) if "superset_of" in metadata_policy[claim]: if set(metadata_policy[claim]["superset_of"]).difference( set(metadata[claim]) ): raise PolicyError( "{} not superset of {}".format( metadata[claim], metadata_policy[claim]["superset_of"], ) ) else: pass # In policy but not in metadata for claim in policy_set.difference(metadata_set): if "value" in metadata_policy[claim]: metadata[claim] = metadata_policy[claim]["value"] elif "add" in metadata_policy[claim]: metadata[claim] = metadata_policy[claim]["add"] elif "default" in metadata_policy[claim]: metadata[claim] = metadata_policy[claim]["default"] if claim not in metadata: if ( "essential" in metadata_policy[claim] and metadata_policy[claim]["essential"] ): raise PolicyError(f"Essential claim '{claim}' missing") return metadata
[docs] def apply_policy(self, metadata: dict, policy: dict) -> dict: """ Apply a metadata policy on metadata. :param metadata: Metadata statements :param policy: A dictionary with metadata and metadata_policy as keys :return: A metadata statement that adheres to a metadata policy """ if policy["metadata_policy"]: metadata = self._apply_metadata_policy(metadata, policy["metadata_policy"]) # All that are in metadata but not in policy should just remain metadata.update(policy["metadata"]) return metadata
def _policy(self, trust_chain, entity_type: str): combined_policy = self.gather_policies(trust_chain[:-1], entity_type) logger.debug("Combined policy: %s", combined_policy) try: # This should be the entity configuration metadata = trust_chain.verified_chain[-1]["metadata"][entity_type] except KeyError: return None else: # apply the combined metadata policies on the metadata trust_chain.set_combined_policy(entity_type, combined_policy) _metadata = self.apply_policy(metadata, combined_policy) logger.debug(f"After applied policy: {_metadata}") return _metadata def __call__(self, trust_chain, entity_type: Optional[str] = ""): """ :param trust_chain: TrustChain instance :param entity_type: Which Entity Type the entity are """ if len(trust_chain.verified_chain) > 1: if entity_type: trust_chain.metadata[entity_type] = self._policy( trust_chain, entity_type ) else: for _type in trust_chain.verified_chain[-1]["metadata"].keys(): trust_chain.metadata[_type] = self._policy(trust_chain, _type) else: trust_chain.metadata = trust_chain.verified_chain[0]["metadata"][ entity_type ] trust_chain.combined_policy[entity_type] = {}