Source code for pulsar.cache.util
import os
import shutil
from datetime import datetime
[docs]
def atomicish_move(source, destination, tmp_suffix="_TMP"):
"""Move source to destination without risk of partial moves.
> from tempfile import mkdtemp
> from os.path import join, exists
> temp_dir = mkdtemp()
> source = join(temp_dir, "the_source")
> destination = join(temp_dir, "the_dest")
> open(source, "wb").write(b"Hello World!")
> assert exists(source)
> assert not exists(destination)
> atomicish_move(source, destination)
> assert not exists(source)
> assert exists(destination)
"""
destination_dir = os.path.dirname(destination)
destination_name = os.path.basename(destination)
temp_destination = os.path.join(destination_dir, "{}{}".format(destination_name, tmp_suffix))
shutil.move(source, temp_destination)
os.rename(temp_destination, destination)
[docs]
class Time:
"""Time utilities of now that can be instrumented for testing."""
[docs]
@classmethod
def now(cls):
"""Return the current datetime."""
return datetime.utcnow()