Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add airflow integration page #2990

Merged
merged 2 commits into from Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions bentoml/bentos.py
Expand Up @@ -14,6 +14,7 @@
from simple_di import Provide

from bentoml.exceptions import InvalidArgument
from bentoml.exceptions import BentoMLException

from ._internal.tag import Tag
from ._internal.bento import Bento
Expand Down Expand Up @@ -222,18 +223,28 @@ def export_bento(
def push(
tag: t.Union[Tag, str],
*,
force: bool = False,
_bento_store: "BentoStore" = Provide[BentoMLContainer.bento_store],
):
raise NotImplementedError
"""Push Bento to a yatai server."""
from bentoml._internal.yatai_client import yatai_client

bento = _bento_store.get(tag)
if not bento:
raise BentoMLException(f"Bento {tag} not found in local store")
parano marked this conversation as resolved.
Show resolved Hide resolved
yatai_client.push_bento(bento, force=force)


@inject
def pull(
tag: t.Union[Tag, str],
*,
force: bool = False,
_bento_store: "BentoStore" = Provide[BentoMLContainer.bento_store],
):
raise NotImplementedError
from bentoml._internal.yatai_client import yatai_client

yatai_client.pull_bento(tag, force=force, bento_store=_bento_store)


@inject
Expand Down
16 changes: 14 additions & 2 deletions bentoml/models.py
Expand Up @@ -8,6 +8,7 @@
from simple_di import inject
from simple_di import Provide

from .exceptions import BentoMLException
from ._internal.tag import Tag
from ._internal.utils import calc_dir_size
from ._internal.models import Model
Expand Down Expand Up @@ -204,20 +205,31 @@ def export_model(
)


@inject
def push(
tag: t.Union[Tag, str],
*,
force: bool = False,
_model_store: "ModelStore" = Provide[BentoMLContainer.model_store],
):
raise NotImplementedError
from bentoml._internal.yatai_client import yatai_client

model_obj = _model_store.get(tag)
if not model_obj:
raise BentoMLException(f"Model {tag} not found in local store")
parano marked this conversation as resolved.
Show resolved Hide resolved
yatai_client.push_model(model_obj, force=force)


@inject
def pull(
tag: t.Union[Tag, str],
*,
force: bool = False,
_model_store: "ModelStore" = Provide[BentoMLContainer.model_store],
) -> Model:
raise NotImplementedError
from bentoml._internal.yatai_client import yatai_client

yatai_client.pull_model(tag, force=force)


@inject
Expand Down
169 changes: 168 additions & 1 deletion docs/source/integrations/airflow.rst
@@ -1,3 +1,170 @@
=======
Airflow
=======
=======

Apache Airflow is a platform to programmatically author, schedule and monitor workflows.
It is a commonly used framework for building model training pipelines in ML projects.
BentoML provides a flexible set of APIs for integrating natively with Apache Airflow.
Users can use Airflow to schedule their model training pipelines and use BentoML to keep
tracked of trained model artifacts and optionally deploy them to production in an
automated fashion.

This is especially userful for teams that can benefit from retraining models often with
newly arrived data, and want to update their production models regularly with
confidence.

For more in-depth Airflow tutorials, please visit `the Airflow documentation <https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html>`_.


Overview
--------

A typical Airflow pipeline with a BentoML serving & deployment workflow look like this:

1. Fetch new data batches from a data source
2. Split the data in train and test sets
3. Perform feature extraction on training data set
4. Train a new model using the training data set
5. Perform model evaluation and validation
6. :doc:`Save model with BentoML </concepts/model>`
7. :ref:`Push saved model to Yatai registry (or export model to s3) <concepts/model:Managing Models>`
8. :doc:`Build a new Bento using the newly trained model </concepts/bento>`
9. Run integration test on the Bento to verify the entire serving pipeline
10. :ref:`Push the Bento to a Yatai (or export bento to s3) <concepts/bento:Managing Bentos>`
11. (Optional) Trigger a redeployment via Yatai, bentoctl, or custom deploy script


Pro Tips
--------

Pipeline Dependencies
~~~~~~~~~~~~~~~~~~~~~

The default PythonOperator requires all the dependencies to be installed on the Airflow
environment. This can be challenging to manage when the pipeline is running on a remote
Airflow deployment and running a mix of different tasks.

To avoid this, we recommend managing dependencies of your ML pipeline with the
`PythonVirtualenvOperator <https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator>`_,
which runs your code in a virtual environment. This allows you to define your Bento's
dependencies in a ``requirements.txt`` file and use it across training pipeline and the
bento build process. For example:

.. code-block:: python

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task

with DAG(
dag_id='example_bento_build_operator',
description='A simple tutorial DAG with BentoML',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:

@task.virtualenv(
task_id="bento_build",
requirements='./requirements.txt',
system_site_packages=False,
provide_context=True,
)
def build_bento(**context):
"""
Perform Bento build in a virtual environment.
"""
import bentoml
bento = bentoml.bentos.build(
"service.py:svc",
labels={
"job_id": context.run_id
},
python={
requirements_txt: "./requirements.txt"
},
include=["*"],
)

build_bento_task = build_bento()



Artifact Management
~~~~~~~~~~~~~~~~~~~

Since Airflow is a distributed system, it is important to save the
:doc:`Models </concepts/model>` and :doc:`Bentos </concepts/bento>` produced in your
Airflow pipeline to a central location that is accessible by all the nodes in the
Airflow cluster, and also by the workers in your production deployment environment.

For a simple setup, we recommend using the Import/Export API for
:ref:`Model <concepts/model:Managing Models>` and
:ref:`Bento <concepts/bento:Managing Bentos>`. This allows you to export the model files
directly to cloud storage, and import them from the same location when needed. E.g:

.. code-block:: python

bentoml.models.export_model('s3://my_bucket/folder/')
bentoml.models.import_model('s3://my_bucket/folder/iris_clf-3vl5n7qkcwqe5uqj.bentomodel')

bentoml.export_bento('s3://my_bucket/bentos/')
bentoml.import_bento('s3://my_bucket/bentos/iris_classifier-7soszfq53sv6huqj.bento')

For a more advanced setup, we recommend using the Model and Bento Registry feature
provided in `Yatai <https://github.com/bentoml/Yatai>`_, which provides additional
management features such as filtering, labels, and a web UI for browsing and managing
models. E.g:

.. code-block:: python

bentoml.models.push("iris_clf:latest")
bentoml.models.pull("iris_clf:3vl5n7qkcwqe5uqj")

bentoml.push("iris_classifier:latest")
bentoml.pull("iris_classifier:mcjbijq6j2yhiusu")


Python API or CLI
~~~~~~~~~~~~~~~~~

BentoML provides both Python APIs and CLI commands for most workflow management tasks,
such as building Bento, managing Models/Bentos, and deploying to production.

When using the Python APIs, you can organize your code in a Airflow PythonOperator task.
And for CLI commands, you can use the `BashOperator <https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html>`_
instead.


Validating new Bento
~~~~~~~~~~~~~~~~~~~~

It is important to validate the new Bento before deploying it to production. The
`bentoml.testing` module provides a set of utility functions for building behavior tests
for your BentoML Service, by launching the API server in a docker container and sending
test requests to it.

The BentoML community is also building a standardized way of defining and running
test cases for your Bento, that can be easily integrated with your CI/CD pipeline in
an Airflow job. See `#2967 <https://github.com/bentoml/BentoML/issues/2967>`_ for the
latest progress.

Saving model metadata
~~~~~~~~~~~~~~~~~~~~~

When saving a model with BentoML, you can pass in a dictionary of metadata to be saved
together with the model. This can be useful for tracking model evaluation metrics and
training context, such as the training dataset timestamp, training code version, or
training parameters.


Sample Project
--------------

The following is a sample project created by the BentoML community member Sarah Floris,
that demonstrates how to use BentoML with Airflow:

* 📖 `Deploying BentoML using Airflow <https://medium.com/codex/deploying-bentoml-using-airflow-28972343ac68>`_
* 💻 `Source Code <https://github.com/sdf94/bentoml-airflow>`_

1 change: 1 addition & 0 deletions docs/source/integrations/index.rst
Expand Up @@ -7,5 +7,6 @@ Integrations
:maxdepth: 1
:titlesonly:

airflow
flink
mlflow