diff --git a/packages/next/server/next-server.ts b/packages/next/server/next-server.ts index c0e8a2b9b3cc349..2291f6514f08d42 100644 --- a/packages/next/server/next-server.ts +++ b/packages/next/server/next-server.ts @@ -1,4 +1,6 @@ import './node-polyfill-fetch' +import './node-polyfill-web-streams' + import type { Params, Route } from './router' import type { CacheFs } from '../shared/lib/utils' import type { MiddlewareManifest } from '../build/webpack/plugins/middleware-plugin' @@ -10,19 +12,15 @@ import type { Rewrite } from '../lib/load-custom-routes' import type { BaseNextRequest, BaseNextResponse } from './base-http' import type { PagesManifest } from '../build/webpack/plugins/pages-manifest-plugin' import type { PayloadOptions } from './send-payload' - -import { execOnce } from '../shared/lib/utils' -import { - addRequestMeta, - getRequestMeta, - NextParsedUrlQuery, - NextUrlWithParsedQuery, -} from './request-meta' +import type { NextParsedUrlQuery, NextUrlWithParsedQuery } from './request-meta' import fs from 'fs' import { join, relative, resolve, sep } from 'path' import { IncomingMessage, ServerResponse } from 'http' +import { execOnce } from '../shared/lib/utils' +import { addRequestMeta, getRequestMeta } from './request-meta' + import { PAGES_MANIFEST, BUILD_ID_FILE, diff --git a/packages/next/server/node-polyfill-web-streams.js b/packages/next/server/node-polyfill-web-streams.js index 8c022a6fe4425d2..4eab8096e5f122f 100644 --- a/packages/next/server/node-polyfill-web-streams.js +++ b/packages/next/server/node-polyfill-web-streams.js @@ -1,8 +1,12 @@ -import { TransformStream } from 'next/dist/compiled/web-streams-polyfill' -import { ReadableStream } from './web/sandbox/readable-stream' +import { + ReadableStream, + TransformStream, +} from 'next/dist/compiled/web-streams-polyfill' -// Polyfill Web Streams in the Node.js environment +// Polyfill Web Streams for the Node.js runtime. if (!global.ReadableStream) { global.ReadableStream = ReadableStream +} +if (!global.TransformStream) { global.TransformStream = TransformStream } diff --git a/packages/next/server/node-web-streams-helper.ts b/packages/next/server/node-web-streams-helper.ts index 9bd2f6a72fa3142..9ef5e747d6813de 100644 --- a/packages/next/server/node-web-streams-helper.ts +++ b/packages/next/server/node-web-streams-helper.ts @@ -24,43 +24,6 @@ export function readableStreamTee( return [transformStream.readable, transformStream2.readable] } -export function pipeTo( - readable: ReadableStream, - writable: WritableStream, - options?: { preventClose: boolean } -) { - let resolver: () => void - const promise = new Promise((resolve) => (resolver = resolve)) - - const reader = readable.getReader() - const writer = writable.getWriter() - function process() { - reader.read().then(({ done, value }) => { - if (done) { - if (options?.preventClose) { - writer.releaseLock() - } else { - writer.close() - } - resolver() - } else { - writer.write(value) - process() - } - }) - } - process() - return promise -} - -export function pipeThrough( - readable: ReadableStream, - transformStream: TransformStream -) { - pipeTo(readable, transformStream.writable) - return transformStream.readable -} - export function chainStreams( streams: ReadableStream[] ): ReadableStream { @@ -69,9 +32,7 @@ export function chainStreams( let promise = Promise.resolve() for (let i = 0; i < streams.length; ++i) { promise = promise.then(() => - pipeTo(streams[i], writable, { - preventClose: i + 1 < streams.length, - }) + streams[i].pipeTo(writable, { preventClose: i + 1 < streams.length }) ) } @@ -119,77 +80,6 @@ export function decodeText(input?: Uint8Array, textDecoder?: TextDecoder) { : new TextDecoder().decode(input) } -export function createTransformStream({ - flush, - transform, -}: { - flush?: ( - controller: TransformStreamDefaultController - ) => Promise | void - transform?: ( - chunk: Input, - controller: TransformStreamDefaultController - ) => Promise | void -}): TransformStream { - const source = new TransformStream() - const sink = new TransformStream() - const reader = source.readable.getReader() - const writer = sink.writable.getWriter() - - const controller = { - enqueue(chunk: Output) { - writer.write(chunk) - }, - - error(reason: Error) { - writer.abort(reason) - reader.cancel() - }, - - terminate() { - writer.close() - reader.cancel() - }, - - get desiredSize() { - return writer.desiredSize - }, - } - - ;(async () => { - try { - while (true) { - const { done, value } = await reader.read() - - if (done) { - const maybePromise = flush?.(controller) - if (maybePromise) { - await maybePromise - } - writer.close() - return - } - - if (transform) { - const maybePromise = transform(value, controller) - if (maybePromise) { - await maybePromise - } - } else { - controller.enqueue(value) - } - } - } catch (err) { - writer.abort(err) - } - })() - - return { - readable: sink.readable, - writable: source.writable, - } -} - export function createBufferedTransformStream(): TransformStream< Uint8Array, Uint8Array @@ -213,7 +103,7 @@ export function createBufferedTransformStream(): TransformStream< const textDecoder = new TextDecoder() - return createTransformStream({ + return new TransformStream({ transform(chunk, controller) { bufferedString += decodeText(chunk, textDecoder) flushBuffer(controller) @@ -230,7 +120,7 @@ export function createBufferedTransformStream(): TransformStream< export function createFlushEffectStream( handleFlushEffect: () => Promise ): TransformStream { - return createTransformStream({ + return new TransformStream({ async transform(chunk, controller) { const flushedChunk = encodeText(await handleFlushEffect()) @@ -285,7 +175,7 @@ export async function continueFromInitialStream({ ].filter(Boolean) as any return transforms.reduce( - (readable, transform) => pipeThrough(readable, transform), + (readable, transform) => readable.pipeThrough(transform), renderStream ) } @@ -318,7 +208,7 @@ export async function renderToStream({ export function createSuffixStream( suffix: string ): TransformStream { - return createTransformStream({ + return new TransformStream({ flush(controller) { if (suffix) { controller.enqueue(encodeText(suffix)) @@ -332,7 +222,7 @@ export function createPrefixStream( ): TransformStream { let prefixFlushed = false let prefixPrefixFlushFinished: Promise | null = null - return createTransformStream({ + return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk) if (!prefixFlushed && prefix) { @@ -362,7 +252,7 @@ export function createInlineDataStream( dataStream: ReadableStream ): TransformStream { let dataStreamFinished: Promise | null = null - return createTransformStream({ + return new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk) diff --git a/packages/next/server/render.tsx b/packages/next/server/render.tsx index 337b07ed618589a..6b2b8648257a9b6 100644 --- a/packages/next/server/render.tsx +++ b/packages/next/server/render.tsx @@ -68,7 +68,6 @@ import { readableStreamTee, encodeText, decodeText, - pipeThrough, streamFromArray, streamToString, chainStreams, @@ -1225,16 +1224,13 @@ export async function renderToHTML( if (renderServerComponentData) { return new RenderResult( - pipeThrough( - renderToReadableStream( - renderFlight(AppMod, ComponentMod, { - ...props.pageProps, - ...serverComponentProps, - }), - serverComponentManifest - ), - createBufferedTransformStream() - ) + renderToReadableStream( + renderFlight(AppMod, ComponentMod, { + ...props.pageProps, + ...serverComponentProps, + }), + serverComponentManifest + ).pipeThrough(createBufferedTransformStream()) ) } diff --git a/packages/next/server/web/sandbox/context.ts b/packages/next/server/web/sandbox/context.ts index 7159bea301386a3..71c221a6078c321 100644 --- a/packages/next/server/web/sandbox/context.ts +++ b/packages/next/server/web/sandbox/context.ts @@ -2,7 +2,7 @@ import type { Context } from 'vm' import { Blob, File, FormData } from 'next/dist/compiled/formdata-node' import { readFileSync, promises as fs } from 'fs' import { requireDependencies } from './require' -import { TransformStream } from 'next/dist/compiled/web-streams-polyfill' +import '../../node-polyfill-web-streams' import cookie from 'next/dist/compiled/cookie' import * as polyfills from './polyfills' import { @@ -202,8 +202,8 @@ function createContext(options: { timeLog: console.timeLog.bind(console), warn: console.warn.bind(console), }, - AbortController: AbortController, - AbortSignal: AbortSignal, + AbortController, + AbortSignal, CryptoKey: polyfills.CryptoKey, Crypto: polyfills.Crypto, crypto: new polyfills.Crypto(), @@ -213,7 +213,7 @@ function createContext(options: { ...polyfills.process, env: buildEnvironmentVariablesFrom(options.env), }, - ReadableStream: polyfills.ReadableStream, + ReadableStream, setInterval, setTimeout, TextDecoder, diff --git a/packages/next/server/web/sandbox/polyfills.ts b/packages/next/server/web/sandbox/polyfills.ts index ce083474526504d..85d8c42cfe52715 100644 --- a/packages/next/server/web/sandbox/polyfills.ts +++ b/packages/next/server/web/sandbox/polyfills.ts @@ -2,7 +2,6 @@ import { Crypto as WebCrypto } from 'next/dist/compiled/@peculiar/webcrypto' import { CryptoKey } from 'next/dist/compiled/@peculiar/webcrypto' import { v4 as uuid } from 'next/dist/compiled/uuid' import processPolyfill from 'next/dist/compiled/process' -import { ReadableStream } from './readable-stream' import crypto from 'crypto' @@ -14,7 +13,7 @@ export function btoa(str: string) { return Buffer.from(str, 'binary').toString('base64') } -export { CryptoKey, ReadableStream, processPolyfill as process } +export { CryptoKey, processPolyfill as process } export class Crypto extends WebCrypto { // @ts-ignore Remove once types are updated and we deprecate node 12 diff --git a/packages/next/server/web/sandbox/readable-stream.ts b/packages/next/server/web/sandbox/readable-stream.ts deleted file mode 100644 index 7c0a1a43bd48568..000000000000000 --- a/packages/next/server/web/sandbox/readable-stream.ts +++ /dev/null @@ -1,81 +0,0 @@ -class ReadableStream { - constructor(opts: UnderlyingSource = {}) { - let closed = false - let pullPromise: any - - let transformController: TransformStreamDefaultController - const { readable, writable } = new TransformStream( - { - start: (controller: TransformStreamDefaultController) => { - transformController = controller - }, - }, - undefined, - { - highWaterMark: 1, - } - ) - - const writer = writable.getWriter() - const encoder = new TextEncoder() - const controller: ReadableStreamController = { - get desiredSize() { - return transformController.desiredSize - }, - close: () => { - if (!closed) { - closed = true - writer.close() - } - }, - enqueue: (chunk: T) => { - writer.write(typeof chunk === 'string' ? encoder.encode(chunk) : chunk) - pull() - }, - error: (reason: any) => { - transformController.error(reason) - }, - } - - const pull = () => { - if (opts.pull) { - const shouldPull = - controller.desiredSize !== null && controller.desiredSize > 0 - if (!pullPromise && shouldPull) { - pullPromise = Promise.resolve().then(() => { - pullPromise = 0 - opts.pull!(controller) - }) - return pullPromise - } - } - return Promise.resolve() - } - - if (opts.cancel) { - readable.cancel = (reason: any) => { - opts.cancel!(reason) - return readable.cancel(reason) - } - } - - function registerPull() { - const getReader = readable.getReader.bind(readable) - readable.getReader = () => { - pull() - return getReader() - } - } - - const started = opts.start && opts.start(controller) - if (started && typeof started.then === 'function') { - started.then(() => registerPull()) - } else { - registerPull() - } - - return readable - } -} - -export { ReadableStream }