-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
daemon.go
188 lines (161 loc) · 4.69 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.
package daemon
import (
"context"
"net/http"
"os"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"github.com/gitpod-io/gitpod/common-go/log"
"github.com/gitpod-io/gitpod/ws-daemon/api"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/container"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/content"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/diskguard"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/hosts"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/iws"
"github.com/gitpod-io/gitpod/ws-daemon/pkg/resources"
)
// NewDaemon produces a new daemon
func NewDaemon(config Config, reg prometheus.Registerer) (*Daemon, error) {
clientset, err := newClientSet(config.Runtime.Kubeconfig)
if err != nil {
return nil, err
}
containerRuntime, err := container.FromConfig(config.Runtime.Container)
if err != nil {
return nil, err
}
if containerRuntime == nil {
return nil, xerrors.Errorf("no container runtime configured")
}
go func() {
// TODO(cw): handle this case more gracefully
err := <-containerRuntime.Error()
log.WithError(err).Fatal("container runtime interface error")
}()
nodename := os.Getenv("NODENAME")
if nodename == "" {
return nil, xerrors.Errorf("NODENAME env var isn't set")
}
dsptch, err := dispatch.NewDispatch(containerRuntime, clientset, config.Runtime.KubernetesNamespace, nodename,
resources.NewDispatchListener(&config.Resources, reg),
)
if err != nil {
return nil, err
}
contentService, err := content.NewWorkspaceService(
context.Background(),
config.Content,
config.Runtime.KubernetesNamespace,
containerRuntime,
dsptch.WorkspaceExistsOnNode,
&iws.Uidmapper{Config: config.Uidmapper, Runtime: containerRuntime},
reg,
)
if err != nil {
return nil, xerrors.Errorf("cannot create content service: %w", err)
}
dsk := diskguard.FromConfig(config.DiskSpaceGuard, clientset, nodename)
hsts, err := hosts.FromConfig(config.Hosts, clientset, config.Runtime.KubernetesNamespace)
if err != nil {
return nil, err
}
return &Daemon{
Config: config,
dispatch: dsptch,
content: contentService,
diskGuards: dsk,
hosts: hsts,
}, nil
}
func newClientSet(kubeconfig string) (res *kubernetes.Clientset, err error) {
defer func() {
if err != nil {
err = xerrors.Errorf("cannot create clientset: %w", err)
}
}()
if kubeconfig != "" {
res, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(res)
}
k8s, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(k8s)
}
// Daemon connects all the individual bits and bobs that make up the workspace daemon
type Daemon struct {
Config Config
dispatch *dispatch.Dispatch
content *content.WorkspaceService
diskGuards []*diskguard.Guard
hosts hosts.Controller
}
// Start runs all parts of the daemon until stop is called
func (d *Daemon) Start() error {
err := d.dispatch.Start()
if err != nil {
return xerrors.Errorf("cannot start dispatch: %w", err)
}
go d.content.Start()
for _, dsk := range d.diskGuards {
go dsk.Start()
}
if d.hosts != nil {
go d.hosts.Start()
}
if d.Config.ReadinessSignal.Enabled {
go d.startReadinessSignal()
}
return nil
}
// Register registers all gRPC services provided by this daemon
func (d *Daemon) Register(srv *grpc.Server) {
api.RegisterWorkspaceContentServiceServer(srv, d.content)
}
func (d *Daemon) startReadinessSignal() {
path := d.Config.ReadinessSignal.Path
if path == "" {
path = "/"
}
mux := http.NewServeMux()
mux.Handle(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if d.hosts != nil && !d.hosts.DidUpdate() {
http.Error(w, "host controller not ready yet", http.StatusTooEarly)
return
}
w.WriteHeader(http.StatusOK)
}))
log.WithField("addr", d.Config.ReadinessSignal.Addr).Info("started readiness signal")
err := http.ListenAndServe(d.Config.ReadinessSignal.Addr, mux)
if err != nil {
log.WithError(err).Error("cannot start readiness probe")
}
}
// Stop gracefully shuts down the daemon. Once stopped, it
// cannot be started again.
func (d *Daemon) Stop() error {
var errs []error
errs = append(errs, d.dispatch.Close())
errs = append(errs, d.content.Close())
if d.hosts != nil {
errs = append(errs, d.hosts.Close())
}
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}