Skip to content

Commit

Permalink
use int32 instead of bool when checking whether to handle requests (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Jul 21, 2020
1 parent e932486 commit 21c1c67
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 108 deletions.
65 changes: 0 additions & 65 deletions graphql/graphql_test.go
Expand Up @@ -37,40 +37,6 @@ func TestBuildSchema(t *testing.T) {
}
}

// Tests that a graphql handler can be added to an existing HTTPServer
func TestGQLAllowed(t *testing.T) {
stack := createNode(t, true)
defer stack.Close()
// start node
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
}
// check that server was created
server := stack.ExistingHTTPServer("127.0.0.1:9393")
if server == nil {
t.Errorf("server was not created on the given endpoint")
}
// assert that server allows GQL requests
assert.True(t, server.GQLAllowed)
}

// Tests to make sure an HTTPServer is created that handles for http, ws, and graphQL
func TestMultiplexedServer(t *testing.T) {
stack := createNode(t, true)
defer stack.Close()
// start the node
if err := stack.Start(); err != nil {
t.Error("could not start http service on node ", err)
}
server := stack.ExistingHTTPServer("127.0.0.1:9393")
if server == nil {
t.Fatalf("server was not configured on the given endpoint")
}
assert.True(t, server.RPCAllowed)
assert.True(t, server.WSAllowed)
assert.True(t, server.GQLAllowed)
}

// Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint
func TestGraphQLHTTPOnSamePort_GQLRequest_Successful(t *testing.T) {
stack := createNode(t, true)
Expand Down Expand Up @@ -109,7 +75,6 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
if server == nil {
t.Fatalf("server was not created on the given endpoint")
}
assert.False(t, server.GQLAllowed)
// create http request
body := strings.NewReader("{\"query\": \"{block{number}}\",\"variables\": null}")
gqlReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/graphql", "127.0.0.1:9393"), body)
Expand All @@ -128,36 +93,6 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
assert.Equal(t, string(bodyBytes), expected)
}

// Tests that graphql can be successfully enabled on a separate port than rpc and ws.
func TestGraphqlOnSeparatePort(t *testing.T) {
stack := createNode(t, false)
defer stack.Close()

separateTestEndpoint := "127.0.0.1:7474"

createGQLService(t, stack, separateTestEndpoint)
// start node
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
}
// create http request
body := strings.NewReader("{\"query\": \"{block{number}}\",\"variables\": null}")
gqlReq, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/graphql", separateTestEndpoint), body)
if err != nil {
t.Error("could not issue new http request ", err)
}
gqlReq.Header.Set("Content-Type", "application/json")
// read from response
resp := doHTTPRequest(t, gqlReq)
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("could not read from response body: %v", err)
}
expected := "{\"data\":{\"block\":{\"number\":\"0x0\"}}}"
assert.Equal(t, expected, string(bodyBytes))

}

