Skip to content

Commit

Permalink
support resp3 protocol
Browse files Browse the repository at this point in the history
Signed-off-by: monkey <golang@88.com>
  • Loading branch information
monkey92t committed Apr 22, 2021
1 parent 06183e6 commit c6ab55a
Show file tree
Hide file tree
Showing 15 changed files with 883 additions and 372 deletions.
18 changes: 15 additions & 3 deletions bench_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ type ClientStub struct {
resp []byte
}

var initHello = []byte("%1\r\n+proto\r\n:3\r\n")

func NewClientStub(resp []byte) *ClientStub {
stub := &ClientStub{
resp: resp,
}

stub.Cmdable = NewClient(&Options{
PoolSize: 128,
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
})
return stub
Expand All @@ -40,7 +43,7 @@ func NewClusterClientStub(resp []byte) *ClientStub {
PoolSize: 128,
Addrs: []string{"127.0.0.1:6379"},
Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
return stub.stubConn(), nil
return stub.stubConn(initHello), nil
},
ClusterSlots: func(_ context.Context) ([]ClusterSlot, error) {
return []ClusterSlot{
Expand All @@ -65,18 +68,27 @@ func NewClusterClientStub(resp []byte) *ClientStub {
return stub
}

func (c *ClientStub) stubConn() *ConnStub {
func (c *ClientStub) stubConn(init []byte) *ConnStub {
return &ConnStub{
init: init,
resp: c.resp,
}
}

type ConnStub struct {
init []byte
resp []byte
pos int
}

func (c *ConnStub) Read(b []byte) (n int, err error) {
// Return conn.init()
if len(c.init) > 0 {
n = copy(b, c.init)
c.init = c.init[n:]
return n, nil
}

if len(c.resp) == 0 {
return 0, io.EOF
}
Expand Down
9 changes: 2 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,20 +1384,15 @@ func (c *ClusterClient) txPipelineReadQueued(
}

// Parse number of replies.
line, err := rd.ReadLine()
line, err := rd.Pathfinder()
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}

switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}

Expand Down
11 changes: 6 additions & 5 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,16 +1182,17 @@ var _ = Describe("ClusterClient with unavailable Cluster", func() {
var client *redis.ClusterClient

BeforeEach(func() {
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}

opt := redisClusterOptions()
opt.ReadTimeout = 250 * time.Millisecond
opt.WriteTimeout = 250 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.newClusterClientUnstable(opt)
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())

for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
})

AfterEach(func() {
Expand Down

0 comments on commit c6ab55a

Please sign in to comment.