Skip to content

Commit

Permalink
Merge #11421
Browse files Browse the repository at this point in the history
11421: Feature detect diff-based bandwidth-optimized PATCH protocol for saving checkpoints r=t0yv0 a=t0yv0

<!--- 
Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation.
-->

# Description

<!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->

Starts using diff-based PATCH protocol for saving checkpoints for backends that indicate that they support it via `GET /api/capabilities`. Removes flags that opted into the feature. Specifically:

- call GET /api/capabilities in parallel with GetStack call so as not to add to critical path latency
- interpret 404 Not Found as "no capabilities" for backwards compatibility and not forcing a simultaneous backend update
- let the response from /api/capabilities configure a minimal stack size cutoff for opting into the diff-based protocol
- opens the possibility to tune the optimal cutoff with service experimentation

Fixes https://github.com/pulumi/home/issues/2425

I've tested the change against the mainline backend that does not support capabilities yet (404 path) and the experimental backend that does, including both sides of the size cutoff, and it seems to work as expected.

## Checklist

<!--- Please provide details if the checkbox below is to be left unchecked. -->
- [ ] I have added tests that prove my fix is effective or that my feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the Pulumi Service,
then the service should honor older versions of the CLI where this change would not exist.
You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Service API version
  <!-- `@Pulumi` employees: If yes, you must submit corresponding changes in the service repo. -->


Co-authored-by: Anton Tayanovskyy <anton@pulumi.com>
  • Loading branch information
bors[bot] and t0yv0 committed Nov 28, 2022
2 parents 40cadef + 1e4aa31 commit 377982b
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 40 deletions.
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: backend/service
description: Allows the service to opt into a bandwidth-optimized DIFF protocol for storing checkpoints. Previously this required setting the PULUMI_OPTIMIZED_CHECKPOINT_PATCH env variable on the client. This env variable is now deprecated.
67 changes: 66 additions & 1 deletion pkg/backend/httpstate/backend.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
cryptorand "crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -29,6 +30,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

opentracing "github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -119,6 +121,7 @@ type cloudBackend struct {
url string
client *client.Client
currentProject *workspace.Project
capabilities func(context.Context) capabilities
}

// Assert we implement the backend.Backend and backend.SpecificDeploymentExporter interfaces.
Expand All @@ -139,11 +142,15 @@ func New(d diag.Sink, cloudURL string) (Backend, error) {
currentProject = nil
}

client := client.NewClient(cloudURL, apiToken, d)
capabilities := detectCapabilities(d, client)

return &cloudBackend{
d: d,
url: cloudURL,
client: client.NewClient(cloudURL, apiToken, d),
client: client,
currentProject: currentProject,
capabilities: capabilities,
}, nil
}

Expand Down Expand Up @@ -767,6 +774,10 @@ func (b *cloudBackend) GetStack(ctx context.Context, stackRef backend.StackRefer
return nil, err
}

// GetStack is typically the initial call to a series of calls to the backend. Although logically unrelated,
// this is a good time to start detecting capabilities so that capability request is not on the critical path.
go b.capabilities(ctx)

stack, err := b.client.GetStack(ctx, stackID)
if err != nil {
// If this was a 404, return nil, nil as per this method's contract.
Expand Down Expand Up @@ -1753,3 +1764,57 @@ func (c httpstateBackendClient) GetStackResourceOutputs(
ctx context.Context, name string) (resource.PropertyMap, error) {
return backend.NewBackendClient(c.backend).GetStackResourceOutputs(ctx, name)
}

// Represents feature-detected capabilities of the service the backend is connected to.
type capabilities struct {
// If non-nil, indicates that delta checkpoint updates are supported.
deltaCheckpointUpdates *apitype.DeltaCheckpointUploadsConfigV1
}

// Builds a lazy wrapper around doDetectCapabilities.
func detectCapabilities(d diag.Sink, client *client.Client) func(ctx context.Context) capabilities {
var once sync.Once
var caps capabilities
done := make(chan struct{})
get := func(ctx context.Context) capabilities {
once.Do(func() {
caps = doDetectCapabilities(ctx, d, client)
close(done)
})
<-done
return caps
}
return get
}

func doDetectCapabilities(ctx context.Context, d diag.Sink, client *client.Client) capabilities {
resp, err := client.GetCapabilities(ctx)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to get capabilities: %v"), err)
return capabilities{}
}
caps, err := decodeCapabilities(resp.Capabilities)
if err != nil {
d.Warningf(diag.Message("" /*urn*/, "failed to decode capabilities: %v"), err)
return capabilities{}
}
return caps
}

