Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update polyfill of web streams #35571

Merged
merged 16 commits into from Apr 7, 2022
14 changes: 6 additions & 8 deletions packages/next/server/next-server.ts
@@ -1,4 +1,6 @@
import './node-polyfill-fetch'
import './node-polyfill-web-streams'

import type { Params, Route } from './router'
import type { CacheFs } from '../shared/lib/utils'
import type { MiddlewareManifest } from '../build/webpack/plugins/middleware-plugin'
Expand All @@ -10,19 +12,15 @@ import type { Rewrite } from '../lib/load-custom-routes'
import type { BaseNextRequest, BaseNextResponse } from './base-http'
import type { PagesManifest } from '../build/webpack/plugins/pages-manifest-plugin'
import type { PayloadOptions } from './send-payload'

import { execOnce } from '../shared/lib/utils'
import {
addRequestMeta,
getRequestMeta,
NextParsedUrlQuery,
NextUrlWithParsedQuery,
} from './request-meta'
import type { NextParsedUrlQuery, NextUrlWithParsedQuery } from './request-meta'

import fs from 'fs'
import { join, relative, resolve, sep } from 'path'
import { IncomingMessage, ServerResponse } from 'http'

import { execOnce } from '../shared/lib/utils'
import { addRequestMeta, getRequestMeta } from './request-meta'

