Skip to content

Commit

Permalink
Simplify custom Writable (#32247)
Browse files Browse the repository at this point in the history
* Simplify custom writable

* Fix draining
  • Loading branch information
devknoll committed Dec 8, 2021
1 parent 4ce49b7 commit aa32deb
Showing 1 changed file with 38 additions and 58 deletions.
96 changes: 38 additions & 58 deletions packages/next/server/render.tsx
Expand Up @@ -1439,87 +1439,67 @@ function renderToNodeStream(
generateStaticHTML: boolean
): Promise<NodeWritablePiper> {
return new Promise((resolve, reject) => {
let underlyingStream: {
resolve: (error?: Error) => void
writable: WritableType
queuedCallbacks: Array<() => void>
} | null = null

const stream = new Writable({
// Use the buffer from the underlying stream
highWaterMark: 0,
writev(chunks, callback) {
let str = ''
for (let { chunk } of chunks) {
str += chunk.toString()
}

let underlyingStream: WritableType | null = null
let queuedCallbacks: Array<(error?: Error | null) => void> = []

// Based on the suggestion here:
// https://github.com/reactwg/react-18/discussions/110
class NextWritable extends Writable {
_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void
) {
if (!underlyingStream) {
throw new Error(
'invariant: write called without an underlying stream. This is a bug in Next.js'
)
}

if (!underlyingStream.writable.write(str)) {
underlyingStream.queuedCallbacks.push(() => callback())
// The compression module (https://github.com/expressjs/compression) doesn't
// support callbacks, so we have to wait for a drain event.
if (!underlyingStream.write(chunk, encoding)) {
queuedCallbacks.push(callback)
} else {
callback()
}
},
})
stream.once('finish', () => {
if (!underlyingStream) {
throw new Error(
'invariant: finish called without an underlying stream. This is a bug in Next.js'
)
}
underlyingStream.resolve()
})
stream.once('error', (err) => {
if (!underlyingStream) {
throw new Error(
'invariant: error called without an underlying stream. This is a bug in Next.js'
)
}
underlyingStream.resolve(err)
})
// React uses `flush` to prevent stream middleware like gzip from buffering to the
// point of harming streaming performance, so we make sure to expose it and forward it.
// See: https://github.com/reactwg/react-18/discussions/91
Object.defineProperty(stream, 'flush', {
value: () => {

flush() {
if (!underlyingStream) {
throw new Error(
'invariant: flush called without an underlying stream. This is a bug in Next.js'
)
}
if (typeof (underlyingStream.writable as any).flush === 'function') {
;(underlyingStream.writable as any).flush()

const anyWritable = underlyingStream as any
if (typeof anyWritable.flush === 'function') {
anyWritable.flush()
}
},
enumerable: true,
}
}

const stream = new NextWritable()
stream.on('drain', () => {
const callbacks = queuedCallbacks
queuedCallbacks = []
callbacks.forEach((callback) => callback())
})

let resolved = false
const doResolve = (startWriting: any) => {
if (!resolved) {
resolved = true
resolve((res, next) => {
const drainHandler = () => {
const prevCallbacks = underlyingStream!.queuedCallbacks
underlyingStream!.queuedCallbacks = []
prevCallbacks.forEach((callback) => callback())
}
res.on('drain', drainHandler)
underlyingStream = {
resolve: (err) => {
underlyingStream = null
res.removeListener('drain', drainHandler)
next(err)
},
writable: res,
queuedCallbacks: [],
const doNext = (err?: Error) => {
underlyingStream = null
queuedCallbacks = []
next(err)
}

stream.once('error', (err) => doNext(err))
stream.once('finish', () => doNext())

underlyingStream = res
startWriting()
})
}
Expand Down

0 comments on commit aa32deb

Please sign in to comment.