func decodeCapabilities(wireLevel []apitype.APICapabilityConfig) (capabilities, error) {
var parsed capabilities
for _, entry := range wireLevel {
switch entry.Capability {
case apitype.DeltaCheckpointUploads:
var cap apitype.DeltaCheckpointUploadsConfigV1
if err := json.Unmarshal(entry.Configuration, &cap); err != nil {
msg := "decoding DeltaCheckpointUploadsConfigV1 returned %w"
return capabilities{}, fmt.Errorf(msg, err)
}
parsed.deltaCheckpointUpdates = &cap
default:
continue
}
}
return parsed, nil
}
2 changes: 2 additions & 0 deletions pkg/backend/httpstate/client/api_endpoints.go
Expand Up @@ -78,6 +78,8 @@ func init() {
routes.Path(path).Methods(method).Name(name)
}

addEndpoint("GET", "/api/capabilities", "getCapabilities")

addEndpoint("GET", "/api/user", "getCurrentUser")
addEndpoint("GET", "/api/user/stacks", "listUserStacks")
addEndpoint("GET", "/api/stacks/{orgName}", "listOrganizationStacks")
Expand Down
34 changes: 33 additions & 1 deletion pkg/backend/httpstate/client/client.go
Expand Up @@ -49,6 +49,9 @@ type Client struct {
apiOrgs []string
diag diag.Sink
client restClient

// If true, do not probe the backend with GET /api/capabilities and assume no capabilities.
DisableCapabilityProbing bool
}

