Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Theofilos Mouratidis authored and thmour committed Apr 22, 2021
1 parent bbc9385 commit e876db7
Show file tree
Hide file tree
Showing 5 changed files with 592 additions and 135 deletions.
183 changes: 73 additions & 110 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ package cephfs
import (
"context"
cephfs2 "github.com/ceph/go-ceph/cephfs"
userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
"github.com/ceph/go-ceph/cephfs/admin"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/mime"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/storage/utils/chunking"
"github.com/pkg/errors"
"io"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
)

type cephfs struct{
conf *Options
chunkHandler *chunking.ChunkHandler
}

func init() {
Expand All @@ -31,145 +32,95 @@ func New(m map[string]interface{}) (storage.FS, error) {
}

func (fs *cephfs) GetHome(ctx context.Context) (string, error) {
if fs.conf.DisableHome || fs.conf.UserLayout == "" {
if fs.conf.DisableHome {
return "", errtypes.NotSupported("cephfs: GetHome() home supported disabled")
}

return fs.MakeUser(ctx).homePath, nil
u := fs.MakeUser(ctx)

return FSadm.SubVolumePath("cephfs", "reva", u.Username)
}

func (fs *cephfs) CreateHome(ctx context.Context) (err error) {
if fs.conf.DisableHome || fs.conf.UserLayout == "" {
if fs.conf.DisableHome {
return errtypes.NotSupported("cephfs: GetHome() home supported disabled")
}

u := fs.MakeUser(ctx)

err = Admin.MakeDir(u.homePath, fs.conf.DirMode); if err != nil { return }
err = Admin.Chown(u.homePath, uint32(u.UidNumber), uint32(u.GidNumber))
err = FSadm.CreateSubVolume("cephfs", "reva", u.Username, &admin.SubVolumeOptions{
Size: admin.ByteCount(fs.conf.UserQuotaBytes),
Uid: int(u.UidNumber),
Gid: int(u.GidNumber),
Mode: 0755,
})

return
}

func (fs *cephfs) CreateDir(ctx context.Context, fn string) error {
u := fs.MakeUser(ctx)
path := filepath.Join(u.homePath, fn)

return u.exec(func(mt Mount) error {
return mt.MakeDir(path, fs.conf.DirMode)
return mt.MakeDir(fn, fs.conf.DirMode)
})
}

func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

return u.exec(func(mt Mount) error {
return mt.Unlink(path)
})
}

func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) {
var oldPath, newPath string
u := fs.MakeUser(ctx)
oldPath, err := ResolveRef(oldRef); if err != nil { return }
newPath, err := ResolveRef(oldRef); if err != nil { return }
oldPath, err = ResolveRef(oldRef); if err != nil { return }
newPath, err = ResolveRef(oldRef); if err != nil { return }

return u.exec(func(mt Mount) error {
return mt.Rename(oldPath, newPath)
})
}

func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) {
panic("implement me")
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return nil, err }

var iri interface{}
iri, err = u.exec2(func(mt Mount) (i interface{}, e error) {
var stat *cephfs2.CephStatx
stat, e = mt.Statx(path, cephfs2.StatxAllStats, 0)

return fileAsResourceInfo(ctx, mt, path, stat, mdKeys)
})

return iri.(*provider.ResourceInfo), err
}

func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

var dir *cephfs2.Directory
err = u.exec(func(mt Mount) (e error) {
dir, e = mt.OpenDir(path)
defer dir.Close()
if e != nil { return }
defer dir.Close()

var entry *cephfs2.DirEntryPlus
var ri *provider.ResourceInfo
for entry, e = dir.ReadDirPlus(cephfs2.StatxAllStats, 0); entry != nil && e == nil;
entry, e = dir.ReadDirPlus(cephfs2.StatxAllStats, 0) {
if entry.Name() == "." || entry.Name() == ".." { continue } //TODO: Maybe remove?
var (
_type provider.ResourceType
target string
size uint64
oid, idp, buf []byte
ie error
)

stat := entry.Statx()
etype := entry.DType()
entryPath := filepath.Join(path, entry.Name())
switch etype {
case cephfs2.DTypeDir:
_type = provider.ResourceType_RESOURCE_TYPE_CONTAINER
if buf, ie = mt.GetXattr(entryPath, "ceph.dir.rbytes"); ie == nil {
size, ie = strconv.ParseUint(string(buf), 10, 64)
}
case cephfs2.DTypeLnk:
_type = provider.ResourceType_RESOURCE_TYPE_SYMLINK
target, e = mt.Readlink(entryPath)
case cephfs2.DTypeReg:
_type = provider.ResourceType_RESOURCE_TYPE_FILE
size = stat.Size
default:
continue
}
if e != nil { e = nil; continue }

if oid, ie = mt.GetXattr(entryPath, "reva.owner.oid"); ie != nil { continue }
if idp, ie = mt.GetXattr(entryPath, "reva.owner.idp"); ie != nil { continue }
uid := userv1beta1.UserId{
Idp: string(idp),
OpaqueId: string(oid),
}

var xattrs []string
mx := make(map[string]string)
if xattrs, e = mt.ListXattr(entryPath); e == nil {
for _, xattr := range xattrs {
if buf, e := mt.GetXattr(entryPath, xattr); e == nil {
mx[xattr] = string(buf)
}
}
}

etag, ok := mx["reva.stat.etag"]; if ok == false {
//TODO: Recalculate Etag
continue
}
fid, ok := mx["reva.stat.fid"]; if ok == false { continue }
id := &provider.ResourceId{ OpaqueId: fid }

mtime := &typesv1beta1.Timestamp{
Seconds: uint64(stat.Mtime.Sec),
Nanos: uint32(stat.Mtime.Nsec),
}

perms := GetPermissionSet(ctx, stat, mt, entryPath)

ri := &provider.ResourceInfo{
Type: _type,
Id: id,
Checksum: nil,
Etag: etag,
MimeType: mime.Detect(entry.DType() == cephfs2.DTypeDir, entryPath),
Mtime: mtime,
Path: entryPath,
PermissionSet: perms,
Size: size,
Owner: &uid,
Target: target,
ArbitraryMetadata: &provider.ArbitraryMetadata{ Metadata: mx },
}
if entry.Name() == "." || entry.Name() == ".." { continue } //TODO: Maybe include?

ri, err = fileAsResourceInfo(ctx, mt, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys)
if ri == nil || err != nil { continue }

files = append(files, ri)
}
Expand All @@ -180,16 +131,22 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey
return
}

func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) {
panic("implement me")
}
func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref)
if err != nil {
return nil, errors.Wrap(err, "cephfs: error resolving ref")
}

