Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: just build to fix whitespace issues #498

Merged
merged 1 commit into from Nov 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/_stream_duplex.js
@@ -1,3 +1,4 @@
'use strict' // Keep this file as an alias for the full stream module.
'use strict'

// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Duplex
3 changes: 2 additions & 1 deletion lib/_stream_passthrough.js
@@ -1,3 +1,4 @@
'use strict' // Keep this file as an alias for the full stream module.
'use strict'

// Keep this file as an alias for the full stream module.
module.exports = require('./stream').PassThrough
3 changes: 2 additions & 1 deletion lib/_stream_readable.js
@@ -1,3 +1,4 @@
'use strict' // Keep this file as an alias for the full stream module.
'use strict'

// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Readable
3 changes: 2 additions & 1 deletion lib/_stream_transform.js
@@ -1,3 +1,4 @@
'use strict' // Keep this file as an alias for the full stream module.
'use strict'

// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Transform
3 changes: 2 additions & 1 deletion lib/_stream_writable.js
@@ -1,3 +1,4 @@
'use strict' // Keep this file as an alias for the full stream module.
'use strict'

// Keep this file as an alias for the full stream module.
module.exports = require('./stream').Writable
13 changes: 2 additions & 11 deletions lib/internal/streams/add-abort-signal.js
@@ -1,52 +1,43 @@
'use strict'

const { AbortError, codes } = require('../../ours/errors')

const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes

const { ERR_INVALID_ARG_TYPE } = codes // This method is inlined here for readable-stream
// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
// https://github.com/nodejs/node/pull/36061#discussion_r533718029

const validateAbortSignal = (signal, name) => {
if (typeof signal !== 'object' || !('aborted' in signal)) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal)
}
}

function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function')
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal')

if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream)
}

return module.exports.addAbortSignalNoValidate(signal, stream)
}

module.exports.addAbortSignalNoValidate = function (signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream
}

const onAbort = () => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason
})
)
}

if (signal.aborted) {
onAbort()
} else {
signal.addEventListener('abort', onAbort)
eos(stream, () => signal.removeEventListener('abort', onAbort))
}

return stream
}
43 changes: 10 additions & 33 deletions lib/internal/streams/buffer_list.js
@@ -1,18 +1,14 @@
'use strict'

const { StringPrototypeSlice, SymbolIterator, TypedArrayPrototypeSet, Uint8Array } = require('../../ours/primordials')

const { Buffer } = require('buffer')

const { inspect } = require('../../ours/util')

module.exports = class BufferList {
constructor() {
this.head = null
this.tail = null
this.length = 0
}

push(v) {
const entry = {
data: v,
Expand All @@ -23,7 +19,6 @@ module.exports = class BufferList {
this.tail = entry
++this.length
}

unshift(v) {
const entry = {
data: v,
Expand All @@ -33,7 +28,6 @@ module.exports = class BufferList {
this.head = entry
++this.length
}

shift() {
if (this.length === 0) return
const ret = this.head.data
Expand All @@ -42,73 +36,62 @@ module.exports = class BufferList {
--this.length
return ret
}

clear() {
this.head = this.tail = null
this.length = 0
}

join(s) {
if (this.length === 0) return ''
let p = this.head
let ret = '' + p.data

while ((p = p.next) !== null) ret += s + p.data

return ret
}

concat(n) {
if (this.length === 0) return Buffer.alloc(0)
const ret = Buffer.allocUnsafe(n >>> 0)
let p = this.head
let i = 0

while (p) {
TypedArrayPrototypeSet(ret, p.data, i)
i += p.data.length
p = p.next
}

return ret
} // Consumes a specified amount of bytes or characters from the buffered data.
}

// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
const data = this.head.data

if (n < data.length) {
// `slice` is the same for buffers and strings.
const slice = data.slice(0, n)
this.head.data = data.slice(n)
return slice
}

if (n === data.length) {
// First chunk is a perfect match.
return this.shift()
} // Result spans more than one buffer.

}
// Result spans more than one buffer.
return hasStrings ? this._getString(n) : this._getBuffer(n)
}

first() {
return this.head.data
}

*[SymbolIterator]() {
for (let p = this.head; p; p = p.next) {
yield p.data
}
} // Consumes a specified amount of characters from the buffered data.
}

// Consumes a specified amount of characters from the buffered data.
_getString(n) {
let ret = ''
let p = this.head
let c = 0

do {
const str = p.data

if (n > str.length) {
ret += str
n -= str.length
Expand All @@ -123,26 +106,22 @@ module.exports = class BufferList {
this.head = p
p.data = StringPrototypeSlice(str, n)
}

break
}

++c
} while ((p = p.next) !== null)

this.length -= c
return ret
} // Consumes a specified amount of bytes from the buffered data.
}

// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
const ret = Buffer.allocUnsafe(n)
const retLen = n
let p = this.head
let c = 0

do {
const buf = p.data

if (n > buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n)
n -= buf.length
Expand All @@ -157,17 +136,15 @@ module.exports = class BufferList {
this.head = p
p.data = buf.slice(n)
}

break
}

++c
} while ((p = p.next) !== null)

