Skip to content

Commit

Permalink
feat: Make async upload_dir non-blocking. #982
Browse files Browse the repository at this point in the history
  • Loading branch information
mturoci committed Jan 20, 2023
1 parent 4d5ee7e commit 1adc040
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions py/h2o_wave/core.py
Expand Up @@ -922,10 +922,7 @@ async def upload_dir(self, directory: str) -> str:
else:
args = ['rsync', '-a', os.path.join(directory, '.'), dst]

_, err = subprocess.Popen(args, stderr=subprocess.PIPE).communicate()
if err:
raise ValueError(err.decode())
return [f'{_base_url}_f/{uuid}']
return [await _copy_in_subprocess(args, uuid)]
except Exception as e:
print(f'Error during local copy, falling back to HTTP upload: {e}')

Expand Down Expand Up @@ -976,7 +973,7 @@ async def upload(self, files: List[str]) -> List[str]:
else:
args = ['cp', f, dst]

tasks.append(asyncio.create_task(copy_in_subprocess(args, uuid, f)))
tasks.append(asyncio.create_task(_copy_in_subprocess(args, uuid, f)))

return await asyncio.gather(*tasks)
except Exception as e:
Expand Down Expand Up @@ -1079,14 +1076,17 @@ async def proxy(self, method: str, url: str, headers: Optional[Dict[str, List[st
raise ServiceError(f'Proxy request failed (code={res.status_code}): {res.text}')


async def copy_in_subprocess(args: List[str], uuid: str, f: str) -> str:
async def _copy_in_subprocess(args: List[str], uuid: str, f='') -> str:
p = await asyncio.create_subprocess_exec(*args, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL)
_, err = await p.communicate()

if err:
raise ValueError(err.decode())

return f'{_base_url}_f/{uuid}/{os.path.basename(f)}'
if f:
return f'{_base_url}_f/{uuid}/{os.path.basename(f)}'
else:
return f'{_base_url}_f/{uuid}'


def _get_files_in_directory(directory: str, files: List[str]) -> List[str]:
Expand Down

0 comments on commit 1adc040

Please sign in to comment.