from typing import Optional
from pyeudiw.tools.utils import iat_now
from pyeudiw.tools.http import http_get_sync
from pyeudiw.status_list import decode_jwt_status_list_token, decode_cwt_status_list_token
from pyeudiw.status_list.exceptions import (
PositionOutOfRangeError,
InvalidTokenFormatError,
MissingStatusListUriError,
StatusListRetrievalError
)
[docs]
class StatusListTokenHelper:
def __init__(
self,
header: dict,
payload: dict,
bits: int,
status_list: bytes,
aggregation_uri: Optional[str] = None
) -> None:
"""
Initializes the StatusListTokenHelper instance.
:param header: The header of the token.
:type header: dict
:param payload: The payload of the token.
:type payload: dict
:param bits: The number of bits used for the status list.
:type bits: int
:param status_list: The status list.
:type status_list: bytes
:param aggregation_uri: The aggregation URI.
:type aggregation_uri: Optional[str]
"""
self.header = header
self.payload = payload
self.bits = bits
self.status_list = status_list
self.aggregation_uri = aggregation_uri
[docs]
def is_expired(self) -> bool:
"""
Returns True if the token is expired, False otherwise.
:returns: True if the token is expired, False otherwise.
:rtype: bool
"""
expiration_time = self.payload.get(
"exp",
self.payload.get(4)
)
if expiration_time is None:
return False
current_time = iat_now()
return current_time > expiration_time
[docs]
def get_status(self, position: int) -> int:
"""
Returns the status at the given position.
:param position: The position of the status in the list.
:type position: int
:raises IndexError: If the position is out of range.
:returns: The status at the given position.
:rtype: int
"""
total_elemets_number = (len(self.status_list) * 8) // self.bits
if position < 0:
raise PositionOutOfRangeError("Position cannot be negative")
if position >= total_elemets_number:
raise PositionOutOfRangeError("Position out of range")
jump = self.bits * position
mask = (1 << self.bits) - 1
status = (self.status_list[jump // 8] >> (jump % 8)) & mask
return status
[docs]
def get_aggregation_uri(self) -> Optional[str]:
"""
Returns the aggregation URI.
:returns: The aggregation URI.
:rtype: Optional[str]
"""
return self.aggregation_uri
@property
def ttl(self) -> Optional[int]:
"""
Returns the time to live (TTL) of the token in seconds.
:returns: The TTL of the token in seconds.
:rtype: Optional[int]
"""
return self.payload.get(
"ttl",
self.payload.get(65534)
)
@property
def iss(self) -> Optional[str]:
"""
Returns the issuer of the token.
:returns: The issuer of the token.
:rtype: Optional[str]
"""
return self.payload.get("iss")
@property
def sub(self) -> Optional[str]:
"""
Returns the subject of the token.
:returns: The subject of the token.
:rtype: Optional[str]
"""
return self.payload.get(
"sub",
self.payload.get(2)
)
@property
def iat(self) -> Optional[int]:
"""
Returns the issued at time of the token.
:returns: The issued at time of the token.
:rtype: Optional[int]
"""
return self.payload.get(
"iat",
self.payload.get(6)
)
[docs]
@staticmethod
def from_token(token: str | bytes) -> "StatusListTokenHelper":
"""
Create a StatusListTokenHelper instance from a status list token.
:param token: The status list token.
:type token: str | bytes
:raises InvalidTokenFormatError: If the token is not a valid JWT or CWT.
:returns: A StatusListTokenHelper instance.
:rtype: StatusListTokenHelper
"""
decoders = [decode_jwt_status_list_token, decode_cwt_status_list_token]
for decoder in decoders:
status, header, payload, bits, status_list = decoder(token)
if status:
return StatusListTokenHelper(header, payload, bits, status_list)
raise InvalidTokenFormatError(f"Token is not a valid JWT or CWT {token}")
[docs]
@staticmethod
def from_status(status: dict, httpc_params: Optional[dict] = None) -> "StatusListTokenHelper":
"""
Create a StatusListTokenHelper instance from a status dictionary.
:param status: The status dictionary.
:type status: dict
:raises MissingStatusListUriError: If the status list URI is missing.
:raises StatusListRetrievalError: If there is an error retrieving the status list.
:raises InvalidTokenFormatError: If the retrieved token is invalid.
:returns: A StatusListTokenHelper instance.
:rtype: StatusListTokenHelper
"""
uri = status.get("status_list", {}).get("uri")
if uri is None:
raise MissingStatusListUriError("Status list URI is missing")
try:
status_token = http_get_sync([uri], {
"connection": {"ssl": True},
"session": {"timeout": 4},
})
except Exception as e:
raise StatusListRetrievalError(f"Failed to retrieve status list token: {e}")
token = status_token[0].text
return StatusListTokenHelper.from_token(token)