Skip to content

Commit

Permalink
System test for EMR Serverless (#25559)
Browse files Browse the repository at this point in the history
* System test for EMR Serverless following the template in #24643 (AIP-47)

* Remove example_emr_serverless.py from example_dags
  • Loading branch information
syedahsn committed Aug 6, 2022
1 parent 439d9ba commit 33fbe75
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Create an EMR Serverless Application
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator` to
create a new EMR Serverless Application.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_create_application]
Expand All @@ -55,7 +55,7 @@ Start an EMR Serverless Job
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator` to
start an EMR Serverless Job.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_start_job]
Expand All @@ -69,7 +69,7 @@ Delete an EMR Serverless Application
You can use :class:`~airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator` to
delete an EMR Serverless Application.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_operator_emr_serverless_delete_application]
Expand All @@ -86,7 +86,7 @@ Wait on an EMR Serverless Job state
To monitor the state of an EMR Serverless Job you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_serverless_job]
Expand All @@ -100,7 +100,7 @@ Wait on an EMR Serverless Application state
To monitor the state of an EMR Serverless Application you can use
:class:`~airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_emr_serverless.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_emr_serverless.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_emr_serverless_application]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,55 @@
# specific language governing permissions and limitations
# under the License.


from datetime import datetime
from os import getenv

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.emr import (
EmrServerlessCreateApplicationOperator,
EmrServerlessDeleteApplicationOperator,
EmrServerlessStartJobOperator,
)
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.sensors.emr import EmrServerlessApplicationSensor, EmrServerlessJobSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

DAG_ID = 'example_emr_serverless'

# Externally fetched variables:
ROLE_ARN_KEY = 'ROLE_ARN'

EXECUTION_ROLE_ARN = getenv('EXECUTION_ROLE_ARN', 'execution_role_arn')
EMR_EXAMPLE_BUCKET = getenv('EMR_EXAMPLE_BUCKET', 'emr_example_bucket')
SPARK_JOB_DRIVER = {
"sparkSubmit": {
"entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py",
"entryPointArguments": [f"s3://{EMR_EXAMPLE_BUCKET}/output"],
"sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
--conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
}
}

SPARK_CONFIGURATION_OVERRIDES = {
"monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{EMR_EXAMPLE_BUCKET}/logs"}}
}
sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build()

with DAG(
dag_id='example_emr_serverless',
schedule_interval=None,
dag_id=DAG_ID,
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as emr_serverless_dag:
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]
role_arn = test_context[ROLE_ARN_KEY]
bucket_name = f'{env_id}-emr-serverless-bucket'
entryPoint = "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py"
create_s3_bucket = S3CreateBucketOperator(task_id='create_s3_bucket', bucket_name=bucket_name)

SPARK_JOB_DRIVER = {
"sparkSubmit": {
"entryPoint": entryPoint,
"entryPointArguments": [f"s3://{bucket_name}/output"],
"sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g\
--conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
}
}

SPARK_CONFIGURATION_OVERRIDES = {
"monitoringConfiguration": {"s3MonitoringConfiguration": {"logUri": f"s3://{bucket_name}/logs"}}
}

# [START howto_operator_emr_serverless_create_application]
emr_serverless_app = EmrServerlessCreateApplicationOperator(
Expand All @@ -70,7 +85,7 @@
start_job = EmrServerlessStartJobOperator(
task_id='start_emr_serverless_job',
application_id=emr_serverless_app.output,
execution_role_arn=EXECUTION_ROLE_ARN,
execution_role_arn=role_arn,
job_driver=SPARK_JOB_DRIVER,
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)
Expand All @@ -84,14 +99,39 @@

# [START howto_operator_emr_serverless_delete_application]
delete_app = EmrServerlessDeleteApplicationOperator(
task_id='delete_application', application_id=emr_serverless_app.output, trigger_rule="all_done"
task_id='delete_application',
application_id=emr_serverless_app.output,
trigger_rule=TriggerRule.ALL_DONE,
)
# [END howto_operator_emr_serverless_delete_application]

delete_s3_bucket = S3DeleteBucketOperator(
task_id='delete_s3_bucket',
bucket_name=bucket_name,
force_delete=True,
trigger_rule=TriggerRule.ALL_DONE,
)
chain(
# TEST SETUP
test_context,
create_s3_bucket,
# TEST BODY
emr_serverless_app,
wait_for_app_creation,
start_job,
wait_for_job,
# TEST TEARDOWN
delete_app,
delete_s3_bucket,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 33fbe75

Please sign in to comment.