Skip to content

Commit

Permalink
Spark 3.5: Skip rolled back snapshot when building changelog scan
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Apr 30, 2024
1 parent 426818b commit 13715b0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
13 changes: 9 additions & 4 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Objects;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
Expand Down Expand Up @@ -349,9 +348,15 @@ public static long snapshotIdAsOfTime(Table table, long timestampMillis) {

public static Long nullableSnapshotIdAsOfTime(Table table, long timestampMillis) {
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
snapshotId = logEntry.snapshotId();
Snapshot current = table.currentSnapshot();
if (current.timestampMillis() <= timestampMillis) {
snapshotId = current.snapshotId();
} else {
for (Snapshot ancestor : currentAncestors(table)) {
if (ancestor.timestampMillis() <= timestampMillis) {
snapshotId = ancestor.snapshotId();
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,59 @@ public void testMetadataColumns() {
rows);
}

@TestTemplate
public void testQueryWithRollback() {
createTable();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();
long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();
long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());

sql(
"CALL %s.system.rollback_to_snapshot('%s', %d)",
catalogName, tableIdent, snap1.snapshotId());
table.refresh();
assertThat(table.currentSnapshot()).isEqualTo(snap1);

sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

assertEquals(
"Should have expected changed rows up to snapshot 3",
ImmutableList.of(
row(1, "a", "INSERT", 0, snap1.snapshotId()),
row(1, "a", "DELETE", 1, snap3.snapshotId()),
row(-2, "a", "INSERT", 1, snap3.snapshotId())),
changelogRecords(null, rightAfterSnap3));

assertEquals(
"Should have expected changed rows up to snapshot 2",
ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())),
changelogRecords(null, rightAfterSnap2));

assertEquals(
"Should have expected changed rows from snapshot 2 and 3",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, snap3.timestampMillis()));

assertEquals(
"Should have expected changed rows from snapshot 3",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap2, null));
}

private void createTableWithDefaultRows() {
createTable();
insertDefaultRows();
Expand Down

0 comments on commit 13715b0

Please sign in to comment.