Skip to content

Commit

Permalink
Speed testZip64LargeFile up by checking in test data.
Browse files Browse the repository at this point in the history
The code to generate the test data is included in the test itself and
will run when testdata is not found.
  • Loading branch information
gpshead committed May 4, 2024
1 parent 5d1f88d commit a1af9d0
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 10 deletions.
87 changes: 77 additions & 10 deletions Lib/test/test_zipimport.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import sys
import os
import marshal
import glob
import importlib
import importlib.util
import re
import struct
import time
import unittest
Expand Down Expand Up @@ -54,6 +56,7 @@ def module_path_to_dotted_name(path):
TESTPACK2 = "ziptestpackage2"
TEMP_DIR = os.path.abspath("junk95142")
TEMP_ZIP = os.path.abspath("junk95142.zip")
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "zipimport_data")

pyc_file = importlib.util.cache_from_source(TESTMOD + '.py')
pyc_ext = '.pyc'
Expand Down Expand Up @@ -818,18 +821,82 @@ def testZip64LargeFile(self):
f"test generates files >{0xFFFFFFFF} bytes and takes a long time "
"to run"
)
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(os_helper.TESTFN, "wb") as f:
f.write(b"data")
f.write(os.linesep.encode())
f.seek(0xFFFFFFFF, os.SEEK_CUR)
f.write(os.linesep.encode())

parts_glob = f"sparse-zip64-c{int(self.compression)}-0x*.part"
full_parts_glob = os.path.join(TEST_DATA_DIR, parts_glob)
pre_built_zip_parts = glob.glob(full_parts_glob)

self.addCleanup(os_helper.unlink, TEMP_ZIP)
with ZipFile(TEMP_ZIP, "w", compression=self.compression) as z:
z.write(os_helper.TESTFN, "data1")
z.writestr("module.py", test_src)
z.write(os_helper.TESTFN, "data2")
if not pre_built_zip_parts:
if self.compression != ZIP_STORED:
support.requires(
"cpu",
"test requires a lot of CPU for compression."
)
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(os_helper.TESTFN, "wb") as f:
f.write(b"data")
f.write(os.linesep.encode())
f.seek(0xFFFFFFFF, os.SEEK_CUR)
f.write(os.linesep.encode())
with ZipFile(TEMP_ZIP, "w", compression=self.compression) as z:
z.write(os_helper.TESTFN, "data1")
z.writestr("module.py", test_src)
z.write(os_helper.TESTFN, "data2")

# This "works" but relies on the zip format having a non-empty
# final page due to the trailing central directory to wind up with
# the correct length file.
def make_sparse_zip_parts(name):
empty_page = b"\0" * 4096
with open(name, "rb") as f:
part = None
try:
while True:
offset = f.tell()
data = f.read(len(empty_page))
if not data:
break
if data != empty_page:
if not part:
part_fullname = os.path.join(
TEST_DATA_DIR,
"sparse-zip64-c%d-0x%09x.part" % (self.compression, offset)
)
part = open(part_fullname, "wb")
print("Created", part_fullname)
part.write(data)
else:
if part:
part.close()
part = None
finally:
if part:
part.close()

if self.compression == ZIP_STORED:
print(f"Creating sparse parts to check in into {TEST_DATA_DIR}:")
make_sparse_zip_parts(TEMP_ZIP)

else:
def extract_offset(name):
if m := re.search(r"-(0x[0-9a-f]{9})\.part$", name):
return int(m.group(1), base=16)
raise ValueError(f"{name=} does not fit expected pattern.")
offset_parts = [(extract_offset(n), n) for n in pre_built_zip_parts]
with open(TEMP_ZIP, "wb") as f:
for offset, part_fn in sorted(offset_parts):
with open(part_fn, "rb") as part:
f.seek(offset, os.SEEK_SET)
f.write(part.read())
# Confirm that the reconstructed zip file works and looks right.
with ZipFile(TEMP_ZIP, "r") as z:
self.assertEqual(
z.read("module.py"), test_src.encode(),
msg=f"Recreate {full_parts_glob}, unexpected contents."
)
self.assertGreater(z.getinfo("data1").file_size, 0xffff_ffff)
self.assertGreater(z.getinfo("data2").file_size, 0xffff_ffff)

self.doTestWithPreBuiltZip(".py", "module")

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit a1af9d0

Please sign in to comment.