You're a Machine Learning Engineer that has just arrived to an existing project. The Data Scientist has given you a seemingly working notebook, and now it's your task to refactor it appropriately so that the analysis is more reproducible and the process is easier to deploy to production. For that, you will first verify that everything is working, and then convert the notebook to a Kedro project.
What's the starting point?
- Some data
- A notebook analyzing it
- The needed requirements to get it working
- Download the data. Fill the form at https://openrepair.org/open-data/downloads/ or use the direct URL. Extract all the files and place them under the
data/
directory. - Setup your development environment. Create a conda/mamba environment called
repair311
with Python 3.11, activate it, and install the dependencies fromrequirements.txt
using pip. - Verify that everything works. Run the notebook top to bottom. How many rows does the repair events dataset have?
Success! 🎉 You can now start working on the project for real. The fun starts now.
- Add
kedro~=0.18.13
andkedro-datasets[polars]~=1.7
torequirements.txt
and install them. Verify thatkedro info
works. - Create a
catalog.yml
file, and register the categories dataset as follows:
# catalog.yml
openrepair-0_3-categories:
type: polars.CSVDataSet
filepath: data/OpenRepairData_v0.3_Product_Categories.csv
- Add the following code snippet at the beginning of the notebook to use the Kedro catalog for data loading:
from kedro.config import OmegaConfigLoader
from kedro.io import DataCatalog
conf_loader = OmegaConfigLoader(".", base_env=".", default_run_env=".")
conf_catalog = conf_loader.get("catalog")
catalog = DataCatalog.from_config(conf_catalog)
- Finally, replace the
categories = pl.read_csv(...)
call withcategories = catalog.load("openrepair-0_3-events-categories")
. Verify that everything works. - Do the same with the events
DataFrame
: register it in thecatalog.yml
and replace thedf = pl.read_csv(...)
with the appropriatedf = catalog.load("...")
call. For that, use the following dataset definition:
# conf/base/catalog.yml
openrepair-0_3-events-raw:
type: polars.CSVDataSet
filepath: data/OpenRepairData_v0.3_aggregate_202210.csv
load_args:
dtypes:
product_age: ${pl:Float64}
group_identifier: ${pl:Utf8}
try_parse_dates: true
and modify the configuration loader code as follows:
conf_loader = OmegaConfigLoader(
".", base_env=".", default_run_env=".", custom_resolvers={
"pl": lambda obj: getattr(pl, obj),
}
)
Success! 🎉 After adding some Kedro boilerplate, you decoupled the Jupyter notebook from the actual file locations and loading options. From this point, let's start making use of the full power of the Kedro framework.
- Install
flit
usingpip
, and runflit init
to create apyproject.toml
file with the appropriate project metadata. Answer the wizard as follows:
Module name: openrepair
Author: (Your name)
Author email: (Your email)
Home page: (Anything, or blank)
Choose a license (see http://choosealicense.com/ for more info)
1. MIT - simple and permissive
2. Apache - explicitly grants patent rights
3. GPL - ensures that code based on this is shared with the same terms
4. Skip - choose a license later
Enter 1-4 [4]: 4
- Create a
src/openrepair
directory, and place these contents insrc/openrepair/__init__.py
:
# src/openrepair/__init__.py
"""
OpenRepair data analysis library code.
"""
__version__ = "0.1.0"
- Verify that the project can be installed as a Python library running
pip install --editable .
. - Add the Kedro build configuration to
pyproject.toml
:
# pyproject.toml
[tool.kedro]
package_name = "openrepair"
project_name = "openrepair"
kedro_init_version = "0.18.13"
Verify that kedro --help
shows a new section called "Project specific commands from Kedro".
- Add the Kedro application configuration to a new file
src/openrepair/settings.py
:
import polars as pl
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
"custom_resolvers": {
"pl": lambda obj: getattr(pl, obj),
}
}
- Go back to the notebook, and replace the catalog initialization by this line:
%load_ext kedro.ipython
. Verify that everything works.
Success! 🎉 You created a basic Python library, which will serve as the blueprint for all the reusable code you will write. You are done with the boilerplate and are ready to start leveraging the power of Kedro.
- Create a new data processing pipeline by running
kedro pipeline create data_processing
. - Turn the code that joins the two
DataFrames
and cleans the result into Python functions by adding this tosrc/openrepair/pipelines/data_processing/nodes.py
:
# src/openrepair/pipelines/data_processing/nodes.py
import polars as pl
def join_events_categories(events: pl.DataFrame, categories: pl.DataFrame):
df_clean = events.select(pl.all().exclude("product_category")).join(
categories, on="product_category_id"
)
return df_clean
def consolidate_barriers(df_clean: pl.DataFrame):
return df_clean.with_columns(
pl.col("repair_barrier_if_end_of_life").map_dict(
{"Item too worn out": "Product too worn out"},
default=pl.col("repair_barrier_if_end_of_life"),
)
)
- Craft the pipeline by modifying
src/openrepair/pipelines/data_processing/pipeline.py
as follows:
# src/openrepair/pipelines/data_processing/pipeline.py
from .nodes import join_events_categories, consolidate_barriers
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=join_events_categories,
inputs=["openrepair-0_3-events-raw", "openrepair-0_3-categories"],
outputs="openrepair-0_3-combined",
name="join_events_categories_node",
),
node(
func=consolidate_barriers,
inputs="openrepair-0_3-combined",
outputs="openrepair-0_3",
name="consolidate_barriers",
),
]
)
- Register the pipeline by creating a
src/openrepair/pipeline_registry.py
module with these contents:
# src/openrepair/pipeline_registry.py
"""Project pipelines."""
from kedro.framework.project import find_pipelines
from kedro.pipeline import Pipeline
def register_pipelines() -> dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from pipeline names to ``Pipeline`` objects.
"""
pipelines = find_pipelines()
pipelines["__default__"] = sum(pipelines.values())
return pipelines
Verify that kedro registry list
shows a __default__
pipeline as well as the data processing one.
- Add
kedro-viz
torequirements.txt
and install it. After that, runkedro viz
, and wait for the web interface to open.
Success! 🎉 You just created your first Kedro pipeline and now you can see it as a beautiful directed acyclic graph (DAG). Now it's time to actually save those intermediate results to disk.
### Open ended: Refine your pipeline and visualize artifacts
- Register the intermediate datasets used in the data processing nodes by adding these contents to
conf/base/catalog.yml
:
# conf/base/catalog.yml
openrepair-0_3-combined:
type: polars.GenericDataset
file_format: parquet
filepath: data/02_intermediate/openrepairdata_v0.3_combined.pq
openrepair-0_3:
type: polars.GenericDataset
file_format: parquet
filepath: data/03_primary/openrepairdata_v0.3_clean.pq
- Run the pipeline by running
kedro run
. Verify that adata/03_primary/openrepairdata_v0.3_clean.pq
file appeared on the filesystem. - Create a
notebooks
directory, and move the EDA notebook there. - Add a new
notebooks/data-science.ipynb
notebook and, using thekedro.ipython
extension and thecatalog
, load theopenrepair-0_3
and extract insights from it. For example, here is a plot of the repair statuses by year:
- Kedro documentation: https://docs.kedro.org/
# Create a mamba environment using micromamba (you can replace micromamba with mamba or conda)
# Replace $VARIABLES with actual parameters or give them a value
$ micromamba create -n $ENV_NAME python=$PY_VERSION -c conda-forge
# Activate a mamba environment using micromamba (you can replace micromamba with mamba or conda)
$ micromamba activate $NEV_NAME
# Install dependencies from `requirements.txt` file using pip
$ pip install -r requirements.txt
## Extra
The MinIO client and S3FS are pre-installed in the image. You can use the MinIO play environment as follows:
$ mc alias ls play # To see credentials
$ mc mb play/openrepair
$ mc cp data/OpenRepairData_v0.3_aggregate_202210.csv play/openrepair
$ export FSSPEC_S3_ENDPOINT_URL=https://play.min.io
$ export FSSPEC_S3_KEY=...
$ export FSSPEC_S3_SECRET=...
$ sed -i 's;filepath: data/01_raw/OpenRepairData_v0.3_aggregate_202210.csv;filepath: s3://openrepair/OpenRepairData_v0.3_aggregate_202210.csv;g' conf/base/catalog.yml # Replace path
$ kedro ipython # Test access