diff --git a/src/_pytest/atomic_writes.py b/src/_pytest/atomic_writes.py index 669191bb5fb..e5ce0193207 100644 --- a/src/_pytest/atomic_writes.py +++ b/src/_pytest/atomic_writes.py @@ -1,5 +1,10 @@ +""" +Module copied over from https://github.com/untitaker/python-atomicwrites, which has become +unmaintained. + +Since then, we have made changes to simplify the code, focusing on pytest's use-case. +""" import contextlib -import io import os import sys import tempfile @@ -15,7 +20,7 @@ except ImportError: fspath = None -__version__ = '1.4.1' +__version__ = "1.4.1" PY2 = sys.version_info[0] == 2 @@ -35,8 +40,9 @@ def _path_to_unicode(x): _proper_fsync = os.fsync -if sys.platform != 'win32': - if hasattr(fcntl, 'F_FULLFSYNC'): +if sys.platform != "win32": + if hasattr(fcntl, "F_FULLFSYNC"): + def _proper_fsync(fd): # https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html # https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html @@ -64,6 +70,7 @@ def _move_atomic(src, dst): _sync_directory(dst_dir) if src_dir != dst_dir: _sync_directory(src_dir) + else: from ctypes import windll, WinError @@ -76,43 +83,47 @@ def _handle_errors(rv): raise WinError() def _replace_atomic(src, dst): - _handle_errors(windll.kernel32.MoveFileExW( - _path_to_unicode(src), _path_to_unicode(dst), - _windows_default_flags | _MOVEFILE_REPLACE_EXISTING - )) + _handle_errors( + windll.kernel32.MoveFileExW( + _path_to_unicode(src), + _path_to_unicode(dst), + _windows_default_flags | _MOVEFILE_REPLACE_EXISTING, + ) + ) def _move_atomic(src, dst): - _handle_errors(windll.kernel32.MoveFileExW( - _path_to_unicode(src), _path_to_unicode(dst), - _windows_default_flags - )) + _handle_errors( + windll.kernel32.MoveFileExW( + _path_to_unicode(src), _path_to_unicode(dst), _windows_default_flags + ) + ) def replace_atomic(src, dst): - ''' + """ Move ``src`` to ``dst``. If ``dst`` exists, it will be silently overwritten. Both paths must reside on the same filesystem for the operation to be atomic. - ''' + """ return _replace_atomic(src, dst) def move_atomic(src, dst): - ''' + """ Move ``src`` to ``dst``. There might a timewindow where both filesystem entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be raised. Both paths must reside on the same filesystem for the operation to be atomic. - ''' + """ return _move_atomic(src, dst) -class AtomicWriter(object): - ''' +class AtomicWriter: + """ A helper class for performing atomic writes. Usage:: with AtomicWriter(path).open() as f: @@ -130,21 +141,20 @@ class AtomicWriter(object): If you need further control over the exact behavior, you are encouraged to subclass. - ''' + """ - def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, - **open_kwargs): - if 'a' in mode: + def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, **open_kwargs): + if "a" in mode: raise ValueError( - 'Appending to an existing file is not supported, because that ' - 'would involve an expensive `copy`-operation to a temporary ' - 'file. Open the file in normal `w`-mode and copy explicitly ' - 'if that\'s what you\'re after.' + "Appending to an existing file is not supported, because that " + "would involve an expensive `copy`-operation to a temporary " + "file. Open the file in normal `w`-mode and copy explicitly " + "if that's what you're after." ) - if 'x' in mode: - raise ValueError('Use the `overwrite`-parameter instead.') - if 'w' not in mode: - raise ValueError('AtomicWriters can only be written to.') + if "x" in mode: + raise ValueError("Use the `overwrite`-parameter instead.") + if "w" not in mode: + raise ValueError("AtomicWriters can only be written to.") # Attempt to convert `path` to `str` or `bytes` if fspath is not None: @@ -156,9 +166,9 @@ def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, self._open_kwargs = open_kwargs def open(self): - ''' + """ Open the temporary file. - ''' + """ return self._open(self.get_fileobject) @contextlib.contextmanager @@ -178,41 +188,41 @@ def _open(self, get_fileobject): except Exception: pass - def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(), - dir=None, **kwargs): - '''Return the temporary file to use.''' + def get_fileobject( + self, suffix="", prefix=tempfile.gettempprefix(), dir=None, **kwargs + ): + """Return the temporary file to use.""" if dir is None: dir = os.path.normpath(os.path.dirname(self._path)) - descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, - dir=dir) + descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) # io.open() will take either the descriptor or the name, but we need # the name later for commit()/replace_atomic() and couldn't find a way # to get the filename from the descriptor. os.close(descriptor) - kwargs['mode'] = self._mode - kwargs['file'] = name - return io.open(**kwargs) + kwargs["mode"] = self._mode + kwargs["file"] = name + return open(**kwargs) def sync(self, f): - '''responsible for clearing as many file caches as possible before - commit''' + """responsible for clearing as many file caches as possible before + commit""" f.flush() _proper_fsync(f.fileno()) def commit(self, f): - '''Move the temporary file to the target location.''' + """Move the temporary file to the target location.""" if self._overwrite: replace_atomic(f.name, self._path) else: move_atomic(f.name, self._path) def rollback(self, f): - '''Clean up all temporary resources.''' + """Clean up all temporary resources.""" os.unlink(f.name) def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs): - ''' + """ Simple atomic writes. This wraps :py:class:`AtomicWriter`:: with atomic_write(path) as f: @@ -225,5 +235,5 @@ def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs): Additional keyword arguments are passed to the writer class. See :py:class:`AtomicWriter`. - ''' + """ return writer_cls(path, **cls_kwargs).open() diff --git a/testing/test_atomic_writes.py b/testing/test_atomic_writes.py index b3128c5b4f1..9eda632ab14 100644 --- a/testing/test_atomic_writes.py +++ b/testing/test_atomic_writes.py @@ -1,63 +1,69 @@ +""" +Module copied over from https://github.com/untitaker/python-atomicwrites, which has become +unmaintained. + +Since then, we have made changes to simplify the code, focusing on pytest's use-case. +""" import errno import os - -from atomicwrites import atomic_write +from pathlib import Path import pytest +from _pytest.atomic_writes import atomic_write -def test_atomic_write(tmpdir): - fname = tmpdir.join('ha') +def test_atomic_write(tmp_path: Path) -> None: + fname = tmp_path.joinpath("ha") for i in range(2): with atomic_write(str(fname), overwrite=True) as f: - f.write('hoho') + f.write("hoho") with pytest.raises(OSError) as excinfo: with atomic_write(str(fname), overwrite=False) as f: - f.write('haha') + f.write("haha") assert excinfo.value.errno == errno.EEXIST - assert fname.read() == 'hoho' - assert len(tmpdir.listdir()) == 1 + assert fname.read_text() == "hoho" + assert len(list(tmp_path.iterdir())) == 1 -def test_teardown(tmpdir): - fname = tmpdir.join('ha') +def test_teardown(tmp_path: Path) -> None: + fname = tmp_path.joinpath("ha") with pytest.raises(AssertionError): with atomic_write(str(fname), overwrite=True): assert False - assert not tmpdir.listdir() + assert not list(tmp_path.iterdir()) -def test_replace_simultaneously_created_file(tmpdir): - fname = tmpdir.join('ha') +def test_replace_simultaneously_created_file(tmp_path: Path) -> None: + fname = tmp_path.joinpath("ha") with atomic_write(str(fname), overwrite=True) as f: - f.write('hoho') - fname.write('harhar') - assert fname.read() == 'harhar' - assert fname.read() == 'hoho' - assert len(tmpdir.listdir()) == 1 + f.write("hoho") + fname.write_text("harhar") + assert fname.read_text() == "harhar" + assert fname.read_text() == "hoho" + assert len(list(tmp_path.iterdir())) == 1 -def test_dont_remove_simultaneously_created_file(tmpdir): - fname = tmpdir.join('ha') +def test_dont_remove_simultaneously_created_file(tmp_path: Path) -> None: + fname = tmp_path.joinpath("ha") with pytest.raises(OSError) as excinfo: with atomic_write(str(fname), overwrite=False) as f: - f.write('hoho') - fname.write('harhar') - assert fname.read() == 'harhar' + f.write("hoho") + fname.write_text("harhar") + assert fname.read_text() == "harhar" assert excinfo.value.errno == errno.EEXIST - assert fname.read() == 'harhar' - assert len(tmpdir.listdir()) == 1 + assert fname.read_text() == "harhar" + assert len(list(tmp_path.iterdir())) == 1 # Verify that nested exceptions during rollback do not overwrite the initial # exception that triggered a rollback. -def test_open_reraise(tmpdir): - fname = tmpdir.join('ha') +def test_open_reraise(tmp_path: Path) -> None: + fname = tmp_path.joinpath("ha") with pytest.raises(AssertionError): aw = atomic_write(str(fname), overwrite=False) with aw: @@ -70,22 +76,23 @@ def test_open_reraise(tmpdir): assert False, "Intentional failure for testing purposes" -def test_atomic_write_in_pwd(tmpdir): +def test_atomic_write_in_pwd(tmp_path: Path) -> None: orig_curdir = os.getcwd() try: - os.chdir(str(tmpdir)) - fname = 'ha' + os.chdir(str(tmp_path)) + fname = "ha" for i in range(2): with atomic_write(str(fname), overwrite=True) as f: - f.write('hoho') + f.write("hoho") with pytest.raises(OSError) as excinfo: with atomic_write(str(fname), overwrite=False) as f: - f.write('haha') + f.write("haha") assert excinfo.value.errno == errno.EEXIST - assert open(fname).read() == 'hoho' - assert len(tmpdir.listdir()) == 1 + with open(fname) as f: + assert f.read() == "hoho" + assert len(list(tmp_path.iterdir())) == 1 finally: os.chdir(orig_curdir)