Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Sampling - XRay (Part 1) (Fetching Sampling Rules) #1536

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
dcc01de
added failure scenario when getting container fails
bhautikpip Feb 5, 2021
1dd54d9
fix test case failure
bhautikpip Feb 5, 2021
03f6fb4
add changelog
bhautikpip Feb 5, 2021
5919be5
Merge branch 'main' into main
bhautikpip Feb 5, 2021
ebd45b4
Merge branch 'main' into main
Aneurysm9 Feb 5, 2021
e1ff7d0
fix ecs resource detector bug
bhautikpip Feb 7, 2021
041d9b8
Merge branch 'main' of https://github.com/bhautikpip/opentelemetry-go…
bhautikpip Feb 7, 2021
e12a4b1
fix struct name as per golint suggestion
bhautikpip Feb 7, 2021
e906d2a
fix merge conflict
bhautikpip Feb 7, 2021
86c04d1
minor changes
bhautikpip Feb 7, 2021
047f5d0
added NewResourceDetector func and interface assertions
bhautikpip Feb 9, 2021
21db8fa
fix golint failure
bhautikpip Feb 9, 2021
5204d27
minor changes to address review comments
bhautikpip Feb 10, 2021
c9c1bca
Merge branch 'main' into main
MrAlias Feb 10, 2021
c956d4b
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Feb 10, 2021
12c6c74
Merge branch 'main' of https://github.com/bhautikpip/opentelemetry-go…
bhautikpip Feb 10, 2021
e042a6f
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Jun 3, 2021
1f6b745
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Jun 15, 2021
ada0137
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Aug 12, 2021
9c6a430
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Aug 20, 2021
79d6d15
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Aug 24, 2021
e21f9a6
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Nov 3, 2021
2e0f29a
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Nov 5, 2021
7aedf46
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Nov 22, 2021
741a285
fetching sampling rules from X-Ray service
bhautikpip Nov 30, 2021
839cff9
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Nov 30, 2021
ab3d7b0
Merge branch 'main' of https://github.com/bhautikpip/opentelemetry-go…
bhautikpip Nov 30, 2021
17d24f3
updated the code to do some best practices based on review
bhautikpip Dec 6, 2021
cdc58fa
added time.ticker
bhautikpip Dec 7, 2021
d5902d9
added unmarshaling mthod to stor json from service API and minor changes
bhautikpip Dec 10, 2021
36fa11f
addressed review comments
bhautikpip Dec 14, 2021
c4ed209
added no-op logging, sampler config options and minor changes
bhautikpip Dec 18, 2021
f423e01
modified config options for sample implementation
bhautikpip Dec 21, 2021
5224286
Merge pull request #987 from bhautikpip/centralized-sampling-part-1
bhautikpip Dec 21, 2021
9fe987d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
bhautikpip Dec 21, 2021
70eb25f
Merge branch 'main' of https://github.com/bhautikpip/opentelemetry-go…
bhautikpip Dec 21, 2021
8e1f48f
added unit tests
bhautikpip Feb 3, 2022
c20df37
fix race in tests
bhautikpip Feb 3, 2022
fdbebb0
refactor PR based on review
bhautikpip Feb 3, 2022
09475dc
improved tests
bhautikpip Feb 3, 2022
eec96d0
ran precommit
bhautikpip Feb 3, 2022
43a2ae7
remove noop log and added logr
bhautikpip Feb 3, 2022
bd59636
fix test failures
bhautikpip Feb 4, 2022
d094d58
refactor manifest logic and added tests
bhautikpip Feb 5, 2022
01a8995
fix linter error
bhautikpip Feb 5, 2022
7b1987e
fix golangci-lint
bhautikpip Feb 7, 2022
bb2fb94
added lock in rs struct
bhautikpip Feb 7, 2022
92740aa
Merge branch 'main' into resolve-conflict-xray-sampler
bhautikpip Feb 7, 2022
86c5aec
minor changes
bhautikpip Feb 8, 2022
3dea789
minor changes
bhautikpip Feb 8, 2022
30e6c79
refactor PR to address comments
bhautikpip Feb 8, 2022
ee415a7
fix test failures
bhautikpip Feb 8, 2022
c923616
fix test failures
bhautikpip Feb 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions samplers/aws/go.mod
Expand Up @@ -3,8 +3,8 @@ module go.opentelemetry.io/contrib/samplers/aws
go 1.16

