Skip to content

Commit

Permalink
only do compression if requested explictly when forwarding/fetching (#…
Browse files Browse the repository at this point in the history
…625)

* only do compression if requested explictly when forwarding/fetching (fixes #624) also fix a data race and also enable delay,close and header query args for debug endpoint

* allow content-type (and content-length though that can create invalid data) to be passed as echo/debug header= to override the default

* readme update for debug handler features

* add (indirect) test for gzip compression on debug handler (and restore coverage)

* actually exercise gzip handling in the test + catch the missing payload earlier than webtest
  • Loading branch information
ldemailly committed Sep 19, 2022
1 parent c72b231 commit c9eaf5e
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 38 deletions.
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -360,6 +360,8 @@ Fortio `server` has the following feature for the http listening on 8080 (all pa
| header | header(s) to add to the reply e.g. `&header=Foo:Bar&header=X:Y` |
| gzip | If `Accept-Encoding: gzip` is passed in headers by the caller/client; and `gzip=true` is in the query args, all response will be gzipped; or if `gzip=42.7` is passed, approximately 42.7% will|

`delay`, `close` and `header` query arguments are also supported for the `debug` endpoint which echoes back the request (gzip is always done if `Accept-Encoding: gzip` is present, status is always 200, and the payload is the echo back debug information).

You can set a default value for all these by passing `-echo-server-default-params` to the server command line, for instance:
`fortio server -echo-server-default-params="delay=0.5s:50,1s:40&status=418"` will make the server respond with http 418 and a delay of either 0.5s half of the time, 1s 40% and no delay in 10% of the calls; unless any `?` query args is passed by the client. Note that the quotes (") are for the shell to escape the ampersand (&) but should not be put in a yaml nor the dynamicflag url for instance.

Expand Down
3 changes: 3 additions & 0 deletions fhttp/http_forwarder.go
Expand Up @@ -247,6 +247,9 @@ func CreateProxyClient() *http.Client {
// TODO make configurable, should be fine for now for most but extreme -c values
MaxIdleConnsPerHost: 128, // must be more than incoming parallelization; divided by number of fan out if using parallel mode
MaxIdleConns: 256,
// This avoids Accept-Encoding: gzip being added to outgoing requests when no encoding accept is specified
// yet if passed by request, it will do gzip end to end. Issue #624.
DisableCompression: true,
},
}
return client
Expand Down
4 changes: 4 additions & 0 deletions fhttp/http_forwarder_test.go
Expand Up @@ -43,6 +43,10 @@ func TestMultiProxy(t *testing.T) {
if !bytes.Contains(data, []byte(payload)) {
t.Errorf("Result %s doesn't contain expected payload echo back %q", DebugSummary(data, 1024), payload)
}
// Issue #624
if bytes.Contains(data, []byte("gzip")) {
t.Errorf("Result %s contains unexpected gzip (accept encoding)", DebugSummary(data, 1024))
}
if !bytes.Contains(data, []byte("X-Fortio-Multi-Id: 1")) {
t.Errorf("Result %s doesn't contain expected X-Fortio-Multi-Id: 1", DebugSummary(data, 1024))
}
Expand Down
70 changes: 39 additions & 31 deletions fhttp/http_server.go
Expand Up @@ -33,6 +33,7 @@ import (

"fortio.org/fortio/dflag"
"fortio.org/fortio/fnet"
"fortio.org/fortio/jrpc"
"fortio.org/fortio/log"
"fortio.org/fortio/version"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -81,46 +82,20 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Debugf("Read %d", len(data))
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
handleCommonArgs(w, r)
statusStr := r.FormValue("status")
var status int
if statusStr != "" {
status = generateStatus(statusStr)
} else {
status = http.StatusOK
}
if log.LogDebug() {
// TODO: this easily lead to contention - use 'thread local'
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
gzip := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && generateGzip(r.FormValue("gzip"))
if gzip {
gwz := NewGzipHTTPResponseWriter(w)
defer gwz.Close()
w = gwz
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
size := generateSize(r.FormValue("size"))
if size >= 0 {
log.LogVf("Writing %d size with %d status", size, status)
Expand All @@ -130,7 +105,7 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
// echo back the Content-Type and Content-Length in the response
for _, k := range []string{"Content-Type", "Content-Length"} {
if v := r.Header.Get(k); v != "" {
w.Header().Set(k, v)
jrpc.SetHeaderIfMissing(w.Header(), k, v)
}
}
w.WriteHeader(status)
Expand All @@ -139,8 +114,40 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
}
}

// handleCommonArgs common flags for debug and echo handlers.
// Must be called after body is read.
func handleCommonArgs(w http.ResponseWriter, r *http.Request) {
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
if log.LogDebug() {
// Note this easily lead to contention, debug mode only (or low qps).
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
}

func writePayload(w http.ResponseWriter, status int, size int) {
w.Header().Set("Content-Type", "application/octet-stream")
jrpc.SetHeaderIfMissing(w.Header(), "Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(size))
w.WriteHeader(status)
n, err := w.Write(fnet.Payload[:size])
Expand Down Expand Up @@ -291,7 +298,7 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
buf.WriteString("Host: ")
buf.WriteString(r.Host)

var keys []string //nolint:prealloc // header is multi valued map,...
keys := make([]string, 0, len(r.Header))
for k := range r.Header {
keys = append(keys, k)
}
Expand Down Expand Up @@ -337,7 +344,8 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
buf.WriteByte('\n')
}
}
w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
handleCommonArgs(w, r)
jrpc.SetHeaderIfMissing(w.Header(), "Content-Type", "text/plain; charset=UTF-8")
if _, err = w.Write(buf.Bytes()); err != nil {
log.Errf("Error writing response %v to %v", err, r.RemoteAddr)
}
Expand Down
22 changes: 17 additions & 5 deletions fhttp/http_test.go
Expand Up @@ -1107,34 +1107,46 @@ func TestBadQueryUUIDClient(t *testing.T) {
}
}

// TestDebugHandlerSortedHeaders tests the headers are sorted but
// also tests post echo back and gzip handling.
func TestDebugHandlerSortedHeaders(t *testing.T) {
m, a := DynamicHTTPServer(false)
m.HandleFunc("/debug", DebugHandler)
url := fmt.Sprintf("http://localhost:%d/debug", a.Port)
o := HTTPOptions{URL: url, DisableFastClient: true}
m.Handle("/debug", Gzip(http.HandlerFunc(DebugHandler))) // same as in Serve()
// Debug handler does respect the delay arg but not status, status is always 200
url := fmt.Sprintf("http://localhost:%d/debug?delay=500ms&status=555", a.Port)
// Trigger transparent compression (which will add Accept-Encoding: gzip header)
o := HTTPOptions{URL: url, DisableFastClient: true, Compression: true, Payload: []byte("abcd")}
o.AddAndValidateExtraHeader("BBB: bbb")
o.AddAndValidateExtraHeader("CCC: ccc")
o.AddAndValidateExtraHeader("ZZZ: zzz")
o.AddAndValidateExtraHeader("AAA: aaa")
client, _ := NewClient(&o)
now := time.Now()
code, data, header := client.Fetch() // used to panic/bug #127
duration := time.Since(now)
t.Logf("TestDebugHandlerSortedHeaders result code %d, data len %d, headerlen %d", code, len(data), header)
if code != http.StatusOK {
t.Errorf("Got %d instead of 200", code)
}
if duration < 500*time.Millisecond {
t.Errorf("Got %s instead of 500ms", duration)
}
// remove the first line ('Φορτίο version...') from the body
body := string(data)
i := strings.Index(body, "\n")
body = body[i+1:]
expected := fmt.Sprintf("\nGET /debug HTTP/1.1\n\n"+
expected := fmt.Sprintf("\nPOST /debug?delay=500ms&status=555 HTTP/1.1\n\n"+
"headers:\n\n"+
"Host: localhost:%d\n"+
"Aaa: aaa\n"+
"Accept-Encoding: gzip\n"+
"Bbb: bbb\n"+
"Ccc: ccc\n"+
"Content-Length: 4\n"+
"Content-Type: application/octet-stream\n"+
"User-Agent: %s\n"+
"Zzz: zzz\n\n"+
"body:\n\n\n", a.Port, jrpc.UserAgent)
"body:\n\nabcd\n", a.Port, jrpc.UserAgent)
if body != expected {
t.Errorf("Get body: %s not as expected: %s", body, expected)
}
Expand Down
5 changes: 3 additions & 2 deletions periodic/periodic.go
Expand Up @@ -105,12 +105,13 @@ func (a *Aborter) Abort(wait bool) {
return
}
a.stopRequested = true
if a.hasStarted || !wait {
started := a.hasStarted
if started || !wait {
log.LogVf("ABORT Closing %v", a)
close(a.StopChan)
a.StopChan = nil
a.Unlock()
if a.hasStarted {
if started {
log.LogVf("ABORT reading start channel")
// shouldn't block/hang, just purging/resetting
<-a.StartChan
Expand Down

0 comments on commit c9eaf5e

Please sign in to comment.