Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PySpark Integration #519

Merged
merged 37 commits into from
Oct 16, 2019
Merged

Add PySpark Integration #519

merged 37 commits into from
Oct 16, 2019

Conversation

AbhiPrasad
Copy link
Member

@AbhiPrasad AbhiPrasad commented Oct 1, 2019

This is draft PR for the purposes of reviewing the work I have so far. Much of this is still in flux, and has to be extensively tested on different executions env like Databricks/Dataproc/YARN.

We make available the integration for both the Spark driver as well as Spark workers (which may be in different clusters depending on the execution environment). In order to add the sentry-integration, you must install the sentry-sdk.

In order to start sending Sentry errors from the main Spark driver, you can simply init Sentry before you init your SparkContext when writing your job.

import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration

if __name__ == "__main__":
    sentry_sdk.init("___PUBLIC_DSN___", integrations=[SparkIntegration()])
    spark = SparkSession\
        .builder\
        .appName("ExampleApp")\
        .getOrCreate()
    ...

You can also just use StreamingContext in case of SparkStreaming usage or just use SparkContext.

This will log errors to Sentry, building breadcrumbs using
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L216

In order to instrument workers, additional steps will have to be taken. The configuration options used to instrument the workers are only available in Spark 2.4 and above.

First, you must install the sentry-sdk on your workers. This can be done locally be sending the sentry-python egg or zip to spark using the --py-files cmd line option or with sc.addPyFile().

Next, we have to monkey patch the spark daemon to initialize Sentry on each worker. This can be done by creating a file with the following code.

import sentry_sdk
from sentry_sdk.integrations.spark import SparkWorkerIntegration
import pyspark.daemon as original_daemon

if __name__ == '__main__':
    sentry_sdk.init("___PUBLIC_DSN___", integrations=[SparkWorkerIntegration()])
    original_daemon.manager()

In order to tell Spark to replace the daemon, you must set some properties, either with a custom SparkConf or using the --conf cmd line option. These are spark.python.use.daemon=true and spark.python.daemon.module=sentry_patched_worker_file_name. You must also use --py-files to send sentry_patched_worker_file_name.py to each of the workers.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L56-L63

Both the workers and drivers share a common application_id, so we can easily filter for which workers are associated with what drivers.

Next Steps:

from sentry_sdk.integrations.spark.spark_driver import SparkIntegration
from sentry_sdk.integrations.spark.spark_worker import sentry_worker_main

