Skip to content

Commit

Permalink
Merge pull request apache#14216: [BEAM-7093] Support Spark 3 in Spark…
Browse files Browse the repository at this point in the history
… runner
  • Loading branch information
iemejia committed Mar 13, 2021
2 parents c925ff8 + d9f3157 commit 153876f
Show file tree
Hide file tree
Showing 118 changed files with 394 additions and 60 deletions.
4 changes: 2 additions & 2 deletions .test-infra/jenkins/CommonTestProperties.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class CommonTestProperties {
JAVA: [
DATAFLOW: ":runners:google-cloud-dataflow-java",
TEST_DATAFLOW: ":runners:google-cloud-dataflow-java",
SPARK: ":runners:spark",
SPARK_STRUCTURED_STREAMING: ":runners:spark",
SPARK: ":runners:spark:2",
SPARK_STRUCTURED_STREAMING: ":runners:spark:2",
FLINK: ":runners:flink:${CommonTestProperties.getFlinkVersion()}",
DIRECT: ":runners:direct-java"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_XVR_Spark',
shell("echo \"*** RUN CROSS-LANGUAGE SPARK USING PYTHON ${pythonVersion} ***\"")
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:job-server:validatesCrossLanguageRunner')
tasks(':runners:spark:2:job-server:validatesCrossLanguageRunner')
tasks(':runners:spark:3:job-server:validatesCrossLanguageRunner')
commonJobProperties.setGradleSwitches(delegate)
switches("-PpythonVersion=${pythonVersion}")
}
Expand Down
8 changes: 4 additions & 4 deletions .test-infra/jenkins/job_PostCommit_Java_Nexmark_Spark.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:spark"' +
switches('-Pnexmark.runner=":runners:spark:2"' +
' -Pnexmark.args="' +
[
commonJobProperties.mapToArgString(nexmarkBigQueryArgs),
Expand All @@ -62,7 +62,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:spark"' +
switches('-Pnexmark.runner=":runners:spark:2"' +
' -Pnexmark.args="' +
[
commonJobProperties.mapToArgString(nexmarkBigQueryArgs),
Expand All @@ -81,7 +81,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:spark"' +
switches('-Pnexmark.runner=":runners:spark:2"' +
' -Pnexmark.args="' +
[
commonJobProperties.mapToArgString(nexmarkBigQueryArgs),
Expand All @@ -101,7 +101,7 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':sdks:java:testing:nexmark:run')
commonJobProperties.setGradleSwitches(delegate)
switches('-Pnexmark.runner=":runners:spark"' +
switches('-Pnexmark.runner=":runners:spark:2"' +
' -Pnexmark.args="' +
[
commonJobProperties.mapToArgString(nexmarkBigQueryArgs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Spark_Batch',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:job-server:validatesPortableRunnerBatch')
tasks(':runners:spark:2:job-server:validatesPortableRunnerBatch')
tasks(':runners:spark:3:job-server:validatesPortableRunnerBatch')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Spark_Streaming',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:job-server:validatesPortableRunnerStreaming')
tasks(':runners:spark:2:job-server:validatesPortableRunnerStreaming')
tasks(':runners:spark:3:job-server:validatesPortableRunnerStreaming')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_Spark',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:validatesRunner')
tasks(':runners:spark:2:validatesRunner')
tasks(':runners:spark:3:validatesRunner')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_SparkSt
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:validatesStructuredStreamingRunnerBatch')
tasks(':runners:spark:2:validatesStructuredStreamingRunnerBatch')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import CommonJobProperties as commonJobProperties
import PostcommitJobBuilder


PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_Spark_Java11',
'Run Spark ValidatesRunner Java 11', 'Apache Spark Runner ValidatesRunner Tests On Java 11', this) {

description('Runs the ValidatesRunner suite on the Spark runner with Java 11.')

def JAVA_11_HOME = '/usr/lib/jvm/java-11-openjdk-amd64'
def JAVA_8_HOME = '/usr/lib/jvm/java-8-openjdk-amd64'

commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 270)
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}

steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:3:jar')
tasks(':runners:spark:3:testJar')
switches("-Dorg.gradle.java.home=${JAVA_8_HOME}")
}

gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:3:validatesRunner')
switches('-x shadowJar')
switches('-x shadowTestJar')
switches('-x compileJava')
switches('-x compileTestJava')
switches('-x jar')
switches('-x testJar')
switches('-x classes')
switches('-x testClasses')
switches("-Dorg.gradle.java.home=${JAVA_11_HOME}")

commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors())
}
}
}
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ task("javaHadoopVersionsTest") {
dependsOn(":sdks:java:io:hcatalog:hadoopVersionsTest")
dependsOn(":sdks:java:io:parquet:hadoopVersionsTest")
dependsOn(":sdks:java:extensions:sorter:hadoopVersionsTest")
dependsOn(":runners:spark:hadoopVersionsTest")
dependsOn(":runners:spark:2:hadoopVersionsTest")
}

