Skip to content

Commit

Permalink
Spark 3.5: Only traverse ancestors of current snapshot when building …
Browse files Browse the repository at this point in the history
…changelog scan
  • Loading branch information
manuzhang committed Apr 30, 2024
1 parent 426818b commit debc745
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,59 @@ public void testMetadataColumns() {
rows);
}

@TestTemplate
public void testQueryWithRollback() {
createTable();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();
long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();
long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());

sql(
"CALL %s.system.rollback_to_snapshot('%s', %d)",
catalogName, tableIdent, snap1.snapshotId());
table.refresh();
assertThat(table.currentSnapshot()).isEqualTo(snap1);

sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

assertEquals(
"Should have expected changed rows up to snapshot 3",
ImmutableList.of(
row(1, "a", "INSERT", 0, snap1.snapshotId()),
row(1, "a", "DELETE", 1, snap3.snapshotId()),
row(-2, "a", "INSERT", 1, snap3.snapshotId())),
changelogRecords(null, rightAfterSnap3));

assertEquals(
"Should have expected changed rows up to snapshot 2",
ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())),
changelogRecords(null, rightAfterSnap2));

assertEquals(
"Should have expected changed rows from snapshot 2 and 3",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, snap3.timestampMillis()));

assertEquals(
"Should have expected changed rows from snapshot 3",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap2, null));
}

private void createTableWithDefaultRows() {
createTable();
insertDefaultRows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ public Scan buildChangelogScan() {
}

if (endTimestamp != null) {
endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp);
endSnapshotId = getEndSnapshotId(endTimestamp);
if ((startSnapshotId == null && endSnapshotId == null)
|| (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) {
emptyScan = true;
Expand Down Expand Up @@ -589,6 +589,24 @@ private Long getStartSnapshotId(Long startTimestamp) {
}
}

private Long getEndSnapshotId(Long endTimestamp) {
Long snapshotId = null;
Snapshot current = table.currentSnapshot();
if (current != null) {
if (current.timestampMillis() <= endTimestamp) {
snapshotId = current.snapshotId();
} else {
for (Snapshot ancestor : SnapshotUtil.currentAncestors(table)) {
if (ancestor.timestampMillis() <= endTimestamp) {
snapshotId = ancestor.snapshotId();
break;
}
}
}
}
return snapshotId;
}

public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null && readConf.tag() == null,
Expand Down

0 comments on commit debc745

Please sign in to comment.