Skip to content

Commit

Permalink
Map PV access modes to CSI access modes
Browse files Browse the repository at this point in the history
  • Loading branch information
chrishenzie committed Jun 29, 2021
1 parent 8db83c8 commit b7d732d
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 3 deletions.
58 changes: 55 additions & 3 deletions pkg/volume/csi/csi_client.go
Expand Up @@ -82,6 +82,7 @@ type csiClient interface {
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
NodeSupportsNodeExpand(ctx context.Context) (bool, error)
NodeSupportsVolumeStats(ctx context.Context) (bool, error)
NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error)
}

// Strongly typed address
Expand Down Expand Up @@ -120,6 +121,8 @@ type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
err error,
)

type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode

// newV1NodeClient creates a new NodeClient with the internally used gRPC
// connection set up. It also returns a closer which must to be called to close
// the gRPC connection when the NodeClient is not used anymore.
Expand Down Expand Up @@ -217,7 +220,11 @@ func (c *csiDriverClient) NodePublishVolume(

if c.nodeV1ClientCreator == nil {
return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
}

accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
if err != nil {
return err
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
Expand All @@ -235,7 +242,7 @@ func (c *csiDriverClient) NodePublishVolume(
Secrets: secrets,
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
Mode: accessModeMapper(accessMode),
},
},
}
Expand Down Expand Up @@ -279,6 +286,11 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
return opts.newSize, errors.New("size can not be less than 0")
}

accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
if err != nil {
return opts.newSize, err
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return opts.newSize, err
Expand All @@ -291,7 +303,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(opts.accessMode),
Mode: accessModeMapper(opts.accessMode),
},
},
}
Expand Down Expand Up @@ -371,6 +383,11 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
return errors.New("nodeV1ClientCreate is nil")
}

accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
if err != nil {
return err
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
Expand All @@ -383,7 +400,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: asCSIAccessModeV1(accessMode),
Mode: accessModeMapper(accessMode),
},
},
Secrets: secrets,
Expand Down Expand Up @@ -446,6 +463,17 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
}

func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) {
supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx)
if err != nil {
return nil, err
}
if supported {
return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil
}
return asCSIAccessModeV1, nil
}

func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
Expand All @@ -454,6 +482,25 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
// This mapping exists to enable CSI drivers that lack the
// SINGLE_NODE_MULTI_WRITER capability to work with the
// ReadWriteOncePod access mode.
case api.ReadWriteOncePod:
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
}
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}

func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
switch am {
case api.ReadWriteOnce:
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER
case api.ReadOnlyMany:
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case api.ReadWriteMany:
return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
case api.ReadWriteOncePod:
return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
}
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}
Expand Down Expand Up @@ -510,6 +557,11 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
}

func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsSingleNodeMultiWriterAccessMode"))
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
}

func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
klog.V(4).Info(log("calling NodeGetVolumeStats rpc: [volid=%s, target_path=%s", volID, targetPath))
if volID == "" {
Expand Down
85 changes: 85 additions & 0 deletions pkg/volume/csi/csi_client_test.go
Expand Up @@ -301,6 +301,11 @@ func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
}

func (c *fakeCsiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeSupportsSingleNodeMultiWriterAccessMode...")
return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
}

func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
capabilities, err := c.nodeGetCapabilities(ctx)
if err != nil {
Expand Down Expand Up @@ -865,3 +870,83 @@ func TestVolumeStats(t *testing.T) {
}

}

func TestAccessModeMapping(t *testing.T) {
tests := []struct {
name string
singleNodeMultiWriterSet bool
accessMode api.PersistentVolumeAccessMode
expectedMappedAccessMode csipbv1.VolumeCapability_AccessMode_Mode
}{
{
name: "with ReadWriteOnce and incapable driver",
singleNodeMultiWriterSet: false,
accessMode: api.ReadWriteOnce,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
{
name: "with ReadOnlyMany and incapable driver",
singleNodeMultiWriterSet: false,
accessMode: api.ReadOnlyMany,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
},
{
name: "with ReadWriteMany and incapable driver",
singleNodeMultiWriterSet: false,
accessMode: api.ReadWriteMany,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
{
name: "with ReadWriteOncePod and incapable driver",
singleNodeMultiWriterSet: false,
accessMode: api.ReadWriteOncePod,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
{
name: "with ReadWriteOnce and capable driver",
singleNodeMultiWriterSet: true,
accessMode: api.ReadWriteOnce,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
},
{
name: "with ReadOnlyMany and capable driver",
singleNodeMultiWriterSet: true,
accessMode: api.ReadOnlyMany,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
},
{
name: "with ReadWriteMany and capable driver",
singleNodeMultiWriterSet: true,
accessMode: api.ReadWriteMany,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
{
name: "with ReadWriteOncePod and capable driver",
singleNodeMultiWriterSet: true,
accessMode: api.ReadWriteOncePod,
expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithSingleNodeMultiWriter(tc.singleNodeMultiWriterSet)
return nodeClient, fakeCloser, nil
},
}

accessModeMapper, err := client.getNodeV1AccessModeMapper(context.Background())
if err != nil {
t.Error(err)
}

mappedAccessMode := accessModeMapper(tc.accessMode)
if mappedAccessMode != tc.expectedMappedAccessMode {
t.Errorf("expected access mode: %v; got: %v", tc.expectedMappedAccessMode, mappedAccessMode)
}
})
}
}
21 changes: 21 additions & 0 deletions pkg/volume/csi/fake/fake_client.go
Expand Up @@ -85,6 +85,7 @@ type NodeClient struct {
expansionSet bool
volumeStatsSet bool
volumeConditionSet bool
singleNodeMultiWriterSet bool
nodeGetInfoResp *csipb.NodeGetInfoResponse
nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
Expand Down Expand Up @@ -123,6 +124,16 @@ func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet
}
}

func NewNodeClientWithSingleNodeMultiWriter(singleNodeMultiWriterSet bool) *NodeClient {
return &NodeClient{
nodePublishedVolumes: make(map[string]CSIVolume),
nodeStagedVolumes: make(map[string]CSIVolume),
stageUnstageSet: true,
volumeStatsSet: true,
singleNodeMultiWriterSet: singleNodeMultiWriterSet,
}
}

// SetNextError injects next expected error
func (f *NodeClient) SetNextError(err error) {
f.nextErr = err
Expand Down Expand Up @@ -364,6 +375,16 @@ func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetC
},
})
}

if f.singleNodeMultiWriterSet {
resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
Type: &csipb.NodeServiceCapability_Rpc{
Rpc: &csipb.NodeServiceCapability_RPC{
Type: csipb.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
},
},
})
}
return resp, nil
}

Expand Down

0 comments on commit b7d732d

Please sign in to comment.