forked from gazette/core
/
server.go
190 lines (166 loc) · 6.73 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package server
import (
"context"
"fmt"
"net"
"net/http"
"time"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// Server bundles gRPC & HTTP servers, multiplexed over a single bound TCP
// socket (using CMux). Additional protocols may be added to the Server by
// interacting directly with its provided CMux.
type Server struct {
// RawListener is the bound TCP listener of the Server.
RawListener *net.TCPListener
// CMux wraps RawListener to provide connection protocol multiplexing over
// a single bound socket. gRPC and HTTP Listeners are provided by default.
// Additional Listeners may be added directly via CMux.Match() -- though
// it is then the user's responsibility to Serve the resulting Listeners.
CMux cmux.CMux
// GRPCListener is a CMux Listener for gRPC connections.
GRPCListener net.Listener
// HTTPListener is a CMux Listener for HTTP connections.
HTTPListener net.Listener
// HTTPMux is the http.ServeMux which is served by Serve().
HTTPMux *http.ServeMux
// GRPCServer is the gRPC server mux which is served by Serve().
GRPCServer *grpc.Server
// GRPCLoopback is a dialed connection to this GRPCServer.
GRPCLoopback *grpc.ClientConn
httpServer http.Server
}
// New builds and returns a Server of the given TCP network interface |iface|
// and |port|. |port| may be zero, in which case a random free port is assigned.
func New(iface string, port uint16) (*Server, error) {
var addr = fmt.Sprintf("%s:%d", iface, port)
var raw, err = net.Listen("tcp", addr)
if err != nil {
return nil, errors.Wrapf(err, "failed to bind service address (%s)", addr)
}
var srv = &Server{
HTTPMux: http.DefaultServeMux,
GRPCServer: grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
),
RawListener: raw.(*net.TCPListener),
}
srv.CMux = cmux.New(srv.RawListener)
srv.CMux.HandleError(func(err error) bool {
if _, ok := err.(net.Error); !ok {
log.WithField("err", err).Warn("failed to CMux client connection to a listener")
}
return true // Continue serving RawListener.
})
// CMux ReadTimeout controls how long we'll wait for an opening send from
// the client which allows CMux to sniff a matching listening mux. It has
// no effect once the connection has been matched to a mux.
// See: https://github.com/soheilhy/cmux/issues/76
srv.CMux.SetReadTimeout(GracefulStopTimeout / 2)
// GRPCListener sniffs for HTTP/2 in-the-clear connections which have
// "Content-Type: application/grpc". Note this matcher will send an initial
// empty SETTINGS frame to the client, as gRPC clients delay the first
// request until the HTTP/2 handshake has completed.
srv.GRPCListener = srv.CMux.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
srv.GRPCLoopback, err = grpc.DialContext(
context.Background(),
srv.RawListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pb.DispatcherGRPCBalancerName)),
// This grpc.ClientConn connects to this server's loopback, and also
// to peer server addresses via the dispatch balancer. It has particular
// knowledge of what addresses *should* be reach-able (from Etcd
// advertisements). Use an aggressive back-off for server-to-server
// connections, as it's crucial for quick cluster recovery from
// partitions, etc.
grpc.WithBackoffMaxDelay(time.Millisecond*500),
// Instrument client for gRPC metric collection.
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial gRPC loopback")
}
// Connections sending HTTP/1 verbs (GET, PUT, POST etc) are assumed to be HTTP.
srv.HTTPListener = srv.CMux.Match(cmux.HTTP1Fast())
return srv, nil
}
// MustLoopback builds and returns a new Server instance bound to a random
// port on the loopback interface. It panics on error.
func MustLoopback() *Server {
if srv, err := New("127.0.0.1", 0); err != nil {
log.WithField("err", err).Panic("failed to build Server")
panic("not reached")
} else {
return srv
}
}
// Endpoint of the Server.
func (s *Server) Endpoint() pb.Endpoint {
return pb.Endpoint("http://" + s.RawListener.Addr().String())
}
// QueueTasks serving the CMux, HTTP, and gRPC component servers onto the task.Group.
// If additional Listeners are derived from the Server.CMux, attempts to Accept
// will block until the CMux itself begins serving.
func (s *Server) QueueTasks(tg *task.Group) {
tg.Queue("server.ServeCMux", func() error {
if err := s.CMux.Serve(); err != nil && tg.Context().Err() == nil {
return err
}
return nil // Swallow error on cancellation.
})
tg.Queue("server.ServeHTTP", func() error {
// Disable Close() of the HTTPListener, because http.Server Shutdown()
// is invoked after grpc.Server GracefulStop(), which has already closed
// the underlying listener.
var ln = noopCloser{s.HTTPListener}
s.httpServer.Handler = s.HTTPMux
if err := s.httpServer.Serve(ln); err != nil && tg.Context().Err() == nil {
return err
}
return nil // Swallow error on cancellation.
})
tg.Queue("server.ServeGRPC", func() error {
if err := s.GRPCServer.Serve(s.GRPCListener); err != grpc.ErrServerStopped {
return err
}
return nil
})
}
// BoundedGracefulStop attempts to perform a graceful stop of the server,
// but falls back to a hard stop if the graceful stop doesn't complete
// reasonably quickly.
func (s *Server) BoundedGracefulStop() {
var ctx, cancel = context.WithCancel(context.Background())
var timer = time.AfterFunc(GracefulStopTimeout, func() {
log.Error("grpc.GracefulStop took too long, issuing a hard Stop")
// Close loopback even though the server isn't stopped, to unblock any
// requests which may be wedged sending to an unresponsive peer.
_ = s.GRPCLoopback.Close()
s.GRPCServer.Stop()
cancel()
})
// GracefulStop immediately closes the underlying RawListener.
s.GRPCServer.GracefulStop()
// Shutdown causes httpServer.Serve to return immediately.
if err := s.httpServer.Shutdown(ctx); err != nil {
log.WithField("err", err).Error("http.Server Shutdown finished with error")
}
timer.Stop()
}
type noopCloser struct {
net.Listener
}
func (noopCloser) Close() error { return nil }
// GracefulStopTimeout is the amount of time BoundedGracefulStop will wait
// before performing a hard server Stop.
var GracefulStopTimeout = 15 * time.Second