Skip to content

Commit

Permalink
only do compression if requested explictly when forwarding/fetching (f…
Browse files Browse the repository at this point in the history
…ixes #624) also fix a data race and also enable delay,close and header query args for debug endpoint
  • Loading branch information
ldemailly committed Sep 18, 2022
1 parent 0fa8982 commit d4a12d1
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 32 deletions.
3 changes: 3 additions & 0 deletions fhttp/http_forwarder.go
Expand Up @@ -247,6 +247,9 @@ func CreateProxyClient() *http.Client {
// TODO make configurable, should be fine for now for most but extreme -c values
MaxIdleConnsPerHost: 128, // must be more than incoming parallelization; divided by number of fan out if using parallel mode
MaxIdleConns: 256,
// This avoids Accept-Encoding: gzip being added to outgoing requests when no encoding accept is specified
// yet if passed by request, it will do gzip end to end. Issue #624.
DisableCompression: true,
},
}
return client
Expand Down
4 changes: 4 additions & 0 deletions fhttp/http_forwarder_test.go
Expand Up @@ -43,6 +43,10 @@ func TestMultiProxy(t *testing.T) {
if !bytes.Contains(data, []byte(payload)) {
t.Errorf("Result %s doesn't contain expected payload echo back %q", DebugSummary(data, 1024), payload)
}
// Issue #624
if bytes.Contains(data, []byte("gzip")) {
t.Errorf("Result %s contains unexpected gzip (accept encoding)", DebugSummary(data, 1024))
}
if !bytes.Contains(data, []byte("X-Fortio-Multi-Id: 1")) {
t.Errorf("Result %s doesn't contain expected X-Fortio-Multi-Id: 1", DebugSummary(data, 1024))
}
Expand Down
62 changes: 34 additions & 28 deletions fhttp/http_server.go
Expand Up @@ -81,46 +81,20 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
return
}
log.Debugf("Read %d", len(data))
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
handleCommonArgs(w, r)
statusStr := r.FormValue("status")
var status int
if statusStr != "" {
status = generateStatus(statusStr)
} else {
status = http.StatusOK
}
if log.LogDebug() {
// TODO: this easily lead to contention - use 'thread local'
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
gzip := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && generateGzip(r.FormValue("gzip"))
if gzip {
gwz := NewGzipHTTPResponseWriter(w)
defer gwz.Close()
w = gwz
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
size := generateSize(r.FormValue("size"))
if size >= 0 {
log.LogVf("Writing %d size with %d status", size, status)
Expand All @@ -139,6 +113,37 @@ func EchoHandler(w http.ResponseWriter, r *http.Request) {
}
}

// handleCommonArgs common flags for debug and echo handlers.
func handleCommonArgs(w http.ResponseWriter, r *http.Request) {
dur := generateDelay(r.FormValue("delay"))
if dur > 0 {
log.LogVf("Sleeping for %v", dur)
time.Sleep(dur)
}
if log.LogDebug() {
// Note this easily lead to contention, debug mode only (or low qps).
rqNum := atomic.AddInt64(&EchoRequests, 1)
log.Debugf("Request # %v", rqNum)
}
if generateClose(r.FormValue("close")) {
log.Debugf("Adding Connection:close / will close socket")
w.Header().Set("Connection", "close")
}
// process header(s) args, must be before size to compose properly
for _, hdr := range r.Form["header"] {
log.LogVf("Adding requested header %s", hdr)
if len(hdr) == 0 {
continue
}
s := strings.SplitN(hdr, ":", 2)
if len(s) != 2 {
log.Errf("invalid extra header '%s', expecting Key: Value", hdr)
continue
}
w.Header().Add(s[0], s[1])
}
}

func writePayload(w http.ResponseWriter, status int, size int) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(size))
Expand Down Expand Up @@ -269,6 +274,7 @@ environment:

// DebugHandler returns debug/useful info to http client.
func DebugHandler(w http.ResponseWriter, r *http.Request) {
handleCommonArgs(w, r)
LogRequest(r, "Debug")
var buf bytes.Buffer
buf.WriteString("Φορτίο version ")
Expand All @@ -291,7 +297,7 @@ func DebugHandler(w http.ResponseWriter, r *http.Request) {
buf.WriteString("Host: ")
buf.WriteString(r.Host)

var keys []string //nolint:prealloc // header is multi valued map,...
keys := make([]string, 0, len(r.Header))
for k := range r.Header {
keys = append(keys, k)
}
Expand Down
10 changes: 8 additions & 2 deletions fhttp/http_test.go
Expand Up @@ -1110,23 +1110,29 @@ func TestBadQueryUUIDClient(t *testing.T) {
func TestDebugHandlerSortedHeaders(t *testing.T) {
m, a := DynamicHTTPServer(false)
m.HandleFunc("/debug", DebugHandler)
url := fmt.Sprintf("http://localhost:%d/debug", a.Port)
// Debug handler does respect the delay arg but not status, status is always 200
url := fmt.Sprintf("http://localhost:%d/debug?delay=500ms&status=555", a.Port)
o := HTTPOptions{URL: url, DisableFastClient: true}
o.AddAndValidateExtraHeader("BBB: bbb")
o.AddAndValidateExtraHeader("CCC: ccc")
o.AddAndValidateExtraHeader("ZZZ: zzz")
o.AddAndValidateExtraHeader("AAA: aaa")
client, _ := NewClient(&o)
now := time.Now()
code, data, header := client.Fetch() // used to panic/bug #127
duration := time.Since(now)
t.Logf("TestDebugHandlerSortedHeaders result code %d, data len %d, headerlen %d", code, len(data), header)
if code != http.StatusOK {
t.Errorf("Got %d instead of 200", code)
}
if duration < 500*time.Millisecond {
t.Errorf("Got %s instead of 500ms", duration)
}
// remove the first line ('Φορτίο version...') from the body
body := string(data)
i := strings.Index(body, "\n")
body = body[i+1:]
expected := fmt.Sprintf("\nGET /debug HTTP/1.1\n\n"+
expected := fmt.Sprintf("\nGET /debug?delay=500ms&status=555 HTTP/1.1\n\n"+
"headers:\n\n"+
"Host: localhost:%d\n"+
"Aaa: aaa\n"+
Expand Down
5 changes: 3 additions & 2 deletions periodic/periodic.go
Expand Up @@ -105,12 +105,13 @@ func (a *Aborter) Abort(wait bool) {
return
}
a.stopRequested = true
if a.hasStarted || !wait {
started := a.hasStarted
if started || !wait {
log.LogVf("ABORT Closing %v", a)
close(a.StopChan)
a.StopChan = nil
a.Unlock()
if a.hasStarted {
if started {
log.LogVf("ABORT reading start channel")
// shouldn't block/hang, just purging/resetting
<-a.StartChan
Expand Down

0 comments on commit d4a12d1

Please sign in to comment.