import {
PAGES_MANIFEST,
BUILD_ID_FILE,
Expand Down
10 changes: 7 additions & 3 deletions packages/next/server/node-polyfill-web-streams.js
@@ -1,8 +1,12 @@
import { TransformStream } from 'next/dist/compiled/web-streams-polyfill'
import { ReadableStream } from './web/sandbox/readable-stream'
import {
ReadableStream,
TransformStream,
} from 'next/dist/compiled/web-streams-polyfill'

// Polyfill Web Streams in the Node.js environment
// Polyfill Web Streams for the Node.js runtime.
if (!global.ReadableStream) {
global.ReadableStream = ReadableStream
}
if (!global.TransformStream) {
global.TransformStream = TransformStream
}
124 changes: 7 additions & 117 deletions packages/next/server/node-web-streams-helper.ts
Expand Up @@ -24,43 +24,6 @@ export function readableStreamTee<T = any>(
return [transformStream.readable, transformStream2.readable]
}

export function pipeTo<T>(
readable: ReadableStream<T>,
writable: WritableStream<T>,
options?: { preventClose: boolean }
) {
let resolver: () => void
const promise = new Promise<void>((resolve) => (resolver = resolve))

const reader = readable.getReader()
const writer = writable.getWriter()
function process() {
reader.read().then(({ done, value }) => {
if (done) {
if (options?.preventClose) {
writer.releaseLock()
} else {
writer.close()
}
resolver()
} else {
writer.write(value)
process()
}
})
}
process()
return promise
}

export function pipeThrough<Input, Output>(
readable: ReadableStream<Input>,
transformStream: TransformStream<Input, Output>
) {
pipeTo(readable, transformStream.writable)
return transformStream.readable
}

export function chainStreams<T>(
streams: ReadableStream<T>[]
): ReadableStream<T> {
Expand All @@ -69,9 +32,7 @@ export function chainStreams<T>(
let promise = Promise.resolve()
for (let i = 0; i < streams.length; ++i) {
promise = promise.then(() =>
pipeTo(streams[i], writable, {
preventClose: i + 1 < streams.length,
})
streams[i].pipeTo(writable, { preventClose: i + 1 < streams.length })
)
}

Expand Down Expand Up @@ -119,77 +80,6 @@ export function decodeText(input?: Uint8Array, textDecoder?: TextDecoder) {
: new TextDecoder().decode(input)
}

export function createTransformStream<Input, Output>({
flush,
transform,
}: {
flush?: (
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
transform?: (
chunk: Input,
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
}): TransformStream<Input, Output> {
const source = new TransformStream()
const sink = new TransformStream()
const reader = source.readable.getReader()
const writer = sink.writable.getWriter()

const controller = {
enqueue(chunk: Output) {
writer.write(chunk)
},

error(reason: Error) {
writer.abort(reason)
reader.cancel()
},

terminate() {
writer.close()
reader.cancel()
},

get desiredSize() {
return writer.desiredSize
},
}

;(async () => {
try {
while (true) {
const { done, value } = await reader.read()

if (done) {
const maybePromise = flush?.(controller)
if (maybePromise) {
await maybePromise
}
writer.close()
return
}

if (transform) {
const maybePromise = transform(value, controller)
if (maybePromise) {
await maybePromise
}
} else {
controller.enqueue(value)
}
}
} catch (err) {
writer.abort(err)
}
})()

return {
readable: sink.readable,
writable: source.writable,
}
}

export function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
Expand All @@ -213,7 +103,7 @@ export function createBufferedTransformStream(): TransformStream<

const textDecoder = new TextDecoder()

return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
bufferedString += decodeText(chunk, textDecoder)
flushBuffer(controller)
Expand All @@ -230,7 +120,7 @@ export function createBufferedTransformStream(): TransformStream<
export function createFlushEffectStream(
handleFlushEffect: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
return new TransformStream({
async transform(chunk, controller) {
const flushedChunk = encodeText(await handleFlushEffect())

Expand Down Expand Up @@ -285,7 +175,7 @@ export async function continueFromInitialStream({
].filter(Boolean) as any

return transforms.reduce(
(readable, transform) => pipeThrough(readable, transform),
(readable, transform) => readable.pipeThrough(transform),
renderStream
)
}
Expand Down Expand Up @@ -318,7 +208,7 @@ export async function renderToStream({
export function createSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
return new TransformStream({
flush(controller) {
if (suffix) {
controller.enqueue(encodeText(suffix))
Expand All @@ -332,7 +222,7 @@ export function createPrefixStream(
): TransformStream<Uint8Array, Uint8Array> {
let prefixFlushed = false
let prefixPrefixFlushFinished: Promise<void> | null = null
return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
if (!prefixFlushed && prefix) {
Expand Down Expand Up @@ -362,7 +252,7 @@ export function createInlineDataStream(
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)

Expand Down
18 changes: 7 additions & 11 deletions packages/next/server/render.tsx
Expand Up @@ -68,7 +68,6 @@ import {
readableStreamTee,
encodeText,
decodeText,
pipeThrough,
streamFromArray,
streamToString,
chainStreams,
Expand Down Expand Up @@ -1225,16 +1224,13 @@ export async function renderToHTML(

if (renderServerComponentData) {
return new RenderResult(
pipeThrough(
renderToReadableStream(
renderFlight(AppMod, ComponentMod, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
),
createBufferedTransformStream()
)
renderToReadableStream(
renderFlight(AppMod, ComponentMod, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
).pipeThrough(createBufferedTransformStream())
)
}

Expand Down
8 changes: 4 additions & 4 deletions packages/next/server/web/sandbox/context.ts
Expand Up @@ -2,7 +2,7 @@ import type { Context } from 'vm'
import { Blob, File, FormData } from 'next/dist/compiled/formdata-node'
import { readFileSync, promises as fs } from 'fs'
import { requireDependencies } from './require'
import { TransformStream } from 'next/dist/compiled/web-streams-polyfill'
import '../../node-polyfill-web-streams'
import cookie from 'next/dist/compiled/cookie'
import * as polyfills from './polyfills'
import {
Expand Down Expand Up @@ -202,8 +202,8 @@ function createContext(options: {
timeLog: console.timeLog.bind(console),
warn: console.warn.bind(console),
},
AbortController: AbortController,
AbortSignal: AbortSignal,
AbortController,
AbortSignal,
CryptoKey: polyfills.CryptoKey,
Crypto: polyfills.Crypto,
crypto: new polyfills.Crypto(),
Expand All @@ -213,7 +213,7 @@ function createContext(options: {
...polyfills.process,
env: buildEnvironmentVariablesFrom(options.env),
},
ReadableStream: polyfills.ReadableStream,
ReadableStream,
setInterval,
setTimeout,
TextDecoder,
Expand Down
3 changes: 1 addition & 2 deletions packages/next/server/web/sandbox/polyfills.ts
Expand Up @@ -2,7 +2,6 @@ import { Crypto as WebCrypto } from 'next/dist/compiled/@peculiar/webcrypto'
import { CryptoKey } from 'next/dist/compiled/@peculiar/webcrypto'
import { v4 as uuid } from 'next/dist/compiled/uuid'
import processPolyfill from 'next/dist/compiled/process'
import { ReadableStream } from './readable-stream'

import crypto from 'crypto'

Expand All @@ -14,7 +13,7 @@ export function btoa(str: string) {
return Buffer.from(str, 'binary').toString('base64')
}

export { CryptoKey, ReadableStream, processPolyfill as process }
export { CryptoKey, processPolyfill as process }

export class Crypto extends WebCrypto {
// @ts-ignore Remove once types are updated and we deprecate node 12
Expand Down
81 changes: 0 additions & 81 deletions packages/next/server/web/sandbox/readable-stream.ts

This file was deleted.