Skip to content

Commit

Permalink
stream: switched from backlog to traffic metric;
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Kamieth committed Feb 1, 2021
1 parent 14050ba commit cf6274e
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 79 deletions.
2 changes: 1 addition & 1 deletion pkg/application/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Default(options ...Option) kernel.Kernel {
WithConfigEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")),
WithConfigSanitizers(cfg.TimeSanitizer),
WithConfigServer,
WithConsumerBacklogMetrics,
WithConsumerTrafficMetrics,
WithKernelSettingsFromConfig,
WithLoggerFormat(mon.FormatGelfFields),
WithLoggerApplicationTag,
Expand Down
4 changes: 2 additions & 2 deletions pkg/application/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ func WithConfigSetting(key string, settings interface{}) Option {
}
}

func WithConsumerBacklogMetrics(app *App) {
func WithConsumerTrafficMetrics(app *App) {
app.addKernelOption(func(config cfg.GosoConf, kernel kernelPkg.GosoKernel) error {
kernel.AddFactory(stream.BacklogMetricWriterFactory)
kernel.AddFactory(stream.TrafficMetricWriterFactory)
return nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
metricNameConsumerBacklog = "Backlog"
metricNameConsumerTraffic = "Traffic"
metricNameConsumerDuration = "Duration"
metricNameConsumerProcessedCount = "ProcessedCount"
metricNameConsumerRunnerCount = "RunnerCount"
Expand All @@ -35,7 +35,7 @@ type ConsumerSettings struct {
RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`
Encoding string `cfg:"encoding" default:"application/json"`
IdleTimeout time.Duration `cfg:"idle_timeout" default:"10s"`
BacklogMetric BacklogMetricWriterSettings `cfg:"backlog_metric"`
TrafficMetric TrafficMetricWriterSettings `cfg:"traffic_metric"`
}

type baseConsumer struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,29 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"sort"
"time"
)

func BacklogMetricWriterFactory(config cfg.Config, logger mon.Logger) (map[string]kernel.ModuleFactory, error) {
func TrafficMetricWriterFactory(config cfg.Config, logger mon.Logger) (map[string]kernel.ModuleFactory, error) {
modules := map[string]kernel.ModuleFactory{}
consumerSettings := readAllConsumerSettings(config)

for consumerName := range consumerSettings {
settings := consumerSettings[consumerName]

if !settings.BacklogMetric.Enabled {
if !settings.TrafficMetric.Enabled {
continue
}

moduleName := fmt.Sprintf("consumer-%s-backlog-metric", consumerName)
modules[moduleName] = NewBacklogMetricWriter(consumerName, &settings.BacklogMetric)
moduleName := fmt.Sprintf("consumer-%s-traffic-metric", consumerName)
modules[moduleName] = NewTrafficMetricWriter(consumerName, &settings.TrafficMetric)
}

return modules, nil
}

type BacklogMetricWriterSettings struct {
type TrafficMetricWriterSettings struct {
Enabled bool `cfg:"enabled" default:"false"`
Period time.Duration `cfg:"period" default:"1m"`
AppId cfg.AppId
Expand All @@ -44,7 +45,7 @@ type BacklogMetricWriterSettings struct {
RunnerCount int
}

type BacklogMetricWriter struct {
type TrafficMetricWriter struct {
kernel.EssentialModule
kernel.ServiceStage

Expand All @@ -53,12 +54,12 @@ type BacklogMetricWriter struct {
cwClient cloudwatchiface.CloudWatchAPI
metricWriter mon.MetricWriter
clock clock.Clock
settings *BacklogMetricWriterSettings
settings *TrafficMetricWriterSettings
}

func NewBacklogMetricWriter(consumerName string, settings *BacklogMetricWriterSettings) kernel.ModuleFactory {
func NewTrafficMetricWriter(consumerName string, settings *TrafficMetricWriterSettings) kernel.ModuleFactory {
return func(ctx context.Context, config cfg.Config, logger mon.Logger) (kernel.Module, error) {
channelName := fmt.Sprintf("consumer-%s-backlog-metric", consumerName)
channelName := fmt.Sprintf("consumer-%s-traffic-metric", consumerName)
logger = logger.WithChannel(channelName)

appId := &cfg.AppId{}
Expand All @@ -68,7 +69,7 @@ func NewBacklogMetricWriter(consumerName string, settings *BacklogMetricWriterSe
inputType := readInputType(config, consumerSettings.Input)

if inputType != InputTypeSqs {
return nil, fmt.Errorf("can not create backlog metric writer as consumer input is not of type SQS")
return nil, fmt.Errorf("can not create traffic metric writer as consumer input is not of type SQS")
}

inputSettings := readSqsInputSettings(config, consumerSettings.Input)
Expand All @@ -79,7 +80,7 @@ func NewBacklogMetricWriter(consumerName string, settings *BacklogMetricWriterSe
settings.MemberId = uuid.New().NewV4()
settings.RunnerCount = consumerSettings.RunnerCount

tableName := fmt.Sprintf("%s-%s-%s-backlog-metric-writer-leaders", appId.Project, appId.Environment, appId.Family)
tableName := fmt.Sprintf("%s-%s-%s-traffic-metric-writer-leaders", appId.Project, appId.Environment, appId.Family)
groupId := fmt.Sprintf("%s-%s", appId.Application, consumerName)

leaderElection, err := conc.NewDdbLeaderElection(config, logger, &conc.DdbLeaderElectionSettings{
Expand All @@ -89,18 +90,18 @@ func NewBacklogMetricWriter(consumerName string, settings *BacklogMetricWriterSe
})

if err != nil {
return nil, fmt.Errorf("can not create leader election for backlog metric writer of consumer %s: %w", consumerName, err)
return nil, fmt.Errorf("can not create leader election for traffic metric writer of consumer %s: %w", consumerName, err)
}

cwClient := mon.ProvideCloudWatchClient(config)
metricWriter := mon.NewMetricDaemonWriter()

return NewBacklogMetricWriterWithInterfaces(logger, leaderElection, cwClient, metricWriter, clock.Provider, settings)
return NewTrafficMetricWriterWithInterfaces(logger, leaderElection, cwClient, metricWriter, clock.Provider, settings)
}
}

func NewBacklogMetricWriterWithInterfaces(logger mon.Logger, leaderElection conc.LeaderElection, cwClient cloudwatchiface.CloudWatchAPI, metricWriter mon.MetricWriter, clock clock.Clock, settings *BacklogMetricWriterSettings) (*BacklogMetricWriter, error) {
writer := &BacklogMetricWriter{
func NewTrafficMetricWriterWithInterfaces(logger mon.Logger, leaderElection conc.LeaderElection, cwClient cloudwatchiface.CloudWatchAPI, metricWriter mon.MetricWriter, clock clock.Clock, settings *TrafficMetricWriterSettings) (*TrafficMetricWriter, error) {
writer := &TrafficMetricWriter{
logger: logger,
leaderElection: leaderElection,
cwClient: cwClient,
Expand All @@ -112,9 +113,9 @@ func NewBacklogMetricWriterWithInterfaces(logger mon.Logger, leaderElection conc
return writer, nil
}

func (u *BacklogMetricWriter) Run(ctx context.Context) error {
func (u *TrafficMetricWriter) Run(ctx context.Context) error {
u.writeRunnerCountMetric(ctx)
u.writeBacklogMetric(ctx)
u.writeTrafficMetric(ctx)

ticker := clock.NewRealTicker(u.settings.Period)

Expand All @@ -124,12 +125,12 @@ func (u *BacklogMetricWriter) Run(ctx context.Context) error {
return nil
case <-ticker.Tick():
u.writeRunnerCountMetric(ctx)
u.writeBacklogMetric(ctx)
u.writeTrafficMetric(ctx)
}
}
}

func (u *BacklogMetricWriter) writeRunnerCountMetric(ctx context.Context) {
func (u *TrafficMetricWriter) writeRunnerCountMetric(ctx context.Context) {
u.metricWriter.WriteOne(&mon.MetricDatum{
Priority: mon.PriorityHigh,
Timestamp: u.clock.Now(),
Expand All @@ -142,10 +143,10 @@ func (u *BacklogMetricWriter) writeRunnerCountMetric(ctx context.Context) {
})
}

func (u *BacklogMetricWriter) writeBacklogMetric(ctx context.Context) {
func (u *TrafficMetricWriter) writeTrafficMetric(ctx context.Context) {
var err error
var isLeader bool
var backlog float64
var traffic float64

if isLeader, err = u.leaderElection.IsLeader(ctx, u.settings.MemberId); err != nil {
u.logger.Warnf("will assume leader role as election failed: %s", err)
Expand All @@ -157,62 +158,63 @@ func (u *BacklogMetricWriter) writeBacklogMetric(ctx context.Context) {
return
}

if backlog, err = u.calculateBacklog(); err != nil {
u.logger.Warnf("can not calculate backlog: %s", err)
if traffic, err = u.calculateTraffic(); err != nil {
u.logger.Warnf("can not calculate traffic: %s", err)
return
}

u.metricWriter.WriteOne(&mon.MetricDatum{
Priority: mon.PriorityHigh,
Timestamp: u.clock.Now(),
MetricName: metricNameConsumerBacklog,
MetricName: metricNameConsumerTraffic,
Dimensions: map[string]string{
"Consumer": u.settings.ConsumerName,
},
Unit: mon.UnitSeconds,
Value: backlog,
Unit: mon.UnitCountAverage,
Value: traffic,
})
}

func (u *BacklogMetricWriter) calculateBacklog() (float64, error) {
func (u *TrafficMetricWriter) calculateTraffic() (float64, error) {
var err error
var runnerCount, consumeDuration, messagesNotVisible, messagesVisible, messagesInQueue, backlog float64
var runnerCount, messagesSent, messagesVisible, traffic float64

if messagesNotVisible, err = u.getMessagesInQueue("ApproximateNumberOfMessagesNotVisible"); err != nil {
return 0, fmt.Errorf("can not get number of messages not visible: %w", err)
if messagesSent, err = u.getMessagesSent(); err != nil {
return 0, fmt.Errorf("can not get number of messages sent: %w", err)
}

if messagesVisible, err = u.getMessagesInQueue("ApproximateNumberOfMessagesVisible"); err != nil {
if messagesVisible, err = u.getMessagesVisible(); err != nil {
return 0, fmt.Errorf("can not get number of messages visible: %w", err)
}

messagesInQueue = messagesNotVisible + messagesVisible

if messagesInQueue == 0 {
return 0, nil
}

if runnerCount, err = u.getRunnerCount(); err != nil {
return 0, fmt.Errorf("can not get runner count: %w", err)
}

if consumeDuration, err = u.getConsumeDuration(); err != nil {
return 0, fmt.Errorf("can not get consume duration: %w", err)
if runnerCount == 0 {
return 0, fmt.Errorf("runner count is zero")
}

backlog = messagesInQueue / runnerCount * consumeDuration
traffic = (messagesSent + messagesVisible) / runnerCount

u.logger.WithFields(mon.Fields{
"messagesSent": messagesSent,
"messagesVisible": messagesVisible,
"runnerCount": runnerCount,
"traffic": traffic,
}).Infof("traffic is at %f", traffic)

return backlog, nil
return traffic, nil
}

func (u *BacklogMetricWriter) getMessagesInQueue(metricName string) (float64, error) {
func (u *TrafficMetricWriter) getMessagesSent() (float64, error) {
startTime := u.clock.Now().Add(-1 * u.settings.Period * 5)
endTime := u.clock.Now()
endTime := u.clock.Now().Add(-1 * u.settings.Period)
period := int64(u.settings.Period.Seconds())

input := &cloudwatch.GetMetricStatisticsInput{
Namespace: aws.String("AWS/SQS"),
MetricName: aws.String(metricName),
MetricName: aws.String("NumberOfMessagesSent"),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("QueueName"),
Expand All @@ -222,7 +224,7 @@ func (u *BacklogMetricWriter) getMessagesInQueue(metricName string) (float64, er
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
Period: aws.Int64(period),
Statistics: []*string{aws.String(cloudwatch.StatisticMaximum)},
Statistics: []*string{aws.String(cloudwatch.StatisticSum)},
Unit: aws.String(cloudwatch.StandardUnitCount),
}

Expand All @@ -236,65 +238,64 @@ func (u *BacklogMetricWriter) getMessagesInQueue(metricName string) (float64, er
return 0, fmt.Errorf("no data points available")
}

consumeDuration := *out.Datapoints[len(out.Datapoints)-1].Maximum
sort.Slice(out.Datapoints, func(i, j int) bool {
return out.Datapoints[i].Timestamp.After(*out.Datapoints[j].Timestamp)
})
messagesSent := *out.Datapoints[0].Sum

return consumeDuration, nil
return messagesSent, nil
}

func (u *BacklogMetricWriter) getRunnerCount() (float64, error) {
appId := u.settings.AppId
namespace := fmt.Sprintf("%s/%s/%s/%s", appId.Project, appId.Environment, appId.Family, appId.Application)

func (u *TrafficMetricWriter) getMessagesVisible() (float64, error) {
startTime := u.clock.Now().Add(-1 * u.settings.Period * 5)
endTime := u.clock.Now()
endTime := u.clock.Now().Add(-1 * u.settings.Period)
period := int64(u.settings.Period.Seconds())

input := &cloudwatch.GetMetricStatisticsInput{
Namespace: aws.String(namespace),
MetricName: aws.String(metricNameConsumerRunnerCount),
Namespace: aws.String("AWS/SQS"),
MetricName: aws.String("ApproximateNumberOfMessagesVisible"),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("Consumer"),
Value: aws.String(u.settings.ConsumerName),
Name: aws.String("QueueName"),
Value: aws.String(u.settings.QueueName),
},
},
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
Period: aws.Int64(period),
Statistics: []*string{aws.String(cloudwatch.StatisticSum)},
Statistics: []*string{aws.String(cloudwatch.StatisticMaximum)},
Unit: aws.String(cloudwatch.StandardUnitCount),
}

out, err := u.cwClient.GetMetricStatistics(input)

if err != nil {
return 0, fmt.Errorf("can not get metric: %w", err)
return 0, fmt.Errorf("can not get metric statistics: %w", err)
}

if len(out.Datapoints) == 0 {
return 0, fmt.Errorf("no data points available")
}

runnerCount := *out.Datapoints[len(out.Datapoints)-1].Sum

if runnerCount == 0 {
return 0, fmt.Errorf("invalid runner count of 0")
}
sort.Slice(out.Datapoints, func(i, j int) bool {
return out.Datapoints[i].Timestamp.After(*out.Datapoints[j].Timestamp)
})
messagesVisible := *out.Datapoints[0].Maximum

return runnerCount, nil
return messagesVisible, nil
}

func (u *BacklogMetricWriter) getConsumeDuration() (float64, error) {
func (u *TrafficMetricWriter) getRunnerCount() (float64, error) {
appId := u.settings.AppId
namespace := fmt.Sprintf("%s/%s/%s/%s", appId.Project, appId.Environment, appId.Family, appId.Application)

startTime := u.clock.Now().Add(-1 * u.settings.Period * 5)
endTime := u.clock.Now()
endTime := u.clock.Now().Add(-1 * u.settings.Period)
period := int64(u.settings.Period.Seconds())

input := &cloudwatch.GetMetricStatisticsInput{
Namespace: aws.String(namespace),
MetricName: aws.String(metricNameConsumerDuration),
MetricName: aws.String(metricNameConsumerRunnerCount),
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("Consumer"),
Expand All @@ -304,25 +305,28 @@ func (u *BacklogMetricWriter) getConsumeDuration() (float64, error) {
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
Period: aws.Int64(period),
Statistics: []*string{aws.String(cloudwatch.StatisticAverage)},
Unit: aws.String(cloudwatch.StandardUnitMilliseconds),
Statistics: []*string{aws.String(cloudwatch.StatisticSum)},
Unit: aws.String(cloudwatch.StandardUnitCount),
}

out, err := u.cwClient.GetMetricStatistics(input)

if err != nil {
return 0, fmt.Errorf("can not get metric for consumer runner count: %w", err)
return 0, fmt.Errorf("can not get metric: %w", err)
}

if len(out.Datapoints) == 0 {
return 0, fmt.Errorf("no data points available")
}

consumeDuration := *out.Datapoints[len(out.Datapoints)-1].Average / 1000
sort.Slice(out.Datapoints, func(i, j int) bool {
return out.Datapoints[i].Timestamp.After(*out.Datapoints[j].Timestamp)
})
runnerCount := *out.Datapoints[0].Sum

if consumeDuration == 0 {
return 0, fmt.Errorf("invalid consume duration of 0")
if runnerCount == 0 {
return 0, fmt.Errorf("invalid runner count of 0")
}

return consumeDuration, nil
return runnerCount, nil
}

0 comments on commit cf6274e

Please sign in to comment.