Skip to content

Commit

Permalink
Loki: Add gzip compression to resource calls (#59059)
Browse files Browse the repository at this point in the history
* Loki: Add compression to `callResource`

* add missing tests

* fix formatting

(cherry picked from commit 08e87a2)
  • Loading branch information
svennergr authored and grafanabot committed Nov 22, 2022
1 parent 1bcdaeb commit 90fbae2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 13 deletions.
25 changes: 20 additions & 5 deletions pkg/tsdb/loki/api.go
Expand Up @@ -24,6 +24,11 @@ type LokiAPI struct {
headers map[string]string
}

type RawLokiResponse struct {
Body []byte
Encoding string
}

func newLokiAPI(client *http.Client, url string, log log.Logger, headers map[string]string) *LokiAPI {
return &LokiAPI{client: client, url: url, log: log, headers: headers}
}
Expand Down Expand Up @@ -204,15 +209,15 @@ func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string,
return req, nil
}

func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) ([]byte, error) {
func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiResponse, error) {
req, err := makeRawRequest(ctx, api.url, resourcePath, api.headers)
if err != nil {
return nil, err
return RawLokiResponse{}, err
}

resp, err := api.client.Do(req)
if err != nil {
return nil, err
return RawLokiResponse{}, err
}

defer func() {
Expand All @@ -222,8 +227,18 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) ([]byte,
}()

if resp.StatusCode/100 != 2 {
return nil, makeLokiError(resp.Body)
return RawLokiResponse{}, makeLokiError(resp.Body)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return RawLokiResponse{}, err
}

encodedBytes := RawLokiResponse{
Body: body,
Encoding: resp.Header.Get("Content-Encoding"),
}

return io.ReadAll(resp.Body)
return encodedBytes, nil
}
31 changes: 31 additions & 0 deletions pkg/tsdb/loki/api_mock.go
Expand Up @@ -32,6 +32,29 @@ func (mockedRT *mockedRoundTripper) RoundTrip(req *http.Request) (*http.Response
}, nil
}

type mockedCompressedRoundTripper struct {
statusCode int
responseBytes []byte
contentType string
requestCallback mockRequestCallback
}

func (mockedRT *mockedCompressedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
requestCallback := mockedRT.requestCallback
if requestCallback != nil {
requestCallback(req)
}

header := http.Header{}
header.Add("Content-Type", mockedRT.contentType)
header.Add("Content-Encoding", "gzip")
return &http.Response{
StatusCode: mockedRT.statusCode,
Header: header,
Body: io.NopCloser(bytes.NewReader(mockedRT.responseBytes)),
}, nil
}

func makeMockedAPI(statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
return makeMockedAPIWithUrl("http://localhost:9999", statusCode, contentType, responseBytes, requestCallback)
}
Expand All @@ -43,3 +66,11 @@ func makeMockedAPIWithUrl(url string, statusCode int, contentType string, respon

return newLokiAPI(&client, url, log.New("test"), nil)
}

func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
client := http.Client{
Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
}

return newLokiAPI(&client, url, log.New("test"), nil)
}
15 changes: 15 additions & 0 deletions pkg/tsdb/loki/api_test.go
Expand Up @@ -150,3 +150,18 @@ func TestApiUrlHandling(t *testing.T) {
})
}
}

func TestApiReturnValues(t *testing.T) {
t.Run("Loki should return the right encoding", func(t *testing.T) {
called := false
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 200, "application/json", []byte("foo"), func(req *http.Request) {
called = true
})

encodedBytes, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.NoError(t, err)
require.True(t, called)
require.Equal(t, "gzip", encodedBytes.Encoding)
require.Equal(t, []byte("foo"), encodedBytes.Body)
})
}
24 changes: 16 additions & 8 deletions pkg/tsdb/loki/loki.go
Expand Up @@ -119,7 +119,7 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
return callResource(ctx, req, sender, dsInfo, logger.FromContext(ctx))
}

func getAuthHeadersForCallResource(headers map[string][]string) map[string]string {
func getHeadersForCallResource(headers map[string][]string) map[string]string {
data := make(map[string]string)

if auth := arrayHeaderFirstValue(headers["Authorization"]); auth != "" {
Expand All @@ -134,6 +134,10 @@ func getAuthHeadersForCallResource(headers map[string][]string) map[string]strin
data["X-ID-Token"] = idToken
}

if encType := arrayHeaderFirstValue(headers["Accept-Encoding"]); encType != "" {
data["Accept-Encoding"] = encType
}

return data
}

Expand All @@ -151,19 +155,23 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
}
lokiURL := fmt.Sprintf("/loki/api/v1/%s", url)

api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, getAuthHeadersForCallResource(req.Headers))
bytes, err := api.RawQuery(ctx, lokiURL)
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, getHeadersForCallResource(req.Headers))
encodedBytes, err := api.RawQuery(ctx, lokiURL)

if err != nil {
return err
}

respHeaders := map[string][]string{
"content-type": {"application/json"},
}
if encodedBytes.Encoding != "" {
respHeaders["content-encoding"] = []string{encodedBytes.Encoding}
}
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: map[string][]string{
"content-type": {"application/json"},
},
Body: bytes,
Status: http.StatusOK,
Headers: respHeaders,
Body: encodedBytes.Body,
})
}

Expand Down

0 comments on commit 90fbae2

Please sign in to comment.