Skip to content

Commit

Permalink
Add docstring and use tmp_path in tests
Browse files Browse the repository at this point in the history
Related to pytest-dev#10114
  • Loading branch information
nicoddemus committed Jul 8, 2022
1 parent 2c1effe commit 57945bd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 80 deletions.
102 changes: 56 additions & 46 deletions src/_pytest/atomic_writes.py
@@ -1,5 +1,10 @@
"""
Module copied over from https://github.com/untitaker/python-atomicwrites, which has become
unmaintained.
Since then, we have made changes to simplify the code, focusing on pytest's use-case.
"""
import contextlib
import io
import os
import sys
import tempfile
Expand All @@ -15,7 +20,7 @@
except ImportError:
fspath = None

__version__ = '1.4.1'
__version__ = "1.4.1"


PY2 = sys.version_info[0] == 2
Expand All @@ -35,8 +40,9 @@ def _path_to_unicode(x):
_proper_fsync = os.fsync


if sys.platform != 'win32':
if hasattr(fcntl, 'F_FULLFSYNC'):
if sys.platform != "win32":
if hasattr(fcntl, "F_FULLFSYNC"):

def _proper_fsync(fd):
# https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html
# https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html
Expand Down Expand Up @@ -64,6 +70,7 @@ def _move_atomic(src, dst):
_sync_directory(dst_dir)
if src_dir != dst_dir:
_sync_directory(src_dir)

else:
from ctypes import windll, WinError

Expand All @@ -76,43 +83,47 @@ def _handle_errors(rv):
raise WinError()

def _replace_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING
))
_handle_errors(
windll.kernel32.MoveFileExW(
_path_to_unicode(src),
_path_to_unicode(dst),
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING,
)
)

def _move_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags
))
_handle_errors(
windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst), _windows_default_flags
)
)


def replace_atomic(src, dst):
'''
"""
Move ``src`` to ``dst``. If ``dst`` exists, it will be silently
overwritten.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
"""
return _replace_atomic(src, dst)


def move_atomic(src, dst):
'''
"""
Move ``src`` to ``dst``. There might a timewindow where both filesystem
entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be
raised.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
"""
return _move_atomic(src, dst)


class AtomicWriter(object):
'''
class AtomicWriter:
"""
A helper class for performing atomic writes. Usage::
with AtomicWriter(path).open() as f:
Expand All @@ -130,21 +141,20 @@ class AtomicWriter(object):
If you need further control over the exact behavior, you are encouraged to
subclass.
'''
"""

def __init__(self, path, mode=DEFAULT_MODE, overwrite=False,
**open_kwargs):
if 'a' in mode:
def __init__(self, path, mode=DEFAULT_MODE, overwrite=False, **open_kwargs):
if "a" in mode:
raise ValueError(
'Appending to an existing file is not supported, because that '
'would involve an expensive `copy`-operation to a temporary '
'file. Open the file in normal `w`-mode and copy explicitly '
'if that\'s what you\'re after.'
"Appending to an existing file is not supported, because that "
"would involve an expensive `copy`-operation to a temporary "
"file. Open the file in normal `w`-mode and copy explicitly "
"if that's what you're after."
)
if 'x' in mode:
raise ValueError('Use the `overwrite`-parameter instead.')
if 'w' not in mode:
raise ValueError('AtomicWriters can only be written to.')
if "x" in mode:
raise ValueError("Use the `overwrite`-parameter instead.")
if "w" not in mode:
raise ValueError("AtomicWriters can only be written to.")

# Attempt to convert `path` to `str` or `bytes`
if fspath is not None:
Expand All @@ -156,9 +166,9 @@ def __init__(self, path, mode=DEFAULT_MODE, overwrite=False,
self._open_kwargs = open_kwargs

def open(self):
'''
"""
Open the temporary file.
'''
"""
return self._open(self.get_fileobject)

@contextlib.contextmanager
Expand All @@ -178,41 +188,41 @@ def _open(self, get_fileobject):
except Exception:
pass

def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(),
dir=None, **kwargs):
'''Return the temporary file to use.'''
def get_fileobject(
self, suffix="", prefix=tempfile.gettempprefix(), dir=None, **kwargs
):
"""Return the temporary file to use."""
if dir is None:
dir = os.path.normpath(os.path.dirname(self._path))
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
dir=dir)
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
# io.open() will take either the descriptor or the name, but we need
# the name later for commit()/replace_atomic() and couldn't find a way
# to get the filename from the descriptor.
os.close(descriptor)
kwargs['mode'] = self._mode
kwargs['file'] = name
return io.open(**kwargs)
kwargs["mode"] = self._mode
kwargs["file"] = name
return open(**kwargs)

def sync(self, f):
'''responsible for clearing as many file caches as possible before
commit'''
"""responsible for clearing as many file caches as possible before
commit"""
f.flush()
_proper_fsync(f.fileno())

def commit(self, f):
'''Move the temporary file to the target location.'''
"""Move the temporary file to the target location."""
if self._overwrite:
replace_atomic(f.name, self._path)
else:
move_atomic(f.name, self._path)

def rollback(self, f):
'''Clean up all temporary resources.'''
"""Clean up all temporary resources."""
os.unlink(f.name)


def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
'''
"""
Simple atomic writes. This wraps :py:class:`AtomicWriter`::
with atomic_write(path) as f:
Expand All @@ -225,5 +235,5 @@ def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
Additional keyword arguments are passed to the writer class. See
:py:class:`AtomicWriter`.
'''
"""
return writer_cls(path, **cls_kwargs).open()
75 changes: 41 additions & 34 deletions testing/test_atomic_writes.py
@@ -1,63 +1,69 @@
"""
Module copied over from https://github.com/untitaker/python-atomicwrites, which has become
unmaintained.
Since then, we have made changes to simplify the code, focusing on pytest's use-case.
"""
import errno
import os

