forked from pytest-dev/pyfakefs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fake_filesystem.py
6295 lines (5437 loc) · 231 KB
/
fake_filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2009 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A fake filesystem implementation for unit testing.
:Includes:
* :py:class:`FakeFile`: Provides the appearance of a real file.
* :py:class:`FakeDirectory`: Provides the appearance of a real directory.
* :py:class:`FakeFilesystem`: Provides the appearance of a real directory
hierarchy.
* :py:class:`FakeOsModule`: Uses :py:class:`FakeFilesystem` to provide a
fake :py:mod:`os` module replacement.
* :py:class:`FakeIoModule`: Uses :py:class:`FakeFilesystem` to provide a
fake ``io`` module replacement.
* :py:class:`FakePathModule`: Faked ``os.path`` module replacement.
* :py:class:`FakeFileOpen`: Faked ``file()`` and ``open()`` function
replacements.
:Usage:
>>> from pyfakefs import fake_filesystem
>>> filesystem = fake_filesystem.FakeFilesystem()
>>> os_module = fake_filesystem.FakeOsModule(filesystem)
>>> pathname = '/a/new/dir/new-file'
Create a new file object, creating parent directory objects as needed:
>>> os_module.path.exists(pathname)
False
>>> new_file = filesystem.create_file(pathname)
File objects can't be overwritten:
>>> os_module.path.exists(pathname)
True
>>> try:
... filesystem.create_file(pathname)
... except OSError as e:
... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
... assert e.strerror == 'File exists in the fake filesystem'
Remove a file object:
>>> filesystem.remove_object(pathname)
>>> os_module.path.exists(pathname)
False
Create a new file object at the previous path:
>>> beatles_file = filesystem.create_file(pathname,
... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
>>> os_module.path.exists(pathname)
True
Use the FakeFileOpen class to read fake file objects:
>>> file_module = fake_filesystem.FakeFileOpen(filesystem)
>>> for line in file_module(pathname):
... print(line.rstrip())
...
Dear Prudence
Won't you come out to play?
File objects cannot be treated like directory objects:
>>> try:
... os_module.listdir(pathname)
... except OSError as e:
... assert e.errno == errno.ENOTDIR, 'unexpected errno: %d' % e.errno
... assert e.strerror == 'Not a directory in the fake filesystem'
The FakeOsModule can list fake directory objects:
>>> os_module.listdir(os_module.path.dirname(pathname))
['new-file']
The FakeOsModule also supports stat operations:
>>> import stat
>>> stat.S_ISREG(os_module.stat(pathname).st_mode)
True
>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
True
"""
import errno
import functools
import heapq
import inspect
import io
import locale
import os
import random
import sys
import traceback
import uuid
from collections import namedtuple, OrderedDict
from contextlib import contextmanager
from doctest import TestResults
from enum import Enum
from stat import (
S_IFREG,
S_IFDIR,
S_ISLNK,
S_IFMT,
S_ISDIR,
S_IFLNK,
S_ISREG,
S_IFSOCK,
)
from types import ModuleType, TracebackType
from typing import (
List,
Optional,
Callable,
Union,
Any,
Dict,
Tuple,
cast,
AnyStr,
overload,
NoReturn,
ClassVar,
IO,
Iterator,
TextIO,
Type,
)
from pyfakefs.extra_packages import use_scandir
from pyfakefs.fake_scandir import scandir, walk, ScanDirIter
from pyfakefs.helpers import (
FakeStatResult,
BinaryBufferIO,
TextBufferIO,
is_int_type,
is_byte_string,
is_unicode_string,
make_string_path,
IS_PYPY,
to_string,
matching_string,
real_encoding,
now,
AnyPath,
to_bytes,
)
from pyfakefs import __version__ # noqa: F401 for upwards compatibility
PERM_READ = 0o400 # Read permission bit.
PERM_WRITE = 0o200 # Write permission bit.
PERM_EXE = 0o100 # Execute permission bit.
PERM_DEF = 0o777 # Default permission bits.
PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
PERM_ALL = 0o7777 # All permission bits.
_OpenModes = namedtuple(
"_OpenModes",
"must_exist can_read can_write truncate append must_not_exist",
)
_OPEN_MODE_MAP = {
# mode name:(file must exist, can read, can write,
# truncate, append, must not exist)
"r": (True, True, False, False, False, False),
"w": (False, False, True, True, False, False),
"a": (False, False, True, False, True, False),
"r+": (True, True, True, False, False, False),
"w+": (False, True, True, True, False, False),
"a+": (False, True, True, False, True, False),
"x": (False, False, True, False, False, True),
"x+": (False, True, True, False, False, True),
}
AnyFileWrapper = Union[
"FakeFileWrapper",
"FakeDirWrapper",
"StandardStreamWrapper",
"FakePipeWrapper",
]
AnyString = Union[str, bytes]
AnyFile = Union["FakeFile", "FakeDirectory"]
if sys.platform.startswith("linux"):
# on newer Linux system, the default maximum recursion depth is 40
# we ignore older systems here
_MAX_LINK_DEPTH = 40
else:
# on MacOS and Windows, the maximum recursion depth is 32
_MAX_LINK_DEPTH = 32
NR_STD_STREAMS = 3
if sys.platform == "win32":
USER_ID = 1
GROUP_ID = 1
else:
USER_ID = os.getuid()
GROUP_ID = os.getgid()
class OSType(Enum):
"""Defines the real or simulated OS of the underlying file system."""
LINUX = "linux"
MACOS = "macos"
WINDOWS = "windows"
class PatchMode(Enum):
"""Defines if patching shall be on, off, or in automatic mode.
Currently only used for `patch_open_code` option.
"""
OFF = 1
AUTO = 2
ON = 3
def set_uid(uid: int) -> None:
"""Set the global user id. This is used as st_uid for new files
and to differentiate between a normal user and the root user (uid 0).
For the root user, some permission restrictions are ignored.
Args:
uid: (int) the user ID of the user calling the file system functions.
"""
global USER_ID
USER_ID = uid
def set_gid(gid: int) -> None:
"""Set the global group id. This is only used to set st_gid for new files,
no permission checks are performed.
Args:
gid: (int) the group ID of the user calling the file system functions.
"""
global GROUP_ID
GROUP_ID = gid
def reset_ids() -> None:
"""Set the global user ID and group ID back to default values."""
if sys.platform == "win32":
set_uid(1)
set_gid(1)
else:
set_uid(os.getuid())
set_gid(os.getgid())
def is_root() -> bool:
"""Return True if the current user is the root user."""
return USER_ID == 0
class FakeLargeFileIoException(Exception):
"""Exception thrown on unsupported operations for fake large files.
Fake large files have a size with no real content.
"""
def __init__(self, file_path: str) -> None:
super(FakeLargeFileIoException, self).__init__(
"Read and write operations not supported for "
"fake large file: %s" % file_path
)
def _copy_module(old: ModuleType) -> ModuleType:
"""Recompiles and creates new module object."""
saved = sys.modules.pop(old.__name__, None)
new = __import__(old.__name__)
if saved is not None:
sys.modules[old.__name__] = saved
return new
class FakeFile:
"""Provides the appearance of a real file.
Attributes currently faked out:
* `st_mode`: user-specified, otherwise S_IFREG
* `st_ctime`: the time.time() timestamp of the file change time (updated
each time a file's attributes is modified).
* `st_atime`: the time.time() timestamp when the file was last accessed.
* `st_mtime`: the time.time() timestamp when the file was last modified.
* `st_size`: the size of the file
* `st_nlink`: the number of hard links to the file
* `st_ino`: the inode number - a unique number identifying the file
* `st_dev`: a unique number identifying the (fake) file system device
the file belongs to
* `st_uid`: always set to USER_ID, which can be changed globally using
`set_uid`
* `st_gid`: always set to GROUP_ID, which can be changed globally using
`set_gid`
.. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the
real file system depends on the used file system (for example it is
only 1s for HFS+ and older Linux file systems, but much higher for
ext4 and NTFS). This is currently ignored by pyfakefs, which uses
the resolution of `time.time()`.
Under Windows, `st_atime` is not updated for performance reasons by
default. pyfakefs never updates `st_atime` under Windows, assuming
the default setting.
"""
stat_types = (
"st_mode",
"st_ino",
"st_dev",
"st_nlink",
"st_uid",
"st_gid",
"st_size",
"st_atime",
"st_mtime",
"st_ctime",
"st_atime_ns",
"st_mtime_ns",
"st_ctime_ns",
)
def __init__(
self,
name: AnyStr,
st_mode: int = S_IFREG | PERM_DEF_FILE,
contents: Optional[AnyStr] = None,
filesystem: Optional["FakeFilesystem"] = None,
encoding: Optional[str] = None,
errors: Optional[str] = None,
side_effect: Optional[Callable[["FakeFile"], None]] = None,
):
"""
Args:
name: Name of the file/directory, without parent path information
st_mode: The stat.S_IF* constant representing the file type (i.e.
stat.S_IFREG, stat.S_IFDIR), and the file permissions.
If no file type is set (e.g. permission flags only), a
regular file type is assumed.
contents: The contents of the filesystem object; should be a string
or byte object for regular files, and a dict of other
FakeFile or FakeDirectory objects wih the file names as
keys for FakeDirectory objects
filesystem: The fake filesystem where the file is created.
encoding: If contents is a unicode string, the encoding used
for serialization.
errors: The error mode used for encoding/decoding errors.
side_effect: function handle that is executed when file is written,
must accept the file object as an argument.
"""
# to be backwards compatible regarding argument order, we raise on None
if filesystem is None:
raise ValueError("filesystem shall not be None")
self.filesystem: FakeFilesystem = filesystem
self._side_effect: Optional[Callable] = side_effect
self.name: AnyStr = name # type: ignore[assignment]
self.stat_result = FakeStatResult(
filesystem.is_windows_fs, USER_ID, GROUP_ID, now()
)
if st_mode >> 12 == 0:
st_mode |= S_IFREG
self.stat_result.st_mode = st_mode
self.st_size: int = 0
self.encoding: Optional[str] = real_encoding(encoding)
self.errors: str = errors or "strict"
self._byte_contents: Optional[bytes] = self._encode_contents(contents)
self.stat_result.st_size = (
len(self._byte_contents) if self._byte_contents is not None else 0
)
self.epoch: int = 0
self.parent_dir: Optional[FakeDirectory] = None
# Linux specific: extended file system attributes
self.xattr: Dict = {}
self.opened_as: AnyString = ""
@property
def byte_contents(self) -> Optional[bytes]:
"""Return the contents as raw byte array."""
return self._byte_contents
@property
def contents(self) -> Optional[str]:
"""Return the contents as string with the original encoding."""
if isinstance(self.byte_contents, bytes):
return self.byte_contents.decode(
self.encoding or locale.getpreferredencoding(False),
errors=self.errors,
)
return None
@property
def st_ctime(self) -> float:
"""Return the creation time of the fake file."""
return self.stat_result.st_ctime
@st_ctime.setter
def st_ctime(self, val: float) -> None:
"""Set the creation time of the fake file."""
self.stat_result.st_ctime = val
@property
def st_atime(self) -> float:
"""Return the access time of the fake file."""
return self.stat_result.st_atime
@st_atime.setter
def st_atime(self, val: float) -> None:
"""Set the access time of the fake file."""
self.stat_result.st_atime = val
@property
def st_mtime(self) -> float:
"""Return the modification time of the fake file."""
return self.stat_result.st_mtime
@st_mtime.setter
def st_mtime(self, val: float) -> None:
"""Set the modification time of the fake file."""
self.stat_result.st_mtime = val
def set_large_file_size(self, st_size: int) -> None:
"""Sets the self.st_size attribute and replaces self.content with None.
Provided specifically to simulate very large files without regards
to their content (which wouldn't fit in memory).
Note that read/write operations with such a file raise
:py:class:`FakeLargeFileIoException`.
Args:
st_size: (int) The desired file size
Raises:
OSError: if the st_size is not a non-negative integer,
or if st_size exceeds the available file system space
"""
self._check_positive_int(st_size)
if self.st_size:
self.size = 0
if self.filesystem:
self.filesystem.change_disk_usage(st_size, self.name, self.st_dev)
self.st_size = st_size
self._byte_contents = None
def _check_positive_int(self, size: int) -> None:
# the size should be an positive integer value
if not is_int_type(size) or size < 0:
self.filesystem.raise_os_error(errno.ENOSPC, self.name)
def is_large_file(self) -> bool:
"""Return `True` if this file was initialized with size
but no contents.
"""
return self._byte_contents is None
def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]:
if is_unicode_string(contents):
contents = bytes(
cast(str, contents),
self.encoding or locale.getpreferredencoding(False),
self.errors,
)
return cast(bytes, contents)
def set_initial_contents(self, contents: AnyStr) -> bool:
"""Sets the file contents and size.
Called internally after initial file creation.
Args:
contents: string, new content of file.
Returns:
True if the contents have been changed.
Raises:
OSError: if the st_size is not a non-negative integer,
or if st_size exceeds the available file system space
"""
byte_contents = self._encode_contents(contents)
changed = self._byte_contents != byte_contents
st_size = len(byte_contents) if byte_contents else 0
current_size = self.st_size or 0
self.filesystem.change_disk_usage(
st_size - current_size, self.name, self.st_dev
)
self._byte_contents = byte_contents
self.st_size = st_size
self.epoch += 1
return changed
def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
"""Sets the file contents and size and increases the modification time.
Also executes the side_effects if available.
Args:
contents: (str, bytes) new content of file.
encoding: (str) the encoding to be used for writing the contents
if they are a unicode string.
If not given, the locale preferred encoding is used.
Returns:
True if the contents have been changed.
Raises:
OSError: if `st_size` is not a non-negative integer,
or if it exceeds the available file system space.
"""
self.encoding = real_encoding(encoding)
changed = self.set_initial_contents(contents)
if self._side_effect is not None:
self._side_effect(self)
return changed
@property
def size(self) -> int:
"""Return the size in bytes of the file contents."""
return self.st_size
@size.setter
def size(self, st_size: int) -> None:
"""Resizes file content, padding with nulls if new size exceeds the
old size.
Args:
st_size: The desired size for the file.
Raises:
OSError: if the st_size arg is not a non-negative integer
or if st_size exceeds the available file system space
"""
self._check_positive_int(st_size)
current_size = self.st_size or 0
self.filesystem.change_disk_usage(
st_size - current_size, self.name, self.st_dev
)
if self._byte_contents:
if st_size < current_size:
self._byte_contents = self._byte_contents[:st_size]
else:
self._byte_contents += b"\0" * (st_size - current_size)
self.st_size = st_size
self.epoch += 1
@property
def path(self) -> AnyStr:
"""Return the full path of the current object."""
names: List[AnyStr] = []
obj: Optional[FakeFile] = self
while obj:
names.insert(0, matching_string(self.name, obj.name)) # type: ignore
obj = obj.parent_dir
sep = self.filesystem.get_path_separator(names[0])
if names[0] == sep:
names.pop(0)
dir_path = sep.join(names)
drive = self.filesystem.splitdrive(dir_path)[0]
# if a Windows path already starts with a drive or UNC path,
# no extra separator is needed
if not drive:
dir_path = sep + dir_path
else:
dir_path = sep.join(names)
return self.filesystem.absnormpath(dir_path)
if sys.version_info >= (3, 12):
@property
def is_junction(self) -> bool:
return self.filesystem.isjunction(self.path)
def __getattr__(self, item: str) -> Any:
"""Forward some properties to stat_result."""
if item in self.stat_types:
return getattr(self.stat_result, item)
return super().__getattribute__(item)
def __setattr__(self, key: str, value: Any) -> None:
"""Forward some properties to stat_result."""
if key in self.stat_types:
return setattr(self.stat_result, key, value)
return super().__setattr__(key, value)
def __str__(self) -> str:
return "%r(%o)" % (self.name, self.st_mode)
class FakeNullFile(FakeFile):
def __init__(self, filesystem: "FakeFilesystem") -> None:
devnull = "nul" if filesystem.is_windows_fs else "/dev/null"
super(FakeNullFile, self).__init__(devnull, filesystem=filesystem, contents="")
@property
def byte_contents(self) -> bytes:
return b""
def set_initial_contents(self, contents: AnyStr) -> bool:
return False
class FakeFileFromRealFile(FakeFile):
"""Represents a fake file copied from the real file system.
The contents of the file are read on demand only.
"""
def __init__(
self,
file_path: str,
filesystem: "FakeFilesystem",
side_effect: Optional[Callable] = None,
) -> None:
"""
Args:
file_path: Path to the existing file.
filesystem: The fake filesystem where the file is created.
Raises:
OSError: if the file does not exist in the real file system.
OSError: if the file already exists in the fake file system.
"""
super().__init__(
name=os.path.basename(file_path),
filesystem=filesystem,
side_effect=side_effect,
)
self.contents_read = False
@property
def byte_contents(self) -> Optional[bytes]:
if not self.contents_read:
self.contents_read = True
with io.open(self.file_path, "rb") as f:
self._byte_contents = f.read()
# On MacOS and BSD, the above io.open() updates atime on the real file
self.st_atime = os.stat(self.file_path).st_atime
return self._byte_contents
def set_contents(self, contents, encoding=None):
self.contents_read = True
super(FakeFileFromRealFile, self).set_contents(contents, encoding)
def is_large_file(self):
"""The contents are never faked."""
return False
class FakeDirectory(FakeFile):
"""Provides the appearance of a real directory."""
def __init__(
self,
name: str,
perm_bits: int = PERM_DEF,
filesystem: Optional["FakeFilesystem"] = None,
):
"""
Args:
name: name of the file/directory, without parent path information
perm_bits: permission bits. defaults to 0o777.
filesystem: if set, the fake filesystem where the directory
is created
"""
FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem)
# directories have the link count of contained entries,
# including '.' and '..'
self.st_nlink += 1
self._entries: Dict[str, AnyFile] = {}
def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
@property
def entries(self) -> Dict[str, FakeFile]:
"""Return the list of contained directory entries."""
return self._entries
@property
def ordered_dirs(self) -> List[str]:
"""Return the list of contained directory entry names ordered by
creation order.
"""
return [
item[0]
for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino)
]
def add_entry(self, path_object: FakeFile) -> None:
"""Adds a child FakeFile to this directory.
Args:
path_object: FakeFile instance to add as a child of this directory.
Raises:
OSError: if the directory has no write permission (Posix only)
OSError: if the file or directory to be added already exists
"""
if (
not is_root()
and not self.st_mode & PERM_WRITE
and not self.filesystem.is_windows_fs
):
raise OSError(errno.EACCES, "Permission Denied", self.path)
path_object_name: str = to_string(path_object.name)
if path_object_name in self.entries:
self.filesystem.raise_os_error(errno.EEXIST, self.path)
self._entries[path_object_name] = path_object
path_object.parent_dir = self
if path_object.st_ino is None:
self.filesystem.last_ino += 1
path_object.st_ino = self.filesystem.last_ino
self.st_nlink += 1
path_object.st_nlink += 1
path_object.st_dev = self.st_dev
if path_object.st_nlink == 1:
self.filesystem.change_disk_usage(
path_object.size, path_object.name, self.st_dev
)
def get_entry(self, pathname_name: str) -> AnyFile:
"""Retrieves the specified child file or directory entry.
Args:
pathname_name: The basename of the child object to retrieve.
Returns:
The fake file or directory object.
Raises:
KeyError: if no child exists by the specified name.
"""
pathname_name = self._normalized_entryname(pathname_name)
return self.entries[to_string(pathname_name)]
def _normalized_entryname(self, pathname_name: str) -> str:
if not self.filesystem.is_case_sensitive:
matching_names = [
name for name in self.entries if name.lower() == pathname_name.lower()
]
if matching_names:
pathname_name = matching_names[0]
return pathname_name
def remove_entry(self, pathname_name: str, recursive: bool = True) -> None:
"""Removes the specified child file or directory.
Args:
pathname_name: Basename of the child object to remove.
recursive: If True (default), the entries in contained directories
are deleted first. Used to propagate removal errors
(e.g. permission problems) from contained entries.
Raises:
KeyError: if no child exists by the specified name.
OSError: if user lacks permission to delete the file,
or (Windows only) the file is open.
"""
pathname_name = self._normalized_entryname(pathname_name)
entry = self.get_entry(pathname_name)
if self.filesystem.is_windows_fs:
if entry.st_mode & PERM_WRITE == 0:
self.filesystem.raise_os_error(errno.EACCES, pathname_name)
if self.filesystem.has_open_file(entry):
self.filesystem.raise_os_error(errno.EACCES, pathname_name)
else:
if not is_root() and (
self.st_mode & (PERM_WRITE | PERM_EXE) != PERM_WRITE | PERM_EXE
):
self.filesystem.raise_os_error(errno.EACCES, pathname_name)
if recursive and isinstance(entry, FakeDirectory):
while entry.entries:
entry.remove_entry(list(entry.entries)[0])
elif entry.st_nlink == 1:
self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev)
self.st_nlink -= 1
entry.st_nlink -= 1
assert entry.st_nlink >= 0
del self.entries[to_string(pathname_name)]
@property
def size(self) -> int:
"""Return the total size of all files contained
in this directory tree.
"""
return sum([item[1].size for item in self.entries.items()])
@size.setter
def size(self, st_size: int) -> None:
"""Setting the size is an error for a directory."""
raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
def has_parent_object(self, dir_object: "FakeDirectory") -> bool:
"""Return `True` if dir_object is a direct or indirect parent
directory, or if both are the same object."""
obj: Optional[FakeDirectory] = self
while obj:
if obj == dir_object:
return True
obj = obj.parent_dir
return False
def __str__(self) -> str:
description = super(FakeDirectory, self).__str__() + ":\n"
for item in self.entries:
item_desc = self.entries[item].__str__()
for line in item_desc.split("\n"):
if line:
description = description + " " + line + "\n"
return description
class FakeDirectoryFromRealDirectory(FakeDirectory):
"""Represents a fake directory copied from the real file system.
The contents of the directory are read on demand only.
"""
def __init__(
self,
source_path: AnyPath,
filesystem: "FakeFilesystem",
read_only: bool,
target_path: Optional[AnyPath] = None,
):
"""
Args:
source_path: Full directory path.
filesystem: The fake filesystem where the directory is created.
read_only: If set, all files under the directory are treated
as read-only, e.g. a write access raises an exception;
otherwise, writing to the files changes the fake files
only as usually.
target_path: If given, the target path of the directory,
otherwise the target is the same as `source_path`.
Raises:
OSError: if the directory does not exist in the real file system
"""
target_path = target_path or source_path
real_stat = os.stat(source_path)
super(FakeDirectoryFromRealDirectory, self).__init__(
name=to_string(os.path.split(target_path)[1]),
perm_bits=real_stat.st_mode,
filesystem=filesystem,
)
self.st_ctime = real_stat.st_ctime
self.st_atime = real_stat.st_atime
self.st_mtime = real_stat.st_mtime
self.st_gid = real_stat.st_gid
self.st_uid = real_stat.st_uid
self.source_path = source_path # type: ignore
self.read_only = read_only
self.contents_read = False
@property
def entries(self) -> Dict[str, FakeFile]:
"""Return the list of contained directory entries, loading them
if not already loaded."""
if not self.contents_read:
self.contents_read = True
base = self.path
for entry in os.listdir(self.source_path):
source_path = os.path.join(self.source_path, entry)
target_path = os.path.join(base, entry) # type: ignore
if os.path.islink(source_path):
self.filesystem.add_real_symlink(source_path, target_path)
elif os.path.isdir(source_path):
self.filesystem.add_real_directory(
source_path, self.read_only, target_path=target_path
)
else:
self.filesystem.add_real_file(
source_path, self.read_only, target_path=target_path
)
return self._entries
@property
def size(self) -> int:
# we cannot get the size until the contents are loaded
if not self.contents_read:
return 0
return super(FakeDirectoryFromRealDirectory, self).size
@size.setter
def size(self, st_size: int) -> None:
raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
class FakeFilesystem:
"""Provides the appearance of a real directory tree for unit testing.
Attributes:
path_separator: The path separator, corresponds to `os.path.sep`.
alternative_path_separator: Corresponds to `os.path.altsep`.
is_windows_fs: `True` in a real or faked Windows file system.
is_macos: `True` under MacOS, or if we are faking it.
is_case_sensitive: `True` if a case-sensitive file system is assumed.
root: The root :py:class:`FakeDirectory` entry of the file system.
umask: The umask used for newly created files, see `os.umask`.
patcher: Holds the Patcher object if created from it. Allows access
to the patcher object if using the pytest fs fixture.
patch_open_code: Defines how `io.open_code` will be patched;
patching can be on, off, or in automatic mode.
shuffle_listdir_results: If `True`, `os.listdir` will not sort the
results to match the real file system behavior.
"""
def __init__(
self,
path_separator: str = os.path.sep,
total_size: Optional[int] = None,
patcher: Any = None,
) -> None:
"""
Args:
path_separator: optional substitute for os.path.sep
total_size: if not None, the total size in bytes of the
root filesystem.
Example usage to use the same path separator under all systems:
>>> filesystem = FakeFilesystem(path_separator='/')
"""
self.path_separator: str = path_separator
self.alternative_path_separator: Optional[str] = os.path.altsep
self.patcher = patcher
if path_separator != os.sep:
self.alternative_path_separator = None
# is_windows_fs can be used to test the behavior of pyfakefs under
# Windows fs on non-Windows systems and vice verse;
# is it used to support drive letters, UNC paths and some other
# Windows-specific features
self._is_windows_fs = sys.platform == "win32"
# can be used to test some MacOS-specific behavior under other systems
self._is_macos = sys.platform == "darwin"
# is_case_sensitive can be used to test pyfakefs for case-sensitive
# file systems on non-case-sensitive systems and vice verse
self.is_case_sensitive: bool = not (self.is_windows_fs or self._is_macos)
self.root = FakeDirectory(self.path_separator, filesystem=self)
self._cwd = ""
# We can't query the current value without changing it:
self.umask = os.umask(0o22)
os.umask(self.umask)
# A list of open file objects. Their position in the list is their
# file descriptor number
self.open_files: List[Optional[List[AnyFileWrapper]]] = []
# A heap containing all free positions in self.open_files list
self._free_fd_heap: List[int] = []
# last used numbers for inodes (st_ino) and devices (st_dev)
self.last_ino: int = 0
self.last_dev: int = 0
self.mount_points: Dict[AnyString, Dict] = OrderedDict()
self._add_root_mount_point(total_size)
self._add_standard_streams()
self.dev_null = FakeNullFile(self)
# set from outside if needed
self.patch_open_code = PatchMode.OFF
self.shuffle_listdir_results = False
@property
def is_linux(self) -> bool:
return not self.is_windows_fs and not self.is_macos
@property
def is_windows_fs(self) -> bool:
return self._is_windows_fs
@is_windows_fs.setter
def is_windows_fs(self, value: bool) -> None:
if self._is_windows_fs != value:
self._is_windows_fs = value
self.reset()
FakePathModule.reset(self)