Source code for pulsar.cache

from hashlib import sha256
from os.path import exists, join

from .persistence import PersistenceStore
from .util import atomicish_move
from .util import Time


class CacheFileMapper:

    def __init__(self, directory):
        self.directory = directory

    def get(self, token):
        return join(self.directory, token)


[docs] class Cache(PersistenceStore): """ Maintain a cache of uploaded files. """ def __init__(self, cache_directory="file_cache"): super().__init__(join(cache_directory, "cache_shelf")) self.file_mapper = CacheFileMapper(cache_directory) self.time = Time
[docs] def cache_required(self, ip, path): token = self.__token(ip, path) def get_token(): inserted = False if token not in self.shelf: self.shelf[token] = self.time.now() inserted = True return inserted return self._with_lock(get_token)
[docs] def cache_file(self, local_path, ip, path): """ Move a file from a temporary staging area into the cache. """ destination = self.__destination(ip, path) atomicish_move(local_path, destination)
[docs] def file_available(self, ip, path): token = self.__token(ip, path) ready = exists(self.destination(token)) return {"token": token, "ready": ready}
[docs] def destination(self, token): return self.file_mapper.get(token)
def __destination(self, ip, path): token = self.__token(ip, path) return self.destination(token) def __token(self, ip, path): for_hash = "IP:{}:{}".format(ip, path) return sha256(for_hash.encode('UTF-8')).hexdigest()
__all__ = ['Cache']