Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for mix&match pipelines #1954

Merged
merged 20 commits into from May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ed61451
added tests
dbacarel Jul 2, 2022
f7079d7
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Jul 7, 2022
9dfb58d
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Jul 8, 2022
36a71dc
Added test
dbacarel Jul 8, 2022
a193f28
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Oct 3, 2022
65b29c2
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Oct 18, 2022
c70b729
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Jan 24, 2023
83c1cbe
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Sep 7, 2023
867b45b
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Mar 12, 2024
aeb677a
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Apr 25, 2024
cb63778
Implemented support for named 'pipelines' property in pino.transport
dbacarel Apr 27, 2024
0110235
Merge branch 'pinojs:master' into mix-target-pipeline
dbacarel Apr 27, 2024
db239f5
Updated pino.d.ts and api.md to include pipeline within targets
dbacarel May 4, 2024
c3e9fec
- Reverted changes related to the support of named pipelines
dbacarel May 5, 2024
dbb40d5
- Removed `worker-pipeline.js`
dbacarel May 5, 2024
ddbbafb
added a simple flow schema to worker.js
dbacarel May 5, 2024
31faf4e
added a simple flow schema to worker.js
dbacarel May 5, 2024
b1e0bb1
Merge branch 'pinojs:main' into mix-target-pipeline
dbacarel May 5, 2024
40297e3
Added a special case in worker.js to skip the multistream instance wh…
dbacarel May 11, 2024
57bcd7d
- Added optional 'level' property to TransportPipelineOptions interface
dbacarel May 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 32 additions & 8 deletions docs/api.md
Expand Up @@ -241,13 +241,13 @@ child.info('this will have both `foo: 1` and `bar: 2`')
logger.info('this will still only have `foo: 1`')
```

As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
the object returned from the `mixin` method will also be nested. Prior versions would mix
this object into the root.
As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey),
the object returned from the `mixin` method will also be nested. Prior versions would mix
this object into the root.

```js
const logger = pino({
nestedKey: 'payload',
nestedKey: 'payload',
mixin() {
return { requestId: requestId.currentId() }
}
Expand Down Expand Up @@ -590,7 +590,7 @@ when using the `transport` option. In this case, an `Error` will be thrown.

#### `onChild` (Function)

The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument.
Any error thrown inside the callback will be uncaught and should be handled inside the callback.
```js
const parent = require('pino')({ onChild: (instance) => {
Expand All @@ -609,7 +609,7 @@ Default: `pino.destination(1)` (STDOUT)
The `destination` parameter can be a file descriptor, a file path, or an
object with `dest` property pointing to a fd or path.
An ordinary Node.js `stream` file descriptor can be passed as the
destination (such as the result
destination (such as the result
of `fs.createWriteStream`) but for peak log writing performance, it is strongly
recommended to use `pino.destination` to create the destination stream.
Note that the `destination` parameter can be the result of `pino.transport()`.
Expand Down Expand Up @@ -1001,7 +1001,7 @@ Adds to the bindings of this logger instance.
**Note:** Does not overwrite bindings. Can potentially result in duplicate keys in
log lines.

* See [`bindings` parameter in `logger.child`](#logger-child-bindings)
* See [`bindings` parameter in `logger.child`](#logger-child-bindings)

<a id="flush"></a>
### `logger.flush([cb])`
Expand Down Expand Up @@ -1239,6 +1239,30 @@ const transport = pino.transport({
pino(transport)
```

Multiple transports can now be defined to include pipelines:

```js
const pino = require('pino')
const transport = pino.transport({
targets: [{
level: 'info',
target: 'pino-pretty' // must be installed separately
}, {
level: 'trace',
target: 'pino/file',
options: { destination: '/path/to/store/logs' }
}, {
pipeline: [{
target: 'pino-syslog' // must be installed separately
}, {
target: 'pino-socket' // must be installed separately
}]
}
]
})
pino(transport)
```

If `WeakRef`, `WeakMap`, and `FinalizationRegistry` are available in the current runtime (v14.5.0+), then the thread
will be automatically terminated in case the stream or logger goes out of scope.
The `transport()` function adds a listener to `process.on('beforeExit')` and `process.on('exit')` to ensure the worker
Expand Down Expand Up @@ -1276,7 +1300,7 @@ For more on transports, how they work, and how to create them see the [`Transpor
* `target`: The transport to pass logs through. This may be an installed module name or an absolute path.
* `options`: An options object which is serialized (see [Structured Clone Algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)), passed to the worker thread, parsed and then passed to the exported transport function.
* `worker`: [Worker thread](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) configuration options. Additionally, the `worker` option supports `worker.autoEnd`. If this is set to `false` logs will not be flushed on process exit. It is then up to the developer to call `transport.end()` to flush logs.
* `targets`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
* `targets`: May be specified instead of `target`. Must be an array of transport configurations and/or pipelines. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport.
* `pipeline`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options. All intermediate steps in the pipeline _must_ be `Transform` streams and not `Writable`.
* `dedupe`: See [pino.multistream options](#pino-multistream)

Expand Down
4 changes: 1 addition & 3 deletions docs/bundling.md
Expand Up @@ -7,7 +7,6 @@ In particular, a bundler must ensure that the following files are also bundled s
* `lib/worker.js` from the `thread-stream` dependency
* `file.js`
* `lib/worker.js`
* `lib/worker-pipeline.js`
* Any transport used by the user (like `pino-pretty`)

Once the files above have been generated, the bundler must also add information about the files above by injecting a code that sets `__bundlerPathsOverrides` in the `globalThis` object.
Expand All @@ -22,12 +21,11 @@ globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': pinoWebpackAbsolutePath('./thread-stream-worker.js')
'pino/file': pinoWebpackAbsolutePath('./pino-file.js'),
'pino-worker': pinoWebpackAbsolutePath('./pino-worker.js'),
'pino-pipeline-worker': pinoWebpackAbsolutePath('./pino-pipeline-worker.js'),
'pino-pretty': pinoWebpackAbsolutePath('./pino-pretty.js'),
};
```

Note that `pino/file`, `pino-worker`, `pino-pipeline-worker`, and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.
Note that `pino/file`, `pino-worker` and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration.

## Webpack Plugin

Expand Down
17 changes: 13 additions & 4 deletions lib/transport.js
Expand Up @@ -87,20 +87,29 @@ function transport (fullOptions) {

if (targets) {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
options.targets = targets.map((dest) => {
options.targets = targets.filter(dest => dest.target).map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
})
options.pipelines = targets.filter(dest => dest.pipeline).map((dest) => {
return dest.pipeline.map((t) => {
return {
...t,
level: dest.level, // duplicate the pipeline `level` property defined in the upper level
target: fixTarget(t.target)
}
})
})
} else if (pipeline) {
target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js')
options.targets = pipeline.map((dest) => {
target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js')
options.pipelines = [pipeline.map((dest) => {
return {
...dest,
target: fixTarget(dest.target)
}
})
})]
}

if (levels) {
Expand Down
38 changes: 0 additions & 38 deletions lib/worker-pipeline.js

This file was deleted.

188 changes: 164 additions & 24 deletions lib/worker.js
@@ -1,5 +1,7 @@
'use strict'

const EE = require('events')
const { pipeline, PassThrough } = require('stream')
const pino = require('../pino.js')
const build = require('pino-abstract-transport')
const loadTransportStreamBuilder = require('./transport-stream')
Expand All @@ -9,36 +11,144 @@ const loadTransportStreamBuilder = require('./transport-stream')

/* istanbul ignore file */

module.exports = async function ({ targets, levels, dedupe }) {
targets = await Promise.all(targets.map(async (t) => {
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}))
return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targets) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
/*
* > Multiple targets & pipelines
*
*
* ┌─────────────────────────────────────────────────┐ ┌─────┐
* │ │ │ p │
* │ │ │ i │
* │ target │ │ n │
* │ │ ────────────────────────────────┼────┤ o │
* │ targets │ target │ │ . │
* │ ────────────► │ ────────────────────────────────┼────┤ m │ source
* │ │ target │ │ u │ │
* │ │ ────────────────────────────────┼────┤ l │ │write
* │ │ │ │ t │ ▼
* │ │ pipeline ┌───────────────┐ │ │ i │ ┌────────┐
* │ │ ──────────► │ PassThrough ├───┼────┤ s ├──────┤ │
* │ │ └───────────────┘ │ │ t │ write│ Thread │
* │ │ │ │ r │◄─────┤ Stream │
* │ │ pipeline ┌───────────────┐ │ │ e │ │ │
* │ │ ──────────► │ PassThrough ├───┼────┤ a │ └────────┘
* │ └───────────────┘ │ │ m │
* │ │ │ │
* └─────────────────────────────────────────────────┘ └─────┘
*
*
*
* > One single pipeline or target
*
*
* source
* │
* ┌────────────────────────────────────────────────┐ │write
* │ │ ▼
* │ │ ┌────────┐
* │ targets │ target │ │ │
* │ ────────────► │ ──────────────────────────────┤ │ │
* │ │ │ │ │
* │ ├──────┤ │
* │ │ │ │
* │ │ │ │
* │ OR │ │ │
* │ │ │ │
* │ │ │ │
* │ ┌──────────────┐ │ │ │
* │ targets │ pipeline │ │ │ │ Thread │
* │ ────────────► │ ────────────►│ PassThrough ├─┤ │ Stream │
* │ │ │ │ │ │ │
* │ └──────────────┘ │ │ │
* │ │ │ │
* │ OR │ write│ │
* │ │◄─────┤ │
* │ │ │ │
* │ ┌──────────────┐ │ │ │
* │ pipeline │ │ │ │ │
* │ ──────────────►│ PassThrough ├────────────────┤ │ │
* │ │ │ │ │ │
* │ └──────────────┘ │ └────────┘
* │ │
* │ │
* └────────────────────────────────────────────────┘
*/

module.exports = async function ({ targets, pipelines, levels, dedupe }) {
const targetStreams = []

// Process targets
if (targets && targets.length) {
targets = await Promise.all(targets.map(async (t) => {
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return {
level: t.level,
stream
}
}))

targetStreams.push(...targets)
}

// Process pipelines
if (pipelines && pipelines.length) {
pipelines = await Promise.all(
pipelines.map(async (p) => {
let level
const pipeDests = await Promise.all(
p.map(async (t) => {
// level assigned to pipeline is duplicated over all its targets, just store it
level = t.level
const fn = await loadTransportStreamBuilder(t.target)
const stream = await fn(t.options)
return stream
}
))

function closeCb () {
if (--expected === 0) {
cb(err)
return {
level,
stream: createPipeline(pipeDests)
}
})
)
targetStreams.push(...pipelines)
}

// Skip building the multistream step if either one single pipeline or target is defined and
// return directly the stream instance back to TreadStream.
// This is equivalent to define either:
//
// pino.transport({ target: ... })
//
// OR
//
// pino.transport({ pipeline: ... })
if (targetStreams.length === 1) {
return targetStreams[0].stream
} else {
return build(process, {
parse: 'lines',
metadata: true,
close (err, cb) {
let expected = 0
for (const transport of targetStreams) {
expected++
transport.stream.on('close', closeCb)
transport.stream.end()
}

function closeCb () {
if (--expected === 0) {
cb(err)
}
}
}
}
})
})
}

// TODO: Why split2 was not used for pipelines?
function process (stream) {
const multi = pino.multistream(targets, { levels, dedupe })
const multi = pino.multistream(targetStreams, { levels, dedupe })
// TODO manage backpressure
stream.on('data', function (chunk) {
const { lastTime, lastMsg, lastObj, lastLevel } = this
Expand All @@ -51,4 +161,34 @@ module.exports = async function ({ targets, levels, dedupe }) {
multi.write(chunk + '\n')
})
}

/**
* Creates a pipeline using the provided streams and return an instance of `PassThrough` stream
* as a source for the pipeline.
*
* @param {(TransformStream|WritableStream)[]} streams An array of streams.
* All intermediate streams in the array *MUST* be `Transform` streams and only the last one `Writable`.
* @returns A `PassThrough` stream instance representing the source stream of the pipeline
*/
function createPipeline (streams) {
const ee = new EE()
const stream = new PassThrough({
autoDestroy: true,
destroy (_, cb) {
ee.on('error', cb)
ee.on('closed', cb)
}
})

pipeline(stream, ...streams, function (err) {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
ee.emit('error', err)
return
}

ee.emit('closed')
})

return stream
}
}