Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] significant slow down with VectorUDT and ParquetCachedBatchSerializer #8474

Open
eordentlich opened this issue Jun 1, 2023 · 9 comments
Labels
bug Something isn't working

Comments

@eordentlich
Copy link
Contributor

Describe the bug
The evaluation step in this example notebook: https://github.com/NVIDIA/spark-rapids-examples/blob/main/examples/XGBoost-Examples/mortgage/notebooks/python/MortgageETL%2BXGBoost.ipynb?short_path=f801328#L1035
is about 600x slower when ParquetCachedBatchSerializer is enabled.
The result DataFrame being processed here has several columns of type VectorUDT. If these columns are either dropped or converted to array type using psypark.ml.functions.vector_to_array before cacheing and then converted back upon read using array_to_vector, the slow down can be avoided. This indicates that ParquetCachedBatchSerializer has an issue with processing VectorUDT columns on the read side.

Steps/Code to reproduce bug
Run the above example notebook with ParquetCachedBatchSerializer enabled.

Expected behavior
No 600x slow down.

Environment details (please complete the following information)
Standalone.

pyspark                             \
--master ${SPARK_URL}            \
--jars ${PWD}/rapids-4-spark_2.12-23.04.0.jar \
--conf spark.plugins=com.nvidia.spark.SQLPlugin \
--conf spark.rapids.memory.gpu.allocFraction=0.5 --conf spark.rapids.memory.pinnedPool.size=2g \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.executor.cores=1 \
--conf spark.task.resource.gpu.amount=1 \
--conf spark.sql.execution.arrow.maxRecordsPerBatch=200000 \
--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh \
--files $SPARK_HOME/examples/src/main/scripts/getGpusResources.sh --conf spark.executor.memory=10g --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer

Additional context
Related issue, with likely same underlying problem: #5975

@eordentlich eordentlich added ? - Needs Triage Need team to review and classify bug Something isn't working labels Jun 1, 2023
@revans2
Copy link
Collaborator

revans2 commented Jun 1, 2023

I have not run this, but I am just guessing. The GPU does not support UserDefinedTypes right now, and VectorUDT is a user defined type, so we are going to fall back to the CPU to serialize and deserialize them. This might not be ideal because the CPU is really bad at writing parquet in many cases compared to the GPU. 600x better (that is hard to believe so I need to do some testing).

@revans2
Copy link
Collaborator

revans2 commented Jun 1, 2023

@eordentlich do you have any instructions on how to get an environment setup to do this? I tried to use conda to setup an environment following the instructions at https://xgboost.readthedocs.io/en/stable/tutorials/spark_estimator.html

conda create -y -n xgboost_env -c conda-forge conda-pack python=3.9
conda activate xgboost_env
# use conda when the supported version of xgboost (1.7) is released on conda-forge
pip install xgboost
conda install cudf pyarrow pandas -c rapids -c nvidia -c conda-forge

But it didn't work and I had to change the last command to

conda install cudf pyarrow pandas -c rapidsai -c nvidia -c conda-forge

Then when I tried to import xgboost in the notebook

from xgboost.spark import SparkXGBClassifier, SparkXGBClassifierModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

I got an error about no sklearn so I installed it

conda install scikit-learn