func createNode(t *testing.T, gqlEnabled bool) *node.Node {
stack, err := node.New(&node.Config{
HTTPHost: "127.0.0.1",
Expand Down
8 changes: 4 additions & 4 deletions graphql/service.go
Expand Up @@ -17,13 +17,12 @@
package graphql

import (
"net/http"

"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/graph-gophers/graphql-go"
"github.com/graph-gophers/graphql-go/relay"
"net/http"
)

// New constructs a new GraphQL service instance.
Expand All @@ -34,7 +33,7 @@ func New(stack *node.Node, backend ethapi.Backend, endpoint string, cors, vhosts
// check if http server with given endpoint exists and enable graphQL on it
server := stack.ExistingHTTPServer(endpoint)
if server != nil {
server.GQLAllowed = true
// set vhosts, cors and timeouts
server.Vhosts = append(server.Vhosts, vhosts...)
server.CorsAllowedOrigins = append(server.CorsAllowedOrigins, cors...)
server.Timeouts = timeouts
Expand All @@ -54,10 +53,11 @@ func New(stack *node.Node, backend ethapi.Backend, endpoint string, cors, vhosts
}
// create the http server
gqlServer := &node.HTTPServer{
RPCAllowed: 0,
WSAllowed: 0,
Vhosts: vhosts,
CorsAllowedOrigins: cors,
Timeouts: timeouts,
GQLAllowed: true,
GQLHandler: handler,
Srv: rpc.NewServer(),
}
Expand Down
19 changes: 10 additions & 9 deletions node/api.go
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -160,7 +161,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
endpoint := fmt.Sprintf("%s:%d", *host, *port)
// check if HTTP server already exists
if server, exists := api.node.httpServers[endpoint]; exists {
if server.RPCAllowed {
if atomic.LoadInt32(&server.RPCAllowed) == 1 {
return false, fmt.Errorf("HTTP RPC already running on %v", server.Listener.Addr())
}
}
Expand Down Expand Up @@ -221,7 +222,7 @@ func (api *PrivateAdminAPI) StopRPC() (bool, error) {
defer api.node.lock.Unlock()

for _, httpServer := range api.node.httpServers {
if httpServer.RPCAllowed {
if atomic.LoadInt32(&httpServer.RPCAllowed) == 1 {
api.node.stopServer(httpServer)
return true, nil
}
Expand All @@ -236,7 +237,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
defer api.node.lock.Unlock()
// check if an existing WS server already exists
for _, server := range api.node.httpServers {
if server.WSAllowed {
if atomic.LoadInt32(&server.WSAllowed) == 1 {
return false, fmt.Errorf("WebSocket RPC already running on %v", server.Listener.Addr())
}
}
Expand All @@ -255,7 +256,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
// check if there is an existing server on the specified port, and if there is, enable ws on it
if server, exists := api.node.httpServers[endpoint]; exists {
// else configure ws on the existing server
server.WSAllowed = true
atomic.AddInt32(&server.WSAllowed, 1)
// configure origins
origins := api.node.config.WSOrigins
if allowedOrigins != nil {
Expand All @@ -280,7 +281,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
// check if an HTTP server exists on the given endpoint, and if so, enable websocket on that HTTP server
existingServer := api.node.ExistingHTTPServer(endpoint)
if existingServer != nil {
existingServer.WSAllowed = true
atomic.AddInt32(&existingServer.WSAllowed, 1)
existingServer.WsOrigins = origins

}
Expand All @@ -300,7 +301,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
port: *port,
Whitelist: modules,
WsOrigins: origins,
WSAllowed: true,
WSAllowed: 1,
}
// create handler
wsServer.handler = wsServer.Srv.WebsocketHandler(wsServer.WsOrigins)
Expand All @@ -323,10 +324,10 @@ func (api *PrivateAdminAPI) StopWS() (bool, error) {
defer api.node.lock.Unlock()

for _, httpServer := range api.node.httpServers {
if httpServer.WSAllowed {
httpServer.WSAllowed = false
if atomic.LoadInt32(&httpServer.WSAllowed) == 1 {
atomic.AddInt32(&httpServer.WSAllowed, int32(-1))
// if RPC is not enabled on the WS http server, shut it down
if !httpServer.RPCAllowed && !httpServer.GQLAllowed { // TODO is the gql check necessary? Can GQL ever be on a WS server that doesn't also support regular http?
if atomic.LoadInt32(&httpServer.RPCAllowed) == 0 {
api.node.stopServer(httpServer)
return true, nil
}
Expand Down
35 changes: 14 additions & 21 deletions node/node.go
Expand Up @@ -29,6 +29,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -142,11 +143,11 @@ func New(conf *Config) (*Node, error) {
endpoint: conf.HTTPEndpoint(),
host: conf.HTTPHost,
port: conf.HTTPPort,
RPCAllowed: true,
RPCAllowed: 1,
}
// check if ws is enabled and if ws port is the same as http port
if conf.WSHost != "" && conf.WSPort == conf.HTTPPort {
httpServ.WSAllowed = true
httpServ.WSAllowed = 1
httpServ.WsOrigins = conf.WSOrigins
httpServ.Whitelist = append(httpServ.Whitelist, conf.WSModules...)

Expand All @@ -157,13 +158,13 @@ func New(conf *Config) (*Node, error) {
}
if conf.WSHost != "" {
node.httpServers[conf.WSEndpoint()] = &HTTPServer{
WsOrigins: conf.WSOrigins,
Whitelist: conf.WSModules,
Srv: rpc.NewServer(),
endpoint: conf.WSEndpoint(),
host: conf.WSHost,
port: conf.WSPort,
WSAllowed: true,
WsOrigins: conf.WSOrigins,
Whitelist: conf.WSModules,
Srv: rpc.NewServer(),
endpoint: conf.WSEndpoint(),
host: conf.WSHost,
port: conf.WSPort,
WSAllowed: 1,
}
}

Expand Down Expand Up @@ -352,29 +353,21 @@ func (n *Node) configureRPC() error {

for _, server := range n.httpServers {
// configure the handlers
if server.RPCAllowed {
if atomic.LoadInt32(&server.RPCAllowed) == 1 {
server.handler = NewHTTPHandlerStack(server.Srv, server.CorsAllowedOrigins, server.Vhosts)
// wrap ws handler just in case ws is enabled through the console after start-up
wsHandler := server.Srv.WebsocketHandler(server.WsOrigins)
server.handler = server.NewWebsocketUpgradeHandler(server.handler, wsHandler)

n.log.Info("HTTP configured on endpoint ", "endpoint", server.endpoint)
if server.WSAllowed {
if atomic.LoadInt32(&server.WSAllowed) == 1 {
n.log.Info("Websocket configured on endpoint ", "endpoint", server.endpoint)
}
}
if server.WSAllowed && server.handler == nil {
if (atomic.LoadInt32(&server.WSAllowed) == 1) && server.handler == nil {
server.handler = server.Srv.WebsocketHandler(server.WsOrigins)
n.log.Info("Websocket configured on endpoint ", "endpoint", server.endpoint)
}
if server.GQLAllowed {
if server.handler == nil {
server.handler = server.GQLHandler
} else {
server.handler = NewGQLUpgradeHandler(server.handler, server.GQLHandler)
}
n.log.Info("GraphQL configured on endpoint ", "endpoint", server.endpoint)
}
// create the HTTP server
if err := n.CreateHTTPServer(server, false); err != nil {
return err
Expand Down Expand Up @@ -580,7 +573,7 @@ func (n *Node) WSEndpoint() string {
defer n.lock.Unlock()

for _, httpServer := range n.httpServers {
if httpServer.WSAllowed {
if atomic.LoadInt32(&httpServer.WSAllowed) == 1 {
if httpServer.Listener != nil {
return httpServer.Listener.Addr().String()
}
Expand Down
6 changes: 4 additions & 2 deletions node/node_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"reflect"
"sync/atomic"
"testing"

"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -423,7 +424,8 @@ func TestHTTPServerCreateAndStop(t *testing.T) {
}
// check to make sure http servers are configured properly
for _, server := range node1.httpServers {
if !(server.WSAllowed && server.RPCAllowed) {

if atomic.LoadInt32(&server.WSAllowed) == 0 && atomic.LoadInt32(&server.RPCAllowed) == 0 {
t.Fatalf("node's http server is not configured to handle both rpc and ws")
}
node1.stopServer(server)
Expand All @@ -450,7 +452,7 @@ func TestHTTPServerCreateAndStop(t *testing.T) {
}
// check that neither http server has both ws and rpc enabled
for _, server := range node2.httpServers {
if server.WSAllowed && server.RPCAllowed {
if atomic.LoadInt32(&server.WSAllowed) == 1 && atomic.LoadInt32(&server.RPCAllowed) == 1 {
t.Fatalf("both rpc and ws allowed on a single http server")
}
node2.stopServer(server)
Expand Down
8 changes: 4 additions & 4 deletions node/rpcstack.go
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -52,9 +53,8 @@ type HTTPServer struct {
WsOrigins []string
Timeouts rpc.HTTPTimeouts

RPCAllowed bool
WSAllowed bool // TODO discuss this later bc possible race condition
GQLAllowed bool
RPCAllowed int32
WSAllowed int32

GQLHandler http.Handler
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func newGzipHandler(next http.Handler) http.Handler {
func (hs *HTTPServer) NewWebsocketUpgradeHandler(h http.Handler, ws http.Handler) http.Handler {
// TODO make sure you protect the pointer
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if hs.WSAllowed && isWebsocket(r) {
if atomic.LoadInt32(&hs.WSAllowed) == 1 && isWebsocket(r) {
ws.ServeHTTP(w, r)
log.Debug("serving websocket request")
return
Expand Down
7 changes: 4 additions & 3 deletions node/rpcstack_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"

"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -13,7 +14,7 @@ import (
func TestNewWebsocketUpgradeHandler_websocket(t *testing.T) {
h := &HTTPServer{
Srv: rpc.NewServer(),
WSAllowed: true,
WSAllowed: 1,
}
handler := h.NewWebsocketUpgradeHandler(nil, h.Srv.WebsocketHandler([]string{}))
ts := httptest.NewServer(handler)
Expand Down Expand Up @@ -63,6 +64,6 @@ func TestWSAllowed(t *testing.T) {
t.Fatalf("server was not started on the given endpoint: %v", err)
}
// assert that both RPC and WS are allowed on the HTTP Server
assert.True(t, server.RPCAllowed)
assert.True(t, server.WSAllowed)
assert.Equal(t, atomic.LoadInt32(&server.RPCAllowed), int32(1))
assert.Equal(t, atomic.LoadInt32(&server.WSAllowed), int32(1))
}

0 comments on commit 21c1c67

Please sign in to comment.