diff --git a/packages/next/server/render.tsx b/packages/next/server/render.tsx index 1f98e28c862b492..7d88ecd1a806a3b 100644 --- a/packages/next/server/render.tsx +++ b/packages/next/server/render.tsx @@ -63,7 +63,12 @@ import { Redirect, } from '../lib/load-custom-routes' import { DomainLocale } from './config' -import { RenderResult, resultFromChunks, resultToChunks } from './utils' +import { + Observer, + RenderResult, + resultFromChunks, + resultToChunks, +} from './utils' function noRouter() { const message = @@ -416,6 +421,7 @@ export async function renderToHTML( previewProps, basePath, devOnlyCacheBusterQueryString, + requireStaticHTML, concurrentFeatures, } = renderOpts @@ -1002,32 +1008,57 @@ export async function renderToHTML( } } - // TODO: Support SSR streaming of Suspense. - const renderToString = concurrentFeatures - ? (element: React.ReactElement) => - new Promise((resolve, reject) => { - const stream = new PassThrough() - const buffers: Buffer[] = [] - stream.on('data', (chunk) => { - buffers.push(chunk) - }) - stream.once('end', () => { - resolve(Buffer.concat(buffers).toString('utf-8')) - }) - - const { - abort, - startWriting, - } = (ReactDOMServer as any).pipeToNodeWritable(element, stream, { - onError(error: Error) { + const generateStaticHTML = requireStaticHTML || inAmpMode + const renderToStream = (element: React.ReactElement) => + new Promise((resolve, reject) => { + const stream = new PassThrough() + let resolved = false + const doResolve = () => { + if (!resolved) { + resolved = true + resolve(({ complete, next }) => { + stream.on('data', (chunk) => { + next(chunk.toString('utf-8')) + }) + stream.once('end', () => { + complete() + }) + + startWriting() + return () => { abort() - reject(error) - }, - onCompleteAll() { - startWriting() - }, + } }) - }) + } + } + + const { + abort, + startWriting, + } = (ReactDOMServer as any).pipeToNodeWritable(element, stream, { + onError(error: Error) { + if (!resolved) { + resolved = true + reject(error) + } + abort() + }, + onReadyToStream() { + if (!generateStaticHTML) { + doResolve() + } + }, + onCompleteAll() { + doResolve() + }, + }) + }).then(multiplexResult) + + const renderToString = concurrentFeatures + ? async (element: React.ReactElement) => { + const result = await renderToStream(element) + return await resultsToString([result]) + } : ReactDOMServer.renderToString const renderPage: RenderPage = ( @@ -1285,6 +1316,87 @@ function mergeResults(chunks: Array): RenderResult { } } +function multiplexResult(result: RenderResult): RenderResult { + const chunks: Array = [] + const subscribers: Set> = new Set() + let terminator: ((subscriber: Observer) => void) | null = null + + result({ + next(chunk) { + chunks.push(chunk) + subscribers.forEach((subscriber) => subscriber.next(chunk)) + }, + error(error) { + if (!terminator) { + terminator = (subscriber) => subscriber.error(error) + subscribers.forEach(terminator) + subscribers.clear() + } + }, + complete() { + if (!terminator) { + terminator = (subscriber) => subscriber.complete() + subscribers.forEach(terminator) + subscribers.clear() + } + }, + }) + + return (innerSubscriber) => { + let completed = false + let cleanup = () => {} + const subscriber: Observer = { + next(chunk) { + if (!completed) { + try { + innerSubscriber.next(chunk) + } catch (err) { + subscriber.error(err) + } + } + }, + complete() { + if (!completed) { + cleanup() + try { + innerSubscriber.complete() + } catch {} + } + }, + error(err) { + if (!completed) { + cleanup() + try { + innerSubscriber.error(err) + } catch {} + } + }, + } + cleanup = () => { + completed = true + subscribers.delete(subscriber) + } + + process.nextTick(() => { + for (const chunk of chunks) { + if (completed) { + return + } + subscriber.next(chunk) + } + + if (!completed) { + if (!terminator) { + subscribers.add(subscriber) + } else { + terminator(subscriber) + } + } + }) + return () => cleanup() + } +} + function errorToJSON(err: Error): Error { const { name, message, stack } = err return { name, message, stack } diff --git a/packages/next/server/utils.ts b/packages/next/server/utils.ts index e898038ecb47d97..1334915ffc56232 100644 --- a/packages/next/server/utils.ts +++ b/packages/next/server/utils.ts @@ -16,12 +16,12 @@ export function cleanAmpPath(pathname: string): string { } export type Disposable = () => void -// TODO: Consider just using an actual Observable here -export type RenderResult = (observer: { - next(chunk: string): void +export type Observer = { + next(chunk: T): void error(error: Error): void complete(): void -}) => Disposable +} +export type RenderResult = (observer: Observer) => Disposable export function resultFromChunks(chunks: string[]): RenderResult { return ({ next, complete, error }) => {