forked from pytest-dev/pytest
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add atomicwrites source and tests verbatim from the original repository
Related to pytest-dev#10114
- Loading branch information
1 parent
966d4fb
commit 2c1effe
Showing
2 changed files
with
320 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
import contextlib | ||
import io | ||
import os | ||
import sys | ||
import tempfile | ||
|
||
try: | ||
import fcntl | ||
except ImportError: | ||
fcntl = None | ||
|
||
# `fspath` was added in Python 3.6 | ||
try: | ||
from os import fspath | ||
except ImportError: | ||
fspath = None | ||
|
||
__version__ = '1.4.1' | ||
|
||
|
||
PY2 = sys.version_info[0] == 2 | ||
|
||
text_type = unicode if PY2 else str # noqa | ||
|
||
|
||
def _path_to_unicode(x): | ||
if not isinstance(x, text_type): | ||
return x.decode(sys.getfilesystemencoding()) | ||
return x | ||
|
||
|
||
DEFAULT_MODE = "wb" if PY2 else "w" | ||
|
||
|
||
_proper_fsync = os.fsync | ||
|
||
|
||
if sys.platform != 'win32': | ||
if hasattr(fcntl, 'F_FULLFSYNC'): | ||
def _proper_fsync(fd): | ||
# https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html | ||
# https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html | ||
# https://github.com/untitaker/python-atomicwrites/issues/6 | ||
fcntl.fcntl(fd, fcntl.F_FULLFSYNC) | ||
|
||
def _sync_directory(directory): | ||
# Ensure that filenames are written to disk | ||
fd = os.open(directory, 0) | ||
try: | ||
_proper_fsync(fd) | ||
finally: | ||
os.close(fd) | ||
|
||
def _replace_atomic(src, dst): | ||
os.rename(src, dst) | ||
_sync_directory(os.path.normpath(os.path.dirname(dst))) | ||
|
||
def _move_atomic(src, dst): | ||
os.link(src, dst) | ||
os.unlink(src) | ||
|
||
src_dir = os.path.normpath(os.path.dirname(src)) | ||
dst_dir = os.path.normpath(os.path.dirname(dst)) | ||
_sync_directory(dst_dir) | ||
if src_dir != dst_dir: | ||
_sync_directory(src_dir) | ||
else: | ||
from ctypes import windll, WinError | ||
|
||
_MOVEFILE_REPLACE_EXISTING = 0x1 | ||
_MOVEFILE_WRITE_THROUGH = 0x8 | ||
_windows_default_flags = _MOVEFILE_WRITE_THROUGH | ||
|
||
def _handle_errors(rv): | ||
if not rv: | ||
raise WinError() | ||
|
||
def _replace_atomic(src, dst): | ||
_handle_errors(windll.kernel32.MoveFileExW( | ||
_path_to_unicode(src), _path_to_unicode(dst), | ||
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING | ||
)) | ||
|
||
def _move_atomic(src, dst): | ||
_handle_errors(windll.kernel32.MoveFileExW( | ||
_path_to_unicode(src), _path_to_unicode(dst), | ||
_windows_default_flags | ||
)) | ||
|
||
|
||
def replace_atomic(src, dst): | ||
''' | ||
Move ``src`` to ``dst``. If ``dst`` exists, it will be silently | ||
overwritten. | ||
Both paths must reside on the same filesystem for the operation to be | ||
atomic. | ||
''' | ||
return _replace_atomic(src, dst) | ||
|
||
|
||
def move_atomic(src, dst): | ||
''' | ||
Move ``src`` to ``dst``. There might a timewindow where both filesystem | ||
entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be | ||
raised. | ||
Both paths must reside on the same filesystem for the operation to be | ||
atomic. | ||
''' | ||
return _move_atomic(src, dst) | ||
|
||
|
||
class AtomicWriter(object): | ||
''' | ||
A helper class for performing atomic writes. Usage:: | ||
with AtomicWriter(path).open() as f: | ||
f.write(...) | ||
:param path: The destination filepath. May or may not exist. | ||
:param mode: The filemode for the temporary file. This defaults to `wb` in | ||
Python 2 and `w` in Python 3. | ||
:param overwrite: If set to false, an error is raised if ``path`` exists. | ||
Errors are only raised after the file has been written to. Either way, | ||
the operation is atomic. | ||
:param open_kwargs: Keyword-arguments to pass to the underlying | ||
:py:func:`open` call. This can be used to set the encoding when opening | ||
files in text-mode. | ||
If you need further control over the exact behavior, you are encouraged to | ||
subclass. | ||
''' | ||
|
||
def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, | ||
**open_kwargs): | ||
if 'a' in mode: | ||
raise ValueError( | ||
'Appending to an existing file is not supported, because that ' | ||
'would involve an expensive `copy`-operation to a temporary ' | ||
'file. Open the file in normal `w`-mode and copy explicitly ' | ||
'if that\'s what you\'re after.' | ||
) | ||
if 'x' in mode: | ||
raise ValueError('Use the `overwrite`-parameter instead.') | ||
if 'w' not in mode: | ||
raise ValueError('AtomicWriters can only be written to.') | ||
|
||
# Attempt to convert `path` to `str` or `bytes` | ||
if fspath is not None: | ||
path = fspath(path) | ||
|
||
self._path = path | ||
self._mode = mode | ||
self._overwrite = overwrite | ||
self._open_kwargs = open_kwargs | ||
|
||
def open(self): | ||
''' | ||
Open the temporary file. | ||
''' | ||
return self._open(self.get_fileobject) | ||
|
||
@contextlib.contextmanager | ||
def _open(self, get_fileobject): | ||
f = None # make sure f exists even if get_fileobject() fails | ||
try: | ||
success = False | ||
with get_fileobject(**self._open_kwargs) as f: | ||
yield f | ||
self.sync(f) | ||
self.commit(f) | ||
success = True | ||
finally: | ||
if not success: | ||
try: | ||
self.rollback(f) | ||
except Exception: | ||
pass | ||
|
||
def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(), | ||
dir=None, **kwargs): | ||
'''Return the temporary file to use.''' | ||
if dir is None: | ||
dir = os.path.normpath(os.path.dirname(self._path)) | ||
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, | ||
dir=dir) | ||
# io.open() will take either the descriptor or the name, but we need | ||
# the name later for commit()/replace_atomic() and couldn't find a way | ||
# to get the filename from the descriptor. | ||
os.close(descriptor) | ||
kwargs['mode'] = self._mode | ||
kwargs['file'] = name | ||
return io.open(**kwargs) | ||
|
||
def sync(self, f): | ||
'''responsible for clearing as many file caches as possible before | ||
commit''' | ||
f.flush() | ||
_proper_fsync(f.fileno()) | ||
|
||
def commit(self, f): | ||
'''Move the temporary file to the target location.''' | ||
if self._overwrite: | ||
replace_atomic(f.name, self._path) | ||
else: | ||
move_atomic(f.name, self._path) | ||
|
||
def rollback(self, f): | ||
'''Clean up all temporary resources.''' | ||
os.unlink(f.name) | ||
|
||
|
||
def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs): | ||
''' | ||
Simple atomic writes. This wraps :py:class:`AtomicWriter`:: | ||
with atomic_write(path) as f: | ||
f.write(...) | ||
:param path: The target path to write to. | ||
:param writer_cls: The writer class to use. This parameter is useful if you | ||
subclassed :py:class:`AtomicWriter` to change some behavior and want to | ||
use that new subclass. | ||
Additional keyword arguments are passed to the writer class. See | ||
:py:class:`AtomicWriter`. | ||
''' | ||
return writer_cls(path, **cls_kwargs).open() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import errno | ||
import os | ||
|
||
from atomicwrites import atomic_write | ||
|
||
import pytest | ||
|
||
|
||
def test_atomic_write(tmpdir): | ||
fname = tmpdir.join('ha') | ||
for i in range(2): | ||
with atomic_write(str(fname), overwrite=True) as f: | ||
f.write('hoho') | ||
|
||
with pytest.raises(OSError) as excinfo: | ||
with atomic_write(str(fname), overwrite=False) as f: | ||
f.write('haha') | ||
|
||
assert excinfo.value.errno == errno.EEXIST | ||
|
||
assert fname.read() == 'hoho' | ||
assert len(tmpdir.listdir()) == 1 | ||
|
||
|
||
def test_teardown(tmpdir): | ||
fname = tmpdir.join('ha') | ||
with pytest.raises(AssertionError): | ||
with atomic_write(str(fname), overwrite=True): | ||
assert False | ||
|
||
assert not tmpdir.listdir() | ||
|
||
|
||
def test_replace_simultaneously_created_file(tmpdir): | ||
fname = tmpdir.join('ha') | ||
with atomic_write(str(fname), overwrite=True) as f: | ||
f.write('hoho') | ||
fname.write('harhar') | ||
assert fname.read() == 'harhar' | ||
assert fname.read() == 'hoho' | ||
assert len(tmpdir.listdir()) == 1 | ||
|
||
|
||
def test_dont_remove_simultaneously_created_file(tmpdir): | ||
fname = tmpdir.join('ha') | ||
with pytest.raises(OSError) as excinfo: | ||
with atomic_write(str(fname), overwrite=False) as f: | ||
f.write('hoho') | ||
fname.write('harhar') | ||
assert fname.read() == 'harhar' | ||
|
||
assert excinfo.value.errno == errno.EEXIST | ||
assert fname.read() == 'harhar' | ||
assert len(tmpdir.listdir()) == 1 | ||
|
||
|
||
# Verify that nested exceptions during rollback do not overwrite the initial | ||
# exception that triggered a rollback. | ||
def test_open_reraise(tmpdir): | ||
fname = tmpdir.join('ha') | ||
with pytest.raises(AssertionError): | ||
aw = atomic_write(str(fname), overwrite=False) | ||
with aw: | ||
# Mess with internals, so commit will trigger a ValueError. We're | ||
# testing that the initial AssertionError triggered below is | ||
# propagated up the stack, not the second exception triggered | ||
# during commit. | ||
aw.rollback = lambda: 1 / 0 | ||
# Now trigger our own exception. | ||
assert False, "Intentional failure for testing purposes" | ||
|
||
|
||
def test_atomic_write_in_pwd(tmpdir): | ||
orig_curdir = os.getcwd() | ||
try: | ||
os.chdir(str(tmpdir)) | ||
fname = 'ha' | ||
for i in range(2): | ||
with atomic_write(str(fname), overwrite=True) as f: | ||
f.write('hoho') | ||
|
||
with pytest.raises(OSError) as excinfo: | ||
with atomic_write(str(fname), overwrite=False) as f: | ||
f.write('haha') | ||
|
||
assert excinfo.value.errno == errno.EEXIST | ||
|
||
assert open(fname).read() == 'hoho' | ||
assert len(tmpdir.listdir()) == 1 | ||
finally: | ||
os.chdir(orig_curdir) |