Skip to content

Commit

Permalink
Merge branch 'upmaster' into json4s-2-jackson
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang committed Jun 7, 2023
2 parents e386fbf + 5021638 commit 9fce83c
Show file tree
Hide file tree
Showing 1,139 changed files with 53,757 additions and 19,303 deletions.
4 changes: 2 additions & 2 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ AVRO:
DSTREAM:
- "streaming/**/*"
- "data/streaming/**/*"
- "external/kinesis*"
- "external/kafka*"
- "connector/kinesis*"
- "connector/kafka*"
- "python/pyspark/streaming/**/*"
GRAPHX:
- "graphx/**/*"
Expand Down
41 changes: 20 additions & 21 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
required: ${{ steps.set-outputs.outputs.required }}
image_url: >-
${{
((inputs.branch == 'branch-3.2' || inputs.branch == 'branch-3.3') && 'dongjoon/apache-spark-github-action-image:20220207')
(inputs.branch == 'branch-3.3' && 'dongjoon/apache-spark-github-action-image:20220207')
|| steps.infra-image-outputs.outputs.image_url
}}
steps:
Expand All @@ -80,16 +80,13 @@ jobs:
id: set-outputs
run: |
if [ -z "${{ inputs.jobs }}" ]; then
# is-changed.py is missing in branch-3.2, and it might run in scheduled build, see also SPARK-39517
pyspark=true; sparkr=true; tpcds=true; docker=true;
if [ -f "./dev/is-changed.py" ]; then
pyspark_modules=`cd dev && python -c "import sparktestsupport.modules as m; print(','.join(m.name for m in m.all_modules if m.name.startswith('pyspark')))"`
pyspark=`./dev/is-changed.py -m $pyspark_modules`
sparkr=`./dev/is-changed.py -m sparkr`
tpcds=`./dev/is-changed.py -m sql`
docker=`./dev/is-changed.py -m docker-integration-tests`
fi
# 'build', 'scala-213', and 'java-11-17' are always true for now.
pyspark_modules=`cd dev && python -c "import sparktestsupport.modules as m; print(','.join(m.name for m in m.all_modules if m.name.startswith('pyspark')))"`
pyspark=`./dev/is-changed.py -m $pyspark_modules`
sparkr=`./dev/is-changed.py -m sparkr`
tpcds=`./dev/is-changed.py -m sql`
docker=`./dev/is-changed.py -m docker-integration-tests`
# 'build', 'scala-212', and 'java-11-17' are always true for now.
# It does not save significant time and most of PRs trigger the build.
precondition="
{
Expand All @@ -98,7 +95,7 @@ jobs:
\"sparkr\": \"$sparkr\",
\"tpcds-1g\": \"$tpcds\",
\"docker-integration-tests\": \"$docker\",
\"scala-213\": \"true\",
\"scala-212\": \"true\",
\"java-11-17\": \"true\",
\"lint\" : \"true\",
\"k8s-integration-tests\" : \"true\",
Expand Down Expand Up @@ -278,7 +275,7 @@ jobs:
(fromJson(needs.precondition.outputs.required).pyspark == 'true' ||
fromJson(needs.precondition.outputs.required).lint == 'true' ||
fromJson(needs.precondition.outputs.required).sparkr == 'true') &&
(inputs.branch != 'branch-3.2' && inputs.branch != 'branch-3.3')
(inputs.branch != 'branch-3.3')
runs-on: ubuntu-latest
permissions:
packages: write
Expand Down Expand Up @@ -344,6 +341,8 @@ jobs:
pyspark-connect, pyspark-errors
- >-
pyspark-pandas-connect
- >-
pyspark-pandas-slow-connect
env:
MODULES_TO_TEST: ${{ matrix.modules }}
HADOOP_PROFILE: ${{ inputs.hadoop }}
Expand Down Expand Up @@ -602,7 +601,7 @@ jobs:
- name: Java linter
run: ./dev/lint-java
- name: Spark connect jvm client mima check
if: inputs.branch != 'branch-3.2' && inputs.branch != 'branch-3.3'
if: inputs.branch != 'branch-3.3'
run: ./dev/connect-jvm-client-mima-check
- name: Install Python linter dependencies
run: |
Expand All @@ -617,7 +616,7 @@ jobs:
- name: Install dependencies for Python code generation check
run: |
# See more in "Installation" https://docs.buf.build/installation#tarball
curl -LO https://github.com/bufbuild/buf/releases/download/v1.17.0/buf-Linux-x86_64.tar.gz
curl -LO https://github.com/bufbuild/buf/releases/download/v1.20.0/buf-Linux-x86_64.tar.gz
mkdir -p $HOME/buf
tar -xvzf buf-Linux-x86_64.tar.gz -C $HOME/buf --strip-components 1
python3.9 -m pip install 'protobuf==3.19.5' 'mypy-protobuf==3.3.0'
Expand Down Expand Up @@ -729,10 +728,10 @@ jobs:
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
rm -rf ~/.m2/repository/org/apache/spark
scala-213:
scala-212:
needs: precondition
if: fromJson(needs.precondition.outputs.required).scala-213 == 'true'
name: Scala 2.13 build with SBT
if: fromJson(needs.precondition.outputs.required).scala-212 == 'true'
name: Scala 2.12 build with SBT
runs-on: ubuntu-22.04
steps:
- name: Checkout Spark repository
Expand Down Expand Up @@ -762,18 +761,18 @@ jobs:
uses: actions/cache@v3
with:
path: ~/.cache/coursier
key: scala-213-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
key: scala-212-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
restore-keys: |
scala-213-coursier-
scala-212-coursier-
- name: Install Java 8
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
- name: Build with SBT
run: |
./dev/change-scala-version.sh 2.13
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile
./dev/change-scala-version.sh 2.12
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.12 compile Test/compile
# Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen job of benchmark.yml as well
tpcds-1g:
Expand Down
49 changes: 0 additions & 49 deletions .github/workflows/build_branch32.yml

This file was deleted.

2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ javolution:javolution
com.esotericsoftware:kryo-shaded
com.esotericsoftware:minlog
com.esotericsoftware:reflectasm
com.google.protobuf:protobuf-java
org.codehaus.janino:commons-compiler
org.codehaus.janino:janino
jline:jline
Expand All @@ -443,6 +442,7 @@ pl.edu.icm:JLargeArrays
BSD 3-Clause
------------

com.google.protobuf:protobuf-java
dk.brics.automaton:automaton
org.antlr:antlr-runtime
org.antlr:ST4
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
numPartitions <- if (!is.null(numPartitions)) {
numToInt(numPartitions)
} else {
1
# If numPartitions is not set, chunk the R DataFrame based on the batch size.
ceiling(
nrow(rdf) / as.numeric(sparkR.conf("spark.sql.execution.arrow.maxRecordsPerBatch")[[1]]))
}

rdf_slices <- if (numPartitions > 1) {
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,19 @@ test_that("SPARK-32478: gapply() Arrow optimization - error message for schema m
"expected IntegerType, IntegerType, got IntegerType, StringType")
})