require (
github.com/go-logr/logr v1.2.1 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-logr/logr v1.2.1
github.com/go-logr/stdr v1.2.0
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel/sdk v1.3.0
)
41 changes: 14 additions & 27 deletions samplers/aws/xray/client.go
Expand Up @@ -15,65 +15,52 @@
package xray

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
)

type xrayClient struct {
// http client for sending unsigned proxied requests to the collector
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// http client for sending unsigned proxied requests to the collector
// http client for sending sampling requests to the collector

httpClient *http.Client

endpoint string
endpoint *url.URL
}

// newClient returns a http client with proxy endpoint
// newClient returns an HTTP client with proxy endpoint
func newClient(d string) *xrayClient {
bhautikpip marked this conversation as resolved.
Show resolved Hide resolved
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func newClient(d string) *xrayClient {
func newClient(addr string) *xrayClient {

endpoint := "http://" + d
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably check if the address already has a scheme. If that should be strictly forbidden, then there should be some validation closer to the user I guess.

Note that while not the first version, eventually this sampler needs to be able to support more configuration, for example TLS or auth headers, as users don't necessarily run collectors as plaintext sidecars.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we strictly expect string like "127.0.0.1:8080" but yeah you're right I will add some validation around this to double sure.


endpointURL, err := url.Parse(endpoint)
if err != nil {
globalLogger.Error(err, "unable to parse endpoint from string")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClient needs to return error this is a fatal issue

}

return &xrayClient{
httpClient: &http.Client{},
endpoint: endpoint,
endpoint: endpointURL,
}
}

// getSamplingRules calls the collector(aws proxy enabled) for sampling rules
func (p *xrayClient) getSamplingRules(ctx context.Context) (*getSamplingRulesOutput, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit c or cl or client seems more appropriate than p

rulesInput := getSamplingRulesInput{}

statisticsByte, _ := json.Marshal(rulesInput)
body := bytes.NewReader(statisticsByte)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint+"/GetSamplingRules", body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.endpoint.String()+"/GetSamplingRules", nil)
if err != nil {
globalLogger.Error(err, "failed to create http request")
return nil, fmt.Errorf("xray client: failed to create http request: %w", err)
}

output, err := p.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("xray client: unable to retrieve sampling settings: %w", err)
}
defer output.Body.Close()

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(output.Body)
if err != nil {
return nil, fmt.Errorf("xray client: unable to read response body: %w", err)
}

// Unmarshalling json data to populate getSamplingTargetsOutput struct
var samplingRulesOutput getSamplingRulesOutput
err = json.Unmarshal(buf.Bytes(), &samplingRulesOutput)
if err != nil {
var samplingRulesOutput *getSamplingRulesOutput
if err := json.NewDecoder(output.Body).Decode(&samplingRulesOutput); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be passing a pointer to a pointer - I'm surprised this seems to actually work , but think you don't need &, alternatively you might not need to return a pointer here since it should get moved to the caller

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried removing & and it throwed the unmarshal error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe json.Unmarshal() handles this correctly, but it would be nicer to see

var samplingRulesOutput getSamplingRulesOutput

i.e., drop the *, not the &

return nil, fmt.Errorf("xray client: unable to unmarshal the response body: %w", err)
}

err = output.Body.Close()
if err != nil {
globalLogger.Error(err, "failed to close http response body")
}

return &samplingRulesOutput, nil
return samplingRulesOutput, nil
}
128 changes: 124 additions & 4 deletions samplers/aws/xray/client_test.go
Expand Up @@ -17,25 +17,92 @@ package xray
import (
"context"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"net/url"
"testing"
)

