Skip to content

Commit

Permalink
Update polyfill of web streams (#35571)
Browse files Browse the repository at this point in the history
* attach pipeTo and pipeThrough polyfills to instance

* remove transformer polyfill

* remove polyfill

* fix missing polyfill in sandbox

* always polyfill the runtime

* always polyfill web streams in renderer

* fix missing AbortController and AbortSignal

* type fix

* fix type generation

* use global

Co-authored-by: Shu Ding <shu@shus-mac-studio.localdomain>
  • Loading branch information
shuding and Shu Ding committed Apr 7, 2022
1 parent 9110b5a commit 049bb22
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 226 deletions.
14 changes: 6 additions & 8 deletions packages/next/server/next-server.ts
@@ -1,4 +1,6 @@
import './node-polyfill-fetch'
import './node-polyfill-web-streams'

import type { Params, Route } from './router'
import type { CacheFs } from '../shared/lib/utils'
import type { MiddlewareManifest } from '../build/webpack/plugins/middleware-plugin'
Expand All @@ -10,19 +12,15 @@ import type { Rewrite } from '../lib/load-custom-routes'
import type { BaseNextRequest, BaseNextResponse } from './base-http'
import type { PagesManifest } from '../build/webpack/plugins/pages-manifest-plugin'
import type { PayloadOptions } from './send-payload'

import { execOnce } from '../shared/lib/utils'
import {
addRequestMeta,
getRequestMeta,
NextParsedUrlQuery,
NextUrlWithParsedQuery,
} from './request-meta'
import type { NextParsedUrlQuery, NextUrlWithParsedQuery } from './request-meta'

import fs from 'fs'
import { join, relative, resolve, sep } from 'path'
import { IncomingMessage, ServerResponse } from 'http'

import { execOnce } from '../shared/lib/utils'
import { addRequestMeta, getRequestMeta } from './request-meta'

import {
PAGES_MANIFEST,
BUILD_ID_FILE,
Expand Down
10 changes: 7 additions & 3 deletions packages/next/server/node-polyfill-web-streams.js
@@ -1,8 +1,12 @@
import { TransformStream } from 'next/dist/compiled/web-streams-polyfill'
import { ReadableStream } from './web/sandbox/readable-stream'
import {
ReadableStream,
TransformStream,
} from 'next/dist/compiled/web-streams-polyfill'

// Polyfill Web Streams in the Node.js environment
// Polyfill Web Streams for the Node.js runtime.
if (!global.ReadableStream) {
global.ReadableStream = ReadableStream
}
if (!global.TransformStream) {
global.TransformStream = TransformStream
}
124 changes: 7 additions & 117 deletions packages/next/server/node-web-streams-helper.ts
Expand Up @@ -24,43 +24,6 @@ export function readableStreamTee<T = any>(
return [transformStream.readable, transformStream2.readable]
}

export function pipeTo<T>(
readable: ReadableStream<T>,
writable: WritableStream<T>,
options?: { preventClose: boolean }
) {
let resolver: () => void
const promise = new Promise<void>((resolve) => (resolver = resolve))

const reader = readable.getReader()
const writer = writable.getWriter()
function process() {
reader.read().then(({ done, value }) => {
if (done) {
if (options?.preventClose) {
writer.releaseLock()
} else {
writer.close()
}
resolver()
} else {
writer.write(value)
process()
}
})
}
process()
return promise
}

export function pipeThrough<Input, Output>(
readable: ReadableStream<Input>,
transformStream: TransformStream<Input, Output>
) {
pipeTo(readable, transformStream.writable)
return transformStream.readable
}

export function chainStreams<T>(
streams: ReadableStream<T>[]
): ReadableStream<T> {
Expand All @@ -69,9 +32,7 @@ export function chainStreams<T>(
let promise = Promise.resolve()
for (let i = 0; i < streams.length; ++i) {
promise = promise.then(() =>
pipeTo(streams[i], writable, {
preventClose: i + 1 < streams.length,
})
streams[i].pipeTo(writable, { preventClose: i + 1 < streams.length })
)
}

Expand Down Expand Up @@ -119,77 +80,6 @@ export function decodeText(input?: Uint8Array, textDecoder?: TextDecoder) {
: new TextDecoder().decode(input)
}

export function createTransformStream<Input, Output>({
flush,
transform,
}: {
flush?: (
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
transform?: (
chunk: Input,
controller: TransformStreamDefaultController<Output>
) => Promise<void> | void
}): TransformStream<Input, Output> {
const source = new TransformStream()
const sink = new TransformStream()
const reader = source.readable.getReader()
const writer = sink.writable.getWriter()

const controller = {
enqueue(chunk: Output) {
writer.write(chunk)
},

error(reason: Error) {
writer.abort(reason)
reader.cancel()
},

terminate() {
writer.close()
reader.cancel()
},

get desiredSize() {
return writer.desiredSize
},
}

;(async () => {
try {
while (true) {
const { done, value } = await reader.read()

if (done) {
const maybePromise = flush?.(controller)
if (maybePromise) {
await maybePromise
}
writer.close()
return
}

if (transform) {
const maybePromise = transform(value, controller)
if (maybePromise) {
await maybePromise
}
} else {
controller.enqueue(value)
}
}
} catch (err) {
writer.abort(err)
}
})()

return {
readable: sink.readable,
writable: source.writable,
}
}

export function createBufferedTransformStream(): TransformStream<
Uint8Array,
Uint8Array
Expand All @@ -213,7 +103,7 @@ export function createBufferedTransformStream(): TransformStream<

const textDecoder = new TextDecoder()

return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
bufferedString += decodeText(chunk, textDecoder)
flushBuffer(controller)
Expand All @@ -230,7 +120,7 @@ export function createBufferedTransformStream(): TransformStream<
export function createFlushEffectStream(
handleFlushEffect: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
return new TransformStream({
async transform(chunk, controller) {
const flushedChunk = encodeText(await handleFlushEffect())

Expand Down Expand Up @@ -285,7 +175,7 @@ export async function continueFromInitialStream({
].filter(Boolean) as any

return transforms.reduce(
(readable, transform) => pipeThrough(readable, transform),
(readable, transform) => readable.pipeThrough(transform),
renderStream
)
}
Expand Down Expand Up @@ -318,7 +208,7 @@ export async function renderToStream({
export function createSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
return createTransformStream({
return new TransformStream({
flush(controller) {
if (suffix) {
controller.enqueue(encodeText(suffix))
Expand All @@ -332,7 +222,7 @@ export function createPrefixStream(
): TransformStream<Uint8Array, Uint8Array> {
let prefixFlushed = false
let prefixPrefixFlushFinished: Promise<void> | null = null
return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
if (!prefixFlushed && prefix) {
Expand Down Expand Up @@ -362,7 +252,7 @@ export function createInlineDataStream(
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
return createTransformStream({
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)

Expand Down
18 changes: 7 additions & 11 deletions packages/next/server/render.tsx
Expand Up @@ -68,7 +68,6 @@ import {
readableStreamTee,
encodeText,
decodeText,
pipeThrough,
streamFromArray,
streamToString,
chainStreams,
Expand Down Expand Up @@ -1225,16 +1224,13 @@ export async function renderToHTML(

if (renderServerComponentData) {
return new RenderResult(
pipeThrough(
renderToReadableStream(
renderFlight(AppMod, ComponentMod, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
),
createBufferedTransformStream()
)
renderToReadableStream(
renderFlight(AppMod, ComponentMod, {
...props.pageProps,
...serverComponentProps,
}),
serverComponentManifest
).pipeThrough(createBufferedTransformStream())
)
}

Expand Down
8 changes: 4 additions & 4 deletions packages/next/server/web/sandbox/context.ts
Expand Up @@ -2,7 +2,7 @@ import type { Context } from 'vm'
import { Blob, File, FormData } from 'next/dist/compiled/formdata-node'
import { readFileSync, promises as fs } from 'fs'
import { requireDependencies } from './require'
import { TransformStream } from 'next/dist/compiled/web-streams-polyfill'
import '../../node-polyfill-web-streams'
import cookie from 'next/dist/compiled/cookie'
import * as polyfills from './polyfills'
import {
Expand Down Expand Up @@ -202,8 +202,8 @@ function createContext(options: {
timeLog: console.timeLog.bind(console),
warn: console.warn.bind(console),
},
AbortController: AbortController,
AbortSignal: AbortSignal,
AbortController,
AbortSignal,
CryptoKey: polyfills.CryptoKey,
Crypto: polyfills.Crypto,
crypto: new polyfills.Crypto(),
Expand All @@ -213,7 +213,7 @@ function createContext(options: {
...polyfills.process,
env: buildEnvironmentVariablesFrom(options.env),
},
ReadableStream: polyfills.ReadableStream,
ReadableStream,
setInterval,
setTimeout,
TextDecoder,
Expand Down
3 changes: 1 addition & 2 deletions packages/next/server/web/sandbox/polyfills.ts
Expand Up @@ -2,7 +2,6 @@ import { Crypto as WebCrypto } from 'next/dist/compiled/@peculiar/webcrypto'
import { CryptoKey } from 'next/dist/compiled/@peculiar/webcrypto'
import { v4 as uuid } from 'next/dist/compiled/uuid'
import processPolyfill from 'next/dist/compiled/process'
import { ReadableStream } from './readable-stream'

import crypto from 'crypto'

Expand All @@ -14,7 +13,7 @@ export function btoa(str: string) {
return Buffer.from(str, 'binary').toString('base64')
}

export { CryptoKey, ReadableStream, processPolyfill as process }
export { CryptoKey, processPolyfill as process }

export class Crypto extends WebCrypto {
// @ts-ignore Remove once types are updated and we deprecate node 12
Expand Down
81 changes: 0 additions & 81 deletions packages/next/server/web/sandbox/readable-stream.ts

This file was deleted.

0 comments on commit 049bb22

Please sign in to comment.