And now I am getting what appears to be CUDA mismatch of some kind.

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 822, in process
    serializer.dump_stream(out_iter, outfile)
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 345, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 86, in dump_stream
    for batch in iterator:
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 338, in init_stream_yield_batches
    for series in iterator:
  File "spark_3.4.0/python/lib/pyspark.zip/pyspark/worker.py", line 519, in func
    for result_batch, result_type in result_iter:
  File "xgboost_env/lib/python3.9/site-packages/xgboost/spark/core.py", line 795, in _train_booster
    use_qdm = use_hist and is_cudf_available()
  File "xgboost_env/lib/python3.9/site-packages/xgboost/compat.py", line 83, in is_cudf_available
    import cudf
  File "xgboost_env/lib/python3.9/site-packages/cudf/__init__.py", line 5, in <module>
    validate_setup()
  File "xgboost_env/lib/python3.9/site-packages/cudf/utils/gpu_utils.py", line 20, in validate_setup
    from rmm._cuda.gpu import (
  File "xgboost_env/lib/python3.9/site-packages/rmm/__init__.py", line 16, in <module>
    from rmm import mr
  File "xgboost_env/lib/python3.9/site-packages/rmm/mr.py", line 14, in <module>
    from rmm._lib.memory_resource import (
  File "xgboost_env/lib/python3.9/site-packages/rmm/_lib/__init__.py", line 15, in <module>
    from .device_buffer import DeviceBuffer
  File "device_buffer.pyx", line 1, in init rmm._lib.device_buffer
TypeError: C function cuda.ccudart.cudaStreamSynchronize has wrong signature (expected __pyx_t_4cuda_7ccudart_cudaError_t (__pyx_t_4cuda_7ccudart_cudaStream_t), got cudaError_t (cudaStream_t))

@eordentlich
Copy link
Contributor Author

Indeed, looks like those instructions can use some work. I think a conda cudatoolkit package needs to be added to your conda environment create command: e.g cudatoolkit=11.5 with version ( >= 11.2, <= 11.8) that matches the one installed on your host. If you are running on a single node, you can activate the conda environment and run in either local mode or standalone, with master and worker started in the environment.

That said, I think you can replicate the key issue via the following running in a pyspark shell started as

pyspark --master local[4] --conf spark.driver.memory=20g --jars rapids-4-spark_2.12-23.04.0.jar --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer

And then in the shell, paste

from pyspark.sql.functions import rand, element_at, sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
import timeit

df = spark.range(10000000)
df = df.select(rand().alias("r0"),rand().alias("r1"))

df_vec = VectorAssembler(inputCols=["r0","r1"],outputCol="vec").transform(df).drop("r0","r1")
timeit.timeit(lambda: print(df_vec.select(sum(element_at(vector_to_array("vec"), 1))).collect()), number=1)
df_vec.cache()
timeit.timeit(lambda: print(df_vec.select(sum(element_at(vector_to_array("vec"), 1))).collect()), number=1)

The first timeit call finishes reasonably fast, while the second, after the cache(), takes "forever". You can try dialing down the range size to compare running times.

@revans2
Copy link
Collaborator

revans2 commented Jun 2, 2023

Thanks for the simplified setup. I was able to reproduce the caching issue. At least I was able to get the Spark to crash with a timeout when using the parquet cached batch serializer for your initial request (I made the data bigger because I was using more cores, but I guess I made it too big!!!).

@revans2
Copy link
Collaborator

revans2 commented Jun 2, 2023

I found at least one really bad problem where we were doing code generation for each row in a specific code path. I need to do some more profiling to see what else might be bad about it.

@revans2
Copy link
Collaborator

revans2 commented Jun 5, 2023

@eordentlich do you have the ability to try out #8495? It is not going to solve all of your problems but it would be good to know if it is good enough for now or if we have to start looking at some of the other optimizations too.

@eordentlich
Copy link
Contributor Author

Thanks. I'll have to build the jar (unless it is already in cicd somewhere) and give it a try on that notebook.

@eordentlich
Copy link
Contributor Author

@revans2 I tested the PR and it is a huge improvement. Still 4x slower than mapping vector to array type and back, with PCBS, and about 6x slower than regular non-PCBS caching for the notebook example eval stage.

@revans2
Copy link
Collaborator

revans2 commented Jun 6, 2023

@revans2 I tested the PR and it is a huge improvement. Still 4x slower than mapping vector to array type and back, with PCBS, and about 6x slower than regular non-PCBS caching for the notebook example eval stage.

@eordentlich glad to hear that it is helping. I'll see if we can get some help in improving the performance even more.

@sameerz looks like we should spend some time on the other issues I filed especially #8496 I think we can make it work without too much difficulty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants