Skip to content

Commit

Permalink
Lambda Promtail - Add functions to ungzip incoming data from Cross-Ac…
Browse files Browse the repository at this point in the history
…count AWS Kinesis/CloudWatch (#10077)

**What this PR does / why we need it**:
When using Lambda Promtail to receive logs via Kinesis that have come
from a CloudWatch Logs Subscription Filter in another account, the data
is gzipped. This PR add a check for gzipped content and if detected
unzips and sends to Loki.

[AWS Docs - Cross Account Subscriptions -
Kinesis](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CrossAccountSubscriptions-Kinesis.html)

**Which issue(s) this PR fixes**:
n/a

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>
  • Loading branch information
aglees and MichelHollands committed Jul 28, 2023
1 parent e97e53f commit de77777
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions tools/lambda-promtail/lambda-promtail/kinesis.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"bytes"
"compress/gzip"
"context"
"io"
"time"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -25,10 +28,22 @@ func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent)

labels = applyExtraLabels(labels)

b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
// Check if the data is gzipped by inspecting the 'data' field
if isGzipped(record.Kinesis.Data) {
uncompressedData, err := ungzipData(record.Kinesis.Data)
if err != nil {
return err
}
b.add(ctx, entry{labels, logproto.Entry{
Line: string(uncompressedData),
Timestamp: timestamp,
}})
} else {
b.add(ctx, entry{labels, logproto.Entry{
Line: string(record.Kinesis.Data),
Timestamp: timestamp,
}})
}
}

return nil
Expand All @@ -47,4 +62,20 @@ func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent, pClient C
return err
}
return nil
}
}

// isGzipped checks if the input data is gzipped
func isGzipped(data []byte) bool {
return len(data) >= 2 && data[0] == 0x1F && data[1] == 0x8B
}

// unzipData decompress the gzipped data
func ungzipData(data []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, err
}
defer reader.Close()

return io.ReadAll(reader)
}

0 comments on commit de77777

Please sign in to comment.