forked from intel/dffml
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
operation: compression: gz, bz2, and xz formats
- Loading branch information
1 parent
c16f6fa
commit 5edb093
Showing
3 changed files
with
215 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import sys | ||
import bz2 | ||
import gzip | ||
import lzma | ||
import shutil | ||
|
||
from ..df.base import op | ||
from ..df.types import Definition | ||
|
||
|
||
def make_compress(extension, compression_cls): | ||
async def compress( | ||
input_file_path: str, output_file_path: str, | ||
): | ||
f""" | ||
A simple function to compress a {extension} file. | ||
Parameters | ||
---------- | ||
input_file_path : str | ||
Path of the file to be compressed. | ||
output_file_path : str | ||
Path where the output should be saved (should include file name). | ||
""" | ||
with open(input_file_path, "rb") as f_in: | ||
with compression_cls.open(output_file_path, "wb") as f_out: | ||
shutil.copyfileobj(f_in, f_out) | ||
|
||
return compress | ||
|
||
|
||
def make_decompress(extension, compression_cls): | ||
async def decompress(input_file_path: str, output_file_path: str): | ||
f""" | ||
A simple function to decompress a {extension} file. | ||
Parameters | ||
---------- | ||
input_file_path : str | ||
Path of the file to be decompressed. | ||
output_file_path : str | ||
Path where the output should be saved (should include file name). | ||
""" | ||
with compression_cls.open(input_file_path, "rb") as f_in: | ||
with open(output_file_path, "wb") as f_out: | ||
shutil.copyfileobj(f_in, f_out) | ||
|
||
return decompress | ||
|
||
|
||
SUPPORTED_COMPRESSION_FORMATS = {"gz": gzip, "bz2": bz2, "xz": lzma} | ||
|
||
for extension, compression_cls in SUPPORTED_COMPRESSION_FORMATS.items(): | ||
# Create definitions for compressed/decompressed file path for this format | ||
compressed_file_path = Definition( | ||
name=f"compressed_{extension}_file_path", primitive="str" | ||
) | ||
decompressed_file_path = Definition( | ||
name=f"decompressed_{extension}_file_path", primitive="str" | ||
) | ||
|
||
compress = op( | ||
inputs={ | ||
"input_file_path": decompressed_file_path, | ||
"output_file_path": compressed_file_path, | ||
}, | ||
outputs={}, | ||
)(make_compress(extension, compression_cls)) | ||
decompress = op( | ||
inputs={ | ||
"input_file_path": compressed_file_path, | ||
"output_file_path": decompressed_file_path, | ||
}, | ||
outputs={}, | ||
)(make_decompress(extension, compression_cls)) | ||
|
||
setattr(sys.modules[__name__], f"{extension}_compress", compress) | ||
setattr(sys.modules[__name__], f"{extension}_decompress", decompress) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
from pathlib import PosixPath | ||
from unittest.mock import patch, mock_open, call | ||
|
||
from .test_archive import create_dataflow | ||
|
||
from dffml import run | ||
from dffml.util.asynctestcase import AsyncTestCase | ||
from dffml.operation.compression import ( | ||
gz_compress, | ||
gz_decompress, | ||
bz2_compress, | ||
bz2_decompress, | ||
xz_compress, | ||
xz_decompress, | ||
) | ||
|
||
|
||
class TestCompressionOperations(AsyncTestCase): | ||
uncomressed_file_pth = "test/path/to/uncompressed_file.ext" | ||
compressed_file_pth = ( | ||
lambda self, file_format: f"test/path/to/compressed_file{file_format}" | ||
) | ||
|
||
def get_creation_mock_calls(self, file_format): | ||
return [ | ||
call(), | ||
call(), | ||
call()(self.uncomressed_file_pth, "rb"), | ||
call()().__enter__(), | ||
call()(self.compressed_file_pth(file_format), "wb"), | ||
call()().__enter__(), | ||
call()().__exit__(None, None, None), | ||
call()().__exit__(None, None, None), | ||
] | ||
|
||
def get_inflation_mock_calls(self, file_format): | ||
return [ | ||
call(), | ||
call(), | ||
call()(self.compressed_file_pth(file_format), "rb"), | ||
call()().__enter__(), | ||
call()(self.uncomressed_file_pth, "wb"), | ||
call()().__enter__(), | ||
call()().__exit__(None, None, None), | ||
call()().__exit__(None, None, None), | ||
] | ||
|
||
async def test_create_gz(self): | ||
dataflow = create_dataflow( | ||
gz_compress, | ||
{ | ||
"input_file_path": self.uncomressed_file_pth, | ||
"output_file_path": self.compressed_file_pth(".gz"), | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"gzip.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_creation_mock_calls(".gz")) | ||
|
||
async def test_create_bz2(self): | ||
dataflow = create_dataflow( | ||
bz2_compress, | ||
{ | ||
"input_file_path": self.uncomressed_file_pth, | ||
"output_file_path": self.compressed_file_pth(".bz2"), | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"bz2.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_creation_mock_calls(".bz2")) | ||
|
||
async def test_create_xz(self): | ||
dataflow = create_dataflow( | ||
xz_compress, | ||
{ | ||
"input_file_path": self.uncomressed_file_pth, | ||
"output_file_path": self.compressed_file_pth(".xz"), | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"lzma.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_creation_mock_calls(".xz")) | ||
|
||
async def test_inflate_gz(self): | ||
dataflow = create_dataflow( | ||
gz_decompress, | ||
{ | ||
"input_file_path": self.compressed_file_pth(".gz"), | ||
"output_file_path": self.uncomressed_file_pth, | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"gzip.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_inflation_mock_calls(".gz")) | ||
|
||
async def test_inflate_bz2(self): | ||
dataflow = create_dataflow( | ||
bz2_decompress, | ||
{ | ||
"input_file_path": self.compressed_file_pth(".bz2"), | ||
"output_file_path": self.uncomressed_file_pth, | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"bz2.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_inflation_mock_calls(".bz2")) | ||
|
||
async def test_inflate_xz(self): | ||
dataflow = create_dataflow( | ||
xz_decompress, | ||
{ | ||
"input_file_path": self.compressed_file_pth(".xz"), | ||
"output_file_path": self.uncomressed_file_pth, | ||
}, | ||
) | ||
m_open = mock_open() | ||
with patch("builtins.open", m_open()), patch( | ||
"lzma.open", m_open() | ||
), patch("shutil.copyfileobj"): | ||
async for _, _ in run(dataflow): | ||
m_open.assert_has_calls(self.get_inflation_mock_calls(".xz")) |