/
worker.js
65 lines (58 loc) 路 1.59 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
'use strict'
const pino = require('../pino.js')
const build = require('pino-abstract-transport')
const { realImport, realRequire } = require('real-require')
// This file is not checked by the code coverage tool,
// as it is not reliable.
/* istanbul ignore file */
module.exports = async function ({ targets }) {
targets = await Promise.all(targets.map(async (t) => {
let fn
try {
const toLoad = 'file://' + t.target
fn = (await realImport(toLoad)).default
} catch (error) {
// See this PR for details: https://github.com/pinojs/thread-stream/pull/34
if ((error.code === 'ENOTDIR' || error.code === 'ERR_MODULE_NOT_FOUND')) {
fn = realRequire(t.target)
} else {
throw error
}
}
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}))
return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targets) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
}
function closeCb () {
if (--expected === 0) {
cb(err)
}
}
}
})
function process (stream) {
const multi = pino.multistream(targets)
// TODO manage backpressure
stream.on('data', function (chunk) {
const { lastTime, lastMsg, lastObj, lastLevel } = this
multi.lastLevel = lastLevel
multi.lastTime = lastTime
multi.lastMsg = lastMsg
multi.lastObj = lastObj
// TODO handle backpressure
multi.write(chunk + '\n')
})
}
}