Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS SNS receiver #2615

Merged
merged 35 commits into from Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
19e74f9
WIP - SNS receiver
Jun 10, 2021
5dcf4f5
ARN Auth start
Jun 11, 2021
74d1527
Add support for role arn, truncation, dedupe key and env auth
Jun 11, 2021
009f8b1
Use 1024 rather than 1000 for KB size, fix target arn, handle large S…
Jun 14, 2021
72d63a5
Remove isFifo config option; use template strings; use retier; other …
Jun 14, 2021
6519c39
Add some tests for sns receiver
Jun 15, 2021
af8406a
Check error type before unpacking awserr.requestFailure
Jun 15, 2021
68fa1bf
Add string length check to fifo check
Jun 15, 2021
b509a5b
Add subject template for subject field. Better check for supplied cre…
Jun 15, 2021
8d3b1b5
Add config docs
Jun 15, 2021
889fa96
Remove isFifoTopic test
Jun 15, 2021
c48b54b
Fix gosmpl linter issues
Jun 16, 2021
3a63cc2
Updated assets
pracucci Jun 16, 2021
6ada9a6
Cache fifo bool in the notifier
Jun 16, 2021
756cdda
Fix for golangci-lint warning
Jun 16, 2021
9d37d6c
More gofmt fixes
Jun 16, 2021
3446b35
Code review fixes: copy attributes, truncate all the messages, fix lo…
Jun 16, 2021
a56305a
Fix spacing from removing default api version
Jun 16, 2021
d4ff90b
Add missing template for aws region
Jun 16, 2021
b9b53f1
Code review fixes
Jun 17, 2021
63f9082
Fix docs spacing
Jun 17, 2021
4ebcaf9
Merge remote-tracking branch 'upstream/master' into sns-reciever
Jun 17, 2021
8911051
Make API URL optional, clear up credential logic
Jun 21, 2021
dfb4d1f
Fix linter error
Jun 21, 2021
9ff4ac3
Create new session if needed to get STS Creds
Jun 21, 2021
30a83f7
Use supplied user creds when creating an STS client
Jun 22, 2021
bd82f70
Fix spacing for client config
Jun 22, 2021
25e6d4e
Add common/sigv4 with the sigv4 config
Jun 23, 2021
208bed6
Update config docs to clarify fifo SNS deduplication strategy. Remove…
Jun 28, 2021
1322abd
Remove unused checkTopicFifoAttribute function
Jun 28, 2021
077b20d
Add error check when creating sns session
Jul 1, 2021
7ecb6bc
Check Error in unit test and clean up docs
Jul 6, 2021
4c2a5f1
Add sigv4 as a global config option
Jul 7, 2021
51b9368
Revert "Add sigv4 as a global config option"
Jul 9, 2021
a1260af
Break notify into submethods to create the session then create the pu…
Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion config/config_test.go
Expand Up @@ -26,7 +26,7 @@ import (
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)

func TestLoadEmptyString(t *testing.T) {
Expand Down Expand Up @@ -904,6 +904,20 @@ func TestSlackGlobalAPIURLFile(t *testing.T) {
}
}

func TestValidSNSConfig(t *testing.T) {
_, err := LoadFile("testdata/conf.sns-topic-arn.yml")
if err != nil {
t.Fatalf("Error parsing %s: %s", "testdata/conf.sns-topic-arn.yml\"", err)
}
}

func TestInvalidSNSConfig(t *testing.T) {
_, err := LoadFile("testdata/conf.sns-invalid.yml")
if err == nil {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("expected error with missing fields on SNS config")
}
}

func TestUnmarshalHostPort(t *testing.T) {
for _, tc := range []struct {
in string
Expand Down
4 changes: 1 addition & 3 deletions config/notifiers.go
Expand Up @@ -135,7 +135,6 @@ var (
},
APIVersion: "sns.default.api_version",
treid314 marked this conversation as resolved.
Show resolved Hide resolved
Message: `{{ template "sns.default.message" . }}`,
IsFIFOTopic: false,
}
)