this.length -= c
return ret
} // Make sure the linked list only shows the minimal necessary information.
}

// Make sure the linked list only shows the minimal necessary information.
[Symbol.for('nodejs.util.inspect.custom')](_, options) {
return inspect(this, {
...options,
Expand Down
32 changes: 3 additions & 29 deletions lib/internal/streams/compose.js
@@ -1,63 +1,48 @@
'use strict'

const { pipeline } = require('./pipeline')

const Duplex = require('./duplex')

const { destroyer } = require('./destroy')

const { isNodeStream, isReadable, isWritable } = require('./utils')

const {
AbortError,
codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
} = require('../../ours/errors')

module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams')
}

if (streams.length === 1) {
return Duplex.from(streams[0])
}

const orgStreams = [...streams]

if (typeof streams[0] === 'function') {
streams[0] = Duplex.from(streams[0])
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1
streams[idx] = Duplex.from(streams[idx])
}

for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue
}

if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
}

if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
}
}

let ondrain
let onfinish
let onreadable
let onclose
let d

function onfinished(err) {
const cb = onclose
onclose = null

if (cb) {
cb(err)
} else if (err) {
Expand All @@ -66,22 +51,21 @@ module.exports = function compose(...streams) {
d.destroy()
}
}

const head = streams[0]
const tail = pipeline(streams, onfinished)
const writable = !!isWritable(head)
const readable = !!isReadable(tail) // TODO(ronag): Avoid double buffering.
const readable = !!isReadable(tail)

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
// See, https://github.com/nodejs/node/pull/33515.

d = new Duplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode),
writable,
readable
})

if (writable) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
Expand All @@ -90,12 +74,10 @@ module.exports = function compose(...streams) {
ondrain = callback
}
}

d._final = function (callback) {
head.end()
onfinish = callback
}

head.on('drain', function () {
if (ondrain) {
const cb = ondrain
Expand All @@ -111,7 +93,6 @@ module.exports = function compose(...streams) {
}
})
}

if (readable) {
tail.on('readable', function () {
if (onreadable) {
Expand All @@ -123,39 +104,32 @@ module.exports = function compose(...streams) {
tail.on('end', function () {
d.push(null)
})

d._read = function () {
while (true) {
const buf = tail.read()

if (buf === null) {
onreadable = d._read
return
}

if (!d.push(buf)) {
return
}
}
}
}

d._destroy = function (err, callback) {
if (!err && onclose !== null) {
err = new AbortError()
}

onreadable = null
ondrain = null
onfinish = null

if (onclose === null) {
callback(err)
} else {
onclose = callback
destroyer(tail, err)
}
}

return d
}