Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature detect diff-based bandwidth-optimized PATCH protocol for saving checkpoints #11421

Merged
merged 13 commits into from Nov 28, 2022
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: backend/service
description: Allows the service to opt into a bandwidth-optimized DIFF protocol for storing checkpoints. Previously this required setting the PULUMI_OPTIMIZED_CHECKPOINT_PATCH env variable on the client. This env variable is now deprecated.
67 changes: 66 additions & 1 deletion pkg/backend/httpstate/backend.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
cryptorand "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -29,6 +30,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -119,6 +121,7 @@ type cloudBackend struct {
url string
client *client.Client
currentProject *workspace.Project
capabilities func(context.Context) capabilities
}

// Assert we implement the backend.Backend and backend.SpecificDeploymentExporter interfaces.
Expand All @@ -139,11 +142,15 @@ func New(d diag.Sink, cloudURL string) (Backend, error) {
currentProject = nil
}

client := client.NewClient(cloudURL, apiToken, d)
capabilities := detectCapabilities(d, client)

return &cloudBackend{
d: d,
url: cloudURL,
client: client.NewClient(cloudURL, apiToken, d),
client: client,
currentProject: currentProject,
capabilities: capabilities,
}, nil
}

Expand Down Expand Up @@ -767,6 +774,10 @@ func (b *cloudBackend) GetStack(ctx context.Context, stackRef backend.StackRefer
return nil, err
}

// GetStack is typically the initial call to a series of calls to the backend. Although logically unrelated,
// this is a good time to start detecting capabilities so that capability request is not on the critical path.
go b.capabilities(ctx)

stack, err := b.client.GetStack(ctx, stackID)
if err != nil {
// If this was a 404, return nil, nil as per this method's contract.
Expand Down Expand Up @@ -1753,3 +1764,57 @@ func (c httpstateBackendClient) GetStackResourceOutputs(
ctx context.Context, name string) (resource.PropertyMap, error) {
return backend.NewBackendClient(c.backend).GetStackResourceOutputs(ctx, name)
}

// Represents feature-detected capabilities of the service the backend is connected to.
type capabilities struct {
// If non-nil, indicates that delta checkpoint updates are supported.
deltaCheckpointUpdates *apitype.DeltaCheckpointUploadsConfigV1
}

// Builds a lazy wrapper around doDetectCapabilities.
func detectCapabilities(d diag.Sink, client *client.Client) func(ctx context.Context) capabilities {
var once sync.Once
var caps capabilities
done := make(chan struct{})
get := func(ctx context.Context) capabilities {
once.Do(func() {
caps = doDetectCapabilities(ctx, d, client)
close(done)
})
<-done
return caps
}
return get
}

func doDetectCapabilities(ctx context.Context, d diag.Sink, client *client.Client) capabilities {
resp, err := client.GetCapabilities(ctx)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to get capabilities: %v"), err)
return capabilities{}
}
caps, err := decodeCapabilities(resp.Capabilities)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to decode capabilities: %v"), err)
return capabilities{}
}
return caps
}

func decodeCapabilities(wireLevel []apitype.APICapabilityConfig) (capabilities, error) {
var parsed capabilities
for _, entry := range wireLevel {
switch entry.Capability {
case apitype.DeltaCheckpointUploads:
var cap apitype.DeltaCheckpointUploadsConfigV1
if err := json.Unmarshal(entry.Configuration, &cap); err != nil {
msg := "decoding DeltaCheckpointUploadsConfigV1 returned %w"
return capabilities{}, fmt.Errorf(msg, err)
}
parsed.deltaCheckpointUpdates = &cap
default:
continue
}
}
return parsed, nil
}
2 changes: 2 additions & 0 deletions pkg/backend/httpstate/client/api_endpoints.go
Expand Up @@ -78,6 +78,8 @@ func init() {
routes.Path(path).Methods(method).Name(name)
}

addEndpoint("GET", "/api/capabilities", "getCapabilities")

