forked from grpc/grpc-go
/
controller.go
182 lines (161 loc) · 6.19 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package controller contains implementation to connect to the control plane.
// Including starting the ClientConn, starting the xDS stream, and
// sending/receiving messages.
//
// All the messages are parsed by the resource package (e.g.
// UnmarshalListener()) and sent to the Pubsub watchers.
package controller
import (
"context"
"errors"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/controller/version"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
// Controller manages the connection and stream to the control plane.
//
// It keeps track of what resources are being watched, and send new requests
// when new watches are added.
//
// It takes a pubsub (as an interface) as input. When a response is received,
// it's parsed, and the updates are sent to the pubsub.
type Controller struct {
config *bootstrap.ServerConfig
updateHandler pubsub.UpdateHandler
updateValidator xdsresource.UpdateValidatorFunc
logger *grpclog.PrefixLogger
cc *grpc.ClientConn // Connection to the management server.
vClient version.VersionedClient
stopRunGoroutine context.CancelFunc
backoff func(int) time.Duration
streamCh chan grpc.ClientStream
sendCh *buffer.Unbounded
mu sync.Mutex
// Message specific watch infos, protected by the above mutex. These are
// written to, after successfully reading from the update channel, and are
// read from when recovering from a broken stream to resend the xDS
// messages. When the user of this client object cancels a watch call,
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[xdsresource.ResourceType]map[string]bool
// versionMap contains the version that was acked (the version in the ack
// request that was sent on wire). The key is rType, the value is the
// version string, becaues the versions for different resource types should
// be independent.
versionMap map[xdsresource.ResourceType]string
// nonceMap contains the nonce from the most recent received response.
nonceMap map[xdsresource.ResourceType]string
// Changes to map lrsClients and the lrsClient inside the map need to be
// protected by lrsMu.
//
// TODO: after LRS refactoring, each controller should only manage the LRS
// stream to its server. LRS streams to other servers should be managed by
// other controllers.
lrsMu sync.Mutex
lrsClients map[string]*lrsClient
}
var grpcDial = grpc.Dial
// SetGRPCDial sets the dialer for the controller. The dial can be used to
// manipulate the dial options or change the target if needed.
// The SetGRPCDial must be called before gRPC initialization to make sure it
// affects all the controllers created.
// To reset any dialer set, pass in grpc.Dial as the parameter.
func SetGRPCDial(dialer func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error)) {
grpcDial = dialer
}
// New creates a new controller.
func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (_ *Controller, retErr error) {
switch {
case config == nil:
return nil, errors.New("xds: no xds_server provided")
case config.ServerURI == "":
return nil, errors.New("xds: no xds_server name provided in options")
case config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
dopts := []grpc.DialOption{
config.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}
if boff == nil {
boff = backoff.DefaultExponential.Backoff
}
ret := &Controller{
config: config,
updateValidator: validator,
updateHandler: updateHandler,
backoff: boff,
streamCh: make(chan grpc.ClientStream, 1),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[xdsresource.ResourceType]map[string]bool),
versionMap: make(map[xdsresource.ResourceType]string),
nonceMap: make(map[xdsresource.ResourceType]string),
lrsClients: make(map[string]*lrsClient),
}
defer func() {
if retErr != nil {
ret.Close()
}
}()
cc, err := grpcDial(config.ServerURI, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial control plane {%s}: %v", config.ServerURI, err)
}
ret.cc = cc
builder := version.GetAPIClientBuilder(config.TransportAPI)
if builder == nil {
return nil, fmt.Errorf("no client builder for xDS API version: %v", config.TransportAPI)
}
apiClient, err := builder(version.BuildOptions{NodeProto: config.NodeProto, Logger: logger})
if err != nil {
return nil, err
}
ret.vClient = apiClient
ctx, cancel := context.WithCancel(context.Background())
ret.stopRunGoroutine = cancel
go ret.run(ctx)
return ret, nil
}
// Close closes the controller.
func (t *Controller) Close() {
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
if t.stopRunGoroutine != nil {
t.stopRunGoroutine()
}
if t.cc != nil {
t.cc.Close()
}
}