Source code for pyeudiw.storage.base_storage

import datetime
from enum import Enum
from typing import Union

from pymongo.results import UpdateResult

from .base_db import BaseDB


[docs] class TrustType(Enum): X509 = "x509" FEDERATION = "federation" DIRECT_TRUST_SD_JWT_VC = "direct_trust_sd_jwt_vc"
trust_type_map: dict = { TrustType.X509: "x509", TrustType.FEDERATION: "federation", TrustType.DIRECT_TRUST_SD_JWT_VC: "direct_trust_sd_jwt_vc", } trust_attestation_field_map: dict = { TrustType.X509: "x5c", TrustType.FEDERATION: "chain", } trust_anchor_field_map: dict = { TrustType.X509: "pem", TrustType.FEDERATION: "entity_configuration", }
[docs] class BaseStorage(BaseDB): """ Interface class for storage. """
[docs] def init_session( self, document_id: str, session_id: str, state: str, remote_flow_typ: str ) -> str: """ Initialize a session. :param document_id: the document id. :type document_id: str :param session_id: the satosa session id that initiated the authentication flow. :type state: str :param state: a unique identifier of the authentication flow :type remote_flow_typ: str :param remote_flow_typ: a value that discriminates between different authentication flow """ raise NotImplementedError()
[docs] def set_session_retention_ttl(self, ttl: int) -> None: """ Set the database retention ttl. :param ttl: the ttl. :type ttl: int | None """ raise NotImplementedError()
[docs] def has_session_retention_ttl(self) -> bool: """ Check if the session has a retention ttl. :returns: True if the session has a retention ttl, False otherwise. :rtype: bool """ raise NotImplementedError()
[docs] def add_dpop_proof_and_attestation( self, document_id, dpop_proof: dict, attestation: dict ) -> UpdateResult: """ Add a dpop proof and an attestation to the session. :param document_id: the document id. :type document_id: str :param dpop_proof: the dpop proof. :type dpop_proof: dict :param attestation: the attestation. :type attestation: dict :returns: the result of the update operation. :rtype: UpdateResult """ raise NotImplementedError()
[docs] def set_finalized(self, document_id: str) -> UpdateResult: """ Set the session as finalized. :param document_id: the document id. :type document_id: str :returns: the result of the update operation. :rtype: UpdateResult """ raise NotImplementedError()
[docs] def update_request_object( self, document_id: str, request_object: dict ) -> UpdateResult: """ Update the request object of the session. :param document_id: the document id. :type document_id: str :param request_object: the request object. :type request_object: dict :returns: the result of the update operation. :rtype: UpdateResult """ raise NotImplementedError()
[docs] def update_response_object( self, nonce: str, state: str, response_object: dict, isError: bool = False ) -> UpdateResult: """ Update the response object of the session. :param nonce: the nonce. :type nonce: str :param state: the state. :type state: str :param response_object: the response object. :type response_object: dict :param isError: if the response is an error response. :type isError: bool :returns: the result of the update operation. :rtype: UpdateResult """ raise NotImplementedError()
[docs] def get_trust_attestation(self, entity_id: str) -> Union[dict, None]: """ Get a trust attestation. :param entity_id: the entity id. :type entity_id: str :returns: the trust attestation. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def get_trust_anchor(self, entity_id: str) -> Union[dict, None]: """ Get a trust anchor. :param entity_id: the entity id. :type entity_id: str :returns: the trust anchor. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def has_trust_attestation(self, entity_id: str) -> bool: """ Check if a trust attestation exists. :param entity_id: the entity id. :type entity_id: str :returns: True if the trust attestation exists, False otherwise. :rtype: bool """ raise NotImplementedError()
[docs] def has_trust_anchor(self, entity_id: str) -> bool: """ Check if a trust anchor exists. :param entity_id: the entity id. :type entity_id: str :returns: True if the trust anchor exists, False otherwise. :rtype: bool """ raise NotImplementedError()
[docs] def has_trust_source(self, entity_id: str) -> bool: raise NotImplementedError()
[docs] def add_trust_attestation( self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict, ) -> str: """ Add a trust attestation. :param entity_id: the entity id. :type entity_id: str :param attestation: the attestation. :type attestation: list[str] :param exp: the expiration date. :type exp: datetime :param trust_type: the trust type. :type trust_type: TrustType :param jwks: cached jwks :type jwks: dict :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def add_trust_attestation_metadata( self, entity_id: str, metadata_type: str, metadata: dict ) -> str: """ Add a trust attestation metadata. :param entity_id: the entity id. :type entity_id: str :param metadata_type: the metadata type. :type metadata_type: str :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def add_trust_source(self, entity_id: str, trust_source: dict) -> str: """ Add a trust source. :param entity_id: the entity id. :type entity_id: str :param trust_source: the trust source. :type trust_source: dict :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def get_trust_source(self, entity_id: str) -> Union[dict, None]: """ Get a trust source. :param entity_id: the entity id. :type entity_id: str :returns: the trust source. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def add_empty_trust_anchor(self, entity_id: str) -> str: """ Add an empty trust anchor. :param entity_id: the entity id. :type entity_id: str :returns: the document id. :rtype: str """ raise NotImplementedError
[docs] def add_trust_anchor( self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType, ): """ Add a trust anchor. :param entity_id: the entity id. :type entity_id: str :param entity_configuration: the entity configuration. :type entity_configuration: str :param exp: the expiration date. :type exp: datetime :param trust_type: the trust type. :type trust_type: TrustType :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def update_trust_attestation( self, entity_id: str, attestation: list[str], exp: datetime, trust_type: TrustType, jwks: dict, ) -> str: """ Update a trust attestation. :param entity_id: the entity id. :type entity_id: str :param attestation: the attestation. :type attestation: list[str] :param exp: the expiration date. :type exp: datetime :param trust_type: the trust type. :type trust_type: TrustType :param jwks: cached jwks :type jwks: dict :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def update_trust_anchor( self, entity_id: str, entity_configuration: str, exp: datetime, trust_type: TrustType, ) -> str: """ Update a trust anchor. :param entity_id: the entity id. :type entity_id: str :param entity_configuration: the entity configuration. :type entity_configuration: str :param exp: the expiration date. :type exp: datetime :param trust_type: the trust type. :type trust_type: TrustType :returns: the document id. :rtype: str """ raise NotImplementedError()
[docs] def exists_by_state_and_session_id(self, state: str, session_id: str = "") -> bool: """ Check if a session exists by state and session id. :param state: the state. :type state: str :param session_id: the session id. :type session_id: str :returns: True if the session exists, False otherwise. :rtype: bool """ raise NotImplementedError()
[docs] def get_by_state(self, state: str) -> Union[dict, None]: """ Get a session by state. :param state: the state. :type state: str :returns: the session. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def get_by_nonce_state(self, state: str, nonce: str) -> Union[dict, None]: """ Get a session by nonce and state. :param state: the state. :type state: str :param nonce: the nonce. :type nonce: str :returns: the session. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def get_by_state_and_session_id( self, state: str, session_id: str = "" ) -> Union[dict, None]: """ Get a session by state and session id. :param state: the state. :type state: str :param session_id: the session id. :type session_id: str :returns: the session. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def get_by_session_id(self, session_id: str) -> Union[dict, None]: """ Get a session by session id. :param session_id: the session id. :type session_id: str :returns: the session. :rtype: Union[dict, None] """ raise NotImplementedError()
[docs] def upsert_session( self, session_id: str, data: dict ) -> tuple[str, dict]: """ Upsert a session by session id. :param session_id: the session id. :type session_id: str :param data: the data to upsert. :type data: dict :returns: a tuple containing the document id and the updated data. :rtype: tuple[str, dict] """ raise NotImplementedError()
[docs] def search_session_by_field( self, field: str, value: str ) -> dict: """ Search for a session by a specific field and value. :param field: the field to search by. :type field: str :param value: the value to search for. :type value: str :returns: the session data if found, otherwise an empty dict. :rtype: dict """ raise NotImplementedError()
# TODO: create add_or_update for all the write methods
[docs] def add_or_update_trust_attestation( self, entity_id: str, attestation: list[str], exp: datetime ) -> str: """ Add or update a trust attestation. :param entity_id: the entity id. :type entity_id: str :param attestation: the attestation. :type attestation: list[str] :param exp: the expiration date. :type exp: datetime :returns: the document id. :rtype: str """ raise NotImplementedError()
@property def is_connected(self) -> bool: """ Check if the storage is connected. :returns: True if the storage is connected, False otherwise. :rtype: bool """ raise NotImplementedError()