Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Copy files instead of sending over HTTP if Wave app and Wave server are running on the same machine. #982 #1765

Merged
merged 23 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
382925a
perf: Copy files instead of sending over HTTP if Wave app and Wave se…
mturoci Dec 15, 2022
1b965b2
feat: Wait for cp process to finish before returning. #982
mturoci Dec 15, 2022
1358e7c
feat: Add trycatch boundary and allow turning off local uploads. #982
mturoci Jan 10, 2023
2acd770
docs: Document H2O_WAVE_SKIP_LOCAL_UPLOAD. #982
mturoci Jan 10, 2023
4961c5c
feat: Make local upload work on Windows as well. #982
mturoci Jan 10, 2023
666fc7e
feat: Make upload_dir copy files during upload if possible. #982
mturoci Jan 10, 2023
7977bca
chore: Minor refactor. #982
mturoci Jan 10, 2023
c106ba7
feat: Make local upload work with baseurl as well. #982
mturoci Jan 10, 2023
f3df20f
chore: Uncomment forgotten tests. #982
mturoci Jan 10, 2023
8f8ef8d
feat: Make sure local copy works on Windows as well. #982
mturoci Jan 11, 2023
bbe16f7
feat: Make upload dir use local copy - Windows. #982
mturoci Jan 11, 2023
1a49268
feat: Make sure the env var is read fresh for wave run to work out of…
mturoci Jan 12, 2023
5f32ae9
fix: Extract filename from header due to newer golang adhering to HTT…
mturoci Jan 12, 2023
4d5ee7e
feat: Make async upload non-blocking. #982
mturoci Jan 12, 2023
1adc040
feat: Make async upload_dir non-blocking. #982
mturoci Jan 12, 2023
61a9055
fix: Add forgotten loopback check. #982
mturoci Jan 12, 2023
e3bdb82
docs: Add a note about H2O_WAVE_WAVED_DIR. #982
mturoci Jan 12, 2023
d4e0fb6
chore: Swallow local copy exception if any to not bother users. #982
mturoci Jan 13, 2023
7eceb15
chore: Rename env var. #982
mturoci Jan 13, 2023
e5a27c6
chore: Refactor. #982
mturoci Jan 13, 2023
c6b2c22
Update website/docs/configuration.md
mturoci Jan 19, 2023
7cb9994
Update website/docs/configuration.md
mturoci Jan 19, 2023
1c05827
chore: Do not print upload_dir copy to stdout. #982
mturoci Jan 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion file_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"os"
Expand Down Expand Up @@ -182,7 +183,15 @@ func (fs *FileServer) storeFilesInSingleDir(files []*multipart.FileHeader) ([]st
}
defer src.Close()

dir, file := filepath.Split(file.Filename)
// Need to parse the filename from the Content-Disposition header due to HTTP standard saying FileName should be basename.
// https://github.com/golang/go/blob/8dbf3e9393400d72d313e5616c88873e07692c70/src/mime/multipart/multipart.go#L82-L84
_, params, _ := mime.ParseMediaType(file.Header.Get("Content-Disposition"))
filename := params["filename"]
if filename == "" {
filename = file.Filename
}

dir, file := filepath.Split(filename)
uploadPath := filepath.Join(uploadDir, dir)

if err := os.MkdirAll(uploadPath, 0700); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions py/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ docs: ## Build API docs
test:
./venv/bin/python -m tests
echo "Testing using BASE_URL" && H2O_WAVE_BASE_URL="/foo/" ./venv/bin/python -m tests
echo "Testing using LOCAL UPLOAD" && H2O_WAVE_WAVED_DIR=".." ./venv/bin/python -m tests
echo "Testing using LOCAL UPLOAD AND BASE URL" && H2O_WAVE_WAVED_DIR=".." H2O_WAVE_BASE_URL="/foo/" ./venv/bin/python -m tests

