Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Only traverse ancestors of current snapshot when building changelog scan #10252

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,69 @@ public void testMetadataColumns() {
rows);
}

@TestTemplate
public void testQueryWithRollback() {
createTable();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();
long rightAfterSnap1 = waitUntilAfter(snap1.timestampMillis());

sql("INSERT INTO %s VALUES (2, 'b')", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();
long rightAfterSnap2 = waitUntilAfter(snap2.timestampMillis());

sql(
"CALL %s.system.rollback_to_snapshot('%s', %d)",
catalogName, tableIdent, snap1.snapshotId());
table.refresh();
assertThat(table.currentSnapshot()).isEqualTo(snap1);

sql("INSERT OVERWRITE %s VALUES (-2, 'a')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

assertEquals(
"Should have expected changed rows up to snapshot 3",
ImmutableList.of(
row(1, "a", "INSERT", 0, snap1.snapshotId()),
row(1, "a", "DELETE", 1, snap3.snapshotId()),
row(-2, "a", "INSERT", 1, snap3.snapshotId())),
changelogRecords(null, rightAfterSnap3));

assertEquals(
"Should have expected changed rows up to snapshot 2",
ImmutableList.of(row(1, "a", "INSERT", 0, snap1.snapshotId())),
changelogRecords(null, rightAfterSnap2));

assertEquals(
"Should have expected changed rows from snapshot 3 only since snapshot 2 is on a different branch.",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, snap3.timestampMillis()));

assertEquals(
"Should have expected changed rows from snapshot 3",
ImmutableList.of(
row(1, "a", "DELETE", 0, snap3.snapshotId()),
row(-2, "a", "INSERT", 0, snap3.snapshotId())),
changelogRecords(rightAfterSnap2, null));
manuzhang marked this conversation as resolved.
Show resolved Hide resolved

sql(
"CALL %s.system.set_current_snapshot('%s', %d)",
catalogName, tableIdent, snap2.snapshotId());
table.refresh();
assertThat(table.currentSnapshot()).isEqualTo(snap2);
assertEquals(
"Should have expected changed rows from snapshot 2 only since snapshot 3 is on a different branch.",
ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
changelogRecords(rightAfterSnap1, null));
}

private void createTableWithDefaultRows() {
createTable();
insertDefaultRows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ public Scan buildChangelogScan() {
}

if (endTimestamp != null) {
endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp);
endSnapshotId = getEndSnapshotId(endTimestamp);
if ((startSnapshotId == null && endSnapshotId == null)
|| (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) {
emptyScan = true;
Expand Down Expand Up @@ -589,6 +589,17 @@ private Long getStartSnapshotId(Long startTimestamp) {
}
}

private Long getEndSnapshotId(Long endTimestamp) {
Long endSnapshotId = null;
for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) {
if (snapshot.timestampMillis() <= endTimestamp) {
endSnapshotId = snapshot.snapshotId();
break;
}
}
return endSnapshotId;
}

public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null && readConf.tag() == null,
Expand Down