Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Add gzip compression to resource calls #59059

Merged
merged 3 commits into from Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 20 additions & 5 deletions pkg/tsdb/loki/api.go
Expand Up @@ -24,6 +24,11 @@ type LokiAPI struct {
headers map[string]string
}

type RawLokiResponse struct {
Body []byte
Encoding string
}

func newLokiAPI(client *http.Client, url string, log log.Logger, headers map[string]string) *LokiAPI {
return &LokiAPI{client: client, url: url, log: log, headers: headers}
}
Expand Down Expand Up @@ -204,15 +209,15 @@ func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string,
return req, nil
}

func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) ([]byte, error) {
func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiResponse, error) {
req, err := makeRawRequest(ctx, api.url, resourcePath, api.headers)
if err != nil {
return nil, err
return RawLokiResponse{}, err
}

resp, err := api.client.Do(req)
if err != nil {
return nil, err
return RawLokiResponse{}, err
}

defer func() {
Expand All @@ -222,8 +227,18 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) ([]byte,
}()

if resp.StatusCode/100 != 2 {
return nil, makeLokiError(resp.Body)
return RawLokiResponse{}, makeLokiError(resp.Body)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return RawLokiResponse{}, err
}

encodedBytes := RawLokiResponse{
Body: body,
Encoding: resp.Header.Get("Content-Encoding"),
}

return io.ReadAll(resp.Body)
return encodedBytes, nil
}
31 changes: 31 additions & 0 deletions pkg/tsdb/loki/api_mock.go
Expand Up @@ -32,6 +32,29 @@ func (mockedRT *mockedRoundTripper) RoundTrip(req *http.Request) (*http.Response
}, nil
}

type mockedCompressedRoundTripper struct {
statusCode int
responseBytes []byte
contentType string
requestCallback mockRequestCallback
}

func (mockedRT *mockedCompressedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
requestCallback := mockedRT.requestCallback
if requestCallback != nil {
requestCallback(req)
}

header := http.Header{}
header.Add("Content-Type", mockedRT.contentType)
header.Add("Content-Encoding", "gzip")
return &http.Response{
StatusCode: mockedRT.statusCode,
Header: header,
Body: io.NopCloser(bytes.NewReader(mockedRT.responseBytes)),
}, nil
}

func makeMockedAPI(statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
return makeMockedAPIWithUrl("http://localhost:9999", statusCode, contentType, responseBytes, requestCallback)
}
Expand All @@ -43,3 +66,11 @@ func makeMockedAPIWithUrl(url string, statusCode int, contentType string, respon

return newLokiAPI(&client, url, log.New("test"), nil)
}

func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
client := http.Client{
Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
}

return newLokiAPI(&client, url, log.New("test"), nil)
}
17 changes: 16 additions & 1 deletion pkg/tsdb/loki/api_test.go
Expand Up @@ -135,7 +135,7 @@ func TestApiUrlHandling(t *testing.T) {
require.True(t, called)
})
}

for _, test := range queryTestData {
t.Run("Loki should build the metadata query URL correctly when "+test.name, func(t *testing.T) {
called := false
Expand All @@ -150,3 +150,18 @@ func TestApiUrlHandling(t *testing.T) {
})
}
}

func TestApiReturnValues(t *testing.T) {
t.Run("Loki should return the right encoding", func(t *testing.T) {
called := false
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 200, "application/json", []byte("foo"), func(req *http.Request) {
called = true
})

encodedBytes, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.NoError(t, err)
require.True(t, called)
require.Equal(t, "gzip", encodedBytes.Encoding)
require.Equal(t, []byte("foo"), encodedBytes.Body)
})
}
24 changes: 16 additions & 8 deletions pkg/tsdb/loki/loki.go
Expand Up @@ -119,7 +119,7 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
return callResource(ctx, req, sender, dsInfo, logger.FromContext(ctx))
}

func getAuthHeadersForCallResource(headers map[string][]string) map[string]string {
func getHeadersForCallResource(headers map[string][]string) map[string]string {
data := make(map[string]string)

if auth := arrayHeaderFirstValue(headers["Authorization"]); auth != "" {
Expand All @@ -134,6 +134,10 @@ func getAuthHeadersForCallResource(headers map[string][]string) map[string]strin
data["X-ID-Token"] = idToken
}

if encType := arrayHeaderFirstValue(headers["Accept-Encoding"]); encType != "" {
data["Accept-Encoding"] = encType
}

return data
}

Expand All @@ -151,19 +155,23 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
}
lokiURL := fmt.Sprintf("/loki/api/v1/%s", url)

api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, getAuthHeadersForCallResource(req.Headers))
bytes, err := api.RawQuery(ctx, lokiURL)
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, getHeadersForCallResource(req.Headers))
encodedBytes, err := api.RawQuery(ctx, lokiURL)

if err != nil {
return err
}

respHeaders := map[string][]string{
"content-type": {"application/json"},
}
if encodedBytes.Encoding != "" {
respHeaders["content-encoding"] = []string{encodedBytes.Encoding}
}
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Headers: map[string][]string{
"content-type": {"application/json"},
},
Body: bytes,
Status: http.StatusOK,
Headers: respHeaders,
Body: encodedBytes.Body,
})
}

Expand Down