Skip to content

Commit

Permalink
rbd: check volume details from original volumeID
Browse files Browse the repository at this point in the history
Checking volume details for the existing volumeID
first. if details like OMAP, RBD Image, Pool doesnot
exists try to use clusterIDMapping to look for the
correct informations.

fixes: #2929

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
  • Loading branch information
Madhu-1 authored and mergify[bot] committed Nov 4, 2022
1 parent 6bba64c commit 07e9ded
Showing 1 changed file with 12 additions and 30 deletions.
42 changes: 12 additions & 30 deletions internal/rbd/nodeserver.go
Expand Up @@ -25,7 +25,6 @@ import (
"strings"

csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -144,14 +143,12 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
// this function also receive the credentials and secrets args as it differs in its data.
// The credentials are used directly by functions like voljournal.Connect() and other functions
// like genVolFromVolumeOptions() make use of secrets.
// nolint:gocyclo,cyclop // reduce complexity
func populateRbdVol(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
cr *util.Credentials,
) (*rbdVolume, error) {
var err error
var j *journal.Connection
volID := req.GetVolumeId()
isBlock := req.GetVolumeCapability().GetBlock() != nil
disableInUseChecks := false
Expand All @@ -173,11 +170,7 @@ func populateRbdVol(

disableInUseChecks = true
}

rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
var rv *rbdVolume

isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
// get rbd image name from the volume journal
Expand All @@ -187,35 +180,24 @@ func populateRbdVol(
// if migration static volume, use imageName as volID
volID = req.GetVolumeContext()["imageName"]
}
rv.RbdImageName = volID
} else {
var vi util.CSIIdentifier
var imageAttributes *journal.ImageAttributes
err = vi.DecomposeCSIID(volID)
rv, err = genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)

return nil, status.Error(codes.Internal, err.Error())
}

j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, cr)
rv.RbdImageName = volID
} else {
rv, err = GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to establish cluster connection: %v", err)
rv.Destroy()
log.ErrorLog(ctx, "error generating volume %s: %v", volID, err)

return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal, "error generating volume %s: %v", volID, err)
}
defer j.Destroy()

imageAttributes, err = j.GetImageAttributes(
ctx, rv.Pool, vi.ObjectUUID, false)
if err != nil {
err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err)

return nil, status.Error(codes.Internal, err.Error())
rv.DataPool = req.GetVolumeContext()["dataPool"]
var ok bool
if rv.Mounter, ok = req.GetVolumeContext()["mounter"]; !ok {
rv.Mounter = rbdDefaultMounter
}
rv.RbdImageName = imageAttributes.ImageName
// set owner after extracting the owner name from the journal
rv.Owner = imageAttributes.Owner
}

err = rv.Connect(cr)
Expand Down

0 comments on commit 07e9ded

Please sign in to comment.