__all__ = ["SparkIntegration", "sentry_worker_main"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to export sentry_worker_main for users to patch in? Why can't we patch main ourselves?

Copy link
Member Author

@AbhiPrasad AbhiPrasad Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't patch main ourselves because the daemon code has already been sent to the distributed executors by the time we patch SparkContext (which is the earliest we can do anything). As a result, we opt to set it when Sentry.init is called in the daemon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should also have a SparkWorkerIntegration in this case?

sentry_sdk/integrations/spark/spark_driver.py Outdated Show resolved Hide resolved
sentry_sdk/integrations/spark/spark_driver.py Outdated Show resolved Hide resolved
sentry_sdk/integrations/spark/spark_driver.py Show resolved Hide resolved
sentry_sdk/integrations/spark/spark_worker.py Show resolved Hide resolved
@AbhiPrasad
Copy link
Member Author

Update: This is probably ready for another look through.

I went through and refactored some of the driver/worker code. I also added tests for the driver integration.

For some more context - here is a WIP branch for the docs https://github.com/getsentry/sentry-docs/compare/abhi/pyspark-docs.

I'm not sure if I should be using spark or pyspark, thoughts?

@untitaker
Copy link
Member

@AbhiPrasad spark is probably fine


sparkContext = SparkContext._active_spark_context
if sparkContext:
sparkContext.setLocalProperty("app_name", sparkContext.appName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we safely do this without interfering with user-set properties? Why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set a local property so that the worker has access to "app_name" and "application_id".

if "app_name" in taskContext._localProperties:
    scope.set_tag("app_name", taskContext._localProperties["app_name"])
    scope.set_tag(
        "application_id", taskContext._localProperties["application_id"]
    )

It might be a good idea to rename it to something more sentry specific so that it won't get changed by the user. sentry_integration_app_name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is necessary. Couldn't you set them on the scope directly?

(also I would prefer it if you used an event processor here like in the other code sample I posted)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/blob/master/python/pyspark/context.py#L1015-L1020

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L668-L685

The driver propagates these local properties to the workers, I'm essentially using this as a way to communicate between the driver process and worker process (because they are on different clusters). We set the local property with the driver integration so that the worker integration can access it.

If we don't use setLocalProperty in the driver, the worker integration cannot set tags for application_id and app_name (which makes it much harder to pair worker errors with driver errors by tag in Sentry itself). I have not found any other way to access this information with just a worker.

Did change to use an event processor 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, could you add a short comment that this is how you transport data over the wire

Copy link
Member

@untitaker untitaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually this looks fine to me now. I don't know if it actually works, but as far as I am concerned we can release a new version if it does for you.

@codecov-io
Copy link

codecov-io commented Oct 11, 2019

Codecov Report

Merging #519 into master will decrease coverage by 6.09%.
The diff coverage is 17.11%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master     #519     +/-   ##
=========================================
- Coverage   85.91%   79.81%   -6.1%     
=========================================
  Files         103       99      -4     
  Lines        8155     7823    -332     
  Branches      864      819     -45     
=========================================
- Hits         7006     6244    -762     
- Misses        831     1299    +468     
+ Partials      318      280     -38
Impacted Files Coverage Δ
sentry_sdk/integrations/spark/__init__.py 100% <100%> (ø)
sentry_sdk/integrations/spark/spark_worker.py 21.42% <21.42%> (ø)
tests/integrations/spark/test_spark.py 3.42% <3.42%> (ø)
sentry_sdk/integrations/spark/spark_driver.py 31.34% <31.34%> (ø)
tests/integrations/django/myapp/routing.py 0% <0%> (-100%) ⬇️
tests/integrations/django/myapp/asgi.py 0% <0%> (-100%) ⬇️
tests/integrations/tornado/test_tornado.py 0% <0%> (-99.01%) ⬇️
tests/integrations/aws_lambda/test_aws.py 14.7% <0%> (-82.36%) ⬇️
sentry_sdk/integrations/aws_lambda.py 0% <0%> (-19.83%) ⬇️
tests/integrations/django/test_transactions.py 66.66% <0%> (-13.34%) ⬇️
... and 22 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ecbe592...91fee09. Read the comment docs.

@AbhiPrasad AbhiPrasad marked this pull request as ready for review October 14, 2019 18:53
sentry_sdk/integrations/spark/spark_driver.py Outdated Show resolved Hide resolved
sentry_sdk/integrations/spark/spark_worker.py Show resolved Hide resolved
tests/integrations/spark/test_spark.py Outdated Show resolved Hide resolved
@AbhiPrasad AbhiPrasad force-pushed the abhi/spark-integration branch 2 times, most recently from 91fee09 to 1e56a2c Compare October 15, 2019 18:31
@AbhiPrasad
Copy link
Member Author

AbhiPrasad commented Oct 15, 2019

Update: Addressed review comments, fix lint errors.

Seems like the pyspark tests are timing out, not sure why though.

Edit:

Seems like tests/test_transport.py is stalling.

$ .venv/bin/tox -e py3.7-pyspark                                        
py3.7-pyspark develop-inst-noop: /Users/abhijeetprasad/workspace/sentry-python
py3.7-pyspark installed: apipkg==1.5,atomicwrites==1.3.0,attrs==19.2.0,certifi==2019.9.11,coverage==4.5.4,dnspython==1.16.0,eventlet==0.25.1,execnet==1.7.1,filelock==3.0.12,gevent==1.4.0,greenlet==0.4.15,hypothesis==3.69.9,importlib-metadata==0.23,monotonic==1.5,more-itertools==7.2.0,pluggy==0.13.0,py==1.8.0,py4j==0.10.7,pyspark==2.4.4,pytest==3.7.3,pytest-cov==2.6.0,pytest-forked==1.0.2,pytest-localserver==0.4.1,pytest-xdist==1.23.0,-e git+git@github.com:getsentry/sentry-python.git@c995c1c96627c24d382c1454b6d94d1f2b54f706#egg=sentry_sdk,six==1.12.0,toml==0.10.0,tox==3.7.0,urllib3==1.25.6,virtualenv==16.7.5,Werkzeug==0.15.3,zipp==0.6.0
py3.7-pyspark run-test-pre: PYTHONHASHSEED='1834852915'
py3.7-pyspark run-test: commands[0] | py.test tests
============================================================================================================================================= test session starts ==============================================================================================================================================
platform darwin -- Python 3.7.4, pytest-3.7.3, py-1.8.0, pluggy-0.13.0
rootdir: /Users/abhijeetprasad/workspace/sentry-python, inifile: pytest.ini
plugins: forked-1.0.2, cov-2.6.0, xdist-1.23.0, hypothesis-3.69.9, localserver-0.4.1
collected 411 items / 18 skipped                                                                                                                                                                                                                                                                               

tests/test_basics.py ............................                                                                                                                                                                                                                                                        [  6%]
tests/test_client.py .......................................................................                                                                                                                                                                                                             [ 24%]
tests/test_scope.py .                                                                                                                                                                                                                                                                                    [ 24%]
tests/test_serializer.py s                                                                                                                                                                                                                                                                               [ 24%]
tests/test_tracing.py ..................                                                                                                                                                                                                                                                                 [ 28%]
tests/test_transport.py ........                                                                                                                                                                                                                                                                         [ 30%]
tests/utils/test_contextvars.py ..                                                                                                                                                                                                                                                                       [ 31%]
tests/test_transport.py 

@untitaker
Copy link
Member

@AbhiPrasad you had both pyspark and spark in the tox.ini which caused it to run the main test suite within the pyspark build. It's a mistake that test_transport even ran as part of pyspark build.

@harpaj
Copy link

harpaj commented Oct 16, 2019

Hi guys, I'm right now building this branch to try it on one of our pipelines. I don't have much time for testing unfortunately, it's a busy week, but I could at least tell you if I manage to set it up initially.

@harpaj
Copy link

harpaj commented Oct 16, 2019

The general setup worked flawlessly! Managed to capture an error in a Arrow UDF with a proper Python Stacktrace - perfect, that's exactly what I was hoping for.

I didn't have much time to look into details, but I noticed one thing:
An event that gets created on the workers does not include any of the tags set on the driver.
We are setting tags like this, after the initialisation of Sentry on the driver:

    with sentry_sdk.configure_scope() as scope:
        scope.set_tag("source", args["source"])
        scope.set_tag("target", args["target"])
        scope.set_tag("run-id", args["run_id"])

In the event form the worker, these tags are missing
spark_worker_event

In the event from the driver, these tags (and also some additional ones that this integration is setting) are present
spark_driver_event

I guess this might just be a technical limitation by this setup, but I think it warrants a note in the documentation at least.

mypy.ini Outdated Show resolved Hide resolved
sentry_sdk/integrations/spark/spark_driver.py Outdated Show resolved Hide resolved
@AbhiPrasad
Copy link
Member Author

AbhiPrasad commented Oct 16, 2019

@harpaj thank you for your feedback!

Any code executed in the main spark job outside of spark actions only run the driver, that's why only the driver tagged it, again as you said just a technical limitation of this approach (not much more we can do as the driver process and worker process are running on different clusters).

You might be able to tag inside the UDF itself, because that is serialized and sent to the worker, but I have not tested that yet. This might also add a lot of overhead and slow down the UDF too much.

Edit: Tested setting tag inside spark action function

spark = SparkSession.builder.appName("Testing123").getOrCreate()

def bad_func(a, b):
    with sentry_sdk.configure_scope() as scope:
        scope.set_tag("source", "This should be in worker")
    a = b / 0
    return a + b

this_breaks = spark.sparkContext.parallelize(range(1, 20), 2).reduce(bad_func)

spark.stop()

This set the source tag for the worker as the worker executes the reduce function. The UDF should work in a similar way.

@@ -153,7 +153,7 @@ deps =
sqlalchemy-1.2: sqlalchemy>=1.2,<1.3
sqlalchemy-1.3: sqlalchemy>=1.3,<1.4

spark: spark==2.4.4
spark: pyspark==2.4.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, yes, that was me.

@AbhiPrasad AbhiPrasad merged commit 958d078 into master Oct 16, 2019
@AbhiPrasad AbhiPrasad deleted the abhi/spark-integration branch October 16, 2019 22:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants