Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only do compression if requested explictly when forwarding/fetching #625

Merged
merged 7 commits into from Sep 19, 2022
2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -360,6 +360,8 @@ Fortio `server` has the following feature for the http listening on 8080 (all pa
| header | header(s) to add to the reply e.g. `&header=Foo:Bar&header=X:Y` |
| gzip | If `Accept-Encoding: gzip` is passed in headers by the caller/client; and `gzip=true` is in the query args, all response will be gzipped; or if `gzip=42.7` is passed, approximately 42.7% will|

`delay`, `close` and `header` query arguments are also supported for the `debug` endpoint which echoes back the request (gzip is always done if `Accept-Encoding: gzip` is present, status is always 200, and the payload is the echo back debug information).

You can set a default value for all these by passing `-echo-server-default-params` to the server command line, for instance:
`fortio server -echo-server-default-params="delay=0.5s:50,1s:40&status=418"` will make the server respond with http 418 and a delay of either 0.5s half of the time, 1s 40% and no delay in 10% of the calls; unless any `?` query args is passed by the client. Note that the quotes (") are for the shell to escape the ampersand (&) but should not be put in a yaml nor the dynamicflag url for instance.

Expand Down
3 changes: 3 additions & 0 deletions fhttp/http_forwarder.go
Expand Up @@ -247,6 +247,9 @@ func CreateProxyClient() *http.Client {
// TODO make configurable, should be fine for now for most but extreme -c values
MaxIdleConnsPerHost: 128, // must be more than incoming parallelization; divided by number of fan out if using parallel mode
MaxIdleConns: 256,
// This avoids Accept-Encoding: gzip being added to outgoing requests when no encoding accept is specified
// yet if passed by request, it will do gzip end to end. Issue #624.
DisableCompression: true,
},
}
return client
Expand Down
4 changes: 4 additions & 0 deletions fhttp/http_forwarder_test.go
Expand Up @@ -43,6 +43,10 @@ func TestMultiProxy(t *testing.T) {
if !bytes.Contains(data, []byte(payload)) {
t.Errorf("Result %s doesn't contain expected payload echo back %q", DebugSummary(data, 1024), payload)
}
// Issue #624
if bytes.Contains(data, []byte("gzip")) {
t.Errorf("Result %s contains unexpected gzip (accept encoding)", DebugSummary(data, 1024))
}
if !bytes.Contains(data, []byte("X-Fortio-Multi-Id: 1")) {
t.Errorf("Result %s doesn't contain expected X-Fortio-Multi-Id: 1", DebugSummary(data, 1024))
}
Expand Down
70 changes: 39 additions & 31 deletions fhttp/http_server.go
Expand Up @@ -33,6 +33,7 @@ import (

"fortio.org/fortio/dflag"
"fortio.org/fortio/fnet"
"fortio.org/fortio/jrpc"
"fortio.org/fortio/log"
"fortio.org/fortio/version"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -81,46 +82,20 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Debugf("Read %d", len(data))
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
handleCommonArgs(w, r)
statusStr := r.FormValue("status")
var status int
if statusStr != "" {
status = generateStatus(statusStr)
} else {
status = http.StatusOK
}
if log.LogDebug() {
// TODO: this easily lead to contention - use 'thread local'
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
gzip := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && generateGzip(r.FormValue("gzip"))
if gzip {
gwz := NewGzipHTTPResponseWriter(w)
defer gwz.Close()
w = gwz
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
size := generateSize(r.FormValue("size"))
if size >= 0 {
log.LogVf("Writing %d size with %d status", size, status)
Expand All @@ -130,7 +105,7 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
// echo back the Content-Type and Content-Length in the response
for _, k := range []string{"Content-Type", "Content-Length"} {
if v := r.Header.Get(k); v != "" {
w.Header().Set(k, v)
jrpc.SetHeaderIfMissing(w.Header(), k, v)
}
}
w.WriteHeader(status)
Expand All @@ -139,8 +114,40 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
}
}

// handleCommonArgs common flags for debug and echo handlers.
// Must be called after body is read.
func handleCommonArgs(w http.ResponseWriter, r *http.Request) {
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
if log.LogDebug() {
// Note this easily lead to contention, debug mode only (or low qps).
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
}

func writePayload(w http.ResponseWriter, status int, size int) {
w.Header().Set("Content-Type", "application/octet-stream")
jrpc.SetHeaderIfMissing(w.Header(), "Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(size))
w.WriteHeader(status)
n, err := w.Write(fnet.Payload[:size])
Expand Down Expand Up @@ -291,7 +298,7 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
buf.WriteString("Host: ")
buf.WriteString(r.Host)

var keys []string //nolint:prealloc // header is multi valued map,...
keys := make([]string, 0, len(r.Header))
for k := range r.Header {
keys = append(keys, k)
}
Expand Down Expand Up @@ -337,7 +344,8 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
buf.WriteByte('\n')
}
}
w.Header().Set("Content-Type", "text/plain; charset=UTF-8")
handleCommonArgs(w, r)
jrpc.SetHeaderIfMissing(w.Header(), "Content-Type", "text/plain; charset=UTF-8")
if _, err = w.Write(buf.Bytes()); err != nil {
log.Errf("Error writing response %v to %v", err, r.RemoteAddr)
}
Expand Down
22 changes: 17 additions & 5 deletions fhttp/http_test.go
Expand Up @@ -1107,34 +1107,46 @@ func TestBadQueryUUIDClient(t *testing.T) {
}
}

// TestDebugHandlerSortedHeaders tests the headers are sorted but
// also tests post echo back and gzip handling.
func TestDebugHandlerSortedHeaders(t *testing.T) {
m, a := DynamicHTTPServer(false)
m.HandleFunc("/debug", DebugHandler)
url := fmt.Sprintf("http://localhost:%d/debug", a.Port)
o := HTTPOptions{URL: url, DisableFastClient: true}
m.Handle("/debug", Gzip(http.HandlerFunc(DebugHandler))) // same as in Serve()
// Debug handler does respect the delay arg but not status, status is always 200
url := fmt.Sprintf("http://localhost:%d/debug?delay=500ms&status=555", a.Port)
// Trigger transparent compression (which will add Accept-Encoding: gzip header)
o := HTTPOptions{URL: url, DisableFastClient: true, Compression: true, Payload: []byte("abcd")}
o.AddAndValidateExtraHeader("BBB: bbb")
o.AddAndValidateExtraHeader("CCC: ccc")
o.AddAndValidateExtraHeader("ZZZ: zzz")
o.AddAndValidateExtraHeader("AAA: aaa")
client, _ := NewClient(&o)
now := time.Now()
code, data, header := client.Fetch() // used to panic/bug #127
duration := time.Since(now)
t.Logf("TestDebugHandlerSortedHeaders result code %d, data len %d, headerlen %d", code, len(data), header)
if code != http.StatusOK {
t.Errorf("Got %d instead of 200", code)
}
if duration < 500*time.Millisecond {
t.Errorf("Got %s instead of 500ms", duration)
}
// remove the first line ('Φορτίο version...') from the body
body := string(data)
i := strings.Index(body, "\n")
body = body[i+1:]
expected := fmt.Sprintf("\nGET /debug HTTP/1.1\n\n"+
expected := fmt.Sprintf("\nPOST /debug?delay=500ms&status=555 HTTP/1.1\n\n"+
"headers:\n\n"+
"Host: localhost:%d\n"+
"Aaa: aaa\n"+
"Accept-Encoding: gzip\n"+
"Bbb: bbb\n"+
"Ccc: ccc\n"+
"Content-Length: 4\n"+
"Content-Type: application/octet-stream\n"+
"User-Agent: %s\n"+
"Zzz: zzz\n\n"+
"body:\n\n\n", a.Port, jrpc.UserAgent)
"body:\n\nabcd\n", a.Port, jrpc.UserAgent)
if body != expected {
t.Errorf("Get body: %s not as expected: %s", body, expected)
}
Expand Down
5 changes: 3 additions & 2 deletions periodic/periodic.go
Expand Up @@ -105,12 +105,13 @@ func (a *Aborter) Abort(wait bool) {
return
}
a.stopRequested = true
if a.hasStarted || !wait {
started := a.hasStarted
if started || !wait {
log.LogVf("ABORT Closing %v", a)
close(a.StopChan)
a.StopChan = nil
a.Unlock()
if a.hasStarted {
if started {
log.LogVf("ABORT reading start channel")
// shouldn't block/hang, just purging/resetting
<-a.StartChan
Expand Down