purge: ## Purge previous build
rm -rf build dist h2o_wave.egg-info
Expand Down
2 changes: 2 additions & 0 deletions py/h2o_wave/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def run(app: str, no_reload: bool, no_autostart: bool):
finally:
if not server_not_running:
try:
if not os.environ.get('H2O_WAVE_WAVED_DIR') and is_waved_present:
os.environ['H2O_WAVE_WAVED_DIR'] = sys.exec_prefix
uvicorn.run(f'{app}:main', host=_localhost, port=port, reload=not no_reload)
except Exception as e:
if waved_process:
Expand Down
171 changes: 151 additions & 20 deletions py/h2o_wave/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@
# limitations under the License.

from io import BufferedReader
import asyncio
import ipaddress
import json
import platform
import secrets
import shutil
import subprocess
from urllib.parse import urlparse
from uuid import uuid4
import warnings
import logging
import os
Expand All @@ -40,6 +47,7 @@ def _get_env(key: str, value: Any):


_default_internal_address = 'http://127.0.0.1:8000'
_base_url = _get_env('BASE_URL', '/')


class _Config:
Expand Down Expand Up @@ -657,17 +665,47 @@ def upload(self, files: List[str]) -> List[str]:
Returns:
A list of remote URLs for the uploaded files, in order.
"""
upload_files = []
file_handles: List[BufferedReader] = []
for f in files:
file_handle = open(f, 'rb')
upload_files.append(('files', (os.path.basename(f), file_handle)))
file_handles.append(file_handle)
if not os.path.isfile(f):
raise ValueError(f'{f} is not a file.')

waved_dir = _get_env('WAVED_DIR', None)
data_dir = _get_env('DATA_DIR', 'data')
skip_local_upload = _get_env('NO_COPY_UPLOAD', 'false').lower() in ['true', '1', 't']

# If we know the path of waved and running app on the same machine,
# we can simply copy the files instead of making an HTTP request.
if _is_loopback_address() and not skip_local_upload and waved_dir and data_dir:
try:
uploaded_files = []
for f in files:
uuid = str(uuid4())
dst = os.path.join(waved_dir, data_dir, 'f', uuid)
os.makedirs(dst, exist_ok=True)

if 'Windows' in platform.system():
src = os.path.dirname(f) or os.getcwd()
args = ['robocopy', src, dst, os.path.basename(f), '/J', '/W:0']
else:
args = ['cp', f, dst]

_, err = subprocess.Popen(args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL).communicate()
if err:
raise ValueError(err.decode())

uploaded_files.append(f'{_base_url}_f/{uuid}/{os.path.basename(f)}')
return uploaded_files
except:
pass

uploaded_files = []
for f in files:
uploaded_files.append(('files', (os.path.basename(f), open(f, 'rb'))))

res = self._http.post(f'{_config.hub_address}_f/', files=upload_files)
res = self._http.post(f'{_config.hub_address}_f/', files=uploaded_files)

for h in file_handles:
h.close()
for _, f in uploaded_files:
f[1].close()

if res.status_code == 200:
return json.loads(res.text)['files']
Expand All @@ -687,17 +725,39 @@ def upload_dir(self, directory: str) -> str:
if not os.path.isdir(directory):
raise ValueError(f'{directory} is not a directory.')

waved_dir = _get_env('WAVED_DIR', None)
data_dir = _get_env('DATA_DIR', 'data')
skip_local_upload = _get_env('NO_COPY_UPLOAD', 'false').lower() in ['true', '1', 't']

# If we know the path of waved and running app on the same machine,
# we can simply copy the files instead of making an HTTP request.
if _is_loopback_address() and not skip_local_upload and waved_dir and data_dir:
try:
uuid = str(uuid4())
dst = os.path.join(waved_dir, data_dir, 'f', uuid)
os.makedirs(dst, exist_ok=True)

if 'Windows' in platform.system():
args = ['robocopy', directory, dst, '/S', '/J', '/W:0', '*.*']
else:
args = ['rsync', '-a', os.path.join(directory, '.'), dst]

_, err = subprocess.Popen(args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL).communicate()
if err:
raise ValueError(err.decode())

return [f'{_base_url}_f/{uuid}']
except:
pass

upload_files = []
file_handles: List[BufferedReader] = []
for f in _get_files_in_directory(directory, []):
file_handle = open(f, 'rb')
upload_files.append(('files', (os.path.relpath(f, directory), file_handle)))
file_handles.append(file_handle)
upload_files.append(('files', (os.path.relpath(f, directory), open(f, 'rb'))))

res = self._http.post(f'{_config.hub_address}_f/', headers={'Wave-Directory-Upload': "True"}, files=upload_files)

for h in file_handles:
h.close()
for _, f in upload_files:
f[1].close()

if res.status_code == 200:
return json.loads(res.text)['files']
Expand Down Expand Up @@ -841,17 +901,35 @@ async def upload_dir(self, directory: str) -> str:
if not os.path.isdir(directory):
raise ValueError(f'{directory} is not a directory.')

waved_dir = _get_env('WAVED_DIR', None)
data_dir = _get_env('DATA_DIR', 'data')
skip_local_upload = _get_env('NO_COPY_UPLOAD', 'false').lower() in ['true', '1', 't']

# If we know the path of waved and running app on the same machine,
# we can simply copy the files instead of making an HTTP request.
if _is_loopback_address() and not skip_local_upload and waved_dir and data_dir:
try:
uuid = str(uuid4())
dst = os.path.join(waved_dir, data_dir, 'f', uuid)
os.makedirs(dst, exist_ok=True)

if 'Windows' in platform.system():
args = ['robocopy', directory, dst, '/S', '/J', '/W:0', '*.*']
else:
args = ['rsync', '-a', os.path.join(directory, '.'), dst]

return [await _copy_in_subprocess(args, uuid)]
except:
pass

upload_files = []
file_handles: List[BufferedReader] = []
for f in _get_files_in_directory(directory, []):
file_handle = open(f, 'rb')
upload_files.append(('files', (os.path.relpath(f, directory), file_handle)))
file_handles.append(file_handle)
upload_files.append(('files', (os.path.relpath(f, directory), open(f, 'rb'))))

res = await self._http.post(f'{_config.hub_address}_f/', headers={'Wave-Directory-Upload': "True"}, files=upload_files)

for h in file_handles:
h.close()
for _, f in upload_files:
f[1].close()

if res.status_code == 200:
return json.loads(res.text)['files']
Expand All @@ -867,6 +945,36 @@ async def upload(self, files: List[str]) -> List[str]:
Returns:
A list of remote URLs for the uploaded files, in order.
"""
for f in files:
if not os.path.isfile(f):
raise ValueError(f'{f} is not a file.')

