Skip to content

Commit

Permalink
Add backoff mechanism to profiler agent exporter (#1541)
Browse files Browse the repository at this point in the history
  • Loading branch information
Qard committed Aug 26, 2021
1 parent e2eb1d8 commit 8999d7a
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 55 deletions.
6 changes: 5 additions & 1 deletion packages/dd-trace/src/profiling/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const {
DD_VERSION,
DD_TRACE_AGENT_URL,
DD_AGENT_HOST,
DD_TRACE_AGENT_PORT
DD_TRACE_AGENT_PORT,
DD_PROFILING_UPLOAD_TIMEOUT
} = process.env

class Config {
Expand All @@ -31,6 +32,8 @@ class Config {
const version = coalesce(options.version, DD_VERSION)
// Must be longer than one minute so pad with five seconds
const flushInterval = coalesce(options.interval, 65 * 1000)
const uploadTimeout = coalesce(options.uploadTimeout,
DD_PROFILING_UPLOAD_TIMEOUT, 60 * 1000)

this.enabled = String(enabled) !== 'false'
this.service = service
Expand All @@ -45,6 +48,7 @@ class Config {
)
this.logger = ensureLogger(options.logger)
this.flushInterval = flushInterval
this.uploadTimeout = uploadTimeout

const hostname = coalesce(options.hostname, DD_AGENT_HOST, 'localhost')
const port = coalesce(options.port, DD_TRACE_AGENT_PORT, 8126)
Expand Down
117 changes: 86 additions & 31 deletions packages/dd-trace/src/profiling/exporters/agent.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,51 @@
'use strict'

const retry = require('retry')
const { request } = require('http')
const FormData = require('form-data')

function sendRequest (options, body, callback) {
const req = request(options, res => {
if (res.statusCode >= 400) {
const error = new Error(`HTTP Error ${res.statusCode}`)
error.status = res.statusCode
callback(error)
} else {
callback(null, res)
}
})
req.on('error', callback)
if (body) req.write(body)
req.end()
}

function getBody (stream, callback) {
const chunks = []
stream.on('error', callback)
stream.on('data', chunk => chunks.push(chunk))
stream.on('end', () => {
callback(null, Buffer.concat(chunks))
})
}

function computeRetries (uploadTimeout) {
let tries = 0
while (tries < 2 || uploadTimeout > 1000) {
tries++
uploadTimeout /= 2
}
return [tries, uploadTimeout]
}

class AgentExporter {
constructor ({ url, logger } = {}) {
constructor ({ url, logger, uploadTimeout } = {}) {
this._url = url
this._logger = logger

const [backoffTries, backoffTime] = computeRetries(uploadTimeout)

this._backoffTime = backoffTime
this._backoffTries = backoffTries
}

export ({ profiles, start, end, tags }) {
Expand Down Expand Up @@ -51,41 +91,56 @@ class AgentExporter {
})
}

return new Promise((resolve, reject) => {
const options = {
method: 'POST',
path: '/profiling/v1/input',
timeout: 10 * 1000
}

if (this._url.protocol === 'unix:') {
options.socketPath = this._url.pathname
} else {
options.protocol = this._url.protocol
options.hostname = this._url.hostname
options.port = this._url.port
}
const body = form.getBuffer()
const options = {
method: 'POST',
path: '/profiling/v1/input',
headers: form.getHeaders()
}

this._logger.debug(() => {
return `Submitting agent report to: ${JSON.stringify(options)}`
})
if (this._url.protocol === 'unix:') {
options.socketPath = this._url.pathname
} else {
options.protocol = this._url.protocol
options.hostname = this._url.hostname
options.port = this._url.port
}

form.submit(options, (err, res) => {
if (err || !res) return reject(err)
this._logger.debug(() => {
return `Submitting agent report to: ${JSON.stringify(options)}`
})

const chunks = []
res.on('data', chunk => chunks.push(chunk))
res.on('end', () => {
this._logger.debug(() => {
return `Agent export response: ${Buffer.concat(chunks)}`
})
})
return new Promise((resolve, reject) => {
const operation = retry.operation({
randomize: true,
minTimeout: this._backoffTime,
retries: this._backoffTries
})

if (res.statusCode >= 400) {
return reject(new Error(`Error from the agent: ${res.statusCode}`))
}
operation.attempt((attempt) => {
const timeout = Math.pow(this._backoffTime, attempt)
sendRequest({ ...options, timeout }, body, (err, response) => {
if (operation.retry(err)) {
this._logger.error(`Error from the agent: ${err.message}`)
return
} else if (err) {
reject(new Error('Profiler agent export back-off period expired'))
return
}

getBody(response, (err, body) => {
if (err) {
this._logger.error(`Error reading agent response: ${err.message}`)
} else {
this._logger.debug(() => {
const bytes = (body.toString('hex').match(/../g) || []).join(' ')
return `Agent export response: ${bytes}`
})
}
})

resolve()
resolve()
})
})
})
}
Expand Down
107 changes: 84 additions & 23 deletions packages/dd-trace/test/profiling/exporters/agent.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('exporters/agent', () => {
})

it('should send profiles as pprof to the intake', async () => {
const exporter = new AgentExporter({ url, logger })
const exporter = new AgentExporter({ url, logger, uploadTimeout: 100 })
const start = new Date()
const end = new Date()
const tags = {
Expand Down Expand Up @@ -139,21 +139,79 @@ describe('exporters/agent', () => {
})
})

it('should log exports and handle http errors gracefully', async () => {
it('should backoff up to the uploadTimeout', async () => {
const exporter = new AgentExporter({
url,
logger,
uploadTimeout: 100
})

const start = new Date()
const end = new Date()
const tags = { foo: 'bar' }

const [ cpu, heap ] = await Promise.all([
createProfile(['wall', 'microseconds']),
createProfile(['space', 'bytes'])
])

const profiles = {
cpu,
heap
}

let attempt = 0
app.post('/profiling/v1/input', upload.any(), (req, res) => {
attempt++
if (attempt % 2) {
res.writeHead(500)
res.end()
} else {
res.destroy()
}
})

let failed = false
try {
await exporter.export({ profiles, start, end, tags })
} catch (err) {
expect(err.message).to.match(/^Profiler agent export back-off period expired$/)
failed = true
}
expect(failed).to.be.true
expect(attempt).to.be.greaterThan(0)
})

it('should log exports and handle http errors gracefully', async function () {
this.timeout(10000)
const expectedLogs = [
/^Building agent export report: (\n {2}[a-z-]+(\[\])?: [a-z0-9-TZ:.]+)+$/,
/^Adding cpu profile to agent export:( [0-9a-f]{2})+$/,
/^Adding heap profile to agent export:( [0-9a-f]{2})+$/,
/^Submitting agent report to: {"[a-z]+":"[a-z0-9/.:]+"(,"[a-z]+":([0-9]+|"[a-z0-9/.:]+"))*}$/i,
/^Agent export response: {"error":"some error"}$/
/^Submitting agent report to:/i,
/^Error from the agent: HTTP Error 400$/,
/^Agent export response: ([0-9a-f]{2}( |$))*/
]

let doneLogs
const waitForResponse = new Promise((resolve) => {
doneLogs = resolve
})

function onMessage (message) {
const expected = expectedLogs[index++ % expectedLogs.length]
expect(typeof message === 'function' ? message() : message)
.to.match(expected)
if (index >= expectedLogs.length) doneLogs()
}

let index = 0
const exporter = new AgentExporter({
url,
uploadTimeout: 100,
logger: {
debug (message) {
expect(typeof message === 'function' ? message() : message)
.to.match(expectedLogs.shift())
}
debug: onMessage,
error: onMessage
}
})
const start = new Date()
Expand All @@ -170,22 +228,25 @@ describe('exporters/agent', () => {
heap
}

await new Promise((resolve, reject) => {
const json = JSON.stringify({ error: 'some error' })
app.post('/profiling/v1/input', upload.any(), (req, res) => {
const data = Buffer.from(json)
res.writeHead(400, {
'content-type': 'application/json',
'content-length': data.length
})
res.end(data)
})

exporter.export({ profiles, start, end, tags }).catch(error => {
expect(error.message).to.equal('Error from the agent: 400')
resolve()
let tries = 0
const json = JSON.stringify({ error: 'some error' })
app.post('/profiling/v1/input', upload.any(), (req, res) => {
if (++tries > 1) {
res.end()
return
}
const data = Buffer.from(json)
res.writeHead(400, {
'content-type': 'application/json',
'content-length': data.length
})
res.end(data)
})

await Promise.all([
exporter.export({ profiles, start, end, tags }),
waitForResponse
])
})
})

Expand All @@ -205,7 +266,7 @@ describe('exporters/agent', () => {
})

it('should support Unix domain sockets', async () => {
const exporter = new AgentExporter({ url: new URL(`unix://${url}`), logger })
const exporter = new AgentExporter({ url: new URL(`unix://${url}`), logger, uploadTimeout: 100 })
const start = new Date()
const end = new Date()
const tags = { foo: 'bar' }
Expand Down

0 comments on commit 8999d7a

Please sign in to comment.