Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert nhooyr.io/websocket #357

Merged
merged 1 commit into from Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 30 additions & 26 deletions v2/delivery/websocket.go
@@ -1,9 +1,9 @@
package delivery

import (
"context"
"nhooyr.io/websocket"
"time"

"github.com/gorilla/websocket"
)

// WsHandler handle raw websocket message
Expand All @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig {
}

var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
ctx, cancel := context.WithCancel(context.Background())
c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil)
c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil)
if err != nil {
cancel()
return nil, nil, err
}
c.SetReadLimit(655350)
Expand All @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
// websocket.Conn.ReadMessage or when the stopC channel is
// closed by the client.
defer close(doneC)
defer cancel()
if WebsocketKeepalive {
go keepAlive(ctx, c, WebsocketTimeout)
keepAlive(c, WebsocketTimeout)
}
// Wait for the stopC channel to be closed. We do that in a
// separate goroutine because ReadMessage is a blocking
Expand All @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
silent = true
case <-doneC:
}
_ = c.Close(websocket.StatusNormalClosure, "normal closure")
c.Close()
}()
for {
_, message, readErr := c.Read(ctx)
if readErr != nil {
_, message, err := c.ReadMessage()
if err != nil {
if !silent {
errHandler(readErr)
errHandler(err)
}
return
}
Expand All @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
return
}

func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) {
t := time.NewTimer(d)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
func keepAlive(c *websocket.Conn, timeout time.Duration) {
ticker := time.NewTicker(timeout)

err := c.Ping(ctx)
if err != nil {
return
}
lastResponse := time.Now()
c.SetPongHandler(func(msg string) error {
lastResponse = time.Now()
return nil
})

t.Reset(d)
}
go func() {
defer ticker.Stop()
for {
deadline := time.Now().Add(10 * time.Second)
err := c.WriteControl(websocket.PingMessage, []byte{}, deadline)
if err != nil {
return
}
<-ticker.C
if time.Since(lastResponse) > timeout {
c.Close()
return
}
}
}()
}
56 changes: 30 additions & 26 deletions v2/futures/websocket.go
@@ -1,9 +1,9 @@
package futures

import (
"context"
"nhooyr.io/websocket"
"time"

"github.com/gorilla/websocket"
)

// WsHandler handle raw websocket message
Expand All @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig {
}

var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
ctx, cancel := context.WithCancel(context.Background())
c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil)
c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil)
if err != nil {
cancel()
return nil, nil, err
}
c.SetReadLimit(655350)
Expand All @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
// websocket.Conn.ReadMessage or when the stopC channel is
// closed by the client.
defer close(doneC)
defer cancel()
if WebsocketKeepalive {
go keepAlive(ctx, c, WebsocketTimeout)
keepAlive(c, WebsocketTimeout)
}
// Wait for the stopC channel to be closed. We do that in a
// separate goroutine because ReadMessage is a blocking
Expand All @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
silent = true
case <-doneC:
}
_ = c.Close(websocket.StatusNormalClosure, "normal closure")
c.Close()
}()
for {
_, message, readErr := c.Read(ctx)
if readErr != nil {
_, message, err := c.ReadMessage()
if err != nil {
if !silent {
errHandler(readErr)
errHandler(err)
}
return
}
Expand All @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
return
}

func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) {
t := time.NewTimer(d)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
func keepAlive(c *websocket.Conn, timeout time.Duration) {
ticker := time.NewTicker(timeout)

err := c.Ping(ctx)
if err != nil {
return
}
lastResponse := time.Now()
c.SetPongHandler(func(msg string) error {
lastResponse = time.Now()
return nil
})

t.Reset(d)
}
go func() {
defer ticker.Stop()
for {
deadline := time.Now().Add(10 * time.Second)
err := c.WriteControl(websocket.PingMessage, []byte{}, deadline)
if err != nil {
return
}
<-ticker.C
if time.Since(lastResponse) > timeout {
c.Close()
return
}
}
}()
}
3 changes: 1 addition & 2 deletions v2/go.mod
Expand Up @@ -5,8 +5,7 @@ go 1.13
require (
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/klauspost/compress v1.13.1 // indirect
github.com/gorilla/websocket v1.5.0
github.com/kr/pretty v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
nhooyr.io/websocket v1.8.7
)
62 changes: 4 additions & 58 deletions v2/go.sum
Expand Up @@ -2,76 +2,22 @@ github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkN
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.1 h1:wXr2uRxZTJXHLly6qhJabee5JqIhTRoLBhDOA74hDEQ=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
56 changes: 30 additions & 26 deletions v2/websocket.go
@@ -1,9 +1,9 @@
package binance

import (
"context"
"nhooyr.io/websocket"
"time"

"github.com/gorilla/websocket"
)

// WsHandler handle raw websocket message
Expand All @@ -24,10 +24,8 @@ func newWsConfig(endpoint string) *WsConfig {
}

var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) {
ctx, cancel := context.WithCancel(context.Background())
c, _, err := websocket.Dial(ctx, cfg.Endpoint, nil)
c, _, err := websocket.DefaultDialer.Dial(cfg.Endpoint, nil)
if err != nil {
cancel()
return nil, nil, err
}
c.SetReadLimit(655350)
Expand All @@ -38,9 +36,8 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
// websocket.Conn.ReadMessage or when the stopC channel is
// closed by the client.
defer close(doneC)
defer cancel()
if WebsocketKeepalive {
go keepAlive(ctx, c, WebsocketTimeout)
keepAlive(c, WebsocketTimeout)
}
// Wait for the stopC channel to be closed. We do that in a
// separate goroutine because ReadMessage is a blocking
Expand All @@ -52,13 +49,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
silent = true
case <-doneC:
}
_ = c.Close(websocket.StatusNormalClosure, "normal closure")
c.Close()
}()
for {
_, message, readErr := c.Read(ctx)
if readErr != nil {
_, message, err := c.ReadMessage()
if err != nil {
if !silent {
errHandler(readErr)
errHandler(err)
}
return
}
Expand All @@ -68,21 +65,28 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don
return
}

func keepAlive(ctx context.Context, c *websocket.Conn, d time.Duration) {
t := time.NewTimer(d)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
func keepAlive(c *websocket.Conn, timeout time.Duration) {
ticker := time.NewTicker(timeout)

err := c.Ping(ctx)
if err != nil {
return
}
lastResponse := time.Now()
c.SetPongHandler(func(msg string) error {
lastResponse = time.Now()
return nil
})

t.Reset(d)
}
go func() {
defer ticker.Stop()
for {
deadline := time.Now().Add(10 * time.Second)
err := c.WriteControl(websocket.PingMessage, []byte{}, deadline)
if err != nil {
return
}
<-ticker.C
if time.Since(lastResponse) > timeout {
c.Close()
return
}
}
}()
}