func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error {
panic("implement me")
}
if strings.HasPrefix(path, fs.conf.ShareFolder) {
return nil, errtypes.PermissionDenied("cephfs: cannot download under the virtual share folder")
}

func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
panic("implement me")
ir, err := u.exec2(func(mt Mount) (interface{}, error) {
return mt.Open(path, os.O_RDONLY, defaultFilePerm)
})

return ir.(*cephfs2.File), err
}

func (fs *cephfs) ListRevisions(ctx context.Context, ref *provider.Reference) ([]*provider.FileVersion, error) {
Expand Down Expand Up @@ -221,12 +178,13 @@ func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
}

func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (string, error) {
panic("implement me")
return "", errors.New("cephfs: file ids not implemented")
}

func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

err = u.exec(func(mt Mount) (e error) {
return changePerms(mt, g, path, UpdateGrant)
Expand All @@ -236,8 +194,9 @@ func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *prov
}

func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

err = u.exec(func(mt Mount) (e error) {
return changePerms(mt, g, path, RemoveGrant)
Expand All @@ -247,8 +206,9 @@ func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *p
}

func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

err = u.exec(func(mt Mount) (e error) {
return changePerms(mt, g, path, UpdateGrant)
Expand All @@ -265,14 +225,14 @@ func (fs *cephfs) GetQuota(ctx context.Context) (total uint64, used uint64, err
u := fs.MakeUser(ctx)

err = u.exec(func(mt Mount) error {
buf, e := mt.GetXattr(u.homePath, "ceph.quota.max_bytes")
buf, e := mt.GetXattr("/", "ceph.quota.max_bytes")
if e != nil {
total = 0
} else {
total, e = strconv.ParseUint(string(buf), 10, 64)
}

buf, e = mt.GetXattr(u.homePath, "ceph.dir.rbytes")
buf, e = mt.GetXattr("/", "ceph.dir.rbytes")
if e == nil {
used, e = strconv.ParseUint(string(buf), 10, 64)
}
Expand All @@ -288,30 +248,32 @@ func (fs *cephfs) CreateReference(ctx context.Context, path string, targetURI *u
return errors.New("cephfs: can't create reference outside a share folder")
}
u := fs.MakeUser(ctx)
p := filepath.Join(u.homePath, path)

err = u.exec(func(mt Mount) error {
return mt.MakeDir(p, fs.conf.DirMode)
return mt.MakeDir(path, fs.conf.DirMode)
})
if err != nil { return }

err = u.exec(func(mt Mount) error {
return mt.SetXattr(p, "reva.stat.ref", []byte(targetURI.String()), 0)
return mt.SetXattr(path, "reva.stat.ref", []byte(targetURI.String()), 0)
})

return
}

func (fs *cephfs) Shutdown(ctx context.Context) (err error) {
Admin.Release()
if Admin != nil {
_ = Admin.Release()
}
ClearCache()

return
}

func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

for k, v := range md.Metadata {
e := u.exec(func(mt Mount) error {
Expand All @@ -324,8 +286,9 @@ func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere
}

func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) {
var path string
u := fs.MakeUser(ctx)
path, err := ResolveRef(ref); if err != nil { return }
path, err = ResolveRef(ref); if err != nil { return }

for _, key := range keys {
e := u.exec(func(mt Mount) error {
Expand Down

0 comments on commit e876db7

Please sign in to comment.