From 6104134b9d02884f42d7b040d9560d058a3ec867 Mon Sep 17 00:00:00 2001 From: Nicholas Pettas Date: Mon, 25 Mar 2024 15:14:07 -0700 Subject: [PATCH] return 503 on unhealthy response --- rpc/client.go | 2 +- rpc/handler.go | 30 ++++++++++++++++++------------ rpc/http.go | 2 +- rpc/server.go | 5 +++-- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 198ce6357359c..088d3450b1e75 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -567,7 +567,7 @@ func (c *Client) dispatch(codec ServerCodec) { if op.batch { conn.handler.handleBatch(op.msgs) } else { - conn.handler.handleMsg(op.msgs[0]) + conn.handler.handleMsg(op.msgs[0], nil) } case err := <-c.readErr: diff --git a/rpc/handler.go b/rpc/handler.go index 65180ef055ea4..9934387b2988d 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "net/http" "reflect" "strconv" "strings" @@ -43,21 +44,20 @@ func SetExecutionTimeLimit(limit time.Duration) { // // The entry points for incoming messages are: // -// h.handleMsg(message) -// h.handleBatch(message) +// h.handleMsg(message) +// h.handleBatch(message) // // Outgoing calls use the requestOp struct. Register the request before sending it // on the connection: // -// op := &requestOp{ids: ...} -// h.addRequestOp(op) +// op := &requestOp{ids: ...} +// h.addRequestOp(op) // // Now send the request, then wait for the reply to be delivered through handleMsg: // -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// +// if err := op.wait(...); err != nil { +// h.removeRequestOp(op) // timeout, etc. +// } type handler struct { reg *serviceRegistry unsubscribeCb *callback @@ -125,7 +125,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { h.startCallProc(func(cp *callProc) { answers := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range calls { - if answer := h.handleCallMsg(cp, msg); answer != nil { + if answer := h.handleCallMsg(cp, msg, nil); answer != nil { answers = append(answers, answer) } } @@ -140,12 +140,12 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } // handleMsg handles a single message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { +func (h *handler) handleMsg(msg *jsonrpcMessage, w http.ResponseWriter) { if ok := h.handleImmediate(msg); ok { return } h.startCallProc(func(cp *callProc) { - answer := h.handleCallMsg(cp, msg) + answer := h.handleCallMsg(cp, msg, w) h.addSubscriptions(cp.notifiers) if answer != nil { h.conn.writeJSON(cp.ctx, answer) @@ -299,7 +299,7 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) { } // handleCallMsg executes a call message and returns the answer. -func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { +func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, w http.ResponseWriter) *jsonrpcMessage { start := time.Now() switch { case msg.isNotification(): @@ -319,6 +319,12 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess } else { h.log.Debug("Served "+msg.Method, ctx...) } + + // return 503 if eth_health is called and the node is not healthy + if w != nil && msg.Method == "eth_health" && resp.Error != nil { + w.WriteHeader(http.StatusServiceUnavailable) + } + return resp case msg.hasValidID(): return msg.errorResponse(&invalidRequestError{"invalid request"}) diff --git a/rpc/http.go b/rpc/http.go index 32f4e7d90a259..a45568a795d6d 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -253,7 +253,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", contentType) codec := newHTTPServerConn(r, w) defer codec.close() - s.serveSingleRequest(ctx, codec) + s.serveSingleRequest(ctx, codec, w) } // validateRequest returns a non-zero response code and error message if the diff --git a/rpc/server.go b/rpc/server.go index f9cee0b9c486c..4753902fdbeef 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -19,6 +19,7 @@ package rpc import ( "context" "io" + "net/http" "sync/atomic" mapset "github.com/deckarep/golang-set" @@ -91,7 +92,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { // serveSingleRequest reads and processes a single RPC request from the given codec. This // is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in // this mode. -func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { +func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, w http.ResponseWriter) { // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { return @@ -111,7 +112,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { if batch { h.handleBatch(reqs) } else { - h.handleMsg(reqs[0]) + h.handleMsg(reqs[0], w) } }