Skip to content

Commit

Permalink
[hailtop.batch] eliminate unittest; use single, session-wide event lo…
Browse files Browse the repository at this point in the history
…op (#14097)

Fixes #14130.

We pervasively assume:
1. That our entire system is used within a single Python thread.
2. That once an event loop is created that's the only event loop that
will exist forever.

Pytest (and newer version of IPython, afaict) violate this pretty
liberally.

~~pytest_asyncio has [explicit instructions on how to run every test in
the same event
loop](https://pytest-asyncio.readthedocs.io/en/latest/how-to-guides/run_session_tests_in_same_loop.html).
I've implemented those here.~~ [These instructions don't
work](pytest-dev/pytest-asyncio#744). It seems
that the reliable way to ensure we're using one event loop everywhere is
to use pytest-asyncio < 0.23 and to define an event_loop fixture with
scope `'session'`.

I also switched test_batch.py into pytest-only style. This allows me to
use session-scoped fixtures so that they exist exactly once for the
entire test suite execution.

Also:
- `RouterAsyncFS` methods must either be a static method or an async
method. We must not create an FS in a sync method. Both `parse_url` and
`copy_part_size` now both do not allocate an FS.
- `httpx.py` now eagerly errors if the running event loop in `request`
differs from that at allocation time. Annoying but much better error
message than this nonsense about timeout context managers.
- `hail_event_loop` either gets the current thread's event loop (running
or not, doesn't matter to us) or creates a fresh event loop and sets it
as the current thread's event loop. The previous code didn't guarantee
we'd get an event loop b/c `get_event_loop` fails if `set_event_loop`
was previously called.
- `conftest.py` is inherited downward, so I lifted fixtures out of
test_copy.py and friends and into a common `hailtop/conftest.py`
- I added `make -C hail pytest-inter-cloud` for testing the inter cloud
directory. You still need appropriate permissions and authn.
- I removed extraneous pytest.mark.asyncio since we use auto mode
everywhere.
- `FailureInjectingClientSession` creates an `aiohttp.ClientSession` and
therefore must be used while an event loop is running. Easiest fix was
to make the test async.
  • Loading branch information
danking committed Jan 9, 2024
1 parent 422edf6 commit d1823fa
Show file tree
Hide file tree
Showing 23 changed files with 1,871 additions and 1,666 deletions.
10 changes: 10 additions & 0 deletions batch/test/conftest.py
@@ -1,3 +1,4 @@
import asyncio
import hashlib
import logging
import os
Expand All @@ -9,6 +10,15 @@
log = logging.getLogger(__name__)


@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop()
try:
yield loop
finally:
loop.close()


@pytest.fixture(autouse=True)
def log_before_after():
log.info('starting test')
Expand Down
7 changes: 3 additions & 4 deletions batch/test/failure_injecting_client_session.py
@@ -1,7 +1,6 @@
import aiohttp

from hailtop import httpx
from hailtop.utils import async_to_blocking


class FailureInjectingClientSession(httpx.ClientSession):
Expand All @@ -10,11 +9,11 @@ def __init__(self, should_fail):
self.should_fail = should_fail
self.real_session = httpx.client_session()

def __enter__(self):
async def __aenter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
async_to_blocking(self.real_session.close())
async def __aexit__(self, exc_type, exc_value, traceback):
await self.real_session.close()

def maybe_fail(self, method, path, headers):
if self.should_fail():
Expand Down
19 changes: 10 additions & 9 deletions batch/test/test_batch.py
Expand Up @@ -11,6 +11,7 @@
from hailtop.auth import hail_credentials
from hailtop.batch.backend import HAIL_GENETICS_HAILTOP_IMAGE
from hailtop.batch_client import BatchNotCreatedError, JobNotSubmittedError
from hailtop.batch_client.aioclient import BatchClient as AioBatchClient
from hailtop.batch_client.client import Batch, BatchClient
from hailtop.config import get_deploy_config
from hailtop.test_utils import skip_in_azure
Expand Down Expand Up @@ -1009,7 +1010,7 @@ def test_client_max_size(client: BatchClient):
b.submit()


def test_restartable_insert(client: BatchClient):
async def test_restartable_insert():
i = 0

def every_third_time():
Expand All @@ -1019,19 +1020,19 @@ def every_third_time():
return True
return False

with FailureInjectingClientSession(every_third_time) as session:
client = BatchClient('test', session=session)
async with FailureInjectingClientSession(every_third_time) as session:
client = await AioBatchClient.create('test', session=session)
b = create_batch(client)

for _ in range(9):
b.create_job(DOCKER_ROOT_IMAGE, ['echo', 'a'])

b.submit(max_bunch_size=1)
b = client.get_batch(b.id) # get a batch untainted by the FailureInjectingClientSession
status = b.wait()
assert status['state'] == 'success', str((status, b.debug_info()))
jobs = list(b.jobs())
assert len(jobs) == 9, str((jobs, b.debug_info()))
await b.submit(max_bunch_size=1)
b = await client.get_batch(b.id) # get a batch untainted by the FailureInjectingClientSession
status = await b.wait()
assert status['state'] == 'success', str((status, await b.debug_info()))
jobs = [x async for x in b.jobs()]
assert len(jobs) == 9, str((jobs, await b.debug_info()))


def test_create_idempotence(client: BatchClient):
Expand Down
63 changes: 47 additions & 16 deletions hail/Makefile
Expand Up @@ -27,7 +27,8 @@ JAVAC ?= javac
JAR ?= jar
endif

PYTEST_TARGET ?= test/hail
PYTEST_TARGET ?= test/hail test/hailtop
PYTEST_INTER_CLOUD_TARGET ?= test/hailtop/inter_cloud

# not perfect, not robust to simdpp changes, but probably fine
BUILD_DEBUG_PREFIX := build/classes/scala/debug
Expand Down Expand Up @@ -57,6 +58,7 @@ WHEEL := build/deploy/dist/hail-$(HAIL_PIP_VERSION)-py3-none-any.whl
GRADLE_ARGS += -Dscala.version=$(SCALA_VERSION) -Dspark.version=$(SPARK_VERSION) -Delasticsearch.major-version=$(ELASTIC_MAJOR_VERSION)

TEST_STORAGE_URI = $(shell kubectl get secret global-config --template={{.data.test_storage_uri}} | base64 --decode)
HAIL_TEST_GCS_BUCKET = $(shell kubectl get secret global-config --template={{.data.hail_test_gcs_bucket}} | base64 --decode)
GCP_PROJECT = $(shell kubectl get secret global-config --template={{.data.gcp_project}} | base64 --decode)
CLOUD_HAIL_TEST_RESOURCES_PREFIX = $(TEST_STORAGE_URI)/$(shell whoami)/hail-test-resources
CLOUD_HAIL_TEST_RESOURCES_DIR = $(CLOUD_HAIL_TEST_RESOURCES_PREFIX)/test/resources/
Expand Down Expand Up @@ -118,7 +120,7 @@ services-jvm-test: $(SCALA_BUILD_INFO) $(JAR_SOURCES) $(JAR_TEST_SOURCES)
ifdef HAIL_COMPILE_NATIVES
fs-jvm-test: native-lib-prebuilt
endif
fs-jvm-test: $(SCALA_BUILD_INFO) $(JAR_SOURCES) $(JAR_TEST_SOURCES) upload-qob-test-resources
fs-jvm-test: $(SCALA_BUILD_INFO) $(JAR_SOURCES) $(JAR_TEST_SOURCES) upload-remote-test-resources
! [ -z $(HAIL_CLOUD) ] # call like make fs-jvm-test HAIL_CLOUD=gcp or azure
! [ -z $(NAMESPACE) ] # call like make fs-jvm-test NAMEPSPACE=default
HAIL_CLOUD=$(HAIL_CLOUD) \
Expand Down Expand Up @@ -180,7 +182,8 @@ python-jar: $(PYTHON_JAR)
.PHONY: pytest
pytest: $(PYTHON_VERSION_INFO) $(INIT_SCRIPTS)
pytest: python/README.md $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
cd python && $(HAIL_PYTHON3) -m pytest \
cd python && \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
--log-cli-level=INFO \
-s \
Expand All @@ -191,20 +194,49 @@ pytest: python/README.md $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
--self-contained-html \
--html=../build/reports/pytest.html \
--timeout=120 \
--ignore $(PYTEST_INTER_CLOUD_TARGET) \
$(PYTEST_TARGET) \
$(PYTEST_ARGS)

# NOTE: Look at upload-qob-test-resources target if test resources are missing

# NOTE: Look at upload-remote-test-resources target if test resources are missing
.PHONY: pytest-inter-cloud
pytest-inter-cloud: $(PYTHON_VERSION_INFO) $(INIT_SCRIPTS)
pytest-inter-cloud: python/README.md $(FAST_PYTHON_JAR) $(FAST_PYTHON_JAR_EXTRA_CLASSPATH)
pytest-inter-cloud: upload-remote-test-resources
cd python && \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_TEST_GCS_BUCKET=$(HAIL_TEST_GCS_BUCKET) \
HAIL_TEST_S3_BUCKET=hail-test-dy5rg \
HAIL_TEST_AZURE_ACCOUNT=hailtest \
HAIL_TEST_AZURE_CONTAINER=hail-test-4nxei \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
--log-cli-level=INFO \
-s \
-vv \
-r A \
--instafail \
--durations=50 \
--self-contained-html \
--html=../build/reports/pytest.html \
--timeout=120 \
$(PYTEST_INTER_CLOUD_TARGET) \
$(PYTEST_ARGS)


# NOTE: Look at upload-remote-test-resources target if test resources are missing
.PHONY: pytest-qob
pytest-qob: upload-qob-jar upload-qob-test-resources install-editable
pytest-qob: upload-qob-jar upload-remote-test-resources install-editable
! [ -z $(NAMESPACE) ] # call this like: make pytest-qob NAMESPACE=default
cd python && \
HAIL_QUERY_BACKEND=batch \
HAIL_QUERY_JAR_URL=$$(cat ../upload-qob-jar) \
HAIL_DEFAULT_NAMESPACE=$(NAMESPACE) \
HAIL_TEST_RESOURCES_DIR='$(CLOUD_HAIL_TEST_RESOURCES_DIR)' \
HAIL_DOCTEST_DATA_DIR='$(HAIL_DOCTEST_DATA_DIR)' \
$(HAIL_PYTHON3) -m pytest \
HAIL_TEST_STORAGE_URI=$(TEST_STORAGE_URI) \
HAIL_QUERY_BACKEND=batch \
HAIL_QUERY_JAR_URL=$$(cat ../upload-qob-jar) \
HAIL_DEFAULT_NAMESPACE=$(NAMESPACE) \
HAIL_TEST_RESOURCES_DIR='$(CLOUD_HAIL_TEST_RESOURCES_DIR)' \
HAIL_DOCTEST_DATA_DIR='$(HAIL_DOCTEST_DATA_DIR)' \
$(HAIL_PYTHON3) -m pytest \
-Werror:::hail -Werror:::hailtop -Werror::ResourceWarning \
--log-cli-level=INFO \
-s \
Expand Down Expand Up @@ -312,18 +344,17 @@ upload-artifacts: $(WHEEL)

# NOTE: 1-day expiration of the test bucket means that this
# target must be run at least once a day. To trigger this target to re-run,
# > rm upload-qob-test-resources
upload-qob-test-resources: $(shell git ls-files src/test/resources)
upload-qob-test-resources: $(shell git ls-files python/hail/docs/data)
! [ -z $(NAMESPACE) ] # call this like: make upload-qob-test-resources NAMESPACE=default
# > rm upload-remote-test-resources
upload-remote-test-resources: $(shell git ls-files src/test/resources)
upload-remote-test-resources: $(shell git ls-files python/hail/docs/data)
gcloud storage cp -r src/test/resources/\* $(CLOUD_HAIL_TEST_RESOURCES_DIR)
gcloud storage cp -r python/hail/docs/data/\* $(CLOUD_HAIL_DOCTEST_DATA_DIR)
# # In Azure, use the following instead of gcloud storage cp
# python3 -m hailtop.aiotools.copy -vvv 'null' '[\
# {"from":"src/test/resources","to":"$(CLOUD_HAIL_TEST_RESOURCES_DIR)"},\
# {"from":"python/hail/docs/data","to":"$(CLOUD_HAIL_DOCTEST_DATA_DIR)"}\
# ]'
# touch $@
touch $@

# NOTE: 1-day expiration of the test bucket means that this
# target must be run at least once a day if using a dev NAMESPACE.
Expand Down
3 changes: 2 additions & 1 deletion hail/python/dev/requirements.txt
Expand Up @@ -11,7 +11,8 @@ pytest>=7.1.3,<8
pytest-html>=1.20.0,<2
pytest-xdist>=2.2.1,<3
pytest-instafail>=0.4.2,<1
pytest-asyncio>=0.14.0,<1
# https://github.com/hail-is/hail/issues/14130
pytest-asyncio>=0.14.0,<0.23
pytest-timestamper>=0.0.9,<1
pytest-timeout>=2.1,<3
pyright>=1.1.324<1.2
Expand Down
16 changes: 9 additions & 7 deletions hail/python/hailtop/aiocloud/aioaws/fs.py
Expand Up @@ -353,12 +353,19 @@ def __init__(
)
self._s3 = boto3.client('s3', config=config)

@staticmethod
def copy_part_size(url: str) -> int: # pylint: disable=unused-argument
# Because the S3 upload_part API call requires the entire part
# be loaded into memory, use a smaller part size.
return 32 * 1024 * 1024

@staticmethod
def valid_url(url: str) -> bool:
return url.startswith('s3://')

def parse_url(self, url: str) -> S3AsyncFSURL:
return S3AsyncFSURL(*self.get_bucket_and_name(url))
@staticmethod
def parse_url(url: str) -> S3AsyncFSURL:
return S3AsyncFSURL(*S3AsyncFS.get_bucket_and_name(url))

@staticmethod
def get_bucket_and_name(url: str) -> Tuple[str, str]:
Expand Down Expand Up @@ -565,8 +572,3 @@ async def remove(self, url: str) -> None:

async def close(self) -> None:
del self._s3

def copy_part_size(self, url: str) -> int: # pylint: disable=unused-argument
# Because the S3 upload_part API call requires the entire part
# be loaded into memory, use a smaller part size.
return 32 * 1024 * 1024
Expand Up @@ -636,8 +636,9 @@ async def is_hot_storage(self, location: str) -> bool:
def valid_url(url: str) -> bool:
return url.startswith('gs://')

def parse_url(self, url: str) -> GoogleStorageAsyncFSURL:
return GoogleStorageAsyncFSURL(*self.get_bucket_and_name(url))
@staticmethod
def parse_url(url: str) -> GoogleStorageAsyncFSURL:
return GoogleStorageAsyncFSURL(*GoogleStorageAsyncFS.get_bucket_and_name(url))

@staticmethod
def get_bucket_and_name(url: str) -> Tuple[str, str]:
Expand Down
14 changes: 8 additions & 6 deletions hail/python/hailtop/aiotools/fs/fs.py
Expand Up @@ -154,13 +154,20 @@ class AsyncFS(abc.ABC):
def schemes(self) -> Set[str]:
pass

@staticmethod
def copy_part_size(url: str) -> int: # pylint: disable=unused-argument
'''Part size when copying using multi-part uploads. The part size of
the destination filesystem is used.'''
return 128 * 1024 * 1024

@staticmethod
@abc.abstractmethod
def valid_url(url: str) -> bool:
pass

@staticmethod
@abc.abstractmethod
def parse_url(self, url: str) -> AsyncFSURL:
def parse_url(url: str) -> AsyncFSURL:
pass

@abc.abstractmethod
Expand Down Expand Up @@ -326,11 +333,6 @@ async def __aexit__(
) -> None:
await self.close()

def copy_part_size(self, url: str) -> int: # pylint: disable=unused-argument
'''Part size when copying using multi-part uploads. The part size of
the destination filesystem is used.'''
return 128 * 1024 * 1024


T = TypeVar('T', bound=AsyncFS)

Expand Down
5 changes: 3 additions & 2 deletions hail/python/hailtop/aiotools/local_fs.py
Expand Up @@ -236,8 +236,9 @@ def __init__(self, thread_pool: Optional[ThreadPoolExecutor] = None, max_workers
def valid_url(url: str) -> bool:
return url.startswith('file://') or '://' not in url

def parse_url(self, url: str) -> LocalAsyncFSURL:
return LocalAsyncFSURL(self._get_path(url))
@staticmethod
def parse_url(url: str) -> LocalAsyncFSURL:
return LocalAsyncFSURL(LocalAsyncFS._get_path(url))

@staticmethod
def _get_path(url):
Expand Down

0 comments on commit d1823fa

Please sign in to comment.