Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of Orchestrating Notebook Executions on Dataproc Serverless via Cloud Composer #1006

Closed
wants to merge 5 commits into from

Conversation

kristin-kim
Copy link
Contributor

Example of Orchestrating Notebook Executions on Dataproc Serverless via Cloud Composer

This PR adds an example to orchestrate running Notebooks on Dataproc Serverless via Cloud Composer using a specific Airflow operator, DataprocCreateBatchOperator(). It contains wrapper file for Notebook execution via PySpark job, Composer DAGs and sample input resources, sample Spark Notebook and datasets in sample spark session.

Typical customer scenario around this example is to 1) migrate and stage Spark Notebooks that were in legacy data lake to GCS then to 2) set up orchestration for deploying the staged Notebooks on Dataproc Serverless as a Spark batch job

@pull-request-size pull-request-size bot added the size/XXL Denotes a PR that changes 1000+ lines. label Mar 21, 2023
@kristin-kim kristin-kim marked this pull request as ready for review March 21, 2023 19:25
Copy link
Member

@NiloFreitas NiloFreitas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @kristin-kim :)

" .appName(\"Spark Session for Electric Vehicle Population\") \\\n",
" .getOrCreate()\n",
"\n",
"gcs_bucket = \"kristin-0105\"\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't hard code personal buckets. Try to think of another way when developing these Spark codes. You could for example, from the notebook, read a yaml file with the value of these parameters. And they are kept in this repo as <generic_placeholders>

" .appName(\"Spark Session for Electric Vehicle Population\") \\\n",
" .getOrCreate()\n",
"\n",
"gcs_bucket = \"kristin-0105\"\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't hard code personal buckets. Try to think of another way when developing these Spark codes. You could for example, from the notebook, read a yaml file with the value of these parameters. And they are kept in this repo as <generic_placeholders>

"metadata": {},
"outputs": [],
"source": [
"# !spark-shell"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this, do you?

},
"outputs": [],
"source": [
"# !spark-shell"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this, do you?

phs_region = Variable.get('phs_region')
phs = Variable.get('phs')

# Arguments to pass to Cloud Dataproc job.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually these arguments are passed to your papermill wrapper, right? Via Dataproc because you are running the Python file using Dataproc

# gcp-dataproc_serverless-running-notebooks

## Objective
Orchestrator to run Notebooks on Dataproc Serverless via Cloud Composer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orchestrate data pipelines built using PySpark and Jupyter Notebooks from Airflow/Cloud Composer, leveraging Dataproc Serverless

## File Details
### composer_input
* **wrapper_papermill.py**: runs a papermill execution of input notebook and writes the output file into the assgined location
* **serverless_airflow.py**: orchestrates the workflow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change filename to something like: dag_run_notebooks_dataproc.py


1. Make sure to modify gcs path for datasets in Notebook

2. Create [Persistent History Server](https://cloud.google.com/dataproc/docs/concepts/jobs/history-server)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be optional? Why did you set it as a necessary step? Removing this step and pointing out the documention if the user wants to use PHS would increase simplicity and possibly adoption

├── notebooks
│ ├── datasets/ electric_vehicle_population.csv
│ ├── jupyter/ spark_notebook.ipynb
│ ├── jupyter/output spark_notebook_outbook.ipynb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this if accept the other comment

3. Find DAGs folder from Composer Environment and add serverless_airflow.py (DAGs file) to it in order to trigger DAGs execution:
DAG folder from Cloud Composer Console

4. Have all the files available in GCS bucket, except DAGs file which should go into your Composer DAGs folder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Manually, or via Continuous Integration, copy the notebook source code files to the appropriate GCS bucket

  • Manually, or via Continuous Integration, copy the Airflow DAG files to your DAGs folder of your Cloud Composer environment GCS bucket

@agold-rh
Copy link
Contributor

@kristin-kim If you can address the review questions, I can try to get this merged.

@agold-rh
Copy link
Contributor

Closed as stale. Please re-open if I'm wrong.

@agold-rh agold-rh closed this May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/XXL Denotes a PR that changes 1000+ lines.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants