Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify custom Writable #32247

Merged
merged 7 commits into from Dec 8, 2021
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 38 additions & 58 deletions packages/next/server/render.tsx
Expand Up @@ -1439,87 +1439,67 @@ function renderToNodeStream(
generateStaticHTML: boolean
): Promise<NodeWritablePiper> {
return new Promise((resolve, reject) => {
let underlyingStream: {
resolve: (error?: Error) => void
writable: WritableType
queuedCallbacks: Array<() => void>
} | null = null

const stream = new Writable({
// Use the buffer from the underlying stream
highWaterMark: 0,
writev(chunks, callback) {
let str = ''
for (let { chunk } of chunks) {
str += chunk.toString()
}

let underlyingStream: WritableType | null = null
let queuedCallbacks: Array<(error?: Error | null) => void> = []

// Based on the suggestion here:
// https://github.com/reactwg/react-18/discussions/110
class NextWritable extends Writable {
_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void
) {
if (!underlyingStream) {
throw new Error(
'invariant: write called without an underlying stream. This is a bug in Next.js'
)
}

if (!underlyingStream.writable.write(str)) {
underlyingStream.queuedCallbacks.push(() => callback())
// The compression module (https://github.com/expressjs/compression) doesn't
// support callbacks, so we have to wait for a drain event.
if (!underlyingStream.write(chunk, encoding)) {
queuedCallbacks.push(callback)
} else {
callback()
}
},
})
stream.once('finish', () => {
if (!underlyingStream) {
throw new Error(
'invariant: finish called without an underlying stream. This is a bug in Next.js'
)
}
underlyingStream.resolve()
})
stream.once('error', (err) => {
if (!underlyingStream) {
throw new Error(
'invariant: error called without an underlying stream. This is a bug in Next.js'
)
}
underlyingStream.resolve(err)
})
// React uses `flush` to prevent stream middleware like gzip from buffering to the
// point of harming streaming performance, so we make sure to expose it and forward it.
// See: https://github.com/reactwg/react-18/discussions/91
Object.defineProperty(stream, 'flush', {
value: () => {

flush() {
if (!underlyingStream) {
throw new Error(
'invariant: flush called without an underlying stream. This is a bug in Next.js'
)
}
if (typeof (underlyingStream.writable as any).flush === 'function') {
;(underlyingStream.writable as any).flush()

const anyWritable = underlyingStream as any
if (typeof anyWritable.flush === 'function') {
anyWritable.flush()
}
},
enumerable: true,
}
}

const stream = new NextWritable()
stream.on('drain', () => {
const callbacks = queuedCallbacks
queuedCallbacks = []
callbacks.forEach((callback) => callback())
})

let resolved = false
const doResolve = (startWriting: any) => {
if (!resolved) {
resolved = true
resolve((res, next) => {
const drainHandler = () => {
const prevCallbacks = underlyingStream!.queuedCallbacks
underlyingStream!.queuedCallbacks = []
prevCallbacks.forEach((callback) => callback())
}
res.on('drain', drainHandler)
underlyingStream = {
resolve: (err) => {
underlyingStream = null
res.removeListener('drain', drainHandler)
next(err)
},
writable: res,
queuedCallbacks: [],
const doNext = (err?: Error) => {
underlyingStream = null
queuedCallbacks = []
next(err)
}

stream.once('error', (err) => doNext(err))
stream.once('finish', () => doNext())

underlyingStream = res
startWriting()
})
}
Expand Down