-
Notifications
You must be signed in to change notification settings - Fork 386
/
provider.go
244 lines (212 loc) · 6.76 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Package provider is heavily based on fsSyncProvider in github.com/moby/buildkit/session/filesync.
// The key difference between BuildContextProvider and fsSyncProvider is that in
// BuildContextProvider, the dirs can be added incrementally after the construction.
package provider
import (
"os"
"path"
"strings"
"sync"
"time"
"github.com/earthly/earthly/conslogging"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/filesync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
keyOverrideExcludes = "override-excludes"
keyIncludePatterns = "include-patterns"
keyExcludePatterns = "exclude-patterns"
keyFollowPaths = "followpaths"
keyDirName = "dir-name"
keyExporterMetaPrefix = "exporter-md-"
)
var _ session.Attachable = (*BuildContextProvider)(nil)
var _ filesync.FileSyncServer = (*BuildContextProvider)(nil)
// BuildContextProvider is a BuildKit attachable which provides local files as part
// of the build context.
type BuildContextProvider struct {
p progressCb
doneCh chan error
mu sync.Mutex
dirs map[string]SyncedDir
console conslogging.ConsoleLogger
}
// SyncedDir is a directory to be synced across.
type SyncedDir struct {
Name string
Dir string
Excludes []string
Map func(string, *fstypes.Stat) bool
}
// NewBuildContextProvider creates a new provider for sending build context files from client.
func NewBuildContextProvider(console conslogging.ConsoleLogger) *BuildContextProvider {
return &BuildContextProvider{
dirs: map[string]SyncedDir{},
console: console,
}
}
// AddDirs adds local directories to the context.
func (bcp *BuildContextProvider) AddDirs(dirs map[string]string) {
bcp.mu.Lock()
defer bcp.mu.Unlock()
resetUIDAndGID := func(p string, st *fstypes.Stat) bool {
st.Uid = 0
st.Gid = 0
return true
}
sds := make([]SyncedDir, 0, len(dirs))
for dirName, dir := range dirs {
sds = append(sds, SyncedDir{
Name: dirName,
Dir: dir,
Map: resetUIDAndGID,
})
}
for _, sd := range sds {
bcp.dirs[sd.Name] = sd
}
}
// Register registers the attachable.
func (bcp *BuildContextProvider) Register(server *grpc.Server) {
filesync.RegisterFileSyncServer(server, bcp)
}
// DiffCopy implements the DiffCopy attachable.
func (bcp *BuildContextProvider) DiffCopy(stream filesync.FileSync_DiffCopyServer) error {
return bcp.handle("diffcopy", stream)
}
// TarStream implements the DiffCopy attachable.
func (bcp *BuildContextProvider) TarStream(stream filesync.FileSync_TarStreamServer) error {
return bcp.handle("tarstream", stream)
}
func (bcp *BuildContextProvider) handle(method string, stream grpc.ServerStream) (retErr error) {
var pr *protocol
for _, p := range supportedProtocols {
if method == p.name && isProtoSupported(p.name) {
pr = &p
break
}
}
if pr == nil {
return errors.New("failed to negotiate protocol")
}
opts, _ := metadata.FromIncomingContext(stream.Context()) // if no metadata continue with empty object
dirName := ""
name, ok := opts[keyDirName]
if ok && len(name) > 0 {
dirName = name[0]
}
dir, err := bcp.getDir(dirName)
if err != nil {
return err
}
excludes := opts[keyExcludePatterns]
if len(dir.Excludes) != 0 && (len(opts[keyOverrideExcludes]) == 0 || opts[keyOverrideExcludes][0] != "true") {
excludes = dir.Excludes
}
includes := opts[keyIncludePatterns]
followPaths := opts[keyFollowPaths]
console := bcp.console.WithPrefixAndSalt("context", dir.Dir)
verboseProgressCB := func(relPath string, status fsutil.VerboseProgressStatus, numBytes int) {
fullPath := path.Join(dir.Dir, relPath)
switch status {
case fsutil.StatusStat:
console.VerbosePrintf("sent file stat for %s\n", fullPath)
case fsutil.StatusSent:
console.VerbosePrintf("sent data for %s (%d bytes)\n", fullPath, numBytes)
case fsutil.StatusFailed:
console.VerbosePrintf("sent data for %s failed\n", fullPath)
case fsutil.StatusSkipped:
console.VerbosePrintf("ignoring file %s\n", fullPath)
default:
console.Warnf("unhandled progress status %v (path=%s, numBytes=%d)\n", status, fullPath, numBytes)
}
}
progress := func(numBytes int, last bool) {
if last {
console.Printf("sent %d bytes in context %s", numBytes, dir.Dir)
}
}
var doneCh chan error
if bcp.doneCh != nil {
doneCh = bcp.doneCh
bcp.doneCh = nil
}
err = pr.sendFn(stream, fsutil.NewFS(dir.Dir, &fsutil.WalkOpt{
ExcludePatterns: excludes,
IncludePatterns: includes,
FollowPaths: followPaths,
Map: dir.Map,
VerboseProgressCB: verboseProgressCB,
}), progress, verboseProgressCB)
if doneCh != nil {
if err != nil {
doneCh <- err
}
close(doneCh)
}
return err
}
func (bcp *BuildContextProvider) getDir(dirName string) (SyncedDir, error) {
bcp.mu.Lock()
defer bcp.mu.Unlock()
dir, ok := bcp.dirs[dirName]
if !ok {
return SyncedDir{}, status.Errorf(codes.NotFound, "no access allowed to dir %q", dirName)
}
return dir, nil
}
// SetNextProgressCallback sets the progress callback function.
func (bcp *BuildContextProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
bcp.p = f
bcp.doneCh = doneCh
}
type progressCb func(int, bool)
type protocol struct {
name string
sendFn func(stream filesync.Stream, fs fsutil.FS, progress progressCb, verboseProgress fsutil.VerboseProgressCB) error
recvFn func(stream grpc.ClientStream, destDir string, cu filesync.CacheUpdater, progress progressCb, mapFunc func(string, *fstypes.Stat) bool) error
}
func isProtoSupported(p string) bool {
// TODO: this should be removed after testing if stability is confirmed
if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
return strings.EqualFold(p, override)
}
return true
}
var supportedProtocols = []protocol{
{
name: "diffcopy",
sendFn: sendDiffCopy,
recvFn: recvDiffCopy,
},
}
func sendDiffCopy(stream filesync.Stream, fs fsutil.FS, progress progressCb, verboseProgress fsutil.VerboseProgressCB) error {
return errors.WithStack(fsutil.Send(stream.Context(), stream, fs, progress, verboseProgress))
}
func recvDiffCopy(ds grpc.ClientStream, dest string, cu filesync.CacheUpdater, progress progressCb, filter func(string, *fstypes.Stat) bool) error {
st := time.Now()
defer func() {
logrus.Debugf("diffcopy took: %v", time.Since(st))
}()
var cf fsutil.ChangeFunc
var ch fsutil.ContentHasher
if cu != nil {
cu.MarkSupported(true)
cf = cu.HandleChange
ch = cu.ContentHasher()
}
return errors.WithStack(fsutil.Receive(ds.Context(), ds, dest, fsutil.ReceiveOpt{
NotifyHashed: cf,
ContentHasher: ch,
ProgressCb: progress,
Filter: fsutil.FilterFunc(filter),
}))
}