Skip to content

Commit

Permalink
Implement generic processing steps (#650)
Browse files Browse the repository at this point in the history
### Generic implementation of a processing graph

Remove explicit mentions to /splits or /first-rows from code, and move them to the "processing graph":

```json
{
  "/splits": {"input_type": "dataset", "required_by_dataset_viewer": true},
  "/first-rows": {"input_type": "split", "requires": "/splits", "required_by_dataset_viewer": true},
}
```

This JSON (see libcommon.config) defines the *processing steps* (here /splits and /first-rows) and their dependency relationship (here /first-rows depends on /splits). It also defines if a processing step is required by the Hub dataset viewer (used to fill /valid and /is-valid).
A processing step is defined by the endpoint (/splits, /first-rows), where the result of the processing step can be downloaded. The endpoint value is also used as the cache key and the job type.

After this change, adding a new processing step should consist in:
- creating a new worker in the `workers/` directory
- update the processing graph
- update the CI, tests, docs and deployment (docker-compose files, e2e tests, docs, openapi, helm chart)

This also means that the services (API, admin) don't contain any code mentioning directly splits or first-rows. And the splits worker does not contain direct reference to first-rows.

### Other changes

- code: the libcache and libqueue libraries have been merged into libcommon
- the code to check if a dataset is supported (exists, is not private, access can be programmatically obtained if gated) has been factorized and is now used before every processing step and before even accepting to create a new job (through the webhook or through the /admin/force-refresh endpoint).
- add a new endpoint: /admin/cancel-jobs, which replaces the last admin scripts. It's easier to send a POST request than to call a remote script.
- simplify the code of the workers by factorizing some code into libcommon:
  - the code to test if a job should be skipped, based on the versions of the git repository and the worker
  - the logic to catch errors and to write to the cache
  This way, the code for every worker now only contains what is specific to that worker.
 
### Breaking changes

- env vars `QUEUE_MAX_LOAD_PCT`, `QUEUE_MAX_MEMORY_PCT` and `QUEUE_SLEEP_SECONDS` are renamed as `WORKER_MAX_LOAD_PCT`, `WORKER_MAX_MEMORY_PCT` and `WORKER_SLEEP_SECONDS`.

---

* feat: 🎸 add /cache-reports/parquet endpoint and parquet reports

* feat: 🎸 add the /parquet endpoint

* feat: 🎸 add parquet worker

Note that it will not pass the CI because
- the CI token is not allowed to push to refs/convert/parquet (should be
  in the "datasets-maintainers" org)
- the refs/convert/parquet does not exist and cannot be created for now

* ci: 🎡 add CI for the worker

* feat: 🎸 remove the hffs dependency

we don't use it, and it's private for now

* feat: 🎸 change the response format

associate each parquet file with a split and a config (based on path
parsing)

* fix: 🐛 handle the fact that "SSSSS-of-NNNNN" is "optional"

thanks @lhoestq

* fix: 🐛 fill two fields to known versions in case of error

* feat: 🎸 upgrade datasets to 2.7.0

* ci: 🎡 fix action

* feat: 🎸 create ref/convert/parquet if it does not exist

* feat: 🎸 update pytest

See pytest-dev/py#287 (comment)

* feat: 🎸 unlock access to the gated datasets

Gated datasets with extra fields are not supported. Note also that only
one token is used now.

* feat: 🎸 check if the dataset is supported only for existing one

* fix: 🐛 fix config

* fix: 🐛 fix the branch argument + fix case where ref is created

* fix: 🐛 fix logic of the worker, to ensure we get the git sha

Also fix the tests, and disable gated+private for now

* fix: 🐛 fix gated datasets and update the tests

* test: 💍 assert that gated with extra fields are not supported

* fix: 🐛 add controls on the dataset_git_revision

* feat: 🎸 upgrade datasets

* feat: 🎸 add script to refresh parquet response

* fix: 🐛 fix the condition to test if the split exists in list

also: rename functions to be more accurate

* refactor: 💡 use exceptions to make the flow clearer

* feat: 🎸 add processing_steps

* fix: 🐛 fix signature

* chore: 🤖 adapt to poetry 1.2, use pip-audit

* feat: 🎸 use ProcessingStep in api service

* feat: 🎸 use ProcessingStep in admin service

and replace the last scripts with the /cancel-jobs/xxx endpoints.

* style: 💄 fix style

* feat: 🎸 update libcommon (use processing_step)

* refactor: 💡 merge libcache and libqueue into libcommon

* feat: 🎸 upgrade to libcommon 0.4

* feat: 🎸 upgrade to libcommon 0.4

* fix: 🐛 upgrade poetry

* feat: 🎸 use processing_step in workers

* feat: 🎸 implement should_skip_job and process in generic Worker

this will make the code of workers simpler

* feat: 🎸 handle CustomError from the workers, with specific code

* feat: 🎸 simplify compute method

* refactor: 💡 fix typing

* fix: 🐛 remove erroneous control

* feat: 🎸 update libcommon to 0.4.2

* feat: 🎸 update to libcommon 0.4.2

* ci: 🎡 fix ci

* docs: ✏️ fix docstring

* feat: 🎸 update to libcommon 0.4.2

* refactor: 💡 use Mapping instead of Dict

* feat: 🎸 update to libcommon 0.4.2

also: replace Dict with Mapping

* fix: 🐛 use Dict because it must be mutable

* fix: 🐛 missing import

* feat: 🎸 remplace dependency with previous_step and next_steps

* feat: 🎸 define the processing graph in the configuration

* feat: 🎸 upgrade to libcommon 0.5

* feat: 🎸 upgrade to libcommon 0.5

* feat: 🎸 upgrade to libcommon 0.5

* feat: 🎸 upgrade to libcommon 0.5.0

* feat: 🎸 upgrade to libcommon 0.5

* feat: 🎸 upgrade to libcommon 0.5

* refactor: 💡 add logic methods to simplify services and workers

* feat: 🎸 upgrade to libcommon 0.5.1

some tests have been moved (commented yet) to e2e, since it becomes hard
to simulate all the Hub endpoints -> better to test the scenari against
the real Hub instead

* feat: 🎸 upgrade to libcommon 0.5.1

* feat: 🎸 remove parquet processing step

since it's not the scope of this PR

* style: 💄 fix stykle

* ci: 🎡 remove parquet ci

* feat: 🎸 upgrade docker images

* test: 💍 add some tests for the webhook

* test: 💍 update e2e tests (and error messages in openapi)

* style: 💄 fix style

* feat: 🎸 remove parquet code
  • Loading branch information
mattstern31 committed Nov 28, 2022
1 parent a252e94 commit 8c47e92
Show file tree
Hide file tree
Showing 303 changed files with 3,301 additions and 6,180 deletions.
22 changes: 0 additions & 22 deletions .github/workflows/l-libcache.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/l-libcommon.yml
Expand Up @@ -10,7 +10,7 @@ on:
- '.github/workflows/l-libcommon.yml'
- '.github/workflows/_quality-python.yml'
- '.github/workflows/_unit-tests-python.yml'
- 'tools/docker-compose-empty.yml'
- 'tools/docker-compose-mongo.yml'
jobs:
quality:
uses: ./.github/workflows/_quality-python.yml
Expand Down
22 changes: 0 additions & 22 deletions .github/workflows/l-libqueue.yml

This file was deleted.

12 changes: 4 additions & 8 deletions .vscode/monorepo.code-workspace
Expand Up @@ -12,18 +12,10 @@
"name": "jobs/mongodb_migration",
"path": "../jobs/mongodb_migration"
},
{
"name": "libs/libcache",
"path": "../libs/libcache"
},
{
"name": "libs/libcommon",
"path": "../libs/libcommon"
},
{
"name": "libs/libqueue",
"path": "../libs/libqueue"
},
{
"name": "services/admin",
"path": "../services/admin"
Expand All @@ -40,6 +32,10 @@
"name": "workers/first_rows",
"path": "../workers/first_rows"
},
{
"name": "workers/parquet",
"path": "../workers/parquet"
},
{
"name": "workers/splits",
"path": "../workers/splits"
Expand Down
6 changes: 3 additions & 3 deletions DEVELOPER_GUIDE.md
Expand Up @@ -55,7 +55,7 @@ The application is distributed in several components.

[api](./services/api) is a web server that exposes the [API endpoints](https://huggingface.co/docs/datasets-server). Apart from some endpoints (`valid`, `is-valid`), all the responses are served from pre-computed responses. That's the main point of this project: generating these responses takes time, and the API server provides this service to the users.

The precomputed responses are stored in a Mongo database called "cache" (see [libcache](./libs/libcache)). They are computed by [workers](./workers) which take their jobs from a job queue stored in a Mongo database called "queue" (see [libqueue](./libs/libqueue)), and store the results (error or valid response) into the "cache".
The precomputed responses are stored in a Mongo database called "cache". They are computed by [workers](./workers) which take their jobs from a job queue stored in a Mongo database called "queue", and store the results (error or valid response) into the "cache" (see [libcommon](./libs/libcommon)).

The API service exposes the `/webhook` endpoint which is called by the Hub on every creation, update or deletion of a dataset on the Hub. On deletion, the cached responses are deleted. On creation or update, a new job is appended in the "queue" database.

Expand Down Expand Up @@ -127,10 +127,10 @@ We version the [libraries](./libs) as they are dependencies of the [services](./
- build with `make build`
- version the new files in `dist/`

And then update the library version in the services that require the update, for example if the library is `libcache`:
And then update the library version in the services that require the update, for example if the library is `libcommon`:

```
poetry update libcache
poetry update libcommon
```

If service is updated, we don't update its version in the `pyproject.yaml` file. But we have to update the [docker images file](./chart/docker-images.yaml) with the new image tag. Then the CI will test the new docker images, and we will be able to deploy them to the infrastructure.
Expand Down
10 changes: 0 additions & 10 deletions Makefile
Expand Up @@ -23,8 +23,6 @@ install:
$(MAKE) -C e2e/ install
$(MAKE) -C services/api/ install
$(MAKE) -C services/admin/ install
$(MAKE) -C libs/libcache/ install
$(MAKE) -C libs/libqueue/ install
$(MAKE) -C libs/libcommon/ install
$(MAKE) -C workers/first_rows install
$(MAKE) -C workers/splits install
Expand All @@ -51,8 +49,6 @@ stop-from-remote-images:
test:
$(MAKE) -C services/admin/ test
$(MAKE) -C services/api/ test
$(MAKE) -C libs/libcache/ test
$(MAKE) -C libs/libqueue/ test
$(MAKE) -C libs/libcommon/ test
$(MAKE) -C workers/first_rows test
$(MAKE) -C workers/splits test
Expand All @@ -61,8 +57,6 @@ test:
coverage:
$(MAKE) -C services/admin/ coverage
$(MAKE) -C services/api/ coverage
$(MAKE) -C libs/libcache/ coverage
$(MAKE) -C libs/libqueue/ coverage
$(MAKE) -C libs/libcommon/ coverage
$(MAKE) -C workers/first_rows coverage
$(MAKE) -C workers/splits coverage
Expand All @@ -75,8 +69,6 @@ quality:
$(MAKE) -C chart/ quality
$(MAKE) -C services/api/ quality
$(MAKE) -C services/admin/ quality
$(MAKE) -C libs/libcache/ quality
$(MAKE) -C libs/libqueue/ quality
$(MAKE) -C libs/libcommon/ quality
$(MAKE) -C workers/first_rows quality
$(MAKE) -C workers/splits quality
Expand All @@ -87,8 +79,6 @@ style:
$(MAKE) -C e2e/ style
$(MAKE) -C services/api/ style
$(MAKE) -C services/admin/ style
$(MAKE) -C libs/libcache/ style
$(MAKE) -C libs/libqueue/ style
$(MAKE) -C libs/libcommon/ style
$(MAKE) -C workers/first_rows style
$(MAKE) -C workers/splits style
Expand Down
10 changes: 5 additions & 5 deletions chart/docker-images.yaml
Expand Up @@ -2,15 +2,15 @@
"dockerImage": {
"reverseProxy": "docker.io/nginx:1.20",
"jobs": {
"mongodbMigration": "huggingface/datasets-server-jobs-mongodb_migration:sha-2d81b2f"
"mongodbMigration": "huggingface/datasets-server-jobs-mongodb_migration:sha-c815296"
},
"services": {
"admin": "huggingface/datasets-server-services-admin:sha-2d81b2f",
"api": "huggingface/datasets-server-services-api:sha-2d81b2f"
"admin": "huggingface/datasets-server-services-admin:sha-7eb22ac",
"api": "huggingface/datasets-server-services-api:sha-6907835"
},
"workers": {
"splits": "huggingface/datasets-server-workers-splits:sha-2d81b2f",
"firstRows": "huggingface/datasets-server-workers-first_rows:sha-2d81b2f"
"splits": "huggingface/datasets-server-workers-splits:sha-5b9a872",
"firstRows": "huggingface/datasets-server-workers-first_rows:sha-5b9a872"
}
}
}
12 changes: 6 additions & 6 deletions chart/static-files/openapi.json
Expand Up @@ -1836,37 +1836,37 @@
"missing-dataset": {
"summary": "The dataset parameter is missing.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameter 'dataset' is required"
}
},
"missing-config": {
"summary": "The config parameter is missing.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameters 'config' and 'split' are required"
}
},
"missing-split": {
"summary": "The split parameter is missing.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameters 'config' and 'split' are required"
}
},
"empty-dataset": {
"summary": "The dataset parameter is empty.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameter 'dataset' is required"
}
},
"empty-config": {
"summary": "The config parameter is empty.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameters 'config' and 'split' are required"
}
},
"empty-split": {
"summary": "The split parameter is empty.",
"value": {
"error": "Parameters 'dataset', 'config' and 'split' are required"
"error": "Parameters 'config' and 'split' are required"
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions e2e/pyproject.toml
Expand Up @@ -26,7 +26,11 @@ build-backend = "poetry.core.masonry.api"
requires = ["poetry-core>=1.0.0"]

[tool.pytest.ini_options]
# addopts = "-k 'wip'"
filterwarnings = ["ignore::DeprecationWarning"]
markers = [
"wip: tests being developed"
]

[tool.isort]
profile = "black"
Expand Down
6 changes: 3 additions & 3 deletions e2e/tests/fixtures/hub.py
Expand Up @@ -5,7 +5,7 @@

import time
from contextlib import contextmanager, suppress
from typing import Dict, Iterable, Literal, Optional, TypedDict
from typing import Iterable, Literal, Mapping, Optional, TypedDict

import pytest
import requests
Expand Down Expand Up @@ -37,7 +37,7 @@ def update_repo_settings(
organization: Optional[str] = None,
repo_type: Optional[str] = None,
name: str = None,
) -> Dict[str, bool]:
) -> Mapping[str, bool]:
"""Update the settings of a repository.
Args:
repo_id (`str`, *optional*):
Expand Down Expand Up @@ -207,7 +207,7 @@ def hf_dataset_repos_csv_data(


AuthType = Literal["cookie", "token", "none"]
AuthHeaders = Dict[AuthType, Dict[str, str]]
AuthHeaders = Mapping[AuthType, Mapping[str, str]]


@pytest.fixture(autouse=True, scope="session")
Expand Down
11 changes: 5 additions & 6 deletions e2e/tests/test_10_healthcheck.py
@@ -1,15 +1,14 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

import pytest

from .utils import poll


def test_healthcheck():
@pytest.mark.parametrize("endpoint", ["/", "/healthcheck", "/metrics"])
def test_healthcheck(endpoint: str) -> None:
# this tests ensures the /healthcheck and the /metrics endpoints are hidden
response = poll("/healthcheck", expected_code=404)
assert response.status_code == 404, f"{response.status_code} - {response.text}"
assert "Not Found" in response.text, response.text

response = poll("/metrics", expected_code=404)
response = poll(endpoint, expected_code=404)
assert response.status_code == 404, f"{response.status_code} - {response.text}"
assert "Not Found" in response.text, response.text
54 changes: 24 additions & 30 deletions e2e/tests/test_11_auth.py
Expand Up @@ -6,10 +6,10 @@
from .fixtures.hub import AuthHeaders, AuthType, DatasetRepos, DatasetReposType
from .utils import (
Response,
get,
get_default_config_split,
poll_first_rows,
refresh_poll_splits,
poll_splits,
post_refresh,
)


Expand All @@ -19,45 +19,39 @@ def log(response: Response, dataset: str) -> str:


@pytest.mark.parametrize(
"type,auth,status_code,error_code_splits,error_code_first_rows",
"type,auth,webhook_status_code,response_status_code,error_code_splits,error_code_first_rows",
[
("public", "none", 200, None, None),
("public", "token", 200, None, None),
("public", "cookie", 200, None, None),
("gated", "none", 401, "ExternalUnauthenticatedError", "ExternalUnauthenticatedError"),
("gated", "token", 200, None, None),
("gated", "cookie", 200, None, None),
("private", "none", 401, "ExternalUnauthenticatedError", "ExternalUnauthenticatedError"),
("private", "token", 404, "SplitsResponseNotFound", "FirstRowsResponseNotFound"),
("private", "cookie", 404, "SplitsResponseNotFound", "FirstRowsResponseNotFound"),
("public", "none", 200, 200, None, None),
("public", "token", 200, 200, None, None),
("public", "cookie", 200, 200, None, None),
# gated: webhook_status_code is 200 because the access is asked for the app token, not the user token
# (which is not passed to the webhook request)
("gated", "none", 200, 401, "ExternalUnauthenticatedError", "ExternalUnauthenticatedError"),
("gated", "token", 200, 200, None, None),
("gated", "cookie", 200, 200, None, None),
# private: webhook_status_code is 400 because the access is asked for the app token, which has no
# access to the private datasets. As a consequence, no data in the cache
("private", "none", 400, 401, "ExternalUnauthenticatedError", "ExternalUnauthenticatedError"),
("private", "token", 400, 404, "ResponseNotFound", "ResponseNotFound"),
("private", "cookie", 400, 404, "ResponseNotFound", "ResponseNotFound"),
],
)
def test_split_public_auth(
auth_headers: AuthHeaders,
hf_dataset_repos_csv_data: DatasetRepos,
type: DatasetReposType,
auth: AuthType,
status_code: int,
webhook_status_code: int,
response_status_code: int,
error_code_splits: str,
error_code_first_rows: str,
) -> None:
if auth not in auth_headers:
# ignore the test case if the auth type is not configured
pytest.skip(f"auth {auth} has not been configured")
dataset, config, split = get_default_config_split(hf_dataset_repos_csv_data[type])
# private: no need to refresh, it's not implemented.
r_splits = (
get(f"/splits?dataset={dataset}", headers=auth_headers[auth])
if type == "private"
else refresh_poll_splits(dataset, headers=auth_headers[auth])
)
assert r_splits.status_code == status_code, log(r_splits, dataset)
r_webhook = post_refresh(dataset)
assert r_webhook.status_code == webhook_status_code, log(r_webhook, dataset)
r_splits = poll_splits(dataset, headers=auth_headers[auth])
assert r_splits.status_code == response_status_code, log(r_splits, dataset)
assert r_splits.headers.get("X-Error-Code") == error_code_splits, log(r_splits, dataset)

r_rows = (
get(f"/first-rows?dataset={dataset}&config={config}&split={split}", headers=auth_headers[auth])
if type == "private"
else poll_first_rows(dataset, config, split, headers=auth_headers[auth])
)
assert r_rows.status_code == status_code, log(r_rows, dataset)
r_rows = poll_first_rows(dataset, config, split, headers=auth_headers[auth])
assert r_rows.status_code == response_status_code, log(r_rows, dataset)
assert r_rows.headers.get("X-Error-Code") == error_code_first_rows, log(r_rows, dataset)
16 changes: 4 additions & 12 deletions e2e/tests/test_12_splits.py
Expand Up @@ -3,13 +3,7 @@

import pytest

from .utils import (
get,
get_openapi_body_example,
poll,
post_refresh,
refresh_poll_splits,
)
from .utils import get, get_openapi_body_example, poll, poll_splits, post_refresh


@pytest.mark.parametrize(
Expand Down Expand Up @@ -43,19 +37,17 @@
# not tested: 'internal_error'
],
)
def test_splits(status: int, name: str, dataset: str, error_code: str):
def test_splits_using_openapi(status: int, name: str, dataset: str, error_code: str):
body = get_openapi_body_example("/splits", status, name)

if name == "empty-parameter":
r_splits = poll("/splits?dataset=", error_field="error")
elif name == "missing-parameter":
r_splits = poll("/splits", error_field="error")
elif name == "not-ready":
else:
post_refresh(dataset)
# poll the endpoint before the worker had the chance to process it
r_splits = get(f"/splits?dataset={dataset}")
else:
r_splits = refresh_poll_splits(dataset)
r_splits = get(f"/splits?dataset={dataset}") if name == "not-ready" else poll_splits(dataset)

assert r_splits.status_code == status, f"{r_splits.status_code} - {r_splits.text}"
assert r_splits.json() == body, r_splits.text
Expand Down

0 comments on commit 8c47e92

Please sign in to comment.