addEndpoint("GET", "/api/user", "getCurrentUser")
addEndpoint("GET", "/api/user/stacks", "listUserStacks")
addEndpoint("GET", "/api/stacks/{orgName}", "listOrganizationStacks")
Expand Down
34 changes: 33 additions & 1 deletion pkg/backend/httpstate/client/client.go
Expand Up @@ -49,6 +49,9 @@ type Client struct {
apiOrgs []string
diag diag.Sink
client restClient

// If true, do not probe the backend with GET /api/capabilities and assume no capabilities.
DisableCapabilityProbing bool
}

// newClient creates a new Pulumi API client with the given URL and API token. It is a variable instead of a regular
Expand Down Expand Up @@ -321,7 +324,7 @@ func (pc *Client) GetLatestConfiguration(ctx context.Context, stackID StackIdent
func (pc *Client) DoesProjectExist(ctx context.Context, owner string, projectName string) (bool, error) {
if err := pc.restCall(ctx, "HEAD", getProjectPath(owner, projectName), nil, nil, nil); err != nil {
// If this was a 404, return false - project not found.
if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == http.StatusNotFound {
if is404(err) {
return false, nil
}

Expand Down Expand Up @@ -1070,3 +1073,32 @@ func (pc *Client) GetDeploymentUpdates(ctx context.Context, stack StackIdentifie
}
return resp, nil
}

func (pc *Client) GetCapabilities(ctx context.Context) (*apitype.CapabilitiesResponse, error) {
if pc.DisableCapabilityProbing {
return &apitype.CapabilitiesResponse{}, nil
}

var resp apitype.CapabilitiesResponse
err := pc.restCall(ctx, http.MethodGet, "/api/capabilities", nil, nil, &resp)
if is404(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect a customer maintaining a fork of the CLI may not even want the client to call a nonexistent /api/capabilities endpoint. We could ask.

I wonder if we should have a disableCapabilities bool field (or better name) on the Client struct, that a customer could set to true from their own implementation of newClient, that when set would avoid making the request to /api/capabilities at all, and have this method return early with &apitype.CapabilitiesResponse{} like the 404 case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right I was also worried about the 404s tripping some alarms, I thought we cleared it already but I'll double-check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added DisableCapabilityProbing on Client struct.

// The client continues to support legacy backends. They do not support /api/capabilities and are
// assumed here to have no additional capabilities.
return &apitype.CapabilitiesResponse{}, nil
}
if err != nil {
return nil, fmt.Errorf("querying capabilities failed: %w", err)
}
return &resp, nil
}

func is404(err error) bool {
if err == nil {
return false
}
var errResp *apitype.ErrorResponse
if errors.As(err, &errResp) && errResp.Code == http.StatusNotFound {
return true
}
return false
}
43 changes: 43 additions & 0 deletions pkg/backend/httpstate/client/client_test.go
Expand Up @@ -179,3 +179,46 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {

assert.Equal(t, string(indented), string(request.UntypedDeployment))
}

func TestGetCapabilities(t *testing.T) {
t.Parallel()
t.Run("legacy-service-404", func(t *testing.T) {
t.Parallel()
s := newMockServer(404, "NOT FOUND")
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Empty(t, resp.Capabilities)
})
t.Run("updated-service-with-delta-checkpoint-capability", func(t *testing.T) {
t.Parallel()
cfg := apitype.DeltaCheckpointUploadsConfigV1{
CheckpointCutoffSizeBytes: 1024 * 1024 * 4,
}
cfgJSON, err := json.Marshal(cfg)
require.NoError(t, err)
actualResp := apitype.CapabilitiesResponse{
Capabilities: []apitype.APICapabilityConfig{{
Version: 3,
Capability: apitype.DeltaCheckpointUploads,
Configuration: json.RawMessage(cfgJSON),
}},
}
respJSON, err := json.Marshal(actualResp)
require.NoError(t, err)
s := newMockServer(200, string(respJSON))
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Len(t, resp.Capabilities, 1)
assert.Equal(t, apitype.DeltaCheckpointUploads, resp.Capabilities[0].Capability)
assert.Equal(t, `{"checkpointCutoffSizeBytes":4194304}`,
string(resp.Capabilities[0].Configuration))
})
}
29 changes: 12 additions & 17 deletions pkg/backend/httpstate/diffs.go
Expand Up @@ -33,8 +33,6 @@ import (
type deploymentDiffState struct {
lastSavedDeployment json.RawMessage
sequenceNumber int
noChecksums bool
strictMode bool
minimalDiffSize int
}

Expand All @@ -44,8 +42,11 @@ type deploymentDiff struct {
deploymentDelta json.RawMessage
}

func newDeploymentDiffState() *deploymentDiffState {
return &deploymentDiffState{sequenceNumber: 1}
func newDeploymentDiffState(minimalDiffSize int) *deploymentDiffState {
return &deploymentDiffState{
sequenceNumber: 1,
minimalDiffSize: minimalDiffSize,
}
}

func (dds *deploymentDiffState) SequenceNumber() int {
Expand All @@ -62,14 +63,10 @@ func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool
if !dds.CanDiff() {
return false
}
small := dds.minimalDiffSize
if small == 0 {
small = 1024 * 32
}
if len(dds.lastSavedDeployment) < small {
if len(dds.lastSavedDeployment) < dds.minimalDiffSize {
return false
}
if len(new.Deployment) < small {
if len(new.Deployment) < dds.minimalDiffSize {
return false
}
return true
Expand Down Expand Up @@ -99,13 +96,11 @@ func (dds *deploymentDiffState) Diff(ctx context.Context,
var checkpointHash string
checkpointHashReady := &sync.WaitGroup{}

if !dds.noChecksums {
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()
}
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()

delta, err := dds.computeEdits(childCtx, string(before), string(after))
if err != nil {
Expand Down
20 changes: 4 additions & 16 deletions pkg/backend/httpstate/snapshot.go
Expand Up @@ -18,15 +18,13 @@ import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/pulumi/pulumi/pkg/v3/backend"
"github.com/pulumi/pulumi/pkg/v3/backend/httpstate/client"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

Expand Down Expand Up @@ -82,9 +80,6 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
return err
}
if err := persister.saveDiff(ctx, diff, token); err != nil {
if differ.strictMode {
return err
}
if logging.V(3) {
logging.V(3).Infof("ignoring error saving checkpoint "+
"with PatchUpdateCheckpointDelta, falling back to "+
Expand Down Expand Up @@ -139,17 +134,10 @@ func (cb *cloudBackend) newSnapshotPersister(ctx context.Context, update client.
sm: sm,
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH")) {
p.deploymentDiffState = newDeploymentDiffState()

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_STRICT")) {
p.deploymentDiffState.strictMode = true
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_NO_CHECKSUMS")) {
p.deploymentDiffState.noChecksums = true
}
caps := cb.capabilities(ctx)
deltaCaps := caps.deltaCheckpointUpdates
if deltaCaps != nil {
p.deploymentDiffState = newDeploymentDiffState(deltaCaps.CheckpointCutoffSizeBytes)
}

return p
}
21 changes: 16 additions & 5 deletions pkg/backend/httpstate/snapshot_test.go
Expand Up @@ -94,8 +94,18 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
}

newMockServer := func() *httptest.Server {
return httptest.NewServer(
http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/api/capabilities":
resp := apitype.CapabilitiesResponse{Capabilities: []apitype.APICapabilityConfig{{
Capability: apitype.DeltaCheckpointUploads,
Configuration: json.RawMessage(`{"checkpointCutoffSizeBytes":1}`),
}}}
err := json.NewEncoder(rw).Encode(resp)
assert.NoError(t, err)
return
case "/api/stacks/owner/project/stack/update/update-id/checkpointverbatim",
"/api/stacks/owner/project/stack/update/update-id/checkpointdelta":
lastRequest = req
rw.WriteHeader(200)
message := `{}`
Expand All @@ -107,7 +117,10 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
_, err = rw.Write([]byte(message))
assert.NoError(t, err)
req.Body = io.NopCloser(bytes.NewBuffer(rbytes))
}))
default:
panic(fmt.Sprintf("Path not supported: %v", req.URL.Path))
}
}))
}

newMockTokenSource := func() tokenSourceCapability {
Expand All @@ -126,8 +139,6 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
UpdateKind: apitype.UpdateUpdate,
UpdateID: updateID,
}, newMockTokenSource(), nil)
persister.deploymentDiffState = newDeploymentDiffState()
persister.deploymentDiffState.minimalDiffSize = 1
return persister
}

Expand Down