Skip to content

Commit

Permalink
Changeset values should honor comparators (#238)
Browse files Browse the repository at this point in the history
* Alter change column behavior so that it honors comparators when determining what fields have changed

* Address requested changes

* Linter updates
  • Loading branch information
ets committed Apr 24, 2024
1 parent 9ef0526 commit d08a4c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/main/scala/uk/co/gresearch/spark/diff/Diff.scala
Expand Up @@ -19,6 +19,7 @@ package uk.co.gresearch.spark.diff
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{ArrayType, StringType}
import uk.co.gresearch.spark.diff.comparator.DiffComparator
import uk.co.gresearch.spark.{backticks, distinctPrefixFor}

import scala.collection.JavaConverters
Expand Down Expand Up @@ -144,20 +145,22 @@ class Differ(options: DiffOptions) {

private def getChangeColumn(
existsColumnName: String,
valueColumns: Seq[String],
valueVolumnsWithComparator: Seq[(String, DiffComparator)],
left: Dataset[_],
right: Dataset[_]
): Option[Column] = {
options.changeColumn
.map(changeColumn =>
when(left(existsColumnName).isNull || right(existsColumnName).isNull, lit(null))
.otherwise(
Some(valueColumns.toSeq)
Some(valueVolumnsWithComparator)
.filter(_.nonEmpty)
.map(columns =>
concat(
columns
.map(c => when(left(backticks(c)) <=> right(backticks(c)), array()).otherwise(array(lit(c)))): _*
.map { case (c, cmp) =>
when(cmp.equiv(left(backticks(c)), right(backticks(c))), array()).otherwise(array(lit(c)))
}: _*
)
)
.getOrElse(
Expand Down Expand Up @@ -282,6 +285,7 @@ class Differ(options: DiffOptions) {
cmp.equiv(leftWithExists(backticks(c)), rightWithExists(backticks(c)))
}
.reduceOption(_ && _)

val changeCondition = not(unChanged.getOrElse(lit(true)))

val diffActionColumn =
Expand All @@ -292,7 +296,7 @@ class Differ(options: DiffOptions) {
.as(options.diffColumn)

val diffColumns = getDiffColumns(pkColumns, valueColumns, left, right, ignoreColumns).map(_._2)
val changeColumn = getChangeColumn(existsColumnName, valueColumns, leftWithExists, rightWithExists)
val changeColumn = getChangeColumn(existsColumnName, valueVolumnsWithComparator, leftWithExists, rightWithExists)
// turn this column into a sequence of one or none column so we can easily concat it below with diffActionColumn and diffColumns
.map(Seq(_))
.getOrElse(Seq.empty[Column])
Expand Down
Expand Up @@ -33,6 +33,7 @@ import uk.co.gresearch.spark.diff.comparator._

import java.sql.{Date, Timestamp}
import java.time.Duration
import java.util

case class Numbers(
id: Int,
Expand Down Expand Up @@ -412,6 +413,36 @@ class DiffComparatorSuite extends AnyFunSuite with SparkTestSession {
DiffOptions.default.withComparator(DiffComparators.duration(Duration.ofSeconds(61)).asExclusive(), "time")
doTest(optionsWithTightComparator, optionsWithRelaxedComparator, leftTimes.toDF, rightTimes.toDF)
}

test("changeset accounts for comparators") {
val changesetOptions = DiffOptions.default
.withComparator(DiffComparators.epsilon(10).asAbsolute().asInclusive(), "longValue")
.withChangeColumn("changeset")

lazy val left: Dataset[Numbers] = Seq(
Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None),
Numbers(2, 2L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)),
Numbers(3, 3L, 3.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)),
Numbers(4, 4L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None),
Numbers(5, 5L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)),
).toDS()

lazy val right: Dataset[Numbers] = Seq(
Numbers(1, 1L, 1.0f, 1.0, Decimal(10, 8, 3), None, None),
Numbers(2, 8L, 2.0f, 2.0, Decimal(20, 8, 3), Some(2), Some(2L)),
Numbers(3, 9L, 6.0f, 3.0, Decimal(30, 8, 3), Some(3), Some(3L)),
Numbers(4, 10L, 4.0f, 4.0, Decimal(40, 8, 3), Some(4), None),
Numbers(5, 11L, 5.0f, 5.0, Decimal(50, 8, 3), None, Some(5L)),
).toDS()

val rs = left.diff(right, changesetOptions, "id").where($"diff" === "C")
assert(rs.count() == 1, "Only one row should differ with the numeric comparator applied")
val changesInDifferingRow: util.List[String] = rs.head.getList[String](1)
assert(
changesInDifferingRow.get(0) == "floatValue",
"Only floatVal differs after considering the comparators so the changeset should be size 1"
)
}
}

Seq(true, false).foreach { sensitive =>
Expand Down

0 comments on commit d08a4c3

Please sign in to comment.