Skip to content

Commit

Permalink
Merge pull request klaytn#1652 from JayChoi1736/bidir
Browse files Browse the repository at this point in the history
Implement Bi-directional Communication
  • Loading branch information
JayChoi1736 committed Nov 10, 2022
2 parents eff54f4 + 41c7a0d commit f0aa7a3
Show file tree
Hide file tree
Showing 34 changed files with 2,307 additions and 2,062 deletions.
8 changes: 3 additions & 5 deletions go.mod
Expand Up @@ -7,11 +7,12 @@ require (
github.com/VictoriaMetrics/fastcache v1.6.0
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf
github.com/aristanetworks/goarista v0.0.0-20191001182449-186a6201b8ef
github.com/bt51/ntpclient v0.0.0-20140310165113-3045f71e2530 // indirect
github.com/aws/aws-sdk-go v1.34.28
github.com/bt51/ntpclient v0.0.0-20140310165113-3045f71e2530
github.com/cespare/cp v1.0.0
github.com/clevergo/websocket v1.0.0
github.com/davecgh/go-spew v1.1.1
github.com/deckarep/golang-set v1.8.0
github.com/dgraph-io/badger v1.6.0
github.com/docker/docker v1.13.1
github.com/edsrzf/mmap-go v1.0.0
Expand All @@ -24,15 +25,14 @@ require (
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/holiman/uint256 v1.2.0
github.com/huin/goupnp v1.0.3-0.20220313090229-ca81a64b4204
github.com/influxdata/influxdb v1.8.3
github.com/jackpal/go-nat-pmp v1.0.2
github.com/jinzhu/gorm v1.9.15
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/julienschmidt/httprouter v1.2.0
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.11
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
Expand Down Expand Up @@ -71,7 +71,5 @@ require (
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/urfave/cli.v1 v1.20.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gotest.tools v2.2.0+incompatible
)
55 changes: 23 additions & 32 deletions go.sum

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions networks/grpc/gServer.go
Expand Up @@ -27,6 +27,7 @@ import (
"encoding/json"
"io"
"net"
"time"

"github.com/klaytn/klaytn/common"
"github.com/klaytn/klaytn/log"
Expand All @@ -49,6 +50,8 @@ type grpcReadWriteNopCloser struct {
io.Writer
}

func (t *grpcReadWriteNopCloser) SetWriteDeadline(time.Time) error { return nil }

// Close does nothing and returns always nil.
func (t *grpcReadWriteNopCloser) Close() error {
return nil
Expand Down Expand Up @@ -126,7 +129,7 @@ func (kns *klaytnServer) BiCall(stream KlaytnNode_BiCallServer) error {
ctx := context.Background()

reader := bufio.NewReaderSize(preader, common.MaxRequestContentLength)
kns.handler.ServeSingleRequest(ctx, rpc.NewCodec(&grpcReadWriteNopCloser{reader, &grpcWriter{stream, nil}}, encoder, decoder), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
kns.handler.ServeSingleRequest(ctx, rpc.NewFuncCodec(&grpcReadWriteNopCloser{reader, &grpcWriter{stream, nil}}, encoder, decoder))
}
}

Expand Down Expand Up @@ -166,7 +169,7 @@ func (kns *klaytnServer) Subscribe(request *RPCRequest, stream KlaytnNode_Subscr
ctx := context.Background()

reader := bufio.NewReaderSize(preader, common.MaxRequestContentLength)
kns.handler.ServeSingleRequest(ctx, rpc.NewCodec(&grpcReadWriteNopCloser{reader, &grpcWriter{stream, writeErr}}, encoder, decoder), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
kns.handler.ServeSingleRequest(ctx, rpc.NewFuncCodec(&grpcReadWriteNopCloser{reader, &grpcWriter{stream, writeErr}}, encoder, decoder))

var err error
loop:
Expand Down Expand Up @@ -222,8 +225,7 @@ func (kns *klaytnServer) Call(ctx context.Context, request *RPCRequest) (*RPCRes
}

reader := bufio.NewReaderSize(preader, common.MaxRequestContentLength)
kns.handler.ServeSingleRequest(ctx, rpc.NewCodec(&grpcReadWriteNopCloser{reader, writer}, encoder, decoder), rpc.OptionMethodInvocation)

kns.handler.ServeSingleRequest(ctx, rpc.NewFuncCodec(&grpcReadWriteNopCloser{reader, writer}, encoder, decoder))
loop:
for {
select {
Expand Down
40 changes: 23 additions & 17 deletions networks/p2p/simulations/adapters/exec.go
Expand Up @@ -39,12 +39,12 @@ import (
"time"

"github.com/docker/docker/pkg/reexec"
"github.com/gorilla/websocket"
"github.com/klaytn/klaytn/log"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
"github.com/klaytn/klaytn/networks/rpc"
"github.com/klaytn/klaytn/node"
"golang.org/x/net/websocket"
)

var logger = log.NewModuleLogger(log.NetworksP2PSimulationsAdapters)
Expand Down Expand Up @@ -288,31 +288,37 @@ func (n *ExecNode) NodeInfo() *p2p.NodeInfo {

// ServeRPC serves RPC requests over the given connection by dialling the
// node's WebSocket address and joining the two connections
func (n *ExecNode) ServeRPC(clientConn net.Conn) error {
conn, err := websocket.Dial(n.wsAddr, "", "http://localhost")
func (n *ExecNode) ServeRPC(clientConn *websocket.Conn) error {
conn, _, err := websocket.DefaultDialer.Dial(n.wsAddr, nil)
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(2)
join := func(src, dst net.Conn) {
defer wg.Done()
io.Copy(dst, src)
// close the write end of the destination connection
if cw, ok := dst.(interface {
CloseWrite() error
}); ok {
cw.CloseWrite()
} else {
dst.Close()
}
}
go join(conn, clientConn)
go join(clientConn, conn)
go wsCopy(&wg, conn, clientConn)
go wsCopy(&wg, clientConn, conn)
wg.Wait()
conn.Close()
return nil
}

func wsCopy(wg *sync.WaitGroup, src, dst *websocket.Conn) {
defer wg.Done()
for {
msgType, r, err := src.NextReader()
if err != nil {
return
}
w, err := dst.NextWriter(msgType)
if err != nil {
return
}
if _, err = io.Copy(w, r); err != nil {
return
}
}
}

// Snapshots creates snapshots of the services by calling the
// simulation_snapshot RPC method
func (n *ExecNode) Snapshots() (map[string][]byte, error) {
Expand Down
6 changes: 4 additions & 2 deletions networks/p2p/simulations/adapters/inproc.go
Expand Up @@ -27,6 +27,7 @@ import (
"net"
"sync"

"github.com/gorilla/websocket"
"github.com/klaytn/klaytn/event"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
Expand Down Expand Up @@ -202,12 +203,13 @@ func (sn *SimNode) Client() (*rpc.Client, error) {

// ServeRPC serves RPC requests over the given connection by creating an
// in-memory client to the node's RPC server
func (sn *SimNode) ServeRPC(conn net.Conn) error {
func (sn *SimNode) ServeRPC(conn *websocket.Conn) error {
handler, err := sn.node.RPCHandler()
if err != nil {
return err
}
handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
codec := rpc.NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
handler.ServeCodec(codec, 0)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions networks/p2p/simulations/adapters/inproc_cn.go
Expand Up @@ -23,6 +23,7 @@ import (
"net"
"sync"

"github.com/gorilla/websocket"
"github.com/klaytn/klaytn/event"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
Expand Down Expand Up @@ -191,12 +192,13 @@ func (sn *CnNode) Client() (*rpc.Client, error) {

// ServeRPC serves RPC requests over the given connection by creating an
// in-memory client to the node's RPC server
func (sn *CnNode) ServeRPC(conn net.Conn) error {
func (sn *CnNode) ServeRPC(conn *websocket.Conn) error {
handler, err := sn.node.RPCHandler()
if err != nil {
return err
}
handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
codec := rpc.NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON)
handler.ServeCodec(codec, 0)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion networks/p2p/simulations/adapters/types.go
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"

"github.com/docker/docker/pkg/reexec"
"github.com/gorilla/websocket"
"github.com/klaytn/klaytn/crypto"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
Expand All @@ -53,7 +54,7 @@ type Node interface {
Client() (*rpc.Client, error)

// ServeRPC serves RPC requests over the given connection
ServeRPC(net.Conn) error
ServeRPC(*websocket.Conn) error

// Start starts the node with the given snapshots
Start(snapshots map[string][]byte) error
Expand Down
18 changes: 11 additions & 7 deletions networks/p2p/simulations/http.go
Expand Up @@ -33,13 +33,13 @@ import (
"strings"
"sync"

"github.com/gorilla/websocket"
"github.com/julienschmidt/httprouter"
"github.com/klaytn/klaytn/event"
"github.com/klaytn/klaytn/networks/p2p"
"github.com/klaytn/klaytn/networks/p2p/discover"
"github.com/klaytn/klaytn/networks/p2p/simulations/adapters"
"github.com/klaytn/klaytn/networks/rpc"
"golang.org/x/net/websocket"
)

// DefaultClient is the default simulation API client which expects the API
Expand Down Expand Up @@ -704,16 +704,20 @@ func (s *Server) Options(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}

var wsUpgrade = websocket.Upgrader{
CheckOrigin: func(*http.Request) bool { return true },
}

// NodeRPC forwards RPC requests to a node in the network via a WebSocket
// connection
func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) {
node := req.Context().Value("node").(*Node)

handler := func(conn *websocket.Conn) {
node.ServeRPC(conn)
conn, err := wsUpgrade.Upgrade(w, req, nil)
if err != nil {
return
}

websocket.Server{Handler: handler}.ServeHTTP(w, req)
defer conn.Close()
node := req.Context().Value("node").(*Node)
node.ServeRPC(conn)
}

// ServeHTTP implements the http.Handler interface by delegating to the
Expand Down

0 comments on commit f0aa7a3

Please sign in to comment.