func TestGetSamplingRules(t *testing.T) {
body := "{\"NextToken\":null,\"SamplingRuleRecords\":[{\"CreatedAt\":0.0,\"ModifiedAt\":1.639517389E9,\"SamplingRule\":{\"Attributes\":{},\"FixedRate\":0.5,\"HTTPMethod\":\"*\",\"Host\":\"*\",\"Priority\":10000,\"ReservoirSize\":60,\"ResourceARN\":\"*\",\"RuleARN\":\"arn:aws:xray:us-west-2:836082170990:sampling-rule/Default\",\"RuleName\":\"Default\",\"ServiceName\":\"*\",\"ServiceType\":\"*\",\"URLPath\":\"*\",\"Version\":1}},{\"CreatedAt\":1.637691613E9,\"ModifiedAt\":1.643748669E9,\"SamplingRule\":{\"Attributes\":{},\"FixedRate\":0.09,\"HTTPMethod\":\"GET\",\"Host\":\"*\",\"Priority\":1,\"ReservoirSize\":3,\"ResourceARN\":\"*\",\"RuleARN\":\"arn:aws:xray:us-west-2:836082170990:sampling-rule/test-rule\",\"RuleName\":\"test-rule\",\"ServiceName\":\"test-rule\",\"ServiceType\":\"local\",\"URLPath\":\"/aws-sdk-call\",\"Version\":1}},{\"CreatedAt\":1.639446197E9,\"ModifiedAt\":1.639446197E9,\"SamplingRule\":{\"Attributes\":{},\"FixedRate\":0.09,\"HTTPMethod\":\"*\",\"Host\":\"*\",\"Priority\":100,\"ReservoirSize\":100,\"ResourceARN\":\"*\",\"RuleARN\":\"arn:aws:xray:us-west-2:836082170990:sampling-rule/test-rule-1\",\"RuleName\":\"test-rule-1\",\"ServiceName\":\"*\",\"ServiceType\":\"*\",\"URLPath\":\"*\",\"Version\":1}}]}"
body := []byte(`{
"NextToken": null,
"SamplingRuleRecords": [
{
"CreatedAt": 0,
"ModifiedAt": 1639517389,
"SamplingRule": {
"Attributes": {},
"FixedRate": 0.5,
"HTTPMethod": "*",
"Host": "*",
"Priority": 10000,
"ReservoirSize": 60,
"ResourceARN": "*",
"RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/Default",
"RuleName": "Default",
"ServiceName": "*",
"ServiceType": "*",
"URLPath": "*",
"Version": 1
}
},
{
"CreatedAt": 1637691613,
"ModifiedAt": 1643748669,
"SamplingRule": {
"Attributes": {},
"FixedRate": 0.09,
"HTTPMethod": "GET",
"Host": "*",
"Priority": 1,
"ReservoirSize": 3,
"ResourceARN": "*",
"RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/test-rule",
"RuleName": "test-rule",
"ServiceName": "test-rule",
"ServiceType": "local",
"URLPath": "/aws-sdk-call",
"Version": 1
}
},
{
"CreatedAt": 1639446197,
"ModifiedAt": 1639446197,
"SamplingRule": {
"Attributes": {},
"FixedRate": 0.09,
"HTTPMethod": "*",
"Host": "*",
"Priority": 100,
"ReservoirSize": 100,
"ResourceARN": "*",
"RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/test-rule-1",
"RuleName": "test-rule-1",
"ServiceName": "*",
"ServiceType": "*",
"URLPath": "*",
"Version": 1
}
}
]
}`)
ctx := context.Background()

// generate a test server so we can capture and inspect the request
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, _ = res.Write([]byte(body))
_, err := res.Write([]byte(body))
require.NoError(t, err)
}))

