diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 121cd2726ab..6d50c91fd50 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -77,6 +77,16 @@ updates: schedule: interval: "weekly" day: "sunday" + - + package-ecosystem: "gomod" + directory: "/samplers/aws/xray" + labels: + - dependencies + - go + - "Skip Changelog" + schedule: + interval: "weekly" + day: "sunday" - package-ecosystem: "gomod" directory: "/exporters/metric/cortex" diff --git a/samplers/aws/go.mod b/samplers/aws/go.mod new file mode 100644 index 00000000000..a650922fad0 --- /dev/null +++ b/samplers/aws/go.mod @@ -0,0 +1,10 @@ +module go.opentelemetry.io/contrib/samplers/aws + +go 1.16 + +require ( + github.com/go-logr/logr v1.2.1 + github.com/go-logr/stdr v1.2.0 + github.com/stretchr/testify v1.7.0 + go.opentelemetry.io/otel/sdk v1.3.0 +) diff --git a/samplers/aws/go.sum b/samplers/aws/go.sum new file mode 100644 index 00000000000..cc354a8aa45 --- /dev/null +++ b/samplers/aws/go.sum @@ -0,0 +1,27 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA= +github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.0 h1:j4LrlVXgrbIWO83mmQUnK0Hi+YnbD+vzrE1z/EphbFE= +github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opentelemetry.io/otel v1.3.0 h1:APxLf0eiBwLl+SOXiJJCVYzA1OOJNyAoV8C5RNRyy7Y= +go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= +go.opentelemetry.io/otel/sdk v1.3.0 h1:3278edCoH89MEJ0Ky8WQXVmDQv3FX4ZJ3Pp+9fJreAI= +go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= +go.opentelemetry.io/otel/trace v1.3.0 h1:doy8Hzb1RJ+I3yFhtDmwNc7tIyw1tNMOIsyPzp1NOGY= +go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/samplers/aws/xray/client.go b/samplers/aws/xray/client.go new file mode 100644 index 00000000000..f61b223deda --- /dev/null +++ b/samplers/aws/xray/client.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" +) + +type xrayClient struct { + // http client for sending sampling requests to the collector + httpClient *http.Client + + endpoint *url.URL +} + +// newClient returns an HTTP client with proxy endpoint +func newClient(addr string) (client *xrayClient, err error) { + endpoint := "http://" + addr + + endpointURL, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + + return &xrayClient{ + httpClient: &http.Client{}, + endpoint: endpointURL, + }, nil +} + +// getSamplingRules calls the collector(aws proxy enabled) for sampling rules +func (c *xrayClient) getSamplingRules(ctx context.Context) (*getSamplingRulesOutput, error) { + samplingRulesInput, err := json.Marshal(getSamplingRulesInput{}) + if err != nil { + return nil, err + } + body := bytes.NewReader(samplingRulesInput) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint.String()+"/GetSamplingRules", body) + if err != nil { + return nil, fmt.Errorf("xray client: failed to create http request: %w", err) + } + + output, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("xray client: unable to retrieve sampling settings: %w", err) + } + defer output.Body.Close() + + var samplingRulesOutput *getSamplingRulesOutput + if err := json.NewDecoder(output.Body).Decode(&samplingRulesOutput); err != nil { + return nil, fmt.Errorf("xray client: unable to unmarshal the response body: %w", err) + } + + return samplingRulesOutput, nil +} diff --git a/samplers/aws/xray/client_test.go b/samplers/aws/xray/client_test.go new file mode 100644 index 00000000000..34bcfdd0e70 --- /dev/null +++ b/samplers/aws/xray/client_test.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetSamplingRules(t *testing.T) { + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/Default", + "RuleName": "Default", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + }, + { + "CreatedAt": 1637691613, + "ModifiedAt": 1643748669, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.09, + "HTTPMethod": "GET", + "Host": "*", + "Priority": 1, + "ReservoirSize": 3, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/test-rule", + "RuleName": "test-rule", + "ServiceName": "test-rule", + "ServiceType": "local", + "URLPath": "/aws-sdk-call", + "Version": 1 + } + }, + { + "CreatedAt": 1639446197, + "ModifiedAt": 1639446197, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.09, + "HTTPMethod": "*", + "Host": "*", + "Priority": 100, + "ReservoirSize": 100, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/test-rule-1", + "RuleName": "test-rule-1", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + ctx := context.Background() + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + })) + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + client, err := newClient(u.Host) + require.NoError(t, err) + + samplingRules, err := client.getSamplingRules(ctx) + require.NoError(t, err) + + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.RuleName, "Default") + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.ServiceType, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.Host, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.URLPath, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.ReservoirSize, int64(60)) + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.FixedRate, 0.5) + + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.RuleName, "test-rule") + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.ServiceType, "local") + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.Host, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.URLPath, "/aws-sdk-call") + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.ReservoirSize, int64(3)) + assert.Equal(t, *samplingRules.SamplingRuleRecords[1].SamplingRule.FixedRate, 0.09) + + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.RuleName, "test-rule-1") + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.ServiceType, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.Host, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.URLPath, "*") + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.ReservoirSize, int64(100)) + assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.FixedRate, 0.09) +} + +func TestGetSamplingRulesWithMissingValues(t *testing.T) { + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/Default", + "RuleName": "Default", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + ctx := context.Background() + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + })) + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + client, err := newClient(u.Host) + require.NoError(t, err) + + samplingRules, err := client.getSamplingRules(ctx) + require.NoError(t, err) + + // Priority and ReservoirSize are missing in API response so they are assigned as nil + assert.Nil(t, samplingRules.SamplingRuleRecords[0].SamplingRule.Priority) + assert.Nil(t, samplingRules.SamplingRuleRecords[0].SamplingRule.ReservoirSize) + + // other values are stored as expected + assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.RuleName, "Default") +} + +func TestNewClient(t *testing.T) { + xrayClient, err := newClient("127.0.0.1:2020") + require.NoError(t, err) + + assert.Equal(t, xrayClient.endpoint.String(), "http://127.0.0.1:2020") +} + +func TestEndpointIsNotReachable(t *testing.T) { + client, err := newClient("127.0.0.1:2020") + require.NoError(t, err) + _, err = client.getSamplingRules(context.Background()) + assert.Error(t, err) +} diff --git a/samplers/aws/xray/clock.go b/samplers/aws/xray/clock.go new file mode 100644 index 00000000000..d04e4c72ae9 --- /dev/null +++ b/samplers/aws/xray/clock.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "time" +) + +// clock provides an interface to implement method for getting current time. +type clock interface { + now() time.Time +} + +// defaultClock is an implementation of Clock interface. +type defaultClock struct{} + +// now returns current time. +func (t *defaultClock) now() time.Time { + return time.Now() +} diff --git a/samplers/aws/xray/rand.go b/samplers/aws/xray/rand.go new file mode 100644 index 00000000000..88c2796e238 --- /dev/null +++ b/samplers/aws/xray/rand.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + crand "crypto/rand" + "encoding/binary" + "math/rand" + "sync" + "time" +) + +var _ rand.Source = (*lockedSource)(nil) +var _ rand.Source64 = (*lockedSource64)(nil) + +type lockedSource struct { + mu sync.Mutex + src rand.Source +} + +func (src *lockedSource) Int63() int64 { + src.mu.Lock() + defer src.mu.Unlock() + return src.src.Int63() +} + +func (src *lockedSource) Seed(seed int64) { + src.mu.Lock() + defer src.mu.Unlock() + src.src.Seed(seed) +} + +type lockedSource64 struct { + mu sync.Mutex + src rand.Source64 +} + +func (src *lockedSource64) Int63() int64 { + src.mu.Lock() + defer src.mu.Unlock() + return src.src.Int63() +} + +func (src *lockedSource64) Uint64() uint64 { + src.mu.Lock() + defer src.mu.Unlock() + return src.src.Uint64() +} + +func (src *lockedSource64) Seed(seed int64) { + src.mu.Lock() + defer src.mu.Unlock() + src.src.Seed(seed) +} + +func newSeed() int64 { + var seed int64 + if err := binary.Read(crand.Reader, binary.BigEndian, &seed); err != nil { + // fallback to timestamp + seed = time.Now().UnixNano() + } + return seed +} + +func newGlobalRand() *rand.Rand { + src := rand.NewSource(newSeed()) + if src64, ok := src.(rand.Source64); ok { + return rand.New(&lockedSource64{src: src64}) + } + return rand.New(&lockedSource{src: src}) +} + +// Rand is an interface for a set of methods that return random value. +type Rand interface { + Int63n(n int64) int64 + Intn(n int) int + Float64() float64 +} + +// DefaultRand is an implementation of Rand interface. +// It is safe for concurrent use by multiple goroutines. +type defaultRand struct{} + +var globalRand = newGlobalRand() + +// Int63n returns, as an int64, a non-negative pseudo-random number in [0,n) +// from the default Source. +func (r *defaultRand) Int63n(n int64) int64 { + return globalRand.Int63n(n) +} + +// Intn returns, as an int, a non-negative pseudo-random number in [0,n) +// from the default Source. +func (r *defaultRand) Intn(n int) int { + return globalRand.Intn(n) +} + +// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0) +// from the default Source. +func (r *defaultRand) Float64() float64 { + return globalRand.Float64() +} diff --git a/samplers/aws/xray/remote_sampler.go b/samplers/aws/xray/remote_sampler.go new file mode 100644 index 00000000000..f61f53b0aec --- /dev/null +++ b/samplers/aws/xray/remote_sampler.go @@ -0,0 +1,253 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "context" + crypto "crypto/rand" + "fmt" + "sync" + "time" + + "github.com/go-logr/logr" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// global variable for logging +var globalLogger logr.Logger + +// remoteSampler is a sampler for AWS X-Ray which polls sampling rules and sampling targets +// to make a sampling decision based on rules set by users on AWS X-Ray console +type remoteSampler struct { + // manifest is the list of known centralized sampling rules. + manifest *manifest + + // xrayClient is used for getting quotas and sampling rules. + xrayClient *xrayClient + + // pollerStarted, if true represents rule and target pollers are started. + pollerStarted bool + + // samplingRulesPollingInterval, default is 300 seconds. + samplingRulesPollingInterval time.Duration + + // Unique ID used by XRay service to identify this client. + clientID string + + // Provides time. + clock clock + + mu sync.RWMutex +} + +// Compile time assertion that remoteSampler implements the Sampler interface. +var _ sdktrace.Sampler = (*remoteSampler)(nil) + +// NewRemoteSampler returns a sampler which decides to sample a given request or not +// based on the sampling rules set by users on AWS X-Ray console. Sampler also periodically polls +// sampling rules and sampling targets. +func NewRemoteSampler(ctx context.Context, opts ...Option) (sdktrace.Sampler, error) { + cfg := newConfig(opts...) + + // validate config + err := validateConfig(cfg) + if err != nil { + return nil, err + } + + // Generate clientID + var r [12]byte + + _, err = crypto.Read(r[:]) + if err != nil { + return nil, fmt.Errorf("unable to generate client ID: %w", err) + } + + id := fmt.Sprintf("%02x", r) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(cfg.endpoint) + if err != nil { + return nil, err + } + + remoteSampler := &remoteSampler{ + clock: clock, + manifest: m, + clientID: id, + xrayClient: client, + samplingRulesPollingInterval: cfg.samplingRulesPollingInterval, + } + + // starts the rule poller + remoteSampler.start(ctx) + + return remoteSampler, nil +} + +func (rs *remoteSampler) ShouldSample(parameters sdktrace.SamplingParameters) sdktrace.SamplingResult { + // ToDo: add business logic for remote sampling + + return sdktrace.TraceIDRatioBased(0.05).ShouldSample(parameters) +} + +func (rs *remoteSampler) Description() string { + return "AwsXrayRemoteSampler{" + rs.getDescription() + "}" +} + +func (rs *remoteSampler) getDescription() string { + return "remote sampling with AWS X-Ray" +} + +func (rs *remoteSampler) start(ctx context.Context) { + if !rs.pollerStarted { + rs.pollerStarted = true + rs.startPoller(ctx) + } +} + +func (rs *remoteSampler) startPoller(ctx context.Context) { + // ToDo: add logic for getting sampling targets + go func() { + // Period = 300s, Jitter = 5s + t := newTicker(rs.samplingRulesPollingInterval, 5*time.Second) + + // Periodic manifest refresh + for { + if err := rs.refreshManifest(ctx); err != nil { + globalLogger.Error(err, "Error occurred while refreshing sampling rules") + } else { + globalLogger.V(1).Info("Successfully fetched sampling rules") + } + select { + case _, more := <-t.C(): + if !more { + return + } + continue + case <-ctx.Done(): + return + } + } + }() +} + +func (rs *remoteSampler) refreshManifest(ctx context.Context) (err error) { + // Compute 'now' before calling GetSamplingRules to avoid marking manifest as + // fresher than it actually is. + now := rs.clock.now() + + // Get sampling rules from proxy. + rules, err := rs.xrayClient.getSamplingRules(ctx) + if err != nil { + return + } + + // temporary manifest declaration. + tempManifest := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: &defaultClock{}, + } + + for _, records := range rules.SamplingRuleRecords { + if records.SamplingRule.RuleName == nil { + globalLogger.V(1).Info("Sampling rule without rule name is not supported") + continue + } + + // Only sampling rule with version 1 is valid + if records.SamplingRule.Version == nil { + globalLogger.V(1).Info("Sampling rule without Version is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if *records.SamplingRule.Version != int64(1) { + globalLogger.V(1).Info("Sampling rule without Version 1 is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.ServiceName == nil { + globalLogger.V(1).Info("Sampling rule without ServiceName is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.ServiceType == nil { + globalLogger.V(1).Info("Sampling rule without ServiceType is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.Host == nil { + globalLogger.V(1).Info("Sampling rule without Host is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.HTTPMethod == nil { + globalLogger.V(1).Info("Sampling rule without HTTPMethod is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.URLPath == nil { + globalLogger.V(1).Info("Sampling rule without URLPath is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.ReservoirSize == nil { + globalLogger.V(1).Info("Sampling rule without ReservoirSize is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.FixedRate == nil { + globalLogger.V(1).Info("Sampling rule without FixedRate is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.ResourceARN == nil { + globalLogger.V(1).Info("Sampling rule without ResourceARN is not applicable", "RuleName", *records.SamplingRule.RuleName) + continue + } + + if records.SamplingRule.Priority == nil { + globalLogger.V(1).Info("Sampling rule without version number is not supported", "RuleName", *records.SamplingRule.RuleName) + continue + } + + // create rule and store it in temporary manifest to avoid locking issues. + createErr := tempManifest.createRule(records.SamplingRule) + if createErr != nil { + globalLogger.Error(createErr, "Error occurred creating/updating rule") + } + } + + // Re-sort to fix matching priorities. + tempManifest.sort() + // Update refreshedAt timestamp + tempManifest.refreshedAt = now + + // assign temp manifest to original copy/one sync refresh. + rs.mu.Lock() + rs.manifest = tempManifest + rs.mu.Unlock() + + return +} diff --git a/samplers/aws/xray/remote_sampler_config.go b/samplers/aws/xray/remote_sampler_config.go new file mode 100644 index 00000000000..e80017f4713 --- /dev/null +++ b/samplers/aws/xray/remote_sampler_config.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "fmt" + "log" + "math" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" +) + +const ( + defaultProxyEndpoint = "127.0.0.1:2000" + defaultPollingInterval = 300 +) + +// SamplerOption is a function that sets config on the sampler +type Option func(options *config) + +type config struct { + endpoint string + samplingRulesPollingInterval time.Duration + logger logr.Logger +} + +// sets custom proxy endpoint +func WithEndpoint(endpoint string) Option { + return func(o *config) { + o.endpoint = endpoint + } +} + +// sets polling interval for sampling rules +func WithSamplingRulesPollingInterval(polingInterval time.Duration) Option { + return func(o *config) { + o.samplingRulesPollingInterval = polingInterval + } +} + +// sets custom logging for remote sampling implementation +func WithLogger(l logr.Logger) Option { + return func(o *config) { + o.logger = l + } +} + +func newConfig(opts ...Option) *config { + cfg := &config{ + endpoint: defaultProxyEndpoint, + samplingRulesPollingInterval: defaultPollingInterval * time.Second, + logger: stdr.NewWithOptions(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile), stdr.Options{LogCaller: stdr.Error}), + } + + for _, option := range opts { + option(cfg) + } + + // setting global logger + globalLogger = cfg.logger + + return cfg +} + +func validateConfig(cfg *config) (err error) { + // check endpoint follows certain format + split := strings.Split(cfg.endpoint, ":") + + if len(split) > 2 { + return fmt.Errorf("endpoint validation error: expected format is 127.0.0.1:8080") + } + + // validate host name + r, err := regexp.Compile("[^A-Za-z0-9.]") + if err != nil { + return err + } + + if r.MatchString(split[0]) { + return fmt.Errorf("endpoint validation error: expected format is 127.0.0.1:8080") + } + + // validate port + if _, err := strconv.Atoi(split[1]); err != nil { + return fmt.Errorf("endpoint validation error: expected format is 127.0.0.1:8080") + } + + // validate polling interval is positive + if math.Signbit(float64(cfg.samplingRulesPollingInterval)) { + return fmt.Errorf("endpoint validation error: samplingRulesPollingInterval should be positive number") + } + + return +} diff --git a/samplers/aws/xray/remote_sampler_config_test.go b/samplers/aws/xray/remote_sampler_config_test.go new file mode 100644 index 00000000000..5470387de41 --- /dev/null +++ b/samplers/aws/xray/remote_sampler_config_test.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "log" + "os" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/go-logr/stdr" + "github.com/stretchr/testify/assert" +) + +// assert that user provided values are tied to config +func TestNewConfig(t *testing.T) { + cfg := newConfig(WithSamplingRulesPollingInterval(400*time.Second), WithEndpoint("127.0.0.1:5000"), WithLogger(logr.Logger{})) + + assert.Equal(t, cfg.samplingRulesPollingInterval, 400*time.Second) + assert.Equal(t, cfg.endpoint, "127.0.0.1:5000") + assert.Equal(t, cfg.logger, logr.Logger{}) +} + +// assert that when user did not provide values are then config would be picked up from default values +func TestDefaultConfig(t *testing.T) { + cfg := newConfig() + + assert.Equal(t, cfg.samplingRulesPollingInterval, 300*time.Second) + assert.Equal(t, cfg.endpoint, "127.0.0.1:2000") + assert.Equal(t, cfg.logger, stdr.NewWithOptions(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile), stdr.Options{LogCaller: stdr.Error})) +} + +// asset when some config is provided by user then other config will be picked up from default config +func TestPartialUserProvidedConfig(t *testing.T) { + cfg := newConfig(WithSamplingRulesPollingInterval(500 * time.Second)) + + assert.Equal(t, cfg.samplingRulesPollingInterval, 500*time.Second) + assert.Equal(t, cfg.endpoint, "127.0.0.1:2000") + assert.Equal(t, cfg.logger, stdr.NewWithOptions(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile), stdr.Options{LogCaller: stdr.Error})) +} + +func TestValidateConfigIncorrectEndpoint(t *testing.T) { + cfg := newConfig(WithEndpoint("http://127.0.0.1:2000")) + + err := validateConfig(cfg) + assert.Error(t, err) +} + +func TestValidateConfigSpecialCharacterEndpoint(t *testing.T) { + cfg := newConfig(WithEndpoint("@127.0.0.1:2000")) + + err := validateConfig(cfg) + assert.Error(t, err) +} + +func TestValidateConfigLocalHost(t *testing.T) { + cfg := newConfig(WithEndpoint("localhost:2000")) + + err := validateConfig(cfg) + assert.NoError(t, err) +} + +func TestValidateConfigInvalidPort(t *testing.T) { + cfg := newConfig(WithEndpoint("127.0.0.1:abcd")) + + err := validateConfig(cfg) + assert.Error(t, err) +} + +func TestValidateConfigNegativeDuration(t *testing.T) { + cfg := newConfig(WithSamplingRulesPollingInterval(-300 * time.Second)) + + err := validateConfig(cfg) + assert.Error(t, err) +} + +func TestValidateConfigPositiveDuration(t *testing.T) { + cfg := newConfig(WithSamplingRulesPollingInterval(300 * time.Second)) + + err := validateConfig(cfg) + assert.NoError(t, err) +} diff --git a/samplers/aws/xray/remote_sampler_test.go b/samplers/aws/xray/remote_sampler_test.go new file mode 100644 index 00000000000..aba9784703a --- /dev/null +++ b/samplers/aws/xray/remote_sampler_test.go @@ -0,0 +1,821 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "context" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRefreshManifest(t *testing.T) { + ctx := context.Background() + + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + }, + { + "CreatedAt": 1637691613, + "ModifiedAt": 1643748669, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.09, + "HTTPMethod": "GET", + "Host": "*", + "Priority": 1, + "ReservoirSize": 3, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r2", + "RuleName": "r2", + "ServiceName": "test-rule", + "ServiceType": "*", + "URLPath": "/aws-sdk-call", + "Version": 1 + } + }, + { + "CreatedAt": 1639446197, + "ModifiedAt": 1639446197, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.09, + "HTTPMethod": "*", + "Host": "*", + "Priority": 100, + "ReservoirSize": 100, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r3", + "RuleName": "r3", + "ServiceName": "*", + "ServiceType": "local", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Rule 'r1' + r1 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r1"), + Priority: getIntPointer(10000), + Host: getStringPointer("*"), + HTTPMethod: getStringPointer("*"), + URLPath: getStringPointer("*"), + ReservoirSize: getIntPointer(60), + Version: getIntPointer(1), + FixedRate: getFloatPointer(0.5), + ServiceName: getStringPointer("*"), + ResourceARN: getStringPointer("*"), + ServiceType: getStringPointer("*"), + }, + } + + // Rule 'r2' + r2 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r2"), + Priority: getIntPointer(1), + Host: getStringPointer("*"), + HTTPMethod: getStringPointer("GET"), + URLPath: getStringPointer("/aws-sdk-call"), + ReservoirSize: getIntPointer(3), + FixedRate: getFloatPointer(0.09), + Version: getIntPointer(1), + ServiceName: getStringPointer("test-rule"), + ResourceARN: getStringPointer("*"), + ServiceType: getStringPointer("*"), + }, + } + + // Rule 'r3' + r3 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r3"), + Priority: getIntPointer(100), + Host: getStringPointer("*"), + HTTPMethod: getStringPointer("*"), + URLPath: getStringPointer("*"), + ReservoirSize: getIntPointer(100), + FixedRate: getFloatPointer(0.09), + Version: getIntPointer(1), + ServiceName: getStringPointer("*"), + ResourceARN: getStringPointer("*"), + ServiceType: getStringPointer("local"), + }, + } + // Assert on sorting order + assert.Equal(t, r2.ruleProperties.RuleName, rs.manifest.rules[0].ruleProperties.RuleName) + assert.Equal(t, r2.ruleProperties.Priority, rs.manifest.rules[0].ruleProperties.Priority) + assert.Equal(t, r2.ruleProperties.Host, rs.manifest.rules[0].ruleProperties.Host) + assert.Equal(t, r2.ruleProperties.HTTPMethod, rs.manifest.rules[0].ruleProperties.HTTPMethod) + assert.Equal(t, r2.ruleProperties.URLPath, rs.manifest.rules[0].ruleProperties.URLPath) + assert.Equal(t, r2.ruleProperties.ReservoirSize, rs.manifest.rules[0].ruleProperties.ReservoirSize) + assert.Equal(t, r2.ruleProperties.FixedRate, rs.manifest.rules[0].ruleProperties.FixedRate) + assert.Equal(t, r2.ruleProperties.Version, rs.manifest.rules[0].ruleProperties.Version) + assert.Equal(t, r2.ruleProperties.ServiceName, rs.manifest.rules[0].ruleProperties.ServiceName) + assert.Equal(t, r2.ruleProperties.ResourceARN, rs.manifest.rules[0].ruleProperties.ResourceARN) + assert.Equal(t, r2.ruleProperties.ServiceType, rs.manifest.rules[0].ruleProperties.ServiceType) + + assert.Equal(t, r3.ruleProperties.RuleName, rs.manifest.rules[1].ruleProperties.RuleName) + assert.Equal(t, r3.ruleProperties.Priority, rs.manifest.rules[1].ruleProperties.Priority) + assert.Equal(t, r3.ruleProperties.Host, rs.manifest.rules[1].ruleProperties.Host) + assert.Equal(t, r3.ruleProperties.HTTPMethod, rs.manifest.rules[1].ruleProperties.HTTPMethod) + assert.Equal(t, r3.ruleProperties.URLPath, rs.manifest.rules[1].ruleProperties.URLPath) + assert.Equal(t, r3.ruleProperties.ReservoirSize, rs.manifest.rules[1].ruleProperties.ReservoirSize) + assert.Equal(t, r3.ruleProperties.FixedRate, rs.manifest.rules[1].ruleProperties.FixedRate) + assert.Equal(t, r3.ruleProperties.Version, rs.manifest.rules[1].ruleProperties.Version) + assert.Equal(t, r3.ruleProperties.ServiceName, rs.manifest.rules[1].ruleProperties.ServiceName) + assert.Equal(t, r3.ruleProperties.ResourceARN, rs.manifest.rules[1].ruleProperties.ResourceARN) + assert.Equal(t, r3.ruleProperties.ServiceType, rs.manifest.rules[1].ruleProperties.ServiceType) + + assert.Equal(t, r1.ruleProperties.RuleName, rs.manifest.rules[2].ruleProperties.RuleName) + assert.Equal(t, r1.ruleProperties.Priority, rs.manifest.rules[2].ruleProperties.Priority) + assert.Equal(t, r1.ruleProperties.Host, rs.manifest.rules[2].ruleProperties.Host) + assert.Equal(t, r1.ruleProperties.HTTPMethod, rs.manifest.rules[2].ruleProperties.HTTPMethod) + assert.Equal(t, r1.ruleProperties.URLPath, rs.manifest.rules[2].ruleProperties.URLPath) + assert.Equal(t, r1.ruleProperties.ReservoirSize, rs.manifest.rules[2].ruleProperties.ReservoirSize) + assert.Equal(t, r1.ruleProperties.FixedRate, rs.manifest.rules[2].ruleProperties.FixedRate) + assert.Equal(t, r1.ruleProperties.Version, rs.manifest.rules[2].ruleProperties.Version) + assert.Equal(t, r1.ruleProperties.ServiceName, rs.manifest.rules[2].ruleProperties.ServiceName) + assert.Equal(t, r1.ruleProperties.ResourceARN, rs.manifest.rules[2].ruleProperties.ResourceARN) + assert.Equal(t, r1.ruleProperties.ServiceType, rs.manifest.rules[2].ruleProperties.ServiceType) + + // Assert on size of manifest + assert.Equal(t, 3, len(rs.manifest.rules)) + assert.Equal(t, 3, len(rs.manifest.index)) +} + +// assert that rule with nil ServiceName does not update to the manifest +func TestRefreshManifestMissingServiceName(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // invalid rule due to ResourceARN + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "XYZ", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Refresh manifest with updates from mock proxy + assert.Equal(t, 0, len(rs.manifest.rules)) // Rule not added +} + +// assert that rule with nil ServiceType does not update to the manifest +func TestRefreshManifestMissingServiceType(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // invalid rule due to ResourceARN + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "XYZ", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "test", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Refresh manifest with updates from mock proxy + assert.Equal(t, 0, len(rs.manifest.rules)) // Rule not added +} + +// assert that rule with nil ReservoirSize does not update to the manifest +func TestRefreshManifestMissingReservoirSize(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // invalid rule due to ResourceARN + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ResourceARN": "XYZ", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "test", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Refresh manifest with updates from mock proxy + assert.Equal(t, 0, len(rs.manifest.rules)) // Rule not added +} + +// assert that rule with version greater than one does not update to the manifest +func TestRefreshManifestIncorrectPriority(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // invalid rule due to ResourceARN + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "XYZ", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "test", + "ServiceType": "*", + "URLPath": "*", + "Version": 5 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Refresh manifest with updates from mock proxy + assert.Equal(t, 0, len(rs.manifest.rules)) // Rule not added +} + +// assert that rule nil attributes does update the manifest +func TestRefreshManifestNilAttributes(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // invalid rule due to ResourceARN + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ResourceARN": "XYZ", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ReservoirSize": 60, + "ServiceName": "test", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // Refresh manifest with updates from mock proxy + assert.Equal(t, 1, len(rs.manifest.rules)) // Rule added +} + +// assert that 1 valid and 1 invalid rule update only valid rule gets stored to the manifest +func TestRefreshManifestAddOneInvalidRule(t *testing.T) { + ctx := context.Background() + + // to enable logging + newConfig() + + // host is missing from r2 + body := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + }, + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {"a":"b"}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r2", + "RuleName": "r2", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + // Rule 'r1' + r1 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r1"), + Priority: getIntPointer(10000), + Host: getStringPointer("*"), + HTTPMethod: getStringPointer("*"), + URLPath: getStringPointer("*"), + ReservoirSize: getIntPointer(60), + FixedRate: getFloatPointer(0.5), + Version: getIntPointer(1), + ServiceName: getStringPointer("*"), + ResourceARN: getStringPointer("*"), + ServiceType: getStringPointer("*"), + }, + } + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + _, err := res.Write([]byte(body)) + require.NoError(t, err) + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + assert.Equal(t, 1, len(rs.manifest.rules)) + + assert.Equal(t, r1.ruleProperties.RuleName, rs.manifest.rules[0].ruleProperties.RuleName) + assert.Equal(t, r1.ruleProperties.Priority, rs.manifest.rules[0].ruleProperties.Priority) + assert.Equal(t, r1.ruleProperties.Host, rs.manifest.rules[0].ruleProperties.Host) + assert.Equal(t, r1.ruleProperties.HTTPMethod, rs.manifest.rules[0].ruleProperties.HTTPMethod) + assert.Equal(t, r1.ruleProperties.URLPath, rs.manifest.rules[0].ruleProperties.URLPath) + assert.Equal(t, r1.ruleProperties.ReservoirSize, rs.manifest.rules[0].ruleProperties.ReservoirSize) + assert.Equal(t, r1.ruleProperties.FixedRate, rs.manifest.rules[0].ruleProperties.FixedRate) + assert.Equal(t, r1.ruleProperties.Version, rs.manifest.rules[0].ruleProperties.Version) + assert.Equal(t, r1.ruleProperties.ServiceName, rs.manifest.rules[0].ruleProperties.ServiceName) + assert.Equal(t, r1.ruleProperties.ResourceARN, rs.manifest.rules[0].ruleProperties.ResourceARN) + assert.Equal(t, r1.ruleProperties.ServiceType, rs.manifest.rules[0].ruleProperties.ServiceType) +} + +// assert that manifest rules and index correctly updates from temporary manifest with each update +func TestManifestRulesAndIndexUpdate(t *testing.T) { + ctx := context.Background() + count := 0 + + // to enable logging + newConfig() + + // first update + body1 := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 100000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + }, + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 10000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r2", + "RuleName": "r2", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + // second update + body2 := []byte(`{ + "NextToken": null, + "SamplingRuleRecords": [ + { + "CreatedAt": 0, + "ModifiedAt": 1639517389, + "SamplingRule": { + "Attributes": {}, + "FixedRate": 0.5, + "HTTPMethod": "*", + "Host": "*", + "Priority": 100000, + "ReservoirSize": 60, + "ResourceARN": "*", + "RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/r1", + "RuleName": "r1", + "ServiceName": "*", + "ServiceType": "*", + "URLPath": "*", + "Version": 1 + } + } + ] +}`) + + // generate a test server so we can capture and inspect the request + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + if count == 0 { + // first update + _, err := res.Write([]byte(body1)) + require.NoError(t, err) + } else { + // second update + _, err := res.Write([]byte(body2)) + require.NoError(t, err) + } + })) + defer testServer.Close() + + u, err := url.Parse(testServer.URL) + require.NoError(t, err) + + clock := &defaultClock{} + + m := &manifest{ + rules: []*rule{}, + index: map[string]*rule{}, + clock: clock, + } + + client, err := newClient(u.Host) + require.NoError(t, err) + + rs := &remoteSampler{ + xrayClient: client, + clock: clock, + manifest: m, + } + + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // assert that manifest has 2 rules and indexes currently + assert.Equal(t, 2, len(rs.manifest.rules)) + assert.Equal(t, 2, len(rs.manifest.index)) + + assert.Equal(t, rs.manifest.rules[0].ruleProperties.RuleName, getStringPointer("r2")) + assert.Equal(t, rs.manifest.rules[1].ruleProperties.RuleName, getStringPointer("r1")) + + // assert that both the rules are available in manifest index + _, okRule1 := rs.manifest.index[*rs.manifest.rules[0].ruleProperties.RuleName] + _, okRule2 := rs.manifest.index[*rs.manifest.rules[1].ruleProperties.RuleName] + + assert.True(t, okRule1) + assert.True(t, okRule2) + + // second update + count++ + err = rs.refreshManifest(ctx) + require.NoError(t, err) + + // assert that manifest has 1 "r1" rule and index currently + assert.Equal(t, 1, len(rs.manifest.rules)) + assert.Equal(t, 1, len(rs.manifest.index)) + + assert.Equal(t, rs.manifest.rules[0].ruleProperties.RuleName, getStringPointer("r1")) + + // assert that "r1" rule available in index + _, okRule := rs.manifest.index[*rs.manifest.rules[0].ruleProperties.RuleName] + assert.True(t, okRule) +} + +// assert that NewRemoteSampler returns a sampler with *xray.remoteSampler type +func TestNewRemoteSampler(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rs, err := NewRemoteSampler(ctx) + require.NoError(t, err) + + s := &remoteSampler{} + assert.Equal(t, reflect.TypeOf(rs), reflect.TypeOf(s)) +} diff --git a/samplers/aws/xray/reservoir.go b/samplers/aws/xray/reservoir.go new file mode 100644 index 00000000000..c9848bc3b10 --- /dev/null +++ b/samplers/aws/xray/reservoir.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +// ToDo: other fields will be used in business logic for remote sampling +// reservoir is a reservoir distributed among all running instances of the SDK +type reservoir struct { + // Quota assigned to client + //quota int64 + // + // Quota refresh timestamp + //refreshedAt int64 + // + // Quota expiration timestamp + //expiresAt int64 + // + // Polling interval for quota + interval int64 + // + // True if reservoir has been borrowed from this epoch + //borrowed bool + + // Total size of reservoir + capacity int64 + // + // Reservoir consumption for current epoch + //used int64 + // + // Unix epoch. Reservoir usage is reset every second. + //currentEpoch int64 +} diff --git a/samplers/aws/xray/rule_manifest.go b/samplers/aws/xray/rule_manifest.go new file mode 100644 index 00000000000..92cfc44c8b5 --- /dev/null +++ b/samplers/aws/xray/rule_manifest.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "sort" + "strings" + "time" +) + +const defaultInterval = int64(10) + +// manifest represents a full sampling ruleset, with a list of +// custom rules and default values for incoming requests that do +// not match any of the provided rules. +type manifest struct { + rules []*rule + index map[string]*rule + refreshedAt time.Time + clock clock +} + +// createRule creates a user-defined rule, appends it to the sorted array, +// adds it to the index, and returns the newly created rule. +func (m *manifest) createRule(ruleProp *ruleProperties) (err error) { + clock := &defaultClock{} + rand := &defaultRand{} + + cr := &reservoir{ + capacity: *ruleProp.ReservoirSize, + interval: defaultInterval, + } + + csr := &rule{ + reservoir: cr, + ruleProperties: ruleProp, + clock: clock, + rand: rand, + } + + // Update sorted array + m.rules = append(m.rules, csr) + + // Update index + m.index[*ruleProp.RuleName] = csr + + return +} + +// sort sorts the rule array first by priority and then by rule name. +func (m *manifest) sort() { + // Comparison function + less := func(i, j int) bool { + if *m.rules[i].ruleProperties.Priority == *m.rules[j].ruleProperties.Priority { + return strings.Compare(*m.rules[i].ruleProperties.RuleName, *m.rules[j].ruleProperties.RuleName) < 0 + } + return *m.rules[i].ruleProperties.Priority < *m.rules[j].ruleProperties.Priority + } + + sort.Slice(m.rules, less) +} diff --git a/samplers/aws/xray/rule_manifest_test.go b/samplers/aws/xray/rule_manifest_test.go new file mode 100644 index 00000000000..c10c7fce1b9 --- /dev/null +++ b/samplers/aws/xray/rule_manifest_test.go @@ -0,0 +1,195 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// utility functions to get pointers value +func getIntPointer(val int64) *int64 { + return &val +} + +func getStringPointer(val string) *string { + return &val +} + +func getFloatPointer(val float64) *float64 { + return &val +} + +// Assert that createRule() creates a new rule and adds to manifest +func TestCreateRule(t *testing.T) { + resARN := "*" + r1 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r1"), + Priority: getIntPointer(5), + }, + } + + r3 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r3"), + Priority: getIntPointer(7), + }, + } + + rules := []*rule{r1, r3} + + index := map[string]*rule{ + "r1": r1, + "r3": r3, + } + + m := &manifest{ + rules: rules, + index: index, + } + + // Output of GetSamplingRules API and Input to putRule(). + serviceName := "www.foo.com" + httpMethod := "POST" + urlPath := "/bar/*" + reservoirSize := int64(10) + fixedRate := 0.05 + ruleName := "r2" + host := "local" + priority := int64(6) + serviceTye := "*" + + ruleProperties := &ruleProperties{ + ServiceName: &serviceName, + HTTPMethod: &httpMethod, + URLPath: &urlPath, + ReservoirSize: &reservoirSize, + FixedRate: &fixedRate, + RuleName: &ruleName, + Priority: &priority, + Host: &host, + ServiceType: &serviceTye, + ResourceARN: &resARN, + } + + // Expected centralized sampling rule + clock := &defaultClock{} + rand := &defaultRand{} + + cr := &reservoir{ + capacity: 10, + interval: 10, + } + + exp := &rule{ + reservoir: cr, + ruleProperties: ruleProperties, + clock: clock, + rand: rand, + } + + // Add to manifest, index + err := m.createRule(ruleProperties) + assert.Nil(t, err) + + // Assert new rule is present in index + r2, ok := m.index["r2"] + assert.True(t, ok) + assert.Equal(t, exp, r2) + + // Assert new rule present at end of array. putRule() does not preserve order. + r2 = m.rules[2] + assert.Equal(t, exp, r2) +} + +// Assert that sorting an unsorted array results in a sorted array - check priority +func TestSort1(t *testing.T) { + r1 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r1"), + Priority: getIntPointer(5), + }, + } + + r2 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r2"), + Priority: getIntPointer(6), + }, + } + + r3 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r3"), + Priority: getIntPointer(7), + }, + } + + // Unsorted rules array + rules := []*rule{r2, r1, r3} + + m := &manifest{ + rules: rules, + } + + // Sort array + m.sort() + + // Assert on order + assert.Equal(t, r1, m.rules[0]) + assert.Equal(t, r2, m.rules[1]) + assert.Equal(t, r3, m.rules[2]) +} + +// Assert that sorting an unsorted array results in a sorted array - check priority and rule name +func TestSort2(t *testing.T) { + r1 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r1"), + Priority: getIntPointer(5), + }, + } + + r2 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r2"), + Priority: getIntPointer(6), + }, + } + + r3 := &rule{ + ruleProperties: &ruleProperties{ + RuleName: getStringPointer("r3"), + Priority: getIntPointer(7), + }, + } + + // Unsorted rules array + rules := []*rule{r2, r1, r3} + + m := &manifest{ + rules: rules, + } + + // Sort array + m.sort() // r1 should precede r2 + + // Assert on order + assert.Equal(t, r1, m.rules[0]) + assert.Equal(t, r2, m.rules[1]) + assert.Equal(t, r3, m.rules[2]) +} diff --git a/samplers/aws/xray/sampling_rule.go b/samplers/aws/xray/sampling_rule.go new file mode 100644 index 00000000000..400109c1519 --- /dev/null +++ b/samplers/aws/xray/sampling_rule.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +// ToDo: other fields will be used in business logic for remote sampling +// rule represents a centralized sampling rule +type rule struct { + // Centralized reservoir for keeping track of reservoir usage + reservoir *reservoir + + // sampling rule properties + ruleProperties *ruleProperties + + // Number of requests matched against this rule + //matchedRequests int64 + // + // Number of requests sampled using this rule + //sampledRequests int64 + // + // Number of requests burrowed + //borrowedRequests int64 + + // Provides system time + clock clock + + // Provides random numbers + rand Rand + + //mu sync.RWMutex +} + +// properties is the base set of properties that define a sampling rule. +type ruleProperties struct { + RuleName *string `json:"RuleName"` + ServiceType *string `json:"ServiceType"` + ResourceARN *string `json:"ResourceARN"` + Attributes map[string]*string `json:"Attributes"` + ServiceName *string `json:"ServiceName"` + Host *string `json:"Host"` + HTTPMethod *string `json:"HTTPMethod"` + URLPath *string `json:"URLPath"` + ReservoirSize *int64 `json:"ReservoirSize"` + FixedRate *float64 `json:"FixedRate"` + Priority *int64 `json:"Priority"` + Version *int64 `json:"Version"` +} + +// getSamplingRulesInput is used to store +type getSamplingRulesInput struct { + NextToken *string `json:"NextToken"` +} + +type samplingRuleRecords struct { + SamplingRule *ruleProperties `json:"SamplingRule"` +} + +// getSamplingRulesOutput is used to store parsed json sampling rules +type getSamplingRulesOutput struct { + SamplingRuleRecords []*samplingRuleRecords `json:"SamplingRuleRecords"` +} diff --git a/samplers/aws/xray/timer.go b/samplers/aws/xray/timer.go new file mode 100644 index 00000000000..da735a72aed --- /dev/null +++ b/samplers/aws/xray/timer.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "time" +) + +// Ticker is the same as time.Ticker except that it has jitters. +// A Ticker must be created with NewTicker. +type ticker struct { + t *time.Ticker + d time.Duration + jitter time.Duration +} + +// NewTicker creates a new Ticker that will send the current time on its channel. +func newTicker(d, jitter time.Duration) *ticker { + t := time.NewTicker(d - time.Duration(globalRand.Int63n(int64(jitter)))) + + jitteredTicker := ticker{ + t: t, + d: d, + jitter: jitter, + } + + return &jitteredTicker +} + +// C is channel. +func (j *ticker) C() <-chan time.Time { + return j.t.C +} + +// Reset resets the timer. +func (j *ticker) Reset() { + j.t.Reset(j.d - time.Duration(globalRand.Int63n(int64(j.jitter)))) +} diff --git a/samplers/aws/xray/utils_test.go b/samplers/aws/xray/utils_test.go new file mode 100644 index 00000000000..64fa5e16cea --- /dev/null +++ b/samplers/aws/xray/utils_test.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xray + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// MockClock is a struct to record current time. +type MockClock struct { + NowTime int64 + NowNanos int64 +} + +// Now function returns NowTime value. +func (c *MockClock) Now() time.Time { + return time.Unix(c.NowTime, c.NowNanos) +} + +// Increment is a method to increase current time. +func (c *MockClock) Increment(s int64, ns int64) time.Time { + sec := atomic.AddInt64(&c.NowTime, s) + nSec := atomic.AddInt64(&c.NowNanos, ns) + + return time.Unix(sec, nSec) +} + +func TestNewTicker(t *testing.T) { + ticker := newTicker(300*time.Second, 5*time.Second) + + assert.Equal(t, ticker.d, 5*time.Minute) + assert.NotEmpty(t, ticker.t) + assert.NotEmpty(t, ticker.jitter) +}