Skip to content

Commit

Permalink
Merge pull request #8585 from NVIDIA/branch-23.06
Browse files Browse the repository at this point in the history
Merge branch-23.06 to main [skip ci]
  • Loading branch information
pxLi committed Jun 19, 2023
2 parents 5806439 + a84e13a commit 643370d
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 11 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2023-06-13
Generated on 2023-06-19

## Release 23.06

Expand Down Expand Up @@ -115,6 +115,8 @@ Generated on 2023-06-13
### PRs
|||
|:---|:---|
|[#8581](https://github.com/NVIDIA/spark-rapids/pull/8581)|Fix 321db 330db shims build errors caused by DB updates|
|[#8570](https://github.com/NVIDIA/spark-rapids/pull/8570)|Update changelog to latest [skip ci]|
|[#8567](https://github.com/NVIDIA/spark-rapids/pull/8567)|Fixed a link in config doc[skip ci]|
|[#8562](https://github.com/NVIDIA/spark-rapids/pull/8562)|Update changelog to latest 230612 [skip ci]|
|[#8560](https://github.com/NVIDIA/spark-rapids/pull/8560)|Fix relative path in config doc [skip ci]|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ case class GpuMergeIntoCommand(
deltaTxn.commit(
deltaActions,
DeltaOperations.Merge(
Option(condition.sql),
Option(condition),
matchedClauses.map(DeltaOperations.MergePredicate(_)),
notMatchedClauses.map(DeltaOperations.MergePredicate(_))))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ case class GpuDeleteCommand(
gpuDeltaLog.withNewTransaction { txn =>
val deleteActions = performDelete(sparkSession, deltaLog, txn)
if (deleteActions.nonEmpty) {
txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
txn.commit(deleteActions, DeltaOperations.Delete(condition.toSeq))
}
}
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class GpuOptimizeExecutor(
val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
if (addedFiles.nonEmpty) {
val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns)
val operation = DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
Expand Down Expand Up @@ -398,4 +398,4 @@ class GpuOptimizeExecutor(
"numRemovedBytes" ->
setAndReturnMetric("total number of bytes removed", totalSize(removedFiles)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ case class GpuUpdateCommand(
metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
}
txn.registerSQLMetrics(sparkSession, metrics)
txn.commit(totalActions, DeltaOperations.Update(condition.map(_.toString)))
txn.commit(totalActions, DeltaOperations.Update(condition))
// This is needed to make the SQL metrics visible in the Spark UI
val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class GpuDeleteCommand(
gpuDeltaLog.withNewTransaction { txn =>
val deleteActions = performDelete(sparkSession, deltaLog, txn)
if (deleteActions.nonEmpty) {
txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
txn.commit(deleteActions, DeltaOperations.Delete(condition.toSeq))
}
}
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class GpuOptimizeExecutor(
val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
if (addedFiles.nonEmpty) {
val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns)
val operation = DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
Expand Down Expand Up @@ -402,4 +402,4 @@ class GpuOptimizeExecutor(
"numRemovedBytes" ->
setAndReturnMetric("total number of bytes removed", totalSize(removedFiles)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ case class GpuUpdateCommand(
metrics("numTouchedRows").value - metrics("numUpdatedRows").value)
}
txn.registerSQLMetrics(sparkSession, metrics)
txn.commit(totalActions, DeltaOperations.Update(condition.map(_.toString)))
txn.commit(totalActions, DeltaOperations.Update(condition))
// This is needed to make the SQL metrics visible in the Spark UI
val executionId = sparkSession.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
Expand Down
2 changes: 1 addition & 1 deletion jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ set_sw_versions()
sw_versions[JSON4S_AST]="3.7.0-M11"
sw_versions[JSON4S_CORE]="3.7.0-M11"
sw_versions[KRYO]="4.0.2"
sw_versions[ORC]="1.6.13"
sw_versions[ORC]="1.6.14"
sw_versions[PARQUET]="1.12.0"
sw_versions[PROTOBUF]="2.6.1"
;;
Expand Down

0 comments on commit 643370d

Please sign in to comment.