waved_dir = _get_env('WAVED_DIR', None)
data_dir = _get_env('DATA_DIR', 'data')
skip_local_upload = _get_env('NO_COPY_UPLOAD', 'false').lower() in ['true', '1', 't']

# If we know the path of waved and running app on the same machine,
# we can simply copy the files instead of making an HTTP request.
if _is_loopback_address() and not skip_local_upload and waved_dir and data_dir:
try:
tasks = []
for f in files:
uuid = str(uuid4())
dst = os.path.join(waved_dir, data_dir, 'f', uuid)
os.makedirs(dst, exist_ok=True)

if 'Windows' in platform.system():
src = os.path.dirname(f) or os.getcwd()
args = ['robocopy', src, dst, os.path.basename(f), '/J', '/W:0']
else:
args = ['cp', f, dst]

tasks.append(asyncio.create_task(_copy_in_subprocess(args, uuid, f)))

return await asyncio.gather(*tasks)
except:
pass

upload_files = []
file_handles: List[BufferedReader] = []
for f in files:
Expand Down Expand Up @@ -896,6 +1004,7 @@ async def download(self, url: str, path: str) -> str:
path = os.path.abspath(path)
# If path is a directory, get basename from url
filepath = os.path.join(path, os.path.basename(url)) if os.path.isdir(path) else path

