Skip to content

Commit

Permalink
Support categorized labels in Tailing (grafana#11079)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This is a follow-up PR for grafana#10419
adding support for tailing.

I tested it on a dev cell and works fine.
<img width="1296" alt="image"
src="https://github.com/grafana/loki/assets/8354290/6177e0ca-02ce-48cd-a17f-0739dc3caa0a">


**Note**: With these changes, the JSON marshal unmarshal functions for
the tail are no longer used ([example][1]) so I think we can remove
them. Also, the new Tail response is no longer used, so we can also make
it an alias to the _legacy_ one. Let's do it on a follow-up PR to avoid
making this one bigger.

[1]:
https://github.com/grafana/loki/blob/52a3f16039dd5ff655fc3681257d99794f620ec4/pkg/loghttp/entry.go#L210-L238
  • Loading branch information
salvacorts authored and rhnasc committed Apr 12, 2024
1 parent 194aaa4 commit 925b5e3
Show file tree
Hide file tree
Showing 17 changed files with 803 additions and 108 deletions.
8 changes: 5 additions & 3 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)...)
if !ok {
continue
}
Expand All @@ -163,8 +163,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
streams[parsedLbs.Hash()] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: e.Timestamp,
Line: newLine,
Timestamp: e.Timestamp,
Line: newLine,
StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()),
Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()),
})
}
streamsResult := make([]*logproto.Stream, 0, len(streams))
Expand Down
144 changes: 141 additions & 3 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,25 @@ func Test_dropstream(t *testing.T) {
}
}

type fakeTailServer struct{}
type fakeTailServer struct {
responses []logproto.TailResponse
}

func (f *fakeTailServer) Send(response *logproto.TailResponse) error {
f.responses = append(f.responses, *response)
return nil

}

func (f *fakeTailServer) Context() context.Context { return context.Background() }

func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil }
func (f *fakeTailServer) Context() context.Context { return context.Background() }
func (f *fakeTailServer) GetResponses() []logproto.TailResponse {
return f.responses
}

func (f *fakeTailServer) Reset() {
f.responses = f.responses[:0]
}

func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
Expand Down Expand Up @@ -137,3 +152,126 @@ func Test_IsMatching(t *testing.T) {
})
}
}

func Test_StructuredMetadata(t *testing.T) {
lbs := makeRandomLabels()

for _, tc := range []struct {
name string
query string
sentStream logproto.Stream
expectedResponses []logproto.TailResponse
}{
{
// Optimization will make the same stream to be returned regardless of structured metadata.
name: "noop pipeline",
query: `{app="foo"}`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
DroppedStreams: nil,
},
},
},
{
name: "parse pipeline labels",
query: `{app="foo"} | logfmt`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")),
},
},
},
DroppedStreams: nil,
},
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "2").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")),
},
},
},
DroppedStreams: nil,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
var server fakeTailServer
tail, err := newTailer("foo", tc.query, &server, 10)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()

tail.send(tc.sentStream, lbs)

// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")

responses := server.GetResponses()
require.ElementsMatch(t, tc.expectedResponses, responses)

tail.close()
wg.Wait()
})
}
}
71 changes: 71 additions & 0 deletions pkg/iter/categorized_labels_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package iter

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

type categorizeLabelsIterator struct {
EntryIterator
currEntry logproto.Entry
currStreamLabels string
currHash uint64
currErr error
}

func NewCategorizeLabelsIterator(wrap EntryIterator) EntryIterator {
return &categorizeLabelsIterator{
EntryIterator: wrap,
}
}

func (c *categorizeLabelsIterator) Next() bool {
if !c.EntryIterator.Next() {
return false
}

c.currEntry = c.Entry()
if len(c.currEntry.StructuredMetadata) == 0 && len(c.currEntry.Parsed) == 0 {
c.currStreamLabels = c.EntryIterator.Labels()
c.currHash = c.EntryIterator.StreamHash()
return true
}

// We need to remove the structured metadata labels and parsed labels from the stream labels.
streamLabels := c.EntryIterator.Labels()
lbls, err := syntax.ParseLabels(streamLabels)
if err != nil {
c.currErr = fmt.Errorf("failed to parse series labels to categorize labels: %w", err)
return false
}

builder := labels.NewBuilder(lbls)
for _, label := range c.currEntry.StructuredMetadata {
builder.Del(label.Name)
}
for _, label := range c.currEntry.Parsed {
builder.Del(label.Name)
}

newLabels := builder.Labels()
c.currStreamLabels = newLabels.String()
c.currHash = newLabels.Hash()

return true
}

func (c *categorizeLabelsIterator) Error() error {
return c.currErr
}

func (c *categorizeLabelsIterator) Labels() string {
return c.currStreamLabels
}

func (c *categorizeLabelsIterator) StreamHash() uint64 {
return c.currHash
}
145 changes: 145 additions & 0 deletions pkg/iter/categorized_labels_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package iter

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
)

func TestNewCategorizeLabelsIterator(t *testing.T) {
for _, tc := range []struct {
name string
inner EntryIterator
expectedStreams []logproto.Stream
}{
{
name: "no structured metadata nor parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
},
},
},
{
name: "structured metadata and parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "foo", "3").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123", "foo", "4").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
itr := NewCategorizeLabelsIterator(tc.inner)

streamsEntries := make(map[string][]logproto.Entry)
for itr.Next() {
streamsEntries[itr.Labels()] = append(streamsEntries[itr.Labels()], itr.Entry())
require.NoError(t, itr.Error())
}

var streams []logproto.Stream
for lbls, entries := range streamsEntries {
streams = append(streams, logproto.Stream{
Labels: lbls,
Entries: entries,
})
}

require.ElementsMatch(t, tc.expectedStreams, streams)
})
}
}

0 comments on commit 925b5e3

Please sign in to comment.