Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement actions waiter #407

Merged
merged 7 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 116 additions & 0 deletions hcloud/action_waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package hcloud

import (
"context"
"fmt"
"maps"
"slices"
"time"
)

type ActionWaiter interface {
WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error
WaitFor(ctx context.Context, actions ...*Action) error
}

var _ ActionWaiter = (*ActionClient)(nil)

// WaitForFunc waits until all actions are completed by polling the API at the interval
// defined by [WithPollBackoffFunc]. An action is considered as complete when its status is
// either [ActionStatusSuccess] or [ActionStatusError].
//
// The handleUpdate callback is called every time an action is updated.
func (c *ActionClient) WaitForFunc(ctx context.Context, handleUpdate func(update *Action) error, actions ...*Action) error {
running := make(map[int64]struct{}, len(actions))
for _, action := range actions {
if action.Status == ActionStatusRunning {
running[action.ID] = struct{}{}
} else if handleUpdate != nil {
// We filter out already completed actions from the API polling loop; while
// this isn't a real update, the caller should be notified about the new
// state.
if err := handleUpdate(action); err != nil {
return err
}
}
}

retries := 0
for {
if len(running) == 0 {
break
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.action.client.pollBackoffFunc(retries)):
retries++
}

opts := ActionListOpts{
Sort: []string{"status", "id"},
ID: make([]int64, 0, len(running)),
}
for actionID := range running {
opts.ID = append(opts.ID, actionID)
}
slices.Sort(opts.ID)
apricote marked this conversation as resolved.
Show resolved Hide resolved

updates, err := c.AllWithOpts(ctx, opts)
if err != nil {
return err
}

if len(updates) != len(running) {
// Some actions may not exist in the API, also fail early to prevent an
// infinite loop when updates == 0.

notFound := maps.Clone(running)
for _, update := range updates {
delete(notFound, update.ID)
}
notFoundIDs := make([]int64, 0, len(notFound))
for unknownID := range notFound {
notFoundIDs = append(notFoundIDs, unknownID)
}

return fmt.Errorf("actions not found: %v", notFoundIDs)
}

for _, update := range updates {
if update.Status != ActionStatusRunning {
delete(running, update.ID)
}

if handleUpdate != nil {
if err := handleUpdate(update); err != nil {
return err
}
}
}
}

return nil
}

// WaitFor waits until all actions succeed by polling the API at the interval defined by
// [WithPollBackoffFunc]. An action is considered as succeeded when its status is either
// [ActionStatusSuccess].
//
// If a single action fails, the function will stop waiting and the error set in the
// action will be returned as an [ActionError].
//
// For more flexibility, see the [WaitForFunc] function.
func (c *ActionClient) WaitFor(ctx context.Context, actions ...*Action) error {
return c.WaitForFunc(
ctx,
func(update *Action) error {
if update.Status == ActionStatusError {
return update.Error()
}
return nil
},
actions...,
)
}
169 changes: 169 additions & 0 deletions hcloud/action_waiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package hcloud

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestWaitFor(t *testing.T) {
RunMockedTestCases(t,
[]MockedTestCase{
{
Name: "succeed",
WantRequests: []MockedRequest{
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772237, "status": "running", "progress": 0 }
],
"meta": { "pagination": { "page": 1 }}
}`},
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772237, "status": "success", "progress": 100 }
],
"meta": { "pagination": { "page": 1 }}
}`},
},
Run: func(env testEnv) {
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}

err := env.Client.Action.WaitFor(context.Background(), actions...)
assert.NoError(t, err)
},
},
{
Name: "succeed with already succeeded action",
Run: func(env testEnv) {
actions := []*Action{{ID: 1509772237, Status: ActionStatusSuccess}}

err := env.Client.Action.WaitFor(context.Background(), actions...)
assert.NoError(t, err)
},
},
{
Name: "fail with unknown action",
WantRequests: []MockedRequest{
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [],
"meta": { "pagination": { "page": 1 }}
}`},
},
Run: func(env testEnv) {
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}

err := env.Client.Action.WaitFor(context.Background(), actions...)
assert.Error(t, err)
assert.Equal(t, "actions not found: [1509772237]", err.Error())
},
},
{
Name: "fail with canceled context",
Run: func(env testEnv) {
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}

ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
err := env.Client.Action.WaitFor(ctx, actions...)
assert.Error(t, err)
},
},
{
Name: "fail with api error",
WantRequests: []MockedRequest{
{"GET", "/actions?id=1509772237&page=1&sort=status&sort=id", nil, 503, ""},
},
Run: func(env testEnv) {
actions := []*Action{{ID: 1509772237, Status: ActionStatusRunning}}

err := env.Client.Action.WaitFor(context.Background(), actions...)
assert.Error(t, err)
assert.Equal(t, "hcloud: server responded with status code 503", err.Error())
},
},
},
)
}

func TestWaitForFunc(t *testing.T) {
RunMockedTestCases(t,
[]MockedTestCase{
{
Name: "succeed",
WantRequests: []MockedRequest{
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772237, "status": "running", "progress": 40 },
{ "id": 1509772238, "status": "running", "progress": 0 }
],
"meta": { "pagination": { "page": 1 }}
}`},
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772237, "status": "running", "progress": 60 },
{ "id": 1509772238, "status": "running", "progress": 50 }
],
"meta": { "pagination": { "page": 1 }}
}`},
{"GET", "/actions?id=1509772237&id=1509772238&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772237, "status": "success", "progress": 100 },
{ "id": 1509772238, "status": "running", "progress": 75 }
],
"meta": { "pagination": { "page": 1 }}
}`},
{"GET", "/actions?id=1509772238&page=1&sort=status&sort=id", nil, 200,
`{
"actions": [
{ "id": 1509772238, "status": "error", "progress": 75,
"error": {
"code": "action_failed",
"message": "Something went wrong with the action"
}
}
],
"meta": { "pagination": { "page": 1 }}
}`},
},
Run: func(env testEnv) {
actions := []*Action{
{ID: 1509772236, Status: ActionStatusSuccess},
{ID: 1509772237, Status: ActionStatusRunning},
{ID: 1509772238, Status: ActionStatusRunning},
}
progress := make([]int, 0)

progressByAction := make(map[int64]int, len(actions))
err := env.Client.Action.WaitForFunc(context.Background(), func(update *Action) error {
switch update.Status {
case ActionStatusRunning:
progressByAction[update.ID] = update.Progress
case ActionStatusSuccess:
progressByAction[update.ID] = 100
case ActionStatusError:
progressByAction[update.ID] = 100
}

sum := 0
for _, value := range progressByAction {
sum += value
}
progress = append(progress, sum/len(actions))

return nil
}, actions...)

assert.Nil(t, err)
assert.Equal(t, []int{33, 46, 46, 53, 70, 83, 91, 100}, progress)
},
},
},
)
}