Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture Log Attributes in the Agent #900

Merged
merged 7 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 19 additions & 1 deletion v3/newrelic/attributes_from_internal.go
Expand Up @@ -5,10 +5,12 @@ package newrelic

import (
"bytes"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -19,6 +21,9 @@ const (
// listed as span attributes to simplify code. It is not listed in the
// public attributes.go file for this reason to prevent confusion.
spanAttributeQueryParameters = "query_parameters"

// The collector can only allow attributes to be a maximum of 256 bytes
maxAttributeLengthBytes = 256
)

var (
Expand Down Expand Up @@ -452,6 +457,9 @@ func addUserAttribute(a *attributes, key string, val interface{}, d destinationS
func writeAttributeValueJSON(w *jsonFieldsWriter, key string, val interface{}) {
switch v := val.(type) {
case string:
if len(v) > maxAttributeLengthBytes {
v = v[:maxAttributeLengthBytes]
}
w.stringField(key, v)
case bool:
if v {
Expand Down Expand Up @@ -486,7 +494,17 @@ func writeAttributeValueJSON(w *jsonFieldsWriter, key string, val interface{}) {
case float64:
w.floatField(key, v)
default:
w.stringField(key, fmt.Sprintf("%T", v))
// attempt to construct a JSON string
kind := reflect.ValueOf(v).Kind()
if kind == reflect.Struct || kind == reflect.Map || kind == reflect.Slice || kind == reflect.Array {
bytes, _ := json.Marshal(v)
if len(bytes) > maxAttributeLengthBytes {
bytes = bytes[:maxAttributeLengthBytes]
}
w.stringField(key, string(bytes))
} else {
w.stringField(key, fmt.Sprintf("%T", v))
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions v3/newrelic/harvest_test.go
Expand Up @@ -315,6 +315,7 @@ func TestHarvestLogEventsReady(t *testing.T) {
})

logEvent := logEvent{
nil,
0.5,
123456,
"INFO",
Expand Down Expand Up @@ -576,6 +577,7 @@ func TestMergeFailedHarvest(t *testing.T) {
}, 0)

logEvent := logEvent{
nil,
0.5,
123456,
"INFO",
Expand Down
21 changes: 18 additions & 3 deletions v3/newrelic/log_event.go
Expand Up @@ -19,6 +19,7 @@ const (
)

type logEvent struct {
atributes map[string]any
iamemilio marked this conversation as resolved.
Show resolved Hide resolved
priority priority
timestamp int64
severity string
Expand All @@ -28,10 +29,15 @@ type logEvent struct {
}

// LogData contains data fields that are needed to generate log events.
// Note: if you are passing a struct, map, slice, or array as an attribute, try to pass it as a JSON string generated by the logging framework if possible.
// The collector can parse that into an object on New Relic's side.
// This is preferable because the json.Marshal method used in the agent to create the string log JSON is usually less efficient than the tools built into
// logging products for creating stringified JSON for complex objects and data structures.
type LogData struct {
Timestamp int64 // Optional: Unix Millisecond Timestamp; A timestamp will be generated if unset
Severity string // Optional: Severity of log being consumed
Message string // Optional: Message of log being consumed; Maximum size: 32768 Bytes.
Timestamp int64 // Optional: Unix Millisecond Timestamp; A timestamp will be generated if unset
Severity string // Optional: Severity of log being consumed
Message string // Optional: Message of log being consumed; Maximum size: 32768 Bytes.
Attributes map[string]any // Optional: a key value pair with a string key, and any value. This can be used for categorizing logs in the UI.
}

// writeJSON prepares JSON in the format expected by the collector.
Expand All @@ -51,6 +57,14 @@ func (e *logEvent) WriteJSON(buf *bytes.Buffer) {
w.needsComma = false
buf.WriteByte(',')
w.intField(logcontext.LogTimestampFieldName, e.timestamp)
if e.atributes != nil && len(e.atributes) > 0 {
buf.WriteString(`,"attributes":{`)
w := jsonFieldsWriter{buf: buf}
for key, val := range e.atributes {
writeAttributeValueJSON(&w, key, val)
}
buf.WriteByte('}')
}
buf.WriteByte('}')
}

Expand Down Expand Up @@ -88,6 +102,7 @@ func (data *LogData) toLogEvent() (logEvent, error) {
message: data.Message,
severity: data.Severity,
timestamp: data.Timestamp,
atributes: data.Attributes,
}

return event, nil
Expand Down
2 changes: 1 addition & 1 deletion v3/newrelic/log_events.go
Expand Up @@ -60,7 +60,7 @@ type logEventHeap []logEvent

// TODO: when go 1.18 becomes the minimum supported version, re-write to make a generic heap implementation
// for all event heaps, to de-duplicate this code
//func (events *logEvents)
// func (events *logEvents)
func (h logEventHeap) Len() int { return len(h) }
func (h logEventHeap) Less(i, j int) bool { return h[i].priority.isLowerPriority(h[j].priority) }
func (h logEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
Expand Down
123 changes: 86 additions & 37 deletions v3/newrelic/log_events_test.go
Expand Up @@ -36,19 +36,20 @@ func loggingConfigEnabled(limit int) loggingConfig {
}
}

func sampleLogEvent(priority priority, severity, message string) *logEvent {
func sampleLogEvent(priority priority, severity, message string, attributes map[string]any) *logEvent {
return &logEvent{
priority: priority,
severity: severity,
message: message,
atributes: attributes,
timestamp: 123456,
}
}

func TestBasicLogEvents(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(5))
events.Add(sampleLogEvent(0.5, infoLevel, "message1"))
events.Add(sampleLogEvent(0.5, infoLevel, "message2"))
events.Add(sampleLogEvent(0.5, infoLevel, "message1", nil))
events.Add(sampleLogEvent(0.5, infoLevel, "message2", nil))

json, err := events.CollectorJSON(agentRunID)
if nil != err {
Expand All @@ -70,6 +71,53 @@ func TestBasicLogEvents(t *testing.T) {
}
}

type testStruct struct {
A string
B int
C c
}

type c struct {
D string
}

func TestBasicLogEventWithAttributes(t *testing.T) {
st := testStruct{
A: "a",
B: 1,
C: c{"hello"},
}

events := newLogEvents(testCommonAttributes, loggingConfigEnabled(5))
events.Add(sampleLogEvent(0.5, infoLevel, "message1", map[string]any{"two": "hi"}))
events.Add(sampleLogEvent(0.5, infoLevel, "message2", map[string]any{"struct": st}))
events.Add(sampleLogEvent(0.5, infoLevel, "message3", map[string]any{"map": map[string]string{"hi": "hello"}}))
events.Add(sampleLogEvent(0.5, infoLevel, "message4", map[string]any{"slice": []string{"hi", "hello", "test"}}))
events.Add(sampleLogEvent(0.5, infoLevel, "message5", map[string]any{"array": [2]int{1, 2}}))

json, err := events.CollectorJSON(agentRunID)
if nil != err {
t.Fatal(err)
}

expected := commonJSON +
`{"level":"INFO","message":"message1","timestamp":123456,"attributes":{"two":"hi"}},` +
`{"level":"INFO","message":"message2","timestamp":123456,"attributes":{"struct":"{\"A\":\"a\",\"B\":1,\"C\":{\"D\":\"hello\"}}"}},` +
`{"level":"INFO","message":"message3","timestamp":123456,"attributes":{"map":"{\"hi\":\"hello\"}"}},` +
`{"level":"INFO","message":"message4","timestamp":123456,"attributes":{"slice":"[\"hi\",\"hello\",\"test\"]"}},` +
`{"level":"INFO","message":"message5","timestamp":123456,"attributes":{"array":"[1,2]"}}]}]`

if string(json) != expected {
t.Error("actual not equal to expected:\n", string(json), "\n", expected)
}
if events.numSeen != 5 {
t.Error(events.numSeen)
}
if events.NumSaved() != 5 {
t.Error(events.NumSaved())
}
}

func TestEmptyLogEvents(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(10))
json, err := events.CollectorJSON(agentRunID)
Expand All @@ -79,10 +127,10 @@ func TestEmptyLogEvents(t *testing.T) {
if nil != json {
t.Error(string(json))
}
if 0 != events.numSeen {
if events.numSeen != 0 {
t.Error(events.numSeen)
}
if 0 != events.NumSaved() {
if events.NumSaved() != 0 {
t.Error(events.NumSaved())
}
}
Expand All @@ -91,12 +139,12 @@ func TestEmptyLogEvents(t *testing.T) {
func TestSamplingLogEvents(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(3))

events.Add(sampleLogEvent(0.999999, infoLevel, "a"))
events.Add(sampleLogEvent(0.1, infoLevel, "b"))
events.Add(sampleLogEvent(0.9, infoLevel, "c"))
events.Add(sampleLogEvent(0.2, infoLevel, "d"))
events.Add(sampleLogEvent(0.8, infoLevel, "e"))
events.Add(sampleLogEvent(0.3, infoLevel, "f"))
events.Add(sampleLogEvent(0.999999, infoLevel, "a", nil))
events.Add(sampleLogEvent(0.1, infoLevel, "b", nil))
events.Add(sampleLogEvent(0.9, infoLevel, "c", nil))
events.Add(sampleLogEvent(0.2, infoLevel, "d", nil))
events.Add(sampleLogEvent(0.8, infoLevel, "e", nil))
events.Add(sampleLogEvent(0.3, infoLevel, "f", nil))

json, err := events.CollectorJSON(agentRunID)
if nil != err {
Expand Down Expand Up @@ -141,14 +189,14 @@ func TestMergeFullLogEvents(t *testing.T) {
e1 := newLogEvents(testCommonAttributes, loggingConfigEnabled(2))
e2 := newLogEvents(testCommonAttributes, loggingConfigEnabled(3))

e1.Add(sampleLogEvent(0.1, infoLevel, "a"))
e1.Add(sampleLogEvent(0.15, infoLevel, "b"))
e1.Add(sampleLogEvent(0.25, infoLevel, "c"))
e1.Add(sampleLogEvent(0.1, infoLevel, "a", nil))
e1.Add(sampleLogEvent(0.15, infoLevel, "b", nil))
e1.Add(sampleLogEvent(0.25, infoLevel, "c", nil))

e2.Add(sampleLogEvent(0.06, infoLevel, "d"))
e2.Add(sampleLogEvent(0.12, infoLevel, "e"))
e2.Add(sampleLogEvent(0.18, infoLevel, "f"))
e2.Add(sampleLogEvent(0.24, infoLevel, "g"))
e2.Add(sampleLogEvent(0.06, infoLevel, "d", nil))
e2.Add(sampleLogEvent(0.12, infoLevel, "e", nil))
e2.Add(sampleLogEvent(0.18, infoLevel, "f", nil))
e2.Add(sampleLogEvent(0.24, infoLevel, "g", nil))

e1.Merge(e2)
json, err := e1.CollectorJSON(agentRunID)
Expand Down Expand Up @@ -176,14 +224,14 @@ func TestLogEventMergeFailedSuccess(t *testing.T) {
e1 := newLogEvents(testCommonAttributes, loggingConfigEnabled(2))
e2 := newLogEvents(testCommonAttributes, loggingConfigEnabled(3))

e1.Add(sampleLogEvent(0.1, infoLevel, "a"))
e1.Add(sampleLogEvent(0.15, infoLevel, "b"))
e1.Add(sampleLogEvent(0.25, infoLevel, "c"))
e1.Add(sampleLogEvent(0.1, infoLevel, "a", nil))
e1.Add(sampleLogEvent(0.15, infoLevel, "b", nil))
e1.Add(sampleLogEvent(0.25, infoLevel, "c", nil))

e2.Add(sampleLogEvent(0.06, infoLevel, "d"))
e2.Add(sampleLogEvent(0.12, infoLevel, "e"))
e2.Add(sampleLogEvent(0.18, infoLevel, "f"))
e2.Add(sampleLogEvent(0.24, infoLevel, "g"))
e2.Add(sampleLogEvent(0.06, infoLevel, "d", nil))
e2.Add(sampleLogEvent(0.12, infoLevel, "e", nil))
e2.Add(sampleLogEvent(0.18, infoLevel, "f", nil))
e2.Add(sampleLogEvent(0.24, infoLevel, "g", nil))

e1.mergeFailed(e2)

Expand Down Expand Up @@ -214,14 +262,14 @@ func TestLogEventMergeFailedLimitReached(t *testing.T) {
e1 := newLogEvents(testCommonAttributes, loggingConfigEnabled(2))
e2 := newLogEvents(testCommonAttributes, loggingConfigEnabled(3))

e1.Add(sampleLogEvent(0.1, infoLevel, "a"))
e1.Add(sampleLogEvent(0.15, infoLevel, "b"))
e1.Add(sampleLogEvent(0.25, infoLevel, "c"))
e1.Add(sampleLogEvent(0.1, infoLevel, "a", nil))
e1.Add(sampleLogEvent(0.15, infoLevel, "b", nil))
e1.Add(sampleLogEvent(0.25, infoLevel, "c", nil))

e2.Add(sampleLogEvent(0.06, infoLevel, "d"))
e2.Add(sampleLogEvent(0.12, infoLevel, "e"))
e2.Add(sampleLogEvent(0.18, infoLevel, "f"))
e2.Add(sampleLogEvent(0.24, infoLevel, "g"))
e2.Add(sampleLogEvent(0.06, infoLevel, "d", nil))
e2.Add(sampleLogEvent(0.12, infoLevel, "e", nil))
e2.Add(sampleLogEvent(0.18, infoLevel, "f", nil))
e2.Add(sampleLogEvent(0.24, infoLevel, "g", nil))

e2.failedHarvests = failedEventsAttemptsLimit

Expand Down Expand Up @@ -253,7 +301,7 @@ func TestLogEventsSplitFull(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(10))
for i := 0; i < 15; i++ {
priority := priority(float32(i) / 10.0)
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority)))
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority), nil))
}
// Test that the capacity cannot exceed the max.
if 10 != events.capacity() {
Expand Down Expand Up @@ -292,7 +340,7 @@ func TestLogEventsSplitNotFullOdd(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(10))
for i := 0; i < 7; i++ {
priority := priority(float32(i) / 10.0)
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority)))
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority), nil))
}
e1, e2 := events.split()
j1, err1 := e1.CollectorJSON(agentRunID)
Expand Down Expand Up @@ -322,7 +370,7 @@ func TestLogEventsSplitNotFullEven(t *testing.T) {
events := newLogEvents(testCommonAttributes, loggingConfigEnabled(10))
for i := 0; i < 8; i++ {
priority := priority(float32(i) / 10.0)
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority)))
events.Add(sampleLogEvent(priority, "INFO", fmt.Sprint(priority), nil))
}
e1, e2 := events.split()
j1, err1 := e1.CollectorJSON(agentRunID)
Expand Down Expand Up @@ -356,7 +404,7 @@ func TestLogEventsZeroCapacity(t *testing.T) {
if 0 != events.NumSeen() || 0 != events.NumSaved() || 0 != events.capacity() {
t.Error(events.NumSeen(), events.NumSaved(), events.capacity())
}
events.Add(sampleLogEvent(0.5, "INFO", "TEST"))
events.Add(sampleLogEvent(0.5, "INFO", "TEST", nil))
if 1 != events.NumSeen() || 0 != events.NumSaved() || 0 != events.capacity() {
t.Error(events.NumSeen(), events.NumSaved(), events.capacity())
}
Expand All @@ -375,7 +423,7 @@ func TestLogEventCollectionDisabled(t *testing.T) {
if 0 != events.NumSeen() || 0 != len(events.severityCount) || 0 != events.NumSaved() || 5 != events.capacity() {
t.Error(events.NumSeen(), len(events.severityCount), events.NumSaved(), events.capacity())
}
events.Add(sampleLogEvent(0.5, "INFO", "TEST"))
events.Add(sampleLogEvent(0.5, "INFO", "TEST", nil))
if 1 != events.NumSeen() || 1 != len(events.severityCount) || 0 != events.NumSaved() || 5 != events.capacity() {
t.Error(events.NumSeen(), len(events.severityCount), events.NumSaved(), events.capacity())
}
Expand Down Expand Up @@ -467,6 +515,7 @@ func BenchmarkRecordLoggingMetrics(b *testing.B) {

for i := 0; i < internal.MaxLogEvents; i++ {
logEvent := logEvent{
nil,
newPriority(),
123456,
"INFO",
Expand Down