Skip to content

Commit

Permalink
rpcc: Allow the compression level to be changed
Browse files Browse the repository at this point in the history
  • Loading branch information
mafredri committed Jun 19, 2017
1 parent f6190f6 commit bcfae00
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
25 changes: 21 additions & 4 deletions rpcc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func WithWriteBufferSize(n int) DialOption {
}

// WithCompression returns a DialOption that enables compression for the
// underlying websocket connection.
// underlying websocket connection. Use SetCompressionLevel on Conn to
// change the default compression level for subsequent writes.
func WithCompression() DialOption {
return func(o *dialOptions) {
o.wsDialer.EnableCompression = true
Expand Down Expand Up @@ -140,6 +141,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}

if ws.EnableCompression {
c.compressionLevel = wsConn.SetCompressionLevel
}

return &wsNetConn{conn: wsConn}, nil
}
}
Expand Down Expand Up @@ -224,9 +230,7 @@ type Conn struct {
dialOpts dialOptions
conn net.Conn

// Codec encodes and decodes JSON onto conn. There is only one
// active decoder (recv) and encoder (guaranteed via reqMu).
codec Codec
compressionLevel func(level int) error

mu sync.Mutex // Protects following.
closed bool
Expand All @@ -235,6 +239,9 @@ type Conn struct {

reqMu sync.Mutex // Protects following.
req Request
// Encodes and decodes JSON onto conn. Encoding is
// guarded by mutex and decoding is done by recv.
codec Codec

streamMu sync.Mutex // Protects following.
streams map[string]*streamClients
Expand Down Expand Up @@ -463,6 +470,16 @@ func (c *Conn) close(err error) error {
return err
}

// SetCompressionLevel sets the flate compressions level for writes. Valid level
// range is [-2, 9]. Returns error if compression is not eanbled for Conn. See
// package compress/flate for a description of compression levels.
func (c *Conn) SetCompressionLevel(level int) error {
if c.compressionLevel == nil {
return errors.New("rpcc: compression is not enabled for Conn")
}
return c.compressionLevel(level)
}

// Close closes the connection.
func (c *Conn) Close() error {
return c.close(nil)
Expand Down
9 changes: 6 additions & 3 deletions rpcc/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpcc

import (
"compress/flate"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -33,12 +34,13 @@ func newTestServer(t testing.TB, respond func(*websocket.Conn, *Request) error)
var err error
ts := &testServer{}
upgrader := &websocket.Upgrader{
HandshakeTimeout: timeout,
HandshakeTimeout: timeout,
EnableCompression: true,
}

setupDone := make(chan struct{})
ts.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, r.Header)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -63,10 +65,11 @@ func newTestServer(t testing.TB, respond func(*websocket.Conn, *Request) error)
}
}))

ts.conn, err = Dial("ws" + strings.TrimPrefix(ts.srv.URL, "http"))
ts.conn, err = Dial("ws"+strings.TrimPrefix(ts.srv.URL, "http"), WithCompression())
if err != nil {
t.Fatal(err)
}
ts.conn.SetCompressionLevel(flate.BestSpeed)

<-setupDone
return ts
Expand Down

0 comments on commit bcfae00

Please sign in to comment.