diff --git a/changelog/pending/20221121--backend-service--allows-the-service-to-opt-into-bandwidth-optimized-diff-protocol.yaml b/changelog/pending/20221121--backend-service--allows-the-service-to-opt-into-bandwidth-optimized-diff-protocol.yaml new file mode 100644 index 000000000000..f929c8632edf --- /dev/null +++ b/changelog/pending/20221121--backend-service--allows-the-service-to-opt-into-bandwidth-optimized-diff-protocol.yaml @@ -0,0 +1,4 @@ +changes: +- type: feat + scope: backend/service + description: Allows the service to opt into a bandwidth-optimized DIFF protocol for storing checkpoints. Previously this required setting the PULUMI_OPTIMIZED_CHECKPOINT_PATCH env variable on the client. This env variable is now deprecated. diff --git a/pkg/backend/httpstate/backend.go b/pkg/backend/httpstate/backend.go index b3a80586431b..44d17efc3d14 100644 --- a/pkg/backend/httpstate/backend.go +++ b/pkg/backend/httpstate/backend.go @@ -18,6 +18,7 @@ import ( "context" cryptorand "crypto/rand" "encoding/hex" + "encoding/json" "errors" "fmt" "io" @@ -29,6 +30,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" opentracing "github.com/opentracing/opentracing-go" @@ -119,6 +121,7 @@ type cloudBackend struct { url string client *client.Client currentProject *workspace.Project + capabilities func(context.Context) capabilities } // Assert we implement the backend.Backend and backend.SpecificDeploymentExporter interfaces. @@ -139,11 +142,15 @@ func New(d diag.Sink, cloudURL string) (Backend, error) { currentProject = nil } + client := client.NewClient(cloudURL, apiToken, d) + capabilities := detectCapabilities(d, client) + return &cloudBackend{ d: d, url: cloudURL, - client: client.NewClient(cloudURL, apiToken, d), + client: client, currentProject: currentProject, + capabilities: capabilities, }, nil } @@ -767,6 +774,10 @@ func (b *cloudBackend) GetStack(ctx context.Context, stackRef backend.StackRefer return nil, err } + // GetStack is typically the initial call to a series of calls to the backend. Although logically unrelated, + // this is a good time to start detecting capabilities so that capability request is not on the critical path. + go b.capabilities(ctx) + stack, err := b.client.GetStack(ctx, stackID) if err != nil { // If this was a 404, return nil, nil as per this method's contract. @@ -1753,3 +1764,57 @@ func (c httpstateBackendClient) GetStackResourceOutputs( ctx context.Context, name string) (resource.PropertyMap, error) { return backend.NewBackendClient(c.backend).GetStackResourceOutputs(ctx, name) } + +// Represents feature-detected capabilities of the service the backend is connected to. +type capabilities struct { + // If non-nil, indicates that delta checkpoint updates are supported. + deltaCheckpointUpdates *apitype.DeltaCheckpointUploadsConfigV1 +} + +// Builds a lazy wrapper around doDetectCapabilities. +func detectCapabilities(d diag.Sink, client *client.Client) func(ctx context.Context) capabilities { + var once sync.Once + var caps capabilities + done := make(chan struct{}) + get := func(ctx context.Context) capabilities { + once.Do(func() { + caps = doDetectCapabilities(ctx, d, client) + close(done) + }) + <-done + return caps + } + return get +} + +func doDetectCapabilities(ctx context.Context, d diag.Sink, client *client.Client) capabilities { + resp, err := client.GetCapabilities(ctx) + if err != nil { + d.Warningf(diag.Message("" /*urn*/, "failed to get capabilities: %v"), err) + return capabilities{} + } + caps, err := decodeCapabilities(resp.Capabilities) + if err != nil { + d.Warningf(diag.Message("" /*urn*/, "failed to decode capabilities: %v"), err) + return capabilities{} + } + return caps +} + +func decodeCapabilities(wireLevel []apitype.APICapabilityConfig) (capabilities, error) { + var parsed capabilities + for _, entry := range wireLevel { + switch entry.Capability { + case apitype.DeltaCheckpointUploads: + var cap apitype.DeltaCheckpointUploadsConfigV1 + if err := json.Unmarshal(entry.Configuration, &cap); err != nil { + msg := "decoding DeltaCheckpointUploadsConfigV1 returned %w" + return capabilities{}, fmt.Errorf(msg, err) + } + parsed.deltaCheckpointUpdates = &cap + default: + continue + } + } + return parsed, nil +} diff --git a/pkg/backend/httpstate/client/api_endpoints.go b/pkg/backend/httpstate/client/api_endpoints.go index 718ca9723ec1..ebfc88eb0dc2 100644 --- a/pkg/backend/httpstate/client/api_endpoints.go +++ b/pkg/backend/httpstate/client/api_endpoints.go @@ -78,6 +78,8 @@ func init() { routes.Path(path).Methods(method).Name(name) } + addEndpoint("GET", "/api/capabilities", "getCapabilities") + addEndpoint("GET", "/api/user", "getCurrentUser") addEndpoint("GET", "/api/user/stacks", "listUserStacks") addEndpoint("GET", "/api/stacks/{orgName}", "listOrganizationStacks") diff --git a/pkg/backend/httpstate/client/client.go b/pkg/backend/httpstate/client/client.go index eaeb483a1450..3b41d9192cc3 100644 --- a/pkg/backend/httpstate/client/client.go +++ b/pkg/backend/httpstate/client/client.go @@ -49,6 +49,9 @@ type Client struct { apiOrgs []string diag diag.Sink client restClient + + // If true, do not probe the backend with GET /api/capabilities and assume no capabilities. + DisableCapabilityProbing bool } // newClient creates a new Pulumi API client with the given URL and API token. It is a variable instead of a regular @@ -321,7 +324,7 @@ func (pc *Client) GetLatestConfiguration(ctx context.Context, stackID StackIdent func (pc *Client) DoesProjectExist(ctx context.Context, owner string, projectName string) (bool, error) { if err := pc.restCall(ctx, "HEAD", getProjectPath(owner, projectName), nil, nil, nil); err != nil { // If this was a 404, return false - project not found. - if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == http.StatusNotFound { + if is404(err) { return false, nil } @@ -1070,3 +1073,32 @@ func (pc *Client) GetDeploymentUpdates(ctx context.Context, stack StackIdentifie } return resp, nil } + +func (pc *Client) GetCapabilities(ctx context.Context) (*apitype.CapabilitiesResponse, error) { + if pc.DisableCapabilityProbing { + return &apitype.CapabilitiesResponse{}, nil + } + + var resp apitype.CapabilitiesResponse + err := pc.restCall(ctx, http.MethodGet, "/api/capabilities", nil, nil, &resp) + if is404(err) { + // The client continues to support legacy backends. They do not support /api/capabilities and are + // assumed here to have no additional capabilities. + return &apitype.CapabilitiesResponse{}, nil + } + if err != nil { + return nil, fmt.Errorf("querying capabilities failed: %w", err) + } + return &resp, nil +} + +func is404(err error) bool { + if err == nil { + return false + } + var errResp *apitype.ErrorResponse + if errors.As(err, &errResp) && errResp.Code == http.StatusNotFound { + return true + } + return false +} diff --git a/pkg/backend/httpstate/client/client_test.go b/pkg/backend/httpstate/client/client_test.go index 6e6d95018032..30eaef55b35d 100644 --- a/pkg/backend/httpstate/client/client_test.go +++ b/pkg/backend/httpstate/client/client_test.go @@ -179,3 +179,46 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) { assert.Equal(t, string(indented), string(request.UntypedDeployment)) } + +func TestGetCapabilities(t *testing.T) { + t.Parallel() + t.Run("legacy-service-404", func(t *testing.T) { + t.Parallel() + s := newMockServer(404, "NOT FOUND") + defer s.Close() + + c := newMockClient(s) + resp, err := c.GetCapabilities(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Empty(t, resp.Capabilities) + }) + t.Run("updated-service-with-delta-checkpoint-capability", func(t *testing.T) { + t.Parallel() + cfg := apitype.DeltaCheckpointUploadsConfigV1{ + CheckpointCutoffSizeBytes: 1024 * 1024 * 4, + } + cfgJSON, err := json.Marshal(cfg) + require.NoError(t, err) + actualResp := apitype.CapabilitiesResponse{ + Capabilities: []apitype.APICapabilityConfig{{ + Version: 3, + Capability: apitype.DeltaCheckpointUploads, + Configuration: json.RawMessage(cfgJSON), + }}, + } + respJSON, err := json.Marshal(actualResp) + require.NoError(t, err) + s := newMockServer(200, string(respJSON)) + defer s.Close() + + c := newMockClient(s) + resp, err := c.GetCapabilities(context.Background()) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Len(t, resp.Capabilities, 1) + assert.Equal(t, apitype.DeltaCheckpointUploads, resp.Capabilities[0].Capability) + assert.Equal(t, `{"checkpointCutoffSizeBytes":4194304}`, + string(resp.Capabilities[0].Configuration)) + }) +} diff --git a/pkg/backend/httpstate/diffs.go b/pkg/backend/httpstate/diffs.go index af13601b0095..19a40ca6af8f 100644 --- a/pkg/backend/httpstate/diffs.go +++ b/pkg/backend/httpstate/diffs.go @@ -33,8 +33,6 @@ import ( type deploymentDiffState struct { lastSavedDeployment json.RawMessage sequenceNumber int - noChecksums bool - strictMode bool minimalDiffSize int } @@ -44,8 +42,11 @@ type deploymentDiff struct { deploymentDelta json.RawMessage } -func newDeploymentDiffState() *deploymentDiffState { - return &deploymentDiffState{sequenceNumber: 1} +func newDeploymentDiffState(minimalDiffSize int) *deploymentDiffState { + return &deploymentDiffState{ + sequenceNumber: 1, + minimalDiffSize: minimalDiffSize, + } } func (dds *deploymentDiffState) SequenceNumber() int { @@ -62,14 +63,10 @@ func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool if !dds.CanDiff() { return false } - small := dds.minimalDiffSize - if small == 0 { - small = 1024 * 32 - } - if len(dds.lastSavedDeployment) < small { + if len(dds.lastSavedDeployment) < dds.minimalDiffSize { return false } - if len(new.Deployment) < small { + if len(new.Deployment) < dds.minimalDiffSize { return false } return true @@ -99,13 +96,11 @@ func (dds *deploymentDiffState) Diff(ctx context.Context, var checkpointHash string checkpointHashReady := &sync.WaitGroup{} - if !dds.noChecksums { - checkpointHashReady.Add(1) - go func() { - defer checkpointHashReady.Done() - checkpointHash = dds.computeHash(childCtx, after) - }() - } + checkpointHashReady.Add(1) + go func() { + defer checkpointHashReady.Done() + checkpointHash = dds.computeHash(childCtx, after) + }() delta, err := dds.computeEdits(childCtx, string(before), string(after)) if err != nil { diff --git a/pkg/backend/httpstate/snapshot.go b/pkg/backend/httpstate/snapshot.go index 3892391e5c11..a2801a4216b9 100644 --- a/pkg/backend/httpstate/snapshot.go +++ b/pkg/backend/httpstate/snapshot.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "os" "github.com/pulumi/pulumi/pkg/v3/backend" "github.com/pulumi/pulumi/pkg/v3/backend/httpstate/client" @@ -26,7 +25,6 @@ import ( "github.com/pulumi/pulumi/pkg/v3/resource/stack" "github.com/pulumi/pulumi/pkg/v3/secrets" "github.com/pulumi/pulumi/sdk/v3/go/common/apitype" - "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" ) @@ -82,9 +80,6 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error { return err } if err := persister.saveDiff(ctx, diff, token); err != nil { - if differ.strictMode { - return err - } if logging.V(3) { logging.V(3).Infof("ignoring error saving checkpoint "+ "with PatchUpdateCheckpointDelta, falling back to "+ @@ -139,17 +134,10 @@ func (cb *cloudBackend) newSnapshotPersister(ctx context.Context, update client. sm: sm, } - if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH")) { - p.deploymentDiffState = newDeploymentDiffState() - - if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_STRICT")) { - p.deploymentDiffState.strictMode = true - } - - if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_NO_CHECKSUMS")) { - p.deploymentDiffState.noChecksums = true - } + caps := cb.capabilities(ctx) + deltaCaps := caps.deltaCheckpointUpdates + if deltaCaps != nil { + p.deploymentDiffState = newDeploymentDiffState(deltaCaps.CheckpointCutoffSizeBytes) } - return p } diff --git a/pkg/backend/httpstate/snapshot_test.go b/pkg/backend/httpstate/snapshot_test.go index 0e57e5fd7f1c..1d49b6b13afa 100644 --- a/pkg/backend/httpstate/snapshot_test.go +++ b/pkg/backend/httpstate/snapshot_test.go @@ -94,8 +94,18 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) { } newMockServer := func() *httptest.Server { - return httptest.NewServer( - http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + switch req.URL.Path { + case "/api/capabilities": + resp := apitype.CapabilitiesResponse{Capabilities: []apitype.APICapabilityConfig{{ + Capability: apitype.DeltaCheckpointUploads, + Configuration: json.RawMessage(`{"checkpointCutoffSizeBytes":1}`), + }}} + err := json.NewEncoder(rw).Encode(resp) + assert.NoError(t, err) + return + case "/api/stacks/owner/project/stack/update/update-id/checkpointverbatim", + "/api/stacks/owner/project/stack/update/update-id/checkpointdelta": lastRequest = req rw.WriteHeader(200) message := `{}` @@ -107,7 +117,10 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) { _, err = rw.Write([]byte(message)) assert.NoError(t, err) req.Body = io.NopCloser(bytes.NewBuffer(rbytes)) - })) + default: + panic(fmt.Sprintf("Path not supported: %v", req.URL.Path)) + } + })) } newMockTokenSource := func() tokenSourceCapability { @@ -126,8 +139,6 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) { UpdateKind: apitype.UpdateUpdate, UpdateID: updateID, }, newMockTokenSource(), nil) - persister.deploymentDiffState = newDeploymentDiffState() - persister.deploymentDiffState.minimalDiffSize = 1 return persister }