// newClient creates a new Pulumi API client with the given URL and API token. It is a variable instead of a regular
Expand Down Expand Up @@ -321,7 +324,7 @@ func (pc *Client) GetLatestConfiguration(ctx context.Context, stackID StackIdent
func (pc *Client) DoesProjectExist(ctx context.Context, owner string, projectName string) (bool, error) {
if err := pc.restCall(ctx, "HEAD", getProjectPath(owner, projectName), nil, nil, nil); err != nil {
// If this was a 404, return false - project not found.
if errResp, ok := err.(*apitype.ErrorResponse); ok && errResp.Code == http.StatusNotFound {
if is404(err) {
return false, nil
}

Expand Down Expand Up @@ -1070,3 +1073,32 @@ func (pc *Client) GetDeploymentUpdates(ctx context.Context, stack StackIdentifie
}
return resp, nil
}

func (pc *Client) GetCapabilities(ctx context.Context) (*apitype.CapabilitiesResponse, error) {
if pc.DisableCapabilityProbing {
return &apitype.CapabilitiesResponse{}, nil
}

var resp apitype.CapabilitiesResponse
err := pc.restCall(ctx, http.MethodGet, "/api/capabilities", nil, nil, &resp)
if is404(err) {
// The client continues to support legacy backends. They do not support /api/capabilities and are
// assumed here to have no additional capabilities.
return &apitype.CapabilitiesResponse{}, nil
}
if err != nil {
return nil, fmt.Errorf("querying capabilities failed: %w", err)
}
return &resp, nil
}

func is404(err error) bool {
if err == nil {
return false
}
var errResp *apitype.ErrorResponse
if errors.As(err, &errResp) && errResp.Code == http.StatusNotFound {
return true
}
return false
}
43 changes: 43 additions & 0 deletions pkg/backend/httpstate/client/client_test.go
Expand Up @@ -179,3 +179,46 @@ func TestPatchUpdateCheckpointVerbatimPreservesIndent(t *testing.T) {

assert.Equal(t, string(indented), string(request.UntypedDeployment))
}

func TestGetCapabilities(t *testing.T) {
t.Parallel()
t.Run("legacy-service-404", func(t *testing.T) {
t.Parallel()
s := newMockServer(404, "NOT FOUND")
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Empty(t, resp.Capabilities)
})
t.Run("updated-service-with-delta-checkpoint-capability", func(t *testing.T) {
t.Parallel()
cfg := apitype.DeltaCheckpointUploadsConfigV1{
CheckpointCutoffSizeBytes: 1024 * 1024 * 4,
}
cfgJSON, err := json.Marshal(cfg)
require.NoError(t, err)
actualResp := apitype.CapabilitiesResponse{
Capabilities: []apitype.APICapabilityConfig{{
Version: 3,
Capability: apitype.DeltaCheckpointUploads,
Configuration: json.RawMessage(cfgJSON),
}},
}
respJSON, err := json.Marshal(actualResp)
require.NoError(t, err)
s := newMockServer(200, string(respJSON))
defer s.Close()

c := newMockClient(s)
resp, err := c.GetCapabilities(context.Background())
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Len(t, resp.Capabilities, 1)
assert.Equal(t, apitype.DeltaCheckpointUploads, resp.Capabilities[0].Capability)
assert.Equal(t, `{"checkpointCutoffSizeBytes":4194304}`,
string(resp.Capabilities[0].Configuration))
})
}
29 changes: 12 additions & 17 deletions pkg/backend/httpstate/diffs.go
Expand Up @@ -33,8 +33,6 @@ import (
type deploymentDiffState struct {
lastSavedDeployment json.RawMessage
sequenceNumber int
noChecksums bool
strictMode bool
minimalDiffSize int
}

Expand All @@ -44,8 +42,11 @@ type deploymentDiff struct {
deploymentDelta json.RawMessage
}

func newDeploymentDiffState() *deploymentDiffState {
return &deploymentDiffState{sequenceNumber: 1}
func newDeploymentDiffState(minimalDiffSize int) *deploymentDiffState {
return &deploymentDiffState{
sequenceNumber: 1,
minimalDiffSize: minimalDiffSize,
}
}

func (dds *deploymentDiffState) SequenceNumber() int {
Expand All @@ -62,14 +63,10 @@ func (dds *deploymentDiffState) ShouldDiff(new *apitype.UntypedDeployment) bool
if !dds.CanDiff() {
return false
}
small := dds.minimalDiffSize
if small == 0 {
small = 1024 * 32
}
if len(dds.lastSavedDeployment) < small {
if len(dds.lastSavedDeployment) < dds.minimalDiffSize {
return false
}
if len(new.Deployment) < small {
if len(new.Deployment) < dds.minimalDiffSize {
return false
}
return true
Expand Down Expand Up @@ -99,13 +96,11 @@ func (dds *deploymentDiffState) Diff(ctx context.Context,
var checkpointHash string
checkpointHashReady := &sync.WaitGroup{}

if !dds.noChecksums {
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()
}
checkpointHashReady.Add(1)
go func() {
defer checkpointHashReady.Done()
checkpointHash = dds.computeHash(childCtx, after)
}()

delta, err := dds.computeEdits(childCtx, string(before), string(after))
if err != nil {
Expand Down
20 changes: 4 additions & 16 deletions pkg/backend/httpstate/snapshot.go
Expand Up @@ -18,15 +18,13 @@ import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/pulumi/pulumi/pkg/v3/backend"
"github.com/pulumi/pulumi/pkg/v3/backend/httpstate/client"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/stack"
"github.com/pulumi/pulumi/pkg/v3/secrets"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)

Expand Down Expand Up @@ -82,9 +80,6 @@ func (persister *cloudSnapshotPersister) Save(snapshot *deploy.Snapshot) error {
return err
}
if err := persister.saveDiff(ctx, diff, token); err != nil {
if differ.strictMode {
return err
}
if logging.V(3) {
logging.V(3).Infof("ignoring error saving checkpoint "+
"with PatchUpdateCheckpointDelta, falling back to "+
Expand Down Expand Up @@ -139,17 +134,10 @@ func (cb *cloudBackend) newSnapshotPersister(ctx context.Context, update client.
sm: sm,
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH")) {
p.deploymentDiffState = newDeploymentDiffState()

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_STRICT")) {
p.deploymentDiffState.strictMode = true
}

if cmdutil.IsTruthy(os.Getenv("PULUMI_OPTIMIZED_CHECKPOINT_PATCH_NO_CHECKSUMS")) {
p.deploymentDiffState.noChecksums = true
}
caps := cb.capabilities(ctx)
deltaCaps := caps.deltaCheckpointUpdates
if deltaCaps != nil {
p.deploymentDiffState = newDeploymentDiffState(deltaCaps.CheckpointCutoffSizeBytes)
}

return p
}
21 changes: 16 additions & 5 deletions pkg/backend/httpstate/snapshot_test.go
Expand Up @@ -94,8 +94,18 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
}

newMockServer := func() *httptest.Server {
return httptest.NewServer(
http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case "/api/capabilities":
resp := apitype.CapabilitiesResponse{Capabilities: []apitype.APICapabilityConfig{{
Capability: apitype.DeltaCheckpointUploads,
Configuration: json.RawMessage(`{"checkpointCutoffSizeBytes":1}`),
}}}
err := json.NewEncoder(rw).Encode(resp)
assert.NoError(t, err)
return
case "/api/stacks/owner/project/stack/update/update-id/checkpointverbatim",
"/api/stacks/owner/project/stack/update/update-id/checkpointdelta":
lastRequest = req
rw.WriteHeader(200)
message := `{}`
Expand All @@ -107,7 +117,10 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
_, err = rw.Write([]byte(message))
assert.NoError(t, err)
req.Body = io.NopCloser(bytes.NewBuffer(rbytes))
}))
default:
panic(fmt.Sprintf("Path not supported: %v", req.URL.Path))
}
}))
}

newMockTokenSource := func() tokenSourceCapability {
Expand All @@ -126,8 +139,6 @@ func TestCloudSnapshotPersisterUseOfDiffProtocol(t *testing.T) {
UpdateKind: apitype.UpdateUpdate,
UpdateID: updateID,
}, newMockTokenSource(), nil)
persister.deploymentDiffState = newDeploymentDiffState()
persister.deploymentDiffState.minimalDiffSize = 1
return persister
}

Expand Down

0 comments on commit 377982b

Please sign in to comment.