forked from ansible/ansible-runner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streaming.py
101 lines (86 loc) · 4.17 KB
/
streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import time
import tempfile
import zipfile
import os
import json
import sys
import stat
from .base64io import Base64IO
from pathlib import Path
def stream_dir(source_directory, stream):
with tempfile.NamedTemporaryFile() as tmp:
with zipfile.ZipFile(
tmp.name, "w", compression=zipfile.ZIP_DEFLATED, allowZip64=True, strict_timestamps=False
) as archive:
if source_directory:
for dirpath, dirs, files in os.walk(source_directory):
relpath = os.path.relpath(dirpath, source_directory)
if relpath == ".":
relpath = ""
for fname in files + dirs:
full_path = os.path.join(dirpath, fname)
# Magic to preserve symlinks
if os.path.islink(full_path):
archive_relative_path = os.path.relpath(dirpath, source_directory)
file_relative_path = os.path.join(archive_relative_path, fname)
zip_info = zipfile.ZipInfo(file_relative_path)
zip_info.create_system = 3
permissions = 0o777
permissions |= 0xA000
zip_info.external_attr = permissions << 16
archive.writestr(zip_info, os.readlink(full_path))
else:
archive.write(
os.path.join(dirpath, fname), arcname=os.path.join(relpath, fname)
)
archive.close()
zip_size = Path(tmp.name).stat().st_size
with open(tmp.name, "rb") as source:
if stream.name == "<stdout>":
target = sys.stdout.buffer
else:
target = stream
target.write(json.dumps({"zipfile": zip_size}).encode("utf-8") + b"\n")
with Base64IO(target) as encoded_target:
for line in source:
encoded_target.write(line)
def unstream_dir(stream, length, target_directory):
# NOTE: caller needs to process exceptions
with tempfile.NamedTemporaryFile() as tmp:
with open(tmp.name, "wb") as target:
with Base64IO(stream) as source:
remaining = length
chunk_size = 1024 * 1000 # 1 MB
while remaining != 0:
if chunk_size >= remaining:
chunk_size = remaining
data = source.read(chunk_size)
target.write(data)
remaining -= chunk_size
with zipfile.ZipFile(tmp.name, "r") as archive:
# Fancy extraction in order to preserve permissions
# AWX relies on the execution bit, in particular, for inventory
# https://www.burgundywall.com/post/preserving-file-perms-with-python-zipfile-module
for info in archive.infolist():
out_path = os.path.join(target_directory, info.filename)
perms = info.external_attr >> 16
mode = stat.filemode(perms)
is_symlink = mode[:1] == 'l'
if os.path.exists(out_path):
if is_symlink:
os.remove(out_path)
elif os.path.isdir(out_path):
# Special case, the important dirs were pre-created so don't try to chmod them
continue
archive.extract(info.filename, path=target_directory)
# Fancy logic to preserve modification times
# AWX uses modification times to determine if new facts were written for a host
# https://stackoverflow.com/questions/9813243/extract-files-from-zip-file-and-retain-mod-date
date_time = time.mktime(info.date_time + (0, 0, -1))
os.utime(out_path, times=(date_time, date_time))
if is_symlink:
link = open(out_path).read()
os.remove(out_path)
os.symlink(link, out_path)
else:
os.chmod(out_path, perms)