Expand Down Expand Up @@ -590,11 +589,11 @@ func (c *PushoverConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
return nil
}

// TODO: Move to common?

// SigV4Config is the configuration for signing remote write requests with
// AWS's SigV4 verification process. Empty values will be retrieved using the
// AWS default credentials chain.
// TODO: Move to common.
treid314 marked this conversation as resolved.
Show resolved Hide resolved
type SigV4Config struct {
Region string `yaml:"region,omitempty"`
AccessKey string `yaml:"access_key,omitempty"`
Expand All @@ -614,7 +613,6 @@ type SNSConfig struct {
TopicARN string `yaml:"topic_arn,omitempty" json:"topic_arn,omitempty"`
PhoneNumber string `yaml:"phone_number,omitempty" json:"phone_number,omitempty"`
TargetARN string `yaml:"target_arn,omitempty" json:"target_arn,omitempty"`
IsFIFOTopic bool `yaml:"is_fifo_topic,omitempty" json:"is_fifo_topic,omitempty"`
Subject string `yaml:"subject,omitempty" json:"subject,omitempty"`
Message string `yaml:"message,omitempty" json:"message,omitempty"`
Attributes map[string]string `yaml:"attributes,omitempty" json:"attributes,omitempty"`
Expand Down
14 changes: 14 additions & 0 deletions config/testdata/conf.sns-invalid.yml
@@ -0,0 +1,14 @@
route:
receiver: 'sns-api-notifications'
group_by: [alertname]

receivers:
- name: 'sns-api-notifications'
sns_configs:
- api_url: https://sns.us-east-2.amazonaws.com
sigv4:
region: us-east-2
access_key: access_key
secret_key: secret_ket
attributes:
severity: Sev2
Expand Up @@ -7,7 +7,6 @@ receivers:
sns_configs:
- api_url: https://sns.us-east-2.amazonaws.com
topic_arn: arn:aws:sns:us-east-2:123456789012:My-Topic
is_fifo_topic: true
sigv4:
region: us-east-2
access_key: access_key
Expand Down
105 changes: 57 additions & 48 deletions notify/sns/sns.go
Expand Up @@ -17,9 +17,11 @@ import (
"context"
"fmt"
"net/http"
"strings"
"unicode/utf8"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -41,17 +43,38 @@ type Notifier struct {
retrier *notify.Retrier
treid314 marked this conversation as resolved.
Show resolved Hide resolved
}

// New returns a new SNS notification handler.
func New(c *config.SNSConfig, t *template.Template, l log.Logger, httpOpts ...commoncfg.HTTPClientOption) (*Notifier, error) {
client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "sns", append(httpOpts, commoncfg.WithHTTP2Disabled())...)
if err != nil {
return nil, err
}

return &Notifier{
conf: c,
tmpl: t,
logger: l,
client: client,
retrier: &notify.Retrier{},
}, nil
}

func (n Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, error) {
credentials := credentials.NewStaticCredentials(n.conf.Sigv4.AccessKey, string(n.conf.Sigv4.SecretKey), "")
if n.conf.Sigv4.AccessKey == "" {
credentials = nil
var(
err error
data = notify.GetTemplateData(ctx, n.tmpl, alert, n.logger)
tmpl = notify.TmplText(n.tmpl, data, &err)
creds *credentials.Credentials = nil
)
if n.conf.Sigv4.AccessKey != "" && n.conf.Sigv4.SecretKey != "" {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
creds = credentials.NewStaticCredentials(n.conf.Sigv4.AccessKey, string(n.conf.Sigv4.SecretKey), "")
}

sess, err := session.NewSessionWithOptions(session.Options{
treid314 marked this conversation as resolved.
Show resolved Hide resolved
Config: aws.Config{
Region: aws.String(n.conf.Sigv4.Region),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving the comment here, but applies to many config options. If you look at the specs in #2559, you can see many config options should actually be tmpl_string. The string is not evaluated as a template here (correct me if I'm wrong), but I think you should. As an example, look at tmplText() usage in the Slack integration.

Credentials: credentials,
Endpoint: aws.String(n.conf.APIUrl),
Credentials: creds,
Endpoint: aws.String(tmpl(n.conf.APIUrl)),
treid314 marked this conversation as resolved.
Show resolved Hide resolved
},
Profile: n.conf.Sigv4.Profile,
})
Expand All @@ -64,37 +87,44 @@ func (n Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, erro
sess.Config.Credentials = stscreds.NewCredentials(sess, n.conf.Sigv4.RoleARN)
}

data := notify.GetTemplateData(ctx, n.tmpl, alert, n.logger)
tmpl := notify.TmplText(n.tmpl, data, &err)
message := tmpl(n.conf.Message)

client := sns.New(sess, &aws.Config{Credentials: credentials})
client := sns.New(sess, &aws.Config{Credentials: creds})
publishInput := &sns.PublishInput{}

if n.conf.TopicARN != "" {
publishInput.SetTopicArn(n.conf.TopicARN)
messageToSend, isTrunc, err := validateAndTruncateMessage(message)
publishInput.SetTopicArn(tmpl(n.conf.TopicARN))
messageToSend, isTrunc, err := validateAndTruncateMessage(tmpl(n.conf.Message))
if err != nil {
return false, err
}
if isTrunc {
n.conf.Attributes["truncated"] = "true"
treid314 marked this conversation as resolved.
Show resolved Hide resolved
}

// Deduplication key and Message Group ID are only added if it's a FIFO SNS Topic.
if isFIFOTopic(n.conf.TopicARN) {
key, err := notify.ExtractGroupKey(ctx)
if err != nil {
return false, err
}
publishInput.SetMessageDeduplicationId(key.Hash())
publishInput.SetMessageGroupId(key.Hash())
}

publishInput.SetMessage(messageToSend)
}
if n.conf.PhoneNumber != "" {
publishInput.SetPhoneNumber(n.conf.PhoneNumber)
publishInput.SetPhoneNumber(tmpl(n.conf.PhoneNumber))
// If SMS message is over 1600 chars, SNS will reject the message.
_, isTruncated := notify.Truncate(message, 1600)
_, isTruncated := notify.Truncate(tmpl(n.conf.Message), 1600)
if isTruncated {
return false, fmt.Errorf("SMS message exeeds length of 1600 charactors")
treid314 marked this conversation as resolved.
Show resolved Hide resolved
} else {
publishInput.SetMessage(message)
publishInput.SetMessage(tmpl(n.conf.Message))
}
}
if n.conf.TargetARN != "" {
publishInput.SetTargetArn(n.conf.TargetARN)
messageToSend, isTrunc, err := validateAndTruncateMessage(message)
publishInput.SetTargetArn(tmpl(n.conf.TargetARN))
messageToSend, isTrunc, err := validateAndTruncateMessage(tmpl(n.conf.Message))
if err != nil {
return false, err
}
Expand All @@ -107,39 +137,35 @@ func (n Notifier) Notify(ctx context.Context, alert ...*types.Alert) (bool, erro
if len(n.conf.Attributes) > 0 {
attributes := map[string]*sns.MessageAttributeValue{}
treid314 marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range n.conf.Attributes {
attributes[k] = &sns.MessageAttributeValue{DataType: aws.String("String"), StringValue: aws.String(v)}
attributes[tmpl(k)] = &sns.MessageAttributeValue{DataType: aws.String("String"), StringValue: aws.String(tmpl(v))}
}
publishInput.SetMessageAttributes(attributes)
}

if n.conf.Subject != "" {
publishInput.SetSubject(n.conf.Subject)
}

// Deduplication key is only added if it's a FIFO SNS Topic.
if n.conf.IsFIFOTopic {
key, err := notify.ExtractGroupKey(ctx)
if err != nil {
return false, err
}
publishInput.SetMessageDeduplicationId(key.Hash())
publishInput.SetSubject(tmpl(n.conf.Subject))
}

publishOutput, err := client.Publish(publishInput)
if err != nil {
// AWS Response is bad, probably a config issue.
return false, err
return n.retrier.Check(err.(awserr.RequestFailure).StatusCode(), strings.NewReader(err.(awserr.RequestFailure).Message()))
treid314 marked this conversation as resolved.
Show resolved Hide resolved
}

err = n.logger.Log(publishOutput.String())
treid314 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}

// Response is good and does not need to be retried.
return false, nil
}

func isFIFOTopic(topicARN string) bool {
if topicARN[len(topicARN)-5:] == ".fifo" {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
}

func validateAndTruncateMessage(message string) (string, bool, error) {
if utf8.ValidString(message) {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
// if the message is larger than 256KB we have to truncate.
Expand All @@ -152,20 +178,3 @@ func validateAndTruncateMessage(message string) (string, bool, error) {
}
return "", false, fmt.Errorf("non utf8 encoded message string")
}

// New returns a new SNS notification handler.
func New(c *config.SNSConfig, t *template.Template, l log.Logger, httpOpts ...commoncfg.HTTPClientOption) (*Notifier, error) {

client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "sns", append(httpOpts, commoncfg.WithHTTP2Disabled())...)
if err != nil {
return nil, err
}

return &Notifier{
conf: c,
tmpl: t,
logger: l,
client: client,
retrier: &notify.Retrier{},
}, nil
}
65 changes: 25 additions & 40 deletions notify/sns/sns_test.go
Expand Up @@ -15,51 +15,36 @@ package sns

import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/notify/test"
"github.com/prometheus/alertmanager/types"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func TestNotifier_Notify(t *testing.T) {
ctx, _, fn := test.GetContextWithCancelingURL()
defer fn()
attrTest := map[string]string{}
attrTest["key"] = "testVal"
// These are fake values
notifier, err := New(
&config.SNSConfig{
HTTPConfig: &commoncfg.HTTPClientConfig{},
Message: `{{ template "sns.default.message" . }}`,
TopicARN: "arn:aws:sns:us-east-2:123456789012:My-Topic",
Sigv4: config.SigV4Config{
Region: "us-east-2",
AccessKey: "access_key",
SecretKey: "secret_key",
},
Attributes: attrTest,
},
test.CreateTmpl(t),
log.NewNopLogger(),
)
func TestIsFIFO(t *testing.T) {
require.True(t, isFIFOTopic("arn:aws:sns:us-east-2:624413706616:snsTestTopic.fifo"))
require.False(t, isFIFOTopic("arn:aws:sns:us-east-2:624413706616:snsTestTopic"))
}

func TestValidateAndTruncateMessage(t *testing.T) {
sBuff := make([]byte, 257*1024, 257*1024)
for i := range sBuff {
sBuff[i] = byte(33)
}
truncatedMessage, isTruncated, err := validateAndTruncateMessage(string(sBuff))
require.True(t, isTruncated)
require.NoError(t, err)
require.NotEqual(t, sBuff, truncatedMessage)
require.Equal(t, len(truncatedMessage), 256*1024)

ok, err := notifier.Notify(ctx, []*types.Alert{
&types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"lbl1": "val1",
},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
},
}...)
sBuff = make([]byte, 100, 100)
for i := range sBuff {
sBuff[i] = byte(33)
}
truncatedMessage, isTruncated, err = validateAndTruncateMessage(string(sBuff))
require.False(t, isTruncated)
require.NoError(t, err)
require.False(t, ok)
require.Equal(t, string(sBuff), truncatedMessage)

invalidUtf8String := "\xc3\x28"
_, _, err = validateAndTruncateMessage(invalidUtf8String)
require.Error(t, err)
}