test_that("SPARK-43789: Automatically pick the number of partitions based on Arrow batch size", {
skip_if_not_installed("arrow")

conf <- callJMethod(sparkSession, "conf")
maxRecordsPerBatch <- sparkR.conf("spark.sql.execution.arrow.maxRecordsPerBatch")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.maxRecordsPerBatch", "10")
tryCatch({
expect_equal(getNumPartitionsRDD(toRDD(createDataFrame(mtcars))), 4)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.maxRecordsPerBatch", maxRecordsPerBatch)
})
})

sparkR.session.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,4 @@ protected boolean doAuthChallenge(
return true;
}

@Override
public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
return saslHandler.getMergedBlockMetaReqHandler();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void processStreamRequest(final StreamRequest req) {
streamManager.streamSent(req.streamId);
});
} else {
// org.apache.spark.repl.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated
// org.apache.spark.executor.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated
// when the following error message is changed.
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ public StreamManager getStreamManager() {
assertEquals(testErrorMessageLength, messageEnd - messageStart);
}

@Test
public void testValidMergedBlockMetaReqHandler() throws Exception {
ctx = new AuthTestCtx();
ctx.createServer("secret");
ctx.createClient("secret");
assertNotNull(ctx.authRpcHandler.getMergedBlockMetaReqHandler());
}

private static class AuthTestCtx {

private final String appId = "testAppId";
Expand Down
4 changes: 2 additions & 2 deletions common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@
<!-- Provided dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hadoop-client-api.artifact}</artifactId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>${hadoop-client-runtime.artifact}</artifactId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,8 @@ public boolean equals(final Object other) {
* substitutions) that are required to change one of the strings into the other.
*/
public int levenshteinDistance(UTF8String other) {
// Implementation adopted from org.apache.common.lang3.StringUtils.getLevenshteinDistance
// Implementation adopted from
// org.apache.commons.text.similarity.LevenshteinDistance.unlimitedCompare

int n = numChars();
int m = other.numChars();
Expand Down Expand Up @@ -1468,6 +1469,96 @@ public int levenshteinDistance(UTF8String other) {
return p[n];
}

public int levenshteinDistance(UTF8String other, int threshold) {
// Implementation adopted from
// org.apache.commons.text.similarity.LevenshteinDistance.limitedCompare

int n = numChars();
int m = other.numChars();

if (n == 0) {
return m <= threshold ? m : -1;
}
if (m == 0) {
return n <= threshold ? n : -1;
}

UTF8String s, t;

if (n <= m) {
s = this;
t = other;
} else {
s = other;
t = this;
int swap;
swap = n;
n = m;
m = swap;
}

if (m - n > threshold) {
return -1;
}

int[] p = new int[n + 1];
int[] d = new int[n + 1];
int[] swap;

int i, i_bytes, num_bytes_i, j, j_bytes, num_bytes_j;

final int boundary = Math.min(n, threshold) + 1;
for (i = 0; i < boundary; i++) { p[i] = i; }
Arrays.fill(p, boundary, p.length, Integer.MAX_VALUE);
Arrays.fill(d, Integer.MAX_VALUE);

for (j = 0, j_bytes = 0; j < m; j_bytes += num_bytes_j, j++) {
num_bytes_j = numBytesForFirstByte(t.getByte(j_bytes));

d[0] = j + 1;

final int min = Math.max(1, j + 1 - threshold);
final int max = j + 1 > Integer.MAX_VALUE - threshold ? n : Math.min(n, j + 1 + threshold);
if (min > 1) {
d[min - 1] = Integer.MAX_VALUE;
}

int lowerBound = Integer.MAX_VALUE;

for (i = 0, i_bytes = 0; i <= max; i_bytes += num_bytes_i, i++) {
if (i < min - 1) {
num_bytes_i = numBytesForFirstByte(s.getByte(i_bytes));
} else if (i == min - 1) {
num_bytes_i = 0;
} else {
if (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes,
s.base, s.offset + i_bytes, num_bytes_j)) {
d[i] = p[i - 1];
} else {
d[i] = 1 + Math.min(Math.min(d[i - 1], p[i]), p[i - 1]);
}
lowerBound = Math.min(lowerBound, d[i]);
num_bytes_i = numBytesForFirstByte(s.getByte(i_bytes));
}
}

if (lowerBound > threshold) {
return -1;
}

swap = p;
p = d;
d = swap;
}

// if p[n] is greater than the threshold, there's no guarantee on it
// being the correct distance
if (p[n] <= threshold) {
return p[n];
}
return -1;
}

@Override
public int hashCode() {
return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
Expand Down

0 comments on commit 9fce83c

Please sign in to comment.