u, _ := url.Parse(testServer.URL)
u, err := url.Parse(testServer.URL)
require.NoError(t, err)

client := newClient(u.Host)

samplingRules, _ := client.getSamplingRules(ctx)
samplingRules, err := client.getSamplingRules(ctx)
require.NoError(t, err)

assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.RuleName, "Default")
assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.ServiceType, "*")
assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.Host, "*")
Expand All @@ -57,3 +124,56 @@ func TestGetSamplingRules(t *testing.T) {
assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.ReservoirSize, int64(100))
assert.Equal(t, *samplingRules.SamplingRuleRecords[2].SamplingRule.FixedRate, 0.09)
}

func TestGetSamplingRulesWithMissingValues(t *testing.T) {
body := []byte(`{
"NextToken": null,
"SamplingRuleRecords": [
{
"CreatedAt": 0,
"ModifiedAt": 1639517389,
"SamplingRule": {
"Attributes": {},
"FixedRate": 0.5,
"HTTPMethod": "*",
"Host": "*",
"ResourceARN": "*",
"RuleARN": "arn:aws:xray:us-west-2:xxxxxxx:sampling-rule/Default",
"RuleName": "Default",
"ServiceName": "*",
"ServiceType": "*",
"URLPath": "*",
"Version": 1
}
}
]
}`)
ctx := context.Background()

// generate a test server so we can capture and inspect the request
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, err := res.Write([]byte(body))
require.NoError(t, err)
}))

u, err := url.Parse(testServer.URL)
require.NoError(t, err)

client := newClient(u.Host)

samplingRules, err := client.getSamplingRules(ctx)
require.NoError(t, err)

// Priority and ReservoirSize are missing in API response so they are assigned as nil
assert.Nil(t, samplingRules.SamplingRuleRecords[0].SamplingRule.Priority)
assert.Nil(t, samplingRules.SamplingRuleRecords[0].SamplingRule.ReservoirSize)

// other values are stored as expected
assert.Equal(t, *samplingRules.SamplingRuleRecords[0].SamplingRule.RuleName, "Default")
}

func TestNewClient(t *testing.T) {
xrayClient := newClient("127.0.0.1:2020")

assert.Equal(t, xrayClient.endpoint.String(), "http://127.0.0.1:2020")
}
14 changes: 7 additions & 7 deletions samplers/aws/xray/clock.go
Expand Up @@ -18,15 +18,15 @@ import (
"time"
)

// Clock provides an interface to implement method for getting current time.
type Clock interface {
Now() time.Time
// clock provides an interface to implement method for getting current time.
type clock interface {
now() time.Time
}

// DefaultClock is an implementation of Clock interface.
type DefaultClock struct{}
// defaultClock is an implementation of Clock interface.
type defaultClock struct{}

// Now returns current time.
func (t *DefaultClock) Now() time.Time {
// now returns current time.
func (t *defaultClock) now() time.Time {
return time.Now()
}
8 changes: 4 additions & 4 deletions samplers/aws/xray/rand.go
Expand Up @@ -91,24 +91,24 @@ type Rand interface {

// DefaultRand is an implementation of Rand interface.
// It is safe for concurrent use by multiple goroutines.
type DefaultRand struct{}
type defaultRand struct{}

var globalRand = newGlobalRand()

// Int63n returns, as an int64, a non-negative pseudo-random number in [0,n)
// from the default Source.
func (r *DefaultRand) Int63n(n int64) int64 {
func (r *defaultRand) Int63n(n int64) int64 {
return globalRand.Int63n(n)
}

// Intn returns, as an int, a non-negative pseudo-random number in [0,n)
// from the default Source.
func (r *DefaultRand) Intn(n int) int {
func (r *defaultRand) Intn(n int) int {
return globalRand.Intn(n)
}

// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0)
// from the default Source.
func (r *DefaultRand) Float64() float64 {
func (r *defaultRand) Float64() float64 {
return globalRand.Float64()
}