from atomicwrites import atomic_write
from pathlib import Path

import pytest
from _pytest.atomic_writes import atomic_write


def test_atomic_write(tmpdir):
fname = tmpdir.join('ha')
def test_atomic_write(tmp_path: Path) -> None:
fname = tmp_path.joinpath("ha")
for i in range(2):
with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho')
f.write("hoho")

with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f:
f.write('haha')
f.write("haha")

assert excinfo.value.errno == errno.EEXIST

assert fname.read() == 'hoho'
assert len(tmpdir.listdir()) == 1
assert fname.read_text() == "hoho"
assert len(list(tmp_path.iterdir())) == 1


def test_teardown(tmpdir):
fname = tmpdir.join('ha')
def test_teardown(tmp_path: Path) -> None:
fname = tmp_path.joinpath("ha")
with pytest.raises(AssertionError):
with atomic_write(str(fname), overwrite=True):
assert False

assert not tmpdir.listdir()
assert not list(tmp_path.iterdir())


def test_replace_simultaneously_created_file(tmpdir):
fname = tmpdir.join('ha')
def test_replace_simultaneously_created_file(tmp_path: Path) -> None:
fname = tmp_path.joinpath("ha")
with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho')
fname.write('harhar')
assert fname.read() == 'harhar'
assert fname.read() == 'hoho'
assert len(tmpdir.listdir()) == 1
f.write("hoho")
fname.write_text("harhar")
assert fname.read_text() == "harhar"
assert fname.read_text() == "hoho"
assert len(list(tmp_path.iterdir())) == 1


def test_dont_remove_simultaneously_created_file(tmpdir):
fname = tmpdir.join('ha')
def test_dont_remove_simultaneously_created_file(tmp_path: Path) -> None:
fname = tmp_path.joinpath("ha")
with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f:
f.write('hoho')
fname.write('harhar')
assert fname.read() == 'harhar'
f.write("hoho")
fname.write_text("harhar")
assert fname.read_text() == "harhar"

assert excinfo.value.errno == errno.EEXIST
assert fname.read() == 'harhar'
assert len(tmpdir.listdir()) == 1
assert fname.read_text() == "harhar"
assert len(list(tmp_path.iterdir())) == 1


# Verify that nested exceptions during rollback do not overwrite the initial
# exception that triggered a rollback.
def test_open_reraise(tmpdir):
fname = tmpdir.join('ha')
def test_open_reraise(tmp_path: Path) -> None:
fname = tmp_path.joinpath("ha")
with pytest.raises(AssertionError):
aw = atomic_write(str(fname), overwrite=False)
with aw:
Expand All @@ -70,22 +76,23 @@ def test_open_reraise(tmpdir):
assert False, "Intentional failure for testing purposes"


def test_atomic_write_in_pwd(tmpdir):
def test_atomic_write_in_pwd(tmp_path: Path) -> None:
orig_curdir = os.getcwd()
try:
os.chdir(str(tmpdir))
fname = 'ha'
os.chdir(str(tmp_path))
fname = "ha"
for i in range(2):
with atomic_write(str(fname), overwrite=True) as f:
f.write('hoho')
f.write("hoho")

with pytest.raises(OSError) as excinfo:
with atomic_write(str(fname), overwrite=False) as f:
f.write('haha')
f.write("haha")

assert excinfo.value.errno == errno.EEXIST

assert open(fname).read() == 'hoho'
assert len(tmpdir.listdir()) == 1
with open(fname) as f:
assert f.read() == "hoho"
assert len(list(tmp_path.iterdir())) == 1
finally:
os.chdir(orig_curdir)

0 comments on commit 57945bd

Please sign in to comment.