Skip to content

Commit

Permalink
return 503 on unhealthy response
Browse files Browse the repository at this point in the history
  • Loading branch information
nibty committed Mar 25, 2024
1 parent 41a0126 commit 6104134
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 16 deletions.
2 changes: 1 addition & 1 deletion rpc/client.go
Expand Up @@ -567,7 +567,7 @@ func (c *Client) dispatch(codec ServerCodec) {
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
conn.handler.handleMsg(op.msgs[0], nil)
}

case err := <-c.readErr:
Expand Down
30 changes: 18 additions & 12 deletions rpc/handler.go
Expand Up @@ -19,6 +19,7 @@ package rpc
import (
"context"
"encoding/json"
"net/http"
"reflect"
"strconv"
"strings"
Expand All @@ -43,21 +44,20 @@ func SetExecutionTimeLimit(limit time.Duration) {
//
// The entry points for incoming messages are:
//
// h.handleMsg(message)
// h.handleBatch(message)
// h.handleMsg(message)
// h.handleBatch(message)
//
// Outgoing calls use the requestOp struct. Register the request before sending it
// on the connection:
//
// op := &requestOp{ids: ...}
// h.addRequestOp(op)
// op := &requestOp{ids: ...}
// h.addRequestOp(op)
//
// Now send the request, then wait for the reply to be delivered through handleMsg:
//
// if err := op.wait(...); err != nil {
// h.removeRequestOp(op) // timeout, etc.
// }
//
// if err := op.wait(...); err != nil {
// h.removeRequestOp(op) // timeout, etc.
// }
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
Expand Down Expand Up @@ -125,7 +125,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
h.startCallProc(func(cp *callProc) {
answers := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range calls {
if answer := h.handleCallMsg(cp, msg); answer != nil {
if answer := h.handleCallMsg(cp, msg, nil); answer != nil {
answers = append(answers, answer)
}
}
Expand All @@ -140,12 +140,12 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}

// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
func (h *handler) handleMsg(msg *jsonrpcMessage, w http.ResponseWriter) {
if ok := h.handleImmediate(msg); ok {
return
}
h.startCallProc(func(cp *callProc) {
answer := h.handleCallMsg(cp, msg)
answer := h.handleCallMsg(cp, msg, w)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.writeJSON(cp.ctx, answer)
Expand Down Expand Up @@ -299,7 +299,7 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) {
}

// handleCallMsg executes a call message and returns the answer.
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, w http.ResponseWriter) *jsonrpcMessage {
start := time.Now()
switch {
case msg.isNotification():
Expand All @@ -319,6 +319,12 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
} else {
h.log.Debug("Served "+msg.Method, ctx...)
}

// return 503 if eth_health is called and the node is not healthy
if w != nil && msg.Method == "eth_health" && resp.Error != nil {
w.WriteHeader(http.StatusServiceUnavailable)
}

return resp
case msg.hasValidID():
return msg.errorResponse(&invalidRequestError{"invalid request"})
Expand Down
2 changes: 1 addition & 1 deletion rpc/http.go
Expand Up @@ -253,7 +253,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.close()
s.serveSingleRequest(ctx, codec)
s.serveSingleRequest(ctx, codec, w)
}

// validateRequest returns a non-zero response code and error message if the
Expand Down
5 changes: 3 additions & 2 deletions rpc/server.go
Expand Up @@ -19,6 +19,7 @@ package rpc
import (
"context"
"io"
"net/http"
"sync/atomic"

mapset "github.com/deckarep/golang-set"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, w http.ResponseWriter) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
Expand All @@ -111,7 +112,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
if batch {
h.handleBatch(reqs)
} else {
h.handleMsg(reqs[0])
h.handleMsg(reqs[0], w)
}
}

Expand Down

0 comments on commit 6104134

Please sign in to comment.