Skip to content

Commit

Permalink
CSI: serialize controller RPCs per plugin
Browse files Browse the repository at this point in the history
The CSI specification says that we "SHOULD" send no more than one in-flight
request per *volume* at a time, with an allowance for losing state
(ex. leadership transitions) which the plugins "SHOULD" handle gracefully. We
mostly succesfully serialize node and controller RPCs for the same volume,
except when Nomad clients are lost.
(See also container-storage-interface/spec#512)

These concurrency requirements in the spec fall short because Storage Provider
APIs aren't necessarily safe to call concurrently on the same host. For example,
concurrently attaching AWS EBS volumes to an EC2 instance results in a race for
device names, which results in failure to attach and confused results when
releasing claims. So in practice many CSI plugins rely on k8s-specific sidecars
for serializing storage provider API calls globally. As a result, we have to be
much more conservative about concurrency in Nomad than the spec allows.

This changeset includes two major changes to fix this:
* Add a serializer method to the CSI volume RPC handler. When the
  RPC handler makes a destructive CSI Controller RPC, we send the RPC thru this
  serializer and only one RPC is sent at a time. Any other RPCs in flight will
  block.
* Ensure that requests go to the same controller plugin instance whenever
  possible by sorting by lowest client ID out of the healthy plugin instances.

Fixes: #15415
  • Loading branch information
tgross committed Jul 19, 2023
1 parent 1e7726c commit 43348d2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 17 deletions.
21 changes: 10 additions & 11 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package nomad

import (
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -262,9 +262,9 @@ func (a *ClientCSI) clientIDsForController(pluginID string) ([]string, error) {

ws := memdb.NewWatchSet()

// note: plugin IDs are not scoped to region/DC but volumes are.
// so any node we get for a controller is already in the same
// region/DC for the volume.
// note: plugin IDs are not scoped to region but volumes are. so any Nomad
// client we get for a controller is already in the same region for the
// volume.
plugin, err := snap.CSIPluginByID(ws, pluginID)
if err != nil {
return nil, fmt.Errorf("error getting plugin: %s, %v", pluginID, err)
Expand All @@ -273,13 +273,10 @@ func (a *ClientCSI) clientIDsForController(pluginID string) ([]string, error) {
return nil, fmt.Errorf("plugin missing: %s", pluginID)
}

// iterating maps is "random" but unspecified and isn't particularly
// random with small maps, so not well-suited for load balancing.
// so we shuffle the keys and iterate over them.
clientIDs := []string{}

for clientID, controller := range plugin.Controllers {
if !controller.IsController() {
if !controller.IsController() || !controller.Healthy {
// we don't have separate types for CSIInfo depending on
// whether it's a controller or node. this error shouldn't
// make it to production but is to aid developers during
Expand All @@ -295,9 +292,11 @@ func (a *ClientCSI) clientIDsForController(pluginID string) ([]string, error) {
return nil, fmt.Errorf("failed to find clients running controller plugin %q", pluginID)
}

rand.Shuffle(len(clientIDs), func(i, j int) {
clientIDs[i], clientIDs[j] = clientIDs[j], clientIDs[i]
})
// Many plugins don't handle concurrent requests as described in the spec,
// and have undocumented expectations of using k8s-specific sidecars to
// leader elect. Sort the client IDs so that we prefer sending requests to
// the same controller to hack around this.
clientIDs = sort.StringSlice(clientIDs)

return clientIDs, nil
}
85 changes: 79 additions & 6 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package nomad

import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -37,6 +39,15 @@ const (
csiPluginTable = "csi_plugins"
)

// controllerFutures is a map of plugin IDs to pending controller RPCs, shared
// across all CSIVolume instances (that is, global so that all client
// connections use the same map). If no RPC is pending for a given plugin, this
// may be nil.
var controllerFutures = map[string]context.Context{}

// controllerLock synchronizes access controllerFutures map
var controllerLock sync.Mutex

// replySetIndex sets the reply with the last index that modified the table
func (s *Server) replySetIndex(table string, reply *structs.QueryMeta) error {
fmsState := s.fsm.State()
Expand Down Expand Up @@ -549,7 +560,9 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest,
cReq.PluginID = plug.ID
cResp := &cstructs.ClientCSIControllerAttachVolumeResponse{}

err = v.srv.RPC(method, cReq, cResp)
err = v.serializedControllerRPC(plug.ID, func() error {
return v.srv.RPC(method, cReq, cResp)
})
if err != nil {
if strings.Contains(err.Error(), "FailedPrecondition") {
return fmt.Errorf("%v: %v", structs.ErrCSIClientRPCRetryable, err)
Expand Down Expand Up @@ -586,6 +599,57 @@ func (v *CSIVolume) volAndPluginLookup(namespace, volID string) (*structs.CSIPlu
return plug, vol, nil
}

// serializedControllerRPC ensures we're only sending a single controller RPC to
// a given plugin if the RPC can cause conflicting state changes.
//
// The CSI specification says that we SHOULD send no more than one in-flight
// request per *volume* at a time, with an allowance for losing state
// (ex. leadership transitions) which the plugins SHOULD handle gracefully.
//
// In practice many CSI plugins rely on k8s-specific sidecars for serializing
// storage provider API calls globally (ex. concurrently attaching EBS volumes
// to an EC2 instance results in a race for device names). So we have to be much
// more conservative about concurrency in Nomad than the spec allows.
func (v *CSIVolume) serializedControllerRPC(pluginID string, fn func() error) error {

for {
controllerLock.Lock()
future := controllerFutures[pluginID]
if future == nil {
future, futureDone := context.WithCancel(v.srv.shutdownCtx)
controllerFutures[pluginID] = future
controllerLock.Unlock()

err := fn()

// close the future while holding the lock and not in a defer so
// that we can ensure we've cleared it from the map before allowing
// anyone else to take the lock and write a new one
controllerLock.Lock()
futureDone()
delete(controllerFutures, pluginID)
controllerLock.Unlock()

return err
} else {
controllerLock.Unlock()

select {
case <-future.Done():
continue
case <-v.srv.shutdownCh:
// The csi_hook publish workflow on the client will retry if it
// gets this error. On unpublish, we don't want to block client
// shutdown so we give up on error. The new leader's
// volumewatcher will iterate all the claims at startup to
// detect this and mop up any claims in the NodeDetached state
// (volume GC will run periodically as well)
return structs.ErrNoLeader
}
}
}
}

// allowCSIMount is called on Job register to check mount permission
func allowCSIMount(aclObj *acl.ACL, namespace string) bool {
return aclObj.AllowPluginRead() &&
Expand Down Expand Up @@ -863,8 +927,11 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
Secrets: vol.Secrets,
}
req.PluginID = vol.PluginID
err = v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})

err = v.serializedControllerRPC(vol.PluginID, func() error {
return v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
})
if err != nil {
return fmt.Errorf("could not detach from controller: %v", err)
}
Expand Down Expand Up @@ -1139,7 +1206,9 @@ func (v *CSIVolume) deleteVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerDeleteVolumeResponse{}

return v.srv.RPC(method, cReq, cResp)
return v.serializedControllerRPC(plugin.ID, func() error {
return v.srv.RPC(method, cReq, cResp)
})
}

func (v *CSIVolume) ListExternal(args *structs.CSIVolumeExternalListRequest, reply *structs.CSIVolumeExternalListResponse) error {
Expand Down Expand Up @@ -1286,7 +1355,9 @@ func (v *CSIVolume) CreateSnapshot(args *structs.CSISnapshotCreateRequest, reply
}
cReq.PluginID = pluginID
cResp := &cstructs.ClientCSIControllerCreateSnapshotResponse{}
err = v.srv.RPC(method, cReq, cResp)
err = v.serializedControllerRPC(pluginID, func() error {
return v.srv.RPC(method, cReq, cResp)
})
if err != nil {
multierror.Append(&mErr, fmt.Errorf("could not create snapshot: %v", err))
continue
Expand Down Expand Up @@ -1360,7 +1431,9 @@ func (v *CSIVolume) DeleteSnapshot(args *structs.CSISnapshotDeleteRequest, reply
cReq := &cstructs.ClientCSIControllerDeleteSnapshotRequest{ID: snap.ID}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerDeleteSnapshotResponse{}
err = v.srv.RPC(method, cReq, cResp)
err = v.serializedControllerRPC(plugin.ID, func() error {
return v.srv.RPC(method, cReq, cResp)
})
if err != nil {
multierror.Append(&mErr, fmt.Errorf("could not delete %q: %v", snap.ID, err))
}
Expand Down

0 comments on commit 43348d2

Please sign in to comment.