Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: FindInBatches with offset limit #5255

Merged
merged 3 commits into from Apr 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions finisher_api.go
Expand Up @@ -181,6 +181,21 @@ func (db *DB) FindInBatches(dest interface{}, batchSize int, fc func(tx *DB, bat
batch int
)

// user specified offset or limit
var totalSize int
if c, ok := tx.Statement.Clauses["LIMIT"]; ok {
if limit, ok := c.Expression.(clause.Limit); ok {
totalSize = limit.Limit

if totalSize > 0 && batchSize > totalSize {
batchSize = totalSize
}

// reset to offset to 0 in next batch
tx = tx.Offset(-1).Session(&Session{})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract this call to another check? The offset is also buggy when using w/o limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract this call to another check? The offset is also buggy when using w/o limit.

What does offset is also buggy mean ? FindInBatches query condition is greater than primary key and limit.

Copy link

@lxdlam lxdlam Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract this call to another check? The offset is also buggy when using w/o limit.

What does offset is also buggy mean ? FindInBatches query condition is greater than primary key and limit.

If we only using offset, which scenario will be like select all users which uid is larger than 100, then using dbInstance.Offset(100).OrderBy("user_id").FindInBatches(), it will skip 100 records in each of all subsequent queries.

The corresponding test is TestOffsetClausedFindInBatches in go-gorm/playground#466.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will skip 100 records in first sub query.
here is reset offset to zero after first sub query that we can use id > ? and limit = ? to query.

Copy link

@lxdlam lxdlam Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will skip 100 records in first sub query. here is reset offset to zero after first sub query that we can use id > ? and limit = ? to query.

That's correct. But this call has been embedded in the limit check. I mean, we may need to extract it to another check that whether we have a Offset clause and the argument is large than 0 so we can both support only using Offset or togethered with Limit clause.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offest and Limit use the same clause.Limit, so it support for only using Offset or togethered
https://github.com/go-gorm/gorm/blob/master/chainable_api.go#L245

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Offset cannot be used is caused by here, thank you.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Offset cannot be used is caused by here, thank you.

Got it. You're nice :).

}
}

for {
result := queryDB.Limit(batchSize).Find(dest)
rowsAffected += result.RowsAffected
Expand All @@ -196,6 +211,15 @@ func (db *DB) FindInBatches(dest interface{}, batchSize int, fc func(tx *DB, bat
break
}

if totalSize > 0 {
if totalSize <= int(rowsAffected) {
break
}
if totalSize/batchSize == batch {
batchSize = totalSize % batchSize
}
}

// Optimize for-break
resultsValue := reflect.Indirect(reflect.ValueOf(dest))
if result.Statement.Schema.PrioritizedPrimaryField == nil {
Expand Down
62 changes: 62 additions & 0 deletions tests/query_test.go
Expand Up @@ -292,6 +292,68 @@ func TestFindInBatches(t *testing.T) {
}
}

func TestFindInBatchesWithOffsetLimit(t *testing.T) {
users := []User{
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
*GetUser("find_in_batches_with_offset_limit", Config{}),
}

DB.Create(&users)

var (
sub, results []User
lastBatch int
)

// offset limit
if result := DB.Offset(3).Limit(5).Where("name = ?", users[0].Name).FindInBatches(&sub, 2, func(tx *gorm.DB, batch int) error {
results = append(results, sub...)
lastBatch = batch
return nil
}); result.Error != nil || result.RowsAffected != 5 {
t.Errorf("Failed to batch find, got error %v, rows affected: %v", result.Error, result.RowsAffected)
}
if lastBatch != 3 {
t.Fatalf("incorrect last batch, expected: %v, got: %v", 3, lastBatch)
}

targetUsers := users[3:8]
for i := 0; i < len(targetUsers); i++ {
AssertEqual(t, results[i], targetUsers[i])
}

var sub1 []User
// limit < batchSize
if result := DB.Limit(5).Where("name = ?", users[0].Name).FindInBatches(&sub1, 10, func(tx *gorm.DB, batch int) error {
return nil
}); result.Error != nil || result.RowsAffected != 5 {
t.Errorf("Failed to batch find, got error %v, rows affected: %v", result.Error, result.RowsAffected)
}

var sub2 []User
// only offset
if result := DB.Offset(3).Where("name = ?", users[0].Name).FindInBatches(&sub2, 2, func(tx *gorm.DB, batch int) error {
return nil
}); result.Error != nil || result.RowsAffected != 7 {
t.Errorf("Failed to batch find, got error %v, rows affected: %v", result.Error, result.RowsAffected)
}

var sub3 []User
if result := DB.Limit(4).Where("name = ?", users[0].Name).FindInBatches(&sub3, 2, func(tx *gorm.DB, batch int) error {
return nil
}); result.Error != nil || result.RowsAffected != 4 {
t.Errorf("Failed to batch find, got error %v, rows affected: %v", result.Error, result.RowsAffected)
}
}

func TestFindInBatchesWithError(t *testing.T) {
if name := DB.Dialector.Name(); name == "sqlserver" {
t.Skip("skip sqlserver due to it will raise data race for invalid sql")
Expand Down