async with self._http.stream('GET', f'{_config.hub_host_address}{url}') as res:
if res.status_code != 200:
await res.aread()
Expand Down Expand Up @@ -963,6 +1072,19 @@ async def proxy(self, method: str, url: str, headers: Optional[Dict[str, List[st
raise ServiceError(f'Proxy request failed (code={res.status_code}): {res.text}')


async def _copy_in_subprocess(args: List[str], uuid: str, f='') -> str:
p = await asyncio.create_subprocess_exec(*args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL)
_, err = await p.communicate()

if err:
raise ValueError(err.decode())

if f:
return f'{_base_url}_f/{uuid}/{os.path.basename(f)}'
else:
return f'{_base_url}_f/{uuid}'


def _get_files_in_directory(directory: str, files: List[str]) -> List[str]:
for f in os.listdir(directory):
path = os.path.join(directory, f)
Expand All @@ -972,6 +1094,7 @@ def _get_files_in_directory(directory: str, files: List[str]) -> List[str]:
_get_files_in_directory(path, files)
return files


def marshal(d: Any) -> str:
"""
Marshal to JSON.
Expand Down Expand Up @@ -1008,3 +1131,11 @@ def pack(data: Any) -> str:
The object or value compressed into a string.
"""
return 'data:' + marshal(_dump(data))


def _is_loopback_address() -> bool:
try:
hostname = urlparse(_config.hub_address).hostname
return ipaddress.ip_address(hostname).is_loopback
except ValueError:
return False
1 change: 1 addition & 0 deletions py/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
__arch__ = "{arch}"
''')


def get_data_files():
data_dict = dict()
data_dict['project_templates'] = [os.path.join('project_templates', f) for f in os.listdir('project_templates')]
Expand Down
File renamed without changes.
10 changes: 2 additions & 8 deletions py/tests/test_python_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def test_cyc_buf_write(self):
i=2,
))))


def test_proxy(self):
# waved -proxy must be set
url = 'https://wave.h2o.ai'
Expand All @@ -342,7 +341,6 @@ def test_proxy(self):
assert result.code == 400
assert len(result.headers) > 0


def test_file_server(self):
f1 = 'temp_file1.txt'
with open(f1, 'w') as f:
Expand All @@ -356,14 +354,12 @@ def test_file_server(self):
os.remove(f2)
assert s1 == s2


def test_public_dir(self):
p = site.download(f'{base_url}assets/brand/h2o.svg', 'h2o.svg')
svg = read_file(p)
os.remove(p)
assert svg.index('<svg') == 0


def test_cache(self):
d1 = dict(foo='bar', qux=42)
site.cache.set('test', 'data', d1)
Expand All @@ -375,7 +371,6 @@ def test_cache(self):
assert d2['foo'] == d1['foo']
assert d2['qux'] == d1['qux']


def test_multipart_server(self):
file_handle = open('../assets/brand/wave.svg', 'rb')
p = site.uplink('test_stream', 'image/svg+xml', file_handle)
Expand All @@ -385,14 +380,13 @@ def test_multipart_server(self):

def test_upload_dir(self):
upload_path, = site.upload_dir(os.path.join('tests', 'test_folder'))
download_path = site.download(f'{upload_path}/test.txt', 'test.txt')
download_path = site.download(f'{upload_path}/dir1/test.txt', 'test.txt')
txt = read_file(download_path)
os.remove(download_path)
assert len(txt) > 0


def test_deleting_files(self):
upload_path, = site.upload([os.path.join('tests', 'test_folder', 'test.txt')])
upload_path, = site.upload([os.path.join('tests', 'test_folder', 'dir1', 'test.txt')])
res = httpx.get(f'http://localhost:10101{upload_path}')
assert res.status_code == 200
site.unload(upload_path)
Expand Down