Skip to content

Commit

Permalink
Extend dataproc example dag (#21091)
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak committed Jan 25, 2022
1 parent aac5a1d commit 623163f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
49 changes: 48 additions & 1 deletion airflow/providers/google/cloud/example_dags/example_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import os
from datetime import datetime
from uuid import uuid4

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
Expand Down Expand Up @@ -178,13 +179,38 @@
},
"jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
}
BATCH_ID = "test-batch-id"
BATCH_ID = f"test-batch-id-{str(uuid4())}"
BATCH_CONFIG = {
"spark_batch": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
project_id=PROJECT_ID,
region=REGION,
master_machine_type="n1-standard-4",
worker_machine_type="n1-standard-4",
num_workers=0,
properties={
"spark:spark.history.fs.logDirectory": f"gs://{BUCKET}/logging",
},
enable_component_gateway=True,
).make()
CLUSTER_NAME_FOR_PHS = "phs-cluster-name"
BATCH_CONFIG_WITH_PHS = {
"spark_batch": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
"environment_config": {
"peripherals_config": {
"spark_history_server_config": {
"dataproc_cluster": f"projects/{PROJECT_ID}/regions/{REGION}/clusters/{CLUSTER_NAME_FOR_PHS}"
}
}
},
}


with models.DAG(
Expand Down Expand Up @@ -310,6 +336,26 @@
)
# [END how_to_cloud_dataproc_create_batch_operator]

# [START how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
create_cluster_for_phs = DataprocCreateClusterOperator(
task_id="create_cluster_for_phs",
project_id=PROJECT_ID,
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
region=REGION,
cluster_name=CLUSTER_NAME_FOR_PHS,
)
# [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server]

# [START how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
create_batch_with_phs = DataprocCreateBatchOperator(
task_id="create_batch_with_phs",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
)
# [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]

# [START how_to_cloud_dataproc_get_batch_operator]
get_batch = DataprocGetBatchOperator(
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
Expand All @@ -331,3 +377,4 @@
# [END how_to_cloud_dataproc_delete_batch_operator]

create_batch >> get_batch >> list_batches >> delete_batch
create_cluster_for_phs >> create_batch_with_phs
7 changes: 7 additions & 0 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class ClusterGenerator:
A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
:param customer_managed_key: The customer-managed key used for disk encryption
``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa
:param enable_component_gateway: Provides access to the web interfaces of default and selected optional
components on the cluster.
"""

def __init__(
Expand Down Expand Up @@ -197,6 +199,7 @@ def __init__(
auto_delete_time: Optional[datetime] = None,
auto_delete_ttl: Optional[int] = None,
customer_managed_key: Optional[str] = None,
enable_component_gateway: Optional[bool] = False,
**kwargs,
) -> None:

Expand Down Expand Up @@ -232,6 +235,7 @@ def __init__(
self.auto_delete_time = auto_delete_time
self.auto_delete_ttl = auto_delete_ttl
self.customer_managed_key = customer_managed_key
self.enable_component_gateway = enable_component_gateway
self.single_node = num_workers == 0

if self.custom_image and self.image_version:
Expand Down Expand Up @@ -339,6 +343,7 @@ def _build_cluster_data(self):
'lifecycle_config': {},
'encryption_config': {},
'autoscaling_config': {},
'endpoint_config': {},
}
if self.num_preemptible_workers > 0:
cluster_data['secondary_worker_config'] = {
Expand Down Expand Up @@ -401,6 +406,8 @@ def _build_cluster_data(self):
cluster_data['encryption_config'] = {'gce_pd_kms_key_name': self.customer_managed_key}
if self.autoscaling_policy:
cluster_data['autoscaling_config'] = {'policy_uri': self.autoscaling_policy}
if self.enable_component_gateway:
cluster_data['endpoint_config'] = {'enable_http_port_access': self.enable_component_gateway}

return cluster_data

Expand Down
18 changes: 18 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataproc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,24 @@ A batch can be created using:
:start-after: [START how_to_cloud_dataproc_create_batch_operator]
:end-before: [END how_to_cloud_dataproc_create_batch_operator]

For creating a batch with Persistent History Server first you should create a Dataproc Cluster
with specific parameters. Documentation how create cluster you can find here:
https://cloud.google.com/dataproc/docs/concepts/jobs/history-server#setting_up_a_persistent_history_server

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
:end-before: [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server]

After Cluster was created you should add it to the Batch configuration.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
:end-before: [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]

Get a Batch
-----------

Expand Down
5 changes: 5 additions & 0 deletions tests/providers/google/cloud/operators/test_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"initialization_actions": [
{"executable_file": "init_actions_uris", "execution_timeout": {'seconds': 600}}
],
"endpoint_config": {},
}

CONFIG_WITH_CUSTOM_IMAGE_FAMILY = {
Expand Down Expand Up @@ -153,6 +154,9 @@
"initialization_actions": [
{"executable_file": "init_actions_uris", "execution_timeout": {'seconds': 600}}
],
"endpoint_config": {
"enable_http_port_access": True,
},
}

LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION}
Expand Down Expand Up @@ -367,6 +371,7 @@ def test_build_with_custom_image_family(self):
auto_delete_time=datetime(2019, 9, 12),
auto_delete_ttl=250,
customer_managed_key="customer_managed_key",
enable_component_gateway=True,
)
cluster = generator.make()
assert CONFIG_WITH_CUSTOM_IMAGE_FAMILY == cluster
Expand Down

0 comments on commit 623163f

Please sign in to comment.