Source code for pyeudiw.storage.mongo_cache

from datetime import datetime
from typing import Callable

import pymongo
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.mongo_client import MongoClient

from pyeudiw.storage.base_cache import BaseCache, RetrieveStatus


[docs] class MongoCache(BaseCache): """ MongoDB cache implementation. """ def __init__(self, conf: dict, url: str, connection_params: dict = None) -> None: """ Create a MongoCache istance. :param conf: the configuration of the cache. :type conf: dict :param url: the url of the MongoDB server. :type url: str :param connection_params: the connection parameters. :type connection_params: dict, optional """ super().__init__() self.storage_conf = conf self.url = url self.connection_params = connection_params self.client: MongoClient = None self.db: Database = None self.collection: Collection = None
[docs] def close(self) -> None: self._connect() self.client.close()
[docs] def try_retrieve(self, object_name: str, on_not_found: Callable[[], str]) -> tuple[dict, RetrieveStatus]: self._connect() query = {"object_name": object_name} cache_object = self.collection.find_one(query) if cache_object is None: cache_object = self._gen_cache_object(object_name, on_not_found()) self.collection.insert_one(cache_object) return cache_object, RetrieveStatus.ADDED return cache_object, RetrieveStatus.RETRIEVED
[docs] def overwrite(self, object_name: str, value_gen_fn: Callable[[], str]) -> dict: self._connect() update_time = datetime.now().isoformat() new_data = value_gen_fn() cache_object = { "object_name": object_name, "data": new_data, "creation_date": update_time, } query = {"object_name": object_name} self.collection.update_one(query, {"$set": {"data": new_data, "creation_date": update_time}}) return cache_object
[docs] def set(self, data: dict) -> dict: self._connect() return self.collection.insert_one(data)
def _reset_connection(self) -> None: """Clear client and db references so the next _connect() creates a fresh connection. Used when MongoDB closes the connection (AutoReconnect / "connection closed"), e.g. under load during pytest runs with multiple DBEngine instances. """ self.client = None self.db = None self.collection = None def _connect(self) -> None: # If the existing client's socket was closed by MongoDB (e.g. under load), reset # and create a new MongoClient so we do not reuse a dead connection. try: if not self.client: raise ValueError("no client") self.client.server_info() except (ValueError, pymongo.errors.AutoReconnect, OSError, AttributeError): self._reset_connection() if not self.client: params = dict(self.connection_params or {}) params.setdefault("maxPoolSize", 10) self.client = pymongo.MongoClient(self.url, **params) self.db = getattr(self.client, self.storage_conf["db_name"]) self.collection = getattr(self.db, "cache_storage") def _gen_cache_object(self, object_name: str, data: str) -> dict: """ Helper function to generate a cache object. :param object_name: the name of the object. :type object_name: str :param data: the data to store. :type data: str """ return { "object_name": object_name, "data": data, "creation_date": datetime.now().isoformat(), }