task("sqlPostCommit") {
Expand Down Expand Up @@ -329,7 +329,8 @@ task("typescriptPreCommit") {
}

task("pushAllDockerImages") {
dependsOn(":runners:spark:job-server:container:dockerPush")
dependsOn(":runners:spark:2:job-server:container:dockerPush")
dependsOn(":runners:spark:3:job-server:container:dockerPush")
dependsOn(":sdks:java:container:pushAll")
dependsOn(":sdks:python:container:pushAll")
for (version in project.ext.get("allFlinkVersions") as Array<*>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,6 @@ class BeamModulePlugin implements Plugin<Project> {
jackson_dataformat_xml : "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version",
jackson_dataformat_yaml : "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version",
jackson_datatype_joda : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version",
jackson_module_scala : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",
joda_time : "joda-time:joda-time:2.10.10",
Expand Down Expand Up @@ -1632,7 +1631,7 @@ class BeamModulePlugin implements Plugin<Project> {
}

if (runner?.equalsIgnoreCase('spark')) {
testRuntime it.project(path: ":runners:spark", configuration: 'testRuntime')
testRuntime it.project(path: ":runners:spark:2", configuration: 'testRuntime')
testRuntime project.library.java.spark_core
testRuntime project.library.java.spark_streaming

Expand Down Expand Up @@ -2338,7 +2337,7 @@ class BeamModulePlugin implements Plugin<Project> {
dependsOn = ['installGcpTest']
mustRunAfter = [
":runners:flink:${project.ext.latestFlinkVersion}:job-server:shadowJar",
':runners:spark:job-server:shadowJar',
':runners:spark:2:job-server:shadowJar',
':sdks:python:container:py36:docker',
':sdks:python:container:py37:docker',
':sdks:python:container:py38:docker',
Expand All @@ -2353,7 +2352,7 @@ class BeamModulePlugin implements Plugin<Project> {
"--parallelism=2",
"--sdk_worker_parallelism=1",
"--flink_job_server_jar=${project.project(flinkJobServerProject).shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:job-server').shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:2:job-server').shadowJar.archivePath}",
]
if (isStreaming)
options += [
Expand Down
2 changes: 1 addition & 1 deletion examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ dependencies {
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
sparkRunnerPreCommit project(":runners:spark")
sparkRunnerPreCommit project(":runners:spark:2")
sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system")
sparkRunnerPreCommit library.java.spark_streaming
sparkRunnerPreCommit library.java.spark_core
Expand Down
2 changes: 1 addition & 1 deletion examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ dependencies {
// TODO: Make the netty version used configurable, we add netty-all 4.1.17.Final so it appears on the classpath
// before 4.1.8.Final defined by Apache Beam
sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
sparkRunnerPreCommit project(":runners:spark")
sparkRunnerPreCommit project(":runners:spark:2")
sparkRunnerPreCommit project(":sdks:java:io:hadoop-file-system")
sparkRunnerPreCommit library.java.spark_streaming
sparkRunnerPreCommit library.java.spark_core
Expand Down
2 changes: 1 addition & 1 deletion release/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ task("runJavaExamplesValidationTask") {
description = "Run the Beam quickstart across all Java runners"
dependsOn(":runners:direct-java:runQuickstartJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow")
dependsOn(":runners:spark:runQuickstartJavaSpark")
dependsOn(":runners:spark:2:runQuickstartJavaSpark")
dependsOn(":runners:flink:1.10:runQuickstartJavaFlinkLocal")
dependsOn(":runners:direct-java:runMobileGamingJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow")
Expand Down
2 changes: 1 addition & 1 deletion release/src/main/scripts/run_rc_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ if [[ "$java_quickstart_spark_local" = true ]]; then
echo "*************************************************************"
echo "* Running Java Quickstart with Spark local runner"
echo "*************************************************************"
./gradlew :runners:spark:runQuickstartJavaSpark \
./gradlew :runners:spark:2:runQuickstartJavaSpark \
-Prepourl=${REPO_URL} \
-Pver=${RELEASE_VER}
else
Expand Down
3 changes: 3 additions & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ dependencies {
compile library.java.slf4j_api
compile library.java.jackson_annotations
compile library.java.avro
// Avro 1.8 leaks an older version of paranamer that conflicts in runtime with the dependencies
// of some runners so we need to fix it to a more recent but still compatible version.
runtimeOnly "com.thoughtworks.paranamer:paranamer:2.8"
testCompile library.java.junit
testCompile library.java.mockito_core
testCompile library.java.jackson_annotations
Expand Down
35 changes: 35 additions & 0 deletions runners/spark/2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Spark build script */
project.ext {
// Set the version of all Spark-related dependencies here.
spark_version = '2.4.7'
spark_scala_version = '2.11'

// Version specific code overrides.
main_source_overrides = ['./src/main/java']
test_source_overrides = ['./src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-spark'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/spark_runner.gradle"
31 changes: 31 additions & 0 deletions runners/spark/2/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-spark-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/spark_job_server.gradle"
27 changes: 27 additions & 0 deletions runners/spark/2/job-server/container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../../job-server/container'

project.ext {
resource_path = basePath
spark_job_server_image = 'spark_job_server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/spark_job_server_container.gradle"
35 changes: 35 additions & 0 deletions runners/spark/3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '..'
/* All properties required for loading the Spark build script */
project.ext {
// Set the version of all Spark-related dependencies here.
spark_version = '3.0.1'
spark_scala_version = '2.12'

// Version specific code overrides.
main_source_overrides = ['./src/main/java']
test_source_overrides = ['./src/test/java']
main_resources_overrides = []
test_resources_overrides = []
archives_base_name = 'beam-runners-spark3'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/spark_runner.gradle"

0 comments on commit 153876f

Please sign in to comment.