-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
client.go
164 lines (134 loc) · 5.69 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package storage
import (
"context"
"io"
"path"
"strings"
"time"
"github.com/grafana/loki/pkg/storage/chunk/client"
)
const delimiter = "/"
// UserIndexClient allows doing operations on the object store for user specific index.
type UserIndexClient interface {
ListUserFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error)
GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error)
PutUserFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error
DeleteUserFile(ctx context.Context, tableName, userID, fileName string) error
}
// CommonIndexClient allows doing operations on the object store for common index.
type CommonIndexClient interface {
ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]IndexFile, []string, error)
GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error)
PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error
DeleteFile(ctx context.Context, tableName, fileName string) error
}
// Client is used to manage boltdb index files in object storage, when using boltdb-shipper.
type Client interface {
CommonIndexClient
UserIndexClient
RefreshIndexTableNamesCache(ctx context.Context)
ListTables(ctx context.Context) ([]string, error)
RefreshIndexTableCache(ctx context.Context, tableName string)
IsFileNotFoundErr(err error) bool
Stop()
}
type indexStorageClient struct {
objectClient *cachedObjectClient
}
type IndexFile struct {
Name string
ModifiedAt time.Time
}
func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix string) Client {
objectClient := newCachedObjectClient(client.NewPrefixedObjectClient(origObjectClient, storagePrefix))
return &indexStorageClient{objectClient: objectClient}
}
func (s *indexStorageClient) RefreshIndexTableNamesCache(ctx context.Context) {
s.objectClient.RefreshIndexTableNamesCache(ctx)
}
func (s *indexStorageClient) RefreshIndexTableCache(ctx context.Context, tableName string) {
s.objectClient.RefreshIndexTableCache(ctx, tableName)
}
func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) {
_, tables, err := s.objectClient.List(ctx, "", delimiter, false)
if err != nil {
return nil, err
}
tableNames := make([]string, 0, len(tables))
for _, table := range tables {
tableNames = append(tableNames, path.Base(string(table)))
}
return tableNames, nil
}
func (s *indexStorageClient) ListFiles(ctx context.Context, tableName string, bypassCache bool) ([]IndexFile, []string, error) {
// The forward slash here needs to stay because we are trying to list contents of a directory without which
// we will get the name of the same directory back with hosted object stores.
// This is due to the object stores not having a concept of directories.
objects, users, err := s.objectClient.List(ctx, tableName+delimiter, delimiter, bypassCache)
if err != nil {
return nil, nil, err
}
files := make([]IndexFile, 0, len(objects))
for _, object := range objects {
// The s3 client can also return the directory itself in the ListObjects.
if strings.HasSuffix(object.Key, delimiter) {
continue
}
files = append(files, IndexFile{
Name: path.Base(object.Key),
ModifiedAt: object.ModifiedAt,
})
}
userIDs := make([]string, 0, len(users))
for _, user := range users {
userIDs = append(userIDs, path.Base(string(user)))
}
return files, userIDs, nil
}
func (s *indexStorageClient) ListUserFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) {
// The forward slash here needs to stay because we are trying to list contents of a directory without which
// we will get the name of the same directory back with hosted object stores.
// This is due to the object stores not having a concept of directories.
objects, _, err := s.objectClient.List(ctx, path.Join(tableName, userID)+delimiter, delimiter, bypassCache)
if err != nil {
return nil, err
}
files := make([]IndexFile, 0, len(objects))
for _, object := range objects {
// The s3 client can also return the directory itself in the ListObjects.
if strings.HasSuffix(object.Key, delimiter) {
continue
}
files = append(files, IndexFile{
Name: path.Base(object.Key),
ModifiedAt: object.ModifiedAt,
})
}
return files, nil
}
func (s *indexStorageClient) GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) {
reader, _, err := s.objectClient.GetObject(ctx, path.Join(tableName, fileName))
return reader, err
}
func (s *indexStorageClient) GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) {
readCloser, _, err := s.objectClient.GetObject(ctx, path.Join(tableName, userID, fileName))
return readCloser, err
}
func (s *indexStorageClient) PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error {
return s.objectClient.PutObject(ctx, path.Join(tableName, fileName), file)
}
func (s *indexStorageClient) PutUserFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error {
return s.objectClient.PutObject(ctx, path.Join(tableName, userID, fileName), file)
}
func (s *indexStorageClient) DeleteFile(ctx context.Context, tableName, fileName string) error {
return s.objectClient.DeleteObject(ctx, path.Join(tableName, fileName))
}
func (s *indexStorageClient) DeleteUserFile(ctx context.Context, tableName, userID, fileName string) error {
return s.objectClient.DeleteObject(ctx, path.Join(tableName, userID, fileName))
}
func (s *indexStorageClient) IsFileNotFoundErr(err error) bool {
return s.objectClient.IsObjectNotFoundErr(err)
}
func (s *indexStorageClient) Stop() {
s.objectClient.Stop()
}