diff --git a/.github/workflows/cross-version-tests.yml b/.github/workflows/cross-version-tests.yml index 52441ff0c9af1..5dfb6e7c4f716 100644 --- a/.github/workflows/cross-version-tests.yml +++ b/.github/workflows/cross-version-tests.yml @@ -103,11 +103,19 @@ jobs: with: repository: ${{ github.event.inputs.repository }} ref: ${{ github.event.inputs.ref }} + - name: Get Java version + id: get-java-version + run: | + if [ "${{ matrix.package }}" = "mleap" ] + then + java_version=8 + else + java_version=11 + fi + echo "::set-output name=version::$java_version" - uses: actions/setup-java@v2 with: - # GitHub Actions' Ubuntu 20.04 image uses Java 11 (which is incompatible with Spark 2.4.x) by default: - # https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-README.md#java - java-version: 8 + java-version: ${{ steps.get-java-version.outputs.version }} distribution: "adopt" - name: Get python version id: get-python-version diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index f379f318b7bc2..f0d6a309f4665 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -57,9 +57,7 @@ jobs: - uses: actions/checkout@master - uses: actions/setup-java@v2 with: - # GitHub Actions' Ubuntu 20.04 image uses Java 11 (which is incompatible with Spark 2.4.x) by default: - # https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-README.md#java - java-version: 8 + java-version: 11 distribution: 'adopt' - name: Re-configure dynamic linker run-time bindings for adoptopenjdk-8-hotspot-amd64 run: | @@ -197,9 +195,7 @@ jobs: python-version: 3.6 - uses: actions/setup-java@v2 with: - # GitHub Actions' Ubuntu 20.04 image uses Java 11 (which is incompatible with Spark 2.4.x) by default: - # https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-README.md#java - java-version: 8 + java-version: 11 distribution: 'adopt' - uses: ./.github/actions/cache-pip - name: Install dependencies @@ -245,7 +241,7 @@ jobs: - name: Set up Java uses: actions/setup-java@v2 with: - java-version: 8 + java-version: 11 distribution: 'adopt' - name: Install dependencies run: | @@ -314,7 +310,7 @@ jobs: python-version: 3.6 - uses: actions/setup-java@v2 with: - java-version: 8 + java-version: 11 distribution: 'adopt' - uses: ./.github/actions/cache-pip - name: Install dependencies @@ -342,7 +338,7 @@ jobs: python-version: 3.6 - uses: actions/setup-java@v2 with: - java-version: 8 + java-version: 11 distribution: 'adopt' - name: Install dependencies env: @@ -387,7 +383,7 @@ jobs: python-version: 3.6 - uses: actions/setup-java@v2 with: - java-version: 8 + java-version: 11 distribution: 'adopt' - name: Install dependencies env: diff --git a/mlflow/mleap.py b/mlflow/mleap.py index 99429be8250cd..c99aecabc3034 100644 --- a/mlflow/mleap.py +++ b/mlflow/mleap.py @@ -211,6 +211,9 @@ def add_to_model(mlflow_model, path, spark_model, sample_input): from pyspark.ml.pipeline import PipelineModel from pyspark.sql import DataFrame import mleap.version + + # This import statement adds `serializeToBundle` and `deserializeFromBundle` to `Transformer`: + # https://github.com/combust/mleap/blob/37f6f61634798118e2c2eb820ceeccf9d234b810/python/mleap/pyspark/spark_support.py#L32-L33 from mleap.pyspark.spark_support import SimpleSparkSerializer # pylint: disable=unused-import from py4j.protocol import Py4JError diff --git a/mlflow/spark.py b/mlflow/spark.py index 71ef06cd13335..c96626d2d2ee3 100644 --- a/mlflow/spark.py +++ b/mlflow/spark.py @@ -23,7 +23,6 @@ import posixpath import re import shutil -import traceback import uuid import yaml @@ -71,10 +70,6 @@ _logger = logging.getLogger(__name__) -def _format_exception(ex): - return "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) - - def get_default_pip_requirements(): """ :return: A list of default pip requirements for MLflow Models produced by this flavor. diff --git a/tests/mleap/test_mleap_model_export.py b/tests/mleap/test_mleap_model_export.py index e4da9050c0096..927cc79ee8a05 100644 --- a/tests/mleap/test_mleap_model_export.py +++ b/tests/mleap/test_mleap_model_export.py @@ -189,3 +189,20 @@ def test_mleap_module_model_save_with_invalid_sample_input_type_raises_exception mlflow.spark.save_model( spark_model=spark_model_iris.model, path=model_path, sample_input=invalid_input ) + + +@pytest.mark.large +def test_spark_module_model_save_with_mleap_and_unsupported_transformer_raises_exception( + spark_model_iris, model_path +): + class CustomTransformer(JavaModel): + def _transform(self, dataset): + return dataset + + unsupported_pipeline = Pipeline(stages=[CustomTransformer()]) + unsupported_model = unsupported_pipeline.fit(spark_model_iris.spark_df) + + with pytest.raises(ValueError, match="CustomTransformer"): + mlflow.spark.save_model( + spark_model=unsupported_model, path=model_path, sample_input=spark_model_iris.spark_df + ) diff --git a/tests/spark/test_spark_model_export.py b/tests/spark/test_spark_model_export.py index 8b4d95edb8407..17c29f6e2770b 100644 --- a/tests/spark/test_spark_model_export.py +++ b/tests/spark/test_spark_model_export.py @@ -9,12 +9,12 @@ from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import VectorAssembler from pyspark.ml.pipeline import Pipeline -from pyspark.ml.wrapper import JavaModel import pytest from sklearn import datasets import shutil from collections import namedtuple import yaml +from packaging.version import Version import mlflow import mlflow.pyfunc.scoring_server as pyfunc_scoring_server @@ -62,6 +62,22 @@ def spark_custom_env(tmpdir): # other tests. @pytest.fixture(scope="session", autouse=True) def spark_context(): + if Version(pyspark.__version__) < Version("3.1"): + # A workaround for this issue: + # https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta + spark_home = ( + os.environ.get("SPARK_HOME") + if "SPARK_HOME" in os.environ + else os.path.dirname(pyspark.__file__) + ) + conf_dir = os.path.join(spark_home, "conf") + os.makedirs(conf_dir, exist_ok=True) + with open(os.path.join(conf_dir, "spark-defaults.conf"), "w") as f: + conf = """ +spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" +spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" +""" + f.write(conf) conf = pyspark.SparkConf() max_tries = 3 for num_tries in range(max_tries): @@ -635,23 +651,6 @@ def test_pyspark_version_is_logged_without_dev_suffix(spark_model_iris): assert any(x == f"pyspark=={unaffected_version}" for x in pip_deps) -@pytest.mark.large -def test_spark_module_model_save_with_mleap_and_unsupported_transformer_raises_exception( - spark_model_iris, model_path -): - class CustomTransformer(JavaModel): - def _transform(self, dataset): - return dataset - - unsupported_pipeline = Pipeline(stages=[CustomTransformer()]) - unsupported_model = unsupported_pipeline.fit(spark_model_iris.spark_df) - - with pytest.raises(ValueError, match="CustomTransformer"): - sparkm.save_model( - spark_model=unsupported_model, path=model_path, sample_input=spark_model_iris.spark_df - ) - - def test_shutil_copytree_without_file_permissions(tmpdir): src_dir = tmpdir.mkdir("src-dir") dst_dir = tmpdir.mkdir("dst-dir")