From 51559f5c645c6ad8ba1bf2da74bac284015f887e Mon Sep 17 00:00:00 2001 From: Gerald Monaco Date: Tue, 17 Aug 2021 20:29:43 -0700 Subject: [PATCH] Use `zen-observable` library (#28214) Our `Observable` use has gotten sufficiently complex that it makes sense to just use a 3rd party implementation and not worry about maintaining it ourselves. As a bonus, it doesn't rely on Node APIs. --- .../next-serverless-loader/page-handler.ts | 8 +- packages/next/compiled/zen-observable/LICENSE | 18 ++ packages/next/compiled/zen-observable/esm.js | 1 + .../next/compiled/zen-observable/package.json | 1 + packages/next/export/worker.ts | 15 +- packages/next/package.json | 4 +- packages/next/server/next-server.ts | 23 +-- packages/next/server/render.tsx | 178 +++++------------- packages/next/server/response-cache.ts | 7 +- packages/next/server/send-payload.ts | 16 +- packages/next/server/utils.ts | 57 ++---- packages/next/taskfile.js | 11 ++ packages/next/types/misc.d.ts | 4 + yarn.lock | 10 + 14 files changed, 142 insertions(+), 211 deletions(-) create mode 100644 packages/next/compiled/zen-observable/LICENSE create mode 100644 packages/next/compiled/zen-observable/esm.js create mode 100644 packages/next/compiled/zen-observable/package.json diff --git a/packages/next/build/webpack/loaders/next-serverless-loader/page-handler.ts b/packages/next/build/webpack/loaders/next-serverless-loader/page-handler.ts index b88398bb5314fb5..db761f6871d3e7b 100644 --- a/packages/next/build/webpack/loaders/next-serverless-loader/page-handler.ts +++ b/packages/next/build/webpack/loaders/next-serverless-loader/page-handler.ts @@ -11,7 +11,7 @@ import { setLazyProp, getCookieParser } from '../../../../server/api-utils' import { getRedirectStatus } from '../../../../lib/load-custom-routes' import getRouteNoAssetPath from '../../../../shared/lib/router/utils/get-route-from-asset-path' import { PERMANENT_REDIRECT_STATUS } from '../../../../shared/lib/constants' -import { resultToChunks } from '../../../../server/utils' +import { resultsToString } from '../../../../server/utils' export function getPageHandler(ctx: ServerlessHandlerCtx) { const { @@ -334,7 +334,7 @@ export function getPageHandler(ctx: ServerlessHandlerCtx) { defaultLocale: i18n?.defaultLocale, }) ) - const html = result2 ? (await resultToChunks(result2)).join('') : '' + const html = result2 ? await resultsToString([result2]) : '' sendPayload( req, res, @@ -402,7 +402,7 @@ export function getPageHandler(ctx: ServerlessHandlerCtx) { } if (renderMode) return { html: result, renderOpts } - return result ? (await resultToChunks(result)).join('') : null + return result ? await resultsToString([result]) : null } catch (err) { if (!parsedUrl!) { parsedUrl = parseUrl(req.url!, true) @@ -464,7 +464,7 @@ export function getPageHandler(ctx: ServerlessHandlerCtx) { err: res.statusCode === 404 ? undefined : err, }) ) - return result2 ? (await resultToChunks(result2)).join('') : null + return result2 ? await resultsToString([result2]) : null } } diff --git a/packages/next/compiled/zen-observable/LICENSE b/packages/next/compiled/zen-observable/LICENSE new file mode 100644 index 000000000000000..d850f52720232cb --- /dev/null +++ b/packages/next/compiled/zen-observable/LICENSE @@ -0,0 +1,18 @@ +Copyright (c) 2018 zenparsing (Kevin Smith) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/packages/next/compiled/zen-observable/esm.js b/packages/next/compiled/zen-observable/esm.js new file mode 100644 index 000000000000000..141e5f414a3a331 --- /dev/null +++ b/packages/next/compiled/zen-observable/esm.js @@ -0,0 +1 @@ +module.exports=(()=>{"use strict";var e={343:(e,t,r)=>{r.r(t);r.d(t,{Observable:()=>Observable,combineLatest:()=>combineLatest,default:()=>l,merge:()=>merge,zip:()=>zip});const n=()=>typeof Symbol==="function";const o=e=>n()&&Boolean(Symbol[e]);const i=e=>o(e)?Symbol[e]:"@@"+e;if(n()&&!o("observable")){Symbol.observable=Symbol("observable")}const s=i("iterator");const u=i("observable");const c=i("species");function getMethod(e,t){let r=e[t];if(r==null)return undefined;if(typeof r!=="function")throw new TypeError(r+" is not a function");return r}function getSpecies(e){let t=e.constructor;if(t!==undefined){t=t[c];if(t===null){t=undefined}}return t!==undefined?t:Observable}function isObservable(e){return e instanceof Observable}function hostReportError(e){if(hostReportError.log){hostReportError.log(e)}else{setTimeout(()=>{throw e})}}function enqueue(e){Promise.resolve().then(()=>{try{e()}catch(e){hostReportError(e)}})}function cleanupSubscription(e){let t=e._cleanup;if(t===undefined)return;e._cleanup=undefined;if(!t){return}try{if(typeof t==="function"){t()}else{let e=getMethod(t,"unsubscribe");if(e){e.call(t)}}}catch(e){hostReportError(e)}}function closeSubscription(e){e._observer=undefined;e._queue=undefined;e._state="closed"}function flushSubscription(e){let t=e._queue;if(!t){return}e._queue=undefined;e._state="ready";for(let r=0;rflushSubscription(e));return}notifySubscription(e,t,r)}class Subscription{constructor(e,t){this._cleanup=undefined;this._observer=e;this._queue=undefined;this._state="initializing";let r=new SubscriptionObserver(this);try{this._cleanup=t.call(undefined,r)}catch(e){r.error(e)}if(this._state==="initializing")this._state="ready"}get closed(){return this._state==="closed"}unsubscribe(){if(this._state!=="closed"){closeSubscription(this);cleanupSubscription(this)}}}class SubscriptionObserver{constructor(e){this._subscription=e}get closed(){return this._subscription._state==="closed"}next(e){onNotify(this._subscription,"next",e)}error(e){onNotify(this._subscription,"error",e)}complete(){onNotify(this._subscription,"complete")}}class Observable{constructor(e){if(!(this instanceof Observable))throw new TypeError("Observable cannot be called as a function");if(typeof e!=="function")throw new TypeError("Observable initializer must be a function");this._subscriber=e}subscribe(e){if(typeof e!=="object"||e===null){e={next:e,error:arguments[1],complete:arguments[2]}}return new Subscription(e,this._subscriber)}forEach(e){return new Promise((t,r)=>{if(typeof e!=="function"){r(new TypeError(e+" is not a function"));return}function done(){n.unsubscribe();t()}let n=this.subscribe({next(t){try{e(t,done)}catch(e){r(e);n.unsubscribe()}},error:r,complete:t})})}map(e){if(typeof e!=="function")throw new TypeError(e+" is not a function");let t=getSpecies(this);return new t(t=>this.subscribe({next(r){try{r=e(r)}catch(e){return t.error(e)}t.next(r)},error(e){t.error(e)},complete(){t.complete()}}))}filter(e){if(typeof e!=="function")throw new TypeError(e+" is not a function");let t=getSpecies(this);return new t(t=>this.subscribe({next(r){try{if(!e(r))return}catch(e){return t.error(e)}t.next(r)},error(e){t.error(e)},complete(){t.complete()}}))}reduce(e){if(typeof e!=="function")throw new TypeError(e+" is not a function");let t=getSpecies(this);let r=arguments.length>1;let n=false;let o=arguments[1];let i=o;return new t(t=>this.subscribe({next(o){let s=!n;n=true;if(!s||r){try{i=e(i,o)}catch(e){return t.error(e)}}else{i=o}},error(e){t.error(e)},complete(){if(!n&&!r)return t.error(new TypeError("Cannot reduce an empty sequence"));t.next(i);t.complete()}}))}concat(...e){let t=getSpecies(this);return new t(r=>{let n;let o=0;function startNext(i){n=i.subscribe({next(e){r.next(e)},error(e){r.error(e)},complete(){if(o===e.length){n=undefined;r.complete()}else{startNext(t.from(e[o++]))}}})}startNext(this);return()=>{if(n){n.unsubscribe();n=undefined}}})}flatMap(e){if(typeof e!=="function")throw new TypeError(e+" is not a function");let t=getSpecies(this);return new t(r=>{let n=[];let o=this.subscribe({next(o){if(e){try{o=e(o)}catch(e){return r.error(e)}}let i=t.from(o).subscribe({next(e){r.next(e)},error(e){r.error(e)},complete(){let e=n.indexOf(i);if(e>=0)n.splice(e,1);completeIfDone()}});n.push(i)},error(e){r.error(e)},complete(){completeIfDone()}});function completeIfDone(){if(o.closed&&n.length===0)r.complete()}return()=>{n.forEach(e=>e.unsubscribe());o.unsubscribe()}})}[u](){return this}static from(e){let t=typeof this==="function"?this:Observable;if(e==null)throw new TypeError(e+" is not an object");let r=getMethod(e,u);if(r){let n=r.call(e);if(Object(n)!==n)throw new TypeError(n+" is not an object");if(isObservable(n)&&n.constructor===t)return n;return new t(e=>n.subscribe(e))}if(o("iterator")){r=getMethod(e,s);if(r){return new t(t=>{enqueue(()=>{if(t.closed)return;for(let n of r.call(e)){t.next(n);if(t.closed)return}t.complete()})})}}if(Array.isArray(e)){return new t(t=>{enqueue(()=>{if(t.closed)return;for(let r=0;r{enqueue(()=>{if(t.closed)return;for(let r=0;r{if(e.length===0)return Observable.from([]);let r=e.length;let n=e.map(e=>Observable.from(e).subscribe({next(e){t.next(e)},error(e){t.error(e)},complete(){if(--r===0)t.complete()}}));return()=>n.forEach(e=>e.unsubscribe())})}function combineLatest(...e){return new Observable(t=>{if(e.length===0)return Observable.from([]);let r=e.length;let n=new Set;let o=false;let i=e.map(()=>undefined);let s=e.map((s,u)=>Observable.from(s).subscribe({next(r){i[u]=r;if(!o){n.add(u);if(n.size!==e.length)return;n=null;o=true}t.next(Array.from(i))},error(e){t.error(e)},complete(){if(--r===0)t.complete()}}));return()=>s.forEach(e=>e.unsubscribe())})}function zip(...e){return new Observable(t=>{if(e.length===0)return Observable.from([]);let r=e.map(()=>[]);function done(){return r.some((e,t)=>e.length===0&&n[t].closed)}let n=e.map((e,n)=>Observable.from(e).subscribe({next(e){r[n].push(e);if(r.every(e=>e.length>0)){t.next(r.map(e=>e.shift()));if(done())t.complete()}},error(e){t.error(e)},complete(){if(done())t.complete()}}));return()=>n.forEach(e=>e.unsubscribe())})}const l=Observable}};var t={};function __nccwpck_require__(r){if(t[r]){return t[r].exports}var n=t[r]={exports:{}};var o=true;try{e[r](n,n.exports,__nccwpck_require__);o=false}finally{if(o)delete t[r]}return n.exports}(()=>{__nccwpck_require__.d=((e,t)=>{for(var r in t){if(__nccwpck_require__.o(t,r)&&!__nccwpck_require__.o(e,r)){Object.defineProperty(e,r,{enumerable:true,get:t[r]})}}})})();(()=>{__nccwpck_require__.o=((e,t)=>Object.prototype.hasOwnProperty.call(e,t))})();(()=>{__nccwpck_require__.r=(e=>{if(typeof Symbol!=="undefined"&&Symbol.toStringTag){Object.defineProperty(e,Symbol.toStringTag,{value:"Module"})}Object.defineProperty(e,"__esModule",{value:true})})})();__nccwpck_require__.ab=__dirname+"/";return __nccwpck_require__(343)})(); \ No newline at end of file diff --git a/packages/next/compiled/zen-observable/package.json b/packages/next/compiled/zen-observable/package.json new file mode 100644 index 000000000000000..13ba577d97d99aa --- /dev/null +++ b/packages/next/compiled/zen-observable/package.json @@ -0,0 +1 @@ +{"name":"zen-observable","main":"esm.js","license":"MIT"} diff --git a/packages/next/export/worker.ts b/packages/next/export/worker.ts index 10fb4dbee8ce039..1dafceff2189cbb 100644 --- a/packages/next/export/worker.ts +++ b/packages/next/export/worker.ts @@ -3,6 +3,7 @@ import { extname, join, dirname, sep } from 'path' import { renderToHTML } from '../server/render' import { promises } from 'fs' import AmpHtmlValidator from 'next/dist/compiled/amphtml-validator' +import Observable from 'next/dist/compiled/zen-observable' import { loadComponents } from '../server/load-components' import { isDynamicRoute } from '../shared/lib/router/utils/is-dynamic' import { getRouteMatcher } from '../shared/lib/router/utils/route-matcher' @@ -18,7 +19,7 @@ import { FontManifest } from '../server/font-utils' import { normalizeLocalePath } from '../shared/lib/i18n/normalize-locale-path' import { trace } from '../telemetry/trace' import { isInAmpMode } from '../shared/lib/amp' -import { resultFromChunks, resultToChunks } from '../server/utils' +import { resultsToString } from '../server/utils' import { NextConfigComplete } from '../server/config-shared' import { setHttpAgentOptions } from '../server/config' @@ -274,7 +275,7 @@ export default async function exportPage({ // if it was auto-exported the HTML is loaded here if (typeof mod === 'string') { - renderResult = resultFromChunks([mod]) + renderResult = Observable.of(mod) queryWithAutoExportWarn() } else { // for non-dynamic SSG pages we should have already @@ -352,7 +353,7 @@ export default async function exportPage({ } if (typeof components.Component === 'string') { - renderResult = resultFromChunks([components.Component]) + renderResult = Observable.of(components.Component) queryWithAutoExportWarn() } else { /** @@ -417,8 +418,7 @@ export default async function exportPage({ } } - const htmlChunks = renderResult ? await resultToChunks(renderResult) : [] - const html = htmlChunks.join('') + const html = renderResult ? await resultsToString([renderResult]) : '' if (inAmpMode && !curRenderOpts.ampSkipValidation) { if (!results.ssgNotFound) { await validateAmp(html, path, curRenderOpts.ampValidatorPath) @@ -460,8 +460,9 @@ export default async function exportPage({ ) } - const ampChunks = await resultToChunks(ampRenderResult) - const ampHtml = ampChunks.join('') + const ampHtml = ampRenderResult + ? await resultsToString([ampRenderResult]) + : '' if (!curRenderOpts.ampSkipValidation) { await validateAmp(ampHtml, page + '?amp=1') } diff --git a/packages/next/package.json b/packages/next/package.json index 68ed590bd308daa..4559184fe8b25d3 100644 --- a/packages/next/package.json +++ b/packages/next/package.json @@ -185,6 +185,7 @@ "@types/text-table": "0.2.1", "@types/webpack": "5.28.0", "@types/webpack-sources": "0.1.5", + "@types/zen-observable": "0.8.3", "@vercel/ncc": "0.27.0", "@vercel/nft": "0.12.2", "amphtml-validator": "1.0.33", @@ -248,7 +249,8 @@ "unistore": "3.4.1", "web-vitals": "2.1.0", "webpack": "4.44.1", - "webpack-sources": "1.4.3" + "webpack-sources": "1.4.3", + "zen-observable": "0.8.15" }, "engines": { "node": ">=12.0.0" diff --git a/packages/next/server/next-server.ts b/packages/next/server/next-server.ts index 1943ae65a27acbb..63e280f2375834d 100644 --- a/packages/next/server/next-server.ts +++ b/packages/next/server/next-server.ts @@ -10,6 +10,7 @@ import { ParsedUrlQuery, } from 'querystring' import { format as formatUrl, parse as parseUrl, UrlWithParsedQuery } from 'url' +import Observable from 'next/dist/compiled/zen-observable' import { PrerenderManifest } from '../build' import { getRedirectStatus, @@ -76,12 +77,7 @@ import { sendRenderResult, setRevalidateHeaders } from './send-payload' import { serveStatic } from './serve-static' import { IncrementalCache } from './incremental-cache' import { execOnce } from '../shared/lib/utils' -import { - isBlockedPage, - RenderResult, - resultFromChunks, - resultToChunks, -} from './utils' +import { isBlockedPage, RenderResult, resultsToString } from './utils' import { loadEnvConfig } from '@next/env' import './node-polyfill-fetch' import { PagesManifest } from '../build/webpack/plugins/pages-manifest-plugin' @@ -1267,7 +1263,7 @@ export default class Server { req, res, resultOrPayload: requireStaticHTML - ? (await resultToChunks(body)).join('') + ? await resultsToString([body]) : body, type, generateEtags, @@ -1296,8 +1292,7 @@ export default class Server { if (payload === null) { return null } - const chunks = await resultToChunks(payload.body) - return chunks.join('') + return resultsToString([payload.body]) } public async render( @@ -1472,7 +1467,7 @@ export default class Server { return { type: 'html', // TODO: Static pages should be written as chunks - body: resultFromChunks([components.Component]), + body: Observable.of(components.Component), } } @@ -1751,7 +1746,7 @@ export default class Server { return { value: { kind: 'PAGE', - html: resultFromChunks([html]), + html: Observable.of(html), pageData: {}, }, } @@ -1830,7 +1825,7 @@ export default class Server { if (isDataReq) { return { type: 'json', - body: resultFromChunks([JSON.stringify(cachedData.props)]), + body: Observable.of(JSON.stringify(cachedData.props)), revalidateOptions, } } else { @@ -1841,7 +1836,7 @@ export default class Server { return { type: isDataReq ? 'json' : 'html', body: isDataReq - ? resultFromChunks([JSON.stringify(cachedData.pageData)]) + ? Observable.of(JSON.stringify(cachedData.pageData)) : cachedData.html, revalidateOptions, } @@ -2076,7 +2071,7 @@ export default class Server { } return { type: 'html', - body: resultFromChunks(['Internal Server Error']), + body: Observable.of('Internal Server Error'), } } } diff --git a/packages/next/server/render.tsx b/packages/next/server/render.tsx index a94a5104d964647..762a18b7c1c0a4d 100644 --- a/packages/next/server/render.tsx +++ b/packages/next/server/render.tsx @@ -3,6 +3,7 @@ import { ParsedUrlQuery } from 'querystring' import { PassThrough } from 'stream' import React from 'react' import * as ReactDOMServer from 'react-dom/server' +import Observable from 'next/dist/compiled/zen-observable' import { warn } from '../build/output/log' import { UnwrapPromise } from '../lib/coalesced-function' import { @@ -63,12 +64,7 @@ import { Redirect, } from '../lib/load-custom-routes' import { DomainLocale } from './config' -import { - Observer, - RenderResult, - resultFromChunks, - resultToChunks, -} from './utils' +import { mergeResults, RenderResult, resultsToString } from './utils' function noRouter() { const message = @@ -966,7 +962,7 @@ export async function renderToHTML( // Avoid rendering page un-necessarily for getServerSideProps data request // and getServerSideProps/getStaticProps redirects if ((isDataReq && !isSSG) || (renderOpts as any).isRedirect) { - return resultFromChunks([JSON.stringify(props)]) + return Observable.of(JSON.stringify(props)) } // We don't call getStaticProps or getServerSideProps while generating @@ -1013,19 +1009,22 @@ export async function renderToHTML( const doResolve = () => { if (!resolved) { resolved = true - resolve(({ complete, next }) => { - stream.on('data', (chunk) => { - next(chunk.toString('utf-8')) - }) - stream.once('end', () => { - complete() - }) - startWriting() - return () => { - abort() - } - }) + resolve( + new Observable((observer) => { + stream.on('data', (chunk) => { + observer.next(chunk.toString('utf-8')) + }) + stream.once('end', () => { + observer.complete() + }) + + startWriting() + return () => { + abort() + } + }) + ) } } @@ -1188,18 +1187,18 @@ export async function renderToHTML( let results: Array = [] const renderTargetIdx = documentHTML.indexOf(BODY_RENDER_TARGET) results.push( - resultFromChunks([ - '' + documentHTML.substring(0, renderTargetIdx), - ]) + Observable.of( + '' + documentHTML.substring(0, renderTargetIdx) + ) ) if (inAmpMode) { - results.push(resultFromChunks([''])) + results.push(Observable.of('')) } - results.push(resultFromChunks([docProps.html])) + results.push(Observable.of(docProps.html)) results.push( - resultFromChunks([ - documentHTML.substring(renderTargetIdx + BODY_RENDER_TARGET.length), - ]) + Observable.of( + documentHTML.substring(renderTargetIdx + BODY_RENDER_TARGET.length) + ) ) const postProcessors: Array<((html: string) => Promise) | null> = [ @@ -1254,68 +1253,20 @@ export async function renderToHTML( html = await postProcessor(html) } } - results = [resultFromChunks([html])] + results = [Observable.of(html)] } return mergeResults(results) } -async function resultsToString(chunks: Array): Promise { - const result = await resultToChunks(mergeResults(chunks)) - return result.join('') -} - -function mergeResults(chunks: Array): RenderResult { - return ({ next, complete, error }) => { - let idx = 0 - let canceled = false - let unsubscribe = () => {} - - const subscribeNext = () => { - if (canceled) { - return - } - - if (idx < chunks.length) { - const result = chunks[idx++] - unsubscribe = result({ - next, - complete() { - unsubscribe() - subscribeNext() - }, - error(err) { - unsubscribe() - if (!canceled) { - canceled = true - error(err) - } - }, - }) - } else { - if (!canceled) { - canceled = true - complete() - } - } - } - subscribeNext() - - return () => { - if (!canceled) { - canceled = true - unsubscribe() - } - } - } -} - function multiplexResult(result: RenderResult): RenderResult { const chunks: Array = [] - const subscribers: Set> = new Set() - let terminator: ((subscriber: Observer) => void) | null = null + const subscribers: Set> = new Set() + let terminator: + | ((subscriber: ZenObservable.SubscriptionObserver) => void) + | null = null - result({ + result.subscribe({ next(chunk) { chunks.push(chunk) subscribers.forEach((subscriber) => subscriber.next(chunk)) @@ -1336,59 +1287,24 @@ function multiplexResult(result: RenderResult): RenderResult { }, }) - return (innerSubscriber) => { - let completed = false - let cleanup = () => {} - const subscriber: Observer = { - next(chunk) { - if (!completed) { - try { - innerSubscriber.next(chunk) - } catch (err) { - subscriber.error(err) - } - } - }, - complete() { - if (!completed) { - cleanup() - try { - innerSubscriber.complete() - } catch {} - } - }, - error(err) { - if (!completed) { - cleanup() - try { - innerSubscriber.error(err) - } catch {} - } - }, - } - cleanup = () => { - completed = true - subscribers.delete(subscriber) + return new Observable((observer) => { + for (const chunk of chunks) { + if (observer.closed) { + return + } + observer.next(chunk) } - process.nextTick(() => { - for (const chunk of chunks) { - if (completed) { - return - } - subscriber.next(chunk) - } + if (terminator) { + terminator(observer) + return + } - if (!completed) { - if (!terminator) { - subscribers.add(subscriber) - } else { - terminator(subscriber) - } - } - }) - return () => cleanup() - } + subscribers.add(observer) + return () => { + subscribers.delete(observer) + } + }) } function errorToJSON(err: Error): Error { diff --git a/packages/next/server/response-cache.ts b/packages/next/server/response-cache.ts index 50442d26efe8ecd..2cc37e1d89528d2 100644 --- a/packages/next/server/response-cache.ts +++ b/packages/next/server/response-cache.ts @@ -1,5 +1,6 @@ +import Observable from 'next/dist/compiled/zen-observable' import { IncrementalCache } from './incremental-cache' -import { RenderResult, resultFromChunks, resultToChunks } from './utils' +import { RenderResult, resultsToString } from './utils' interface CachedRedirectValue { kind: 'REDIRECT' @@ -78,7 +79,7 @@ export default class ResponseCache { cachedResponse.value?.kind === 'PAGE' ? { kind: 'PAGE', - html: resultFromChunks([cachedResponse.value.html]), + html: Observable.of(cachedResponse.value.html), pageData: cachedResponse.value.pageData, } : cachedResponse.value, @@ -99,7 +100,7 @@ export default class ResponseCache { cacheEntry.value?.kind === 'PAGE' ? { kind: 'PAGE', - html: (await resultToChunks(cacheEntry.value.html)).join(''), + html: await resultsToString([cacheEntry.value.html]), pageData: cacheEntry.value.pageData, } : cacheEntry.value, diff --git a/packages/next/server/send-payload.ts b/packages/next/server/send-payload.ts index 894fad848b5d210..8686f4639a94e37 100644 --- a/packages/next/server/send-payload.ts +++ b/packages/next/server/send-payload.ts @@ -58,7 +58,7 @@ export function sendPayload( }) } -export function sendRenderResult({ +export async function sendRenderResult({ req, res, resultOrPayload, @@ -74,7 +74,7 @@ export function sendRenderResult({ generateEtags: boolean poweredByHeader: boolean options?: PayloadOptions -}): void { +}): Promise { if (isResSent(res)) { return } @@ -117,11 +117,15 @@ export function sendRenderResult({ } else if (isPayload) { res.end(resultOrPayload as string) } else { - ;(resultOrPayload as RenderResult)({ - next: (chunk) => res.write(chunk), - error: (_) => res.end(), - complete: () => res.end(), + const maybeFlush = + typeof (res as any).flush === 'function' + ? () => (res as any).flush() + : () => {} + await (resultOrPayload as RenderResult).forEach((chunk) => { + res.write(chunk) + maybeFlush() }) + res.end() } } diff --git a/packages/next/server/utils.ts b/packages/next/server/utils.ts index 1334915ffc56232..9de3c0080534354 100644 --- a/packages/next/server/utils.ts +++ b/packages/next/server/utils.ts @@ -1,3 +1,4 @@ +import Observable from 'next/dist/compiled/zen-observable' import { BLOCKED_PAGES } from '../shared/lib/constants' export function isBlockedPage(pathname: string): boolean { @@ -15,53 +16,19 @@ export function cleanAmpPath(pathname: string): string { return pathname } -export type Disposable = () => void -export type Observer = { - next(chunk: T): void - error(error: Error): void - complete(): void -} -export type RenderResult = (observer: Observer) => Disposable - -export function resultFromChunks(chunks: string[]): RenderResult { - return ({ next, complete, error }) => { - let canceled = false - process.nextTick(() => { - try { - for (const chunk of chunks) { - if (canceled) { - return - } - next(chunk) - } - } catch (err) { - if (!canceled) { - canceled = true - error(err) - } - } +export type RenderResult = Observable - if (!canceled) { - canceled = true - complete() - } - }) - - return () => { - canceled = true - } - } +export function mergeResults(results: Array): RenderResult { + // @ts-ignore + return Observable.prototype.concat.call(...results) } -export function resultToChunks(result: RenderResult): Promise { - return new Promise((resolve, reject) => { - const chunks: string[] = [] - result({ - next: (chunk) => { - chunks.push(chunk) - }, - error: (error) => reject(error), - complete: () => resolve(chunks), - }) +export async function resultsToString( + results: Array +): Promise { + const chunks: string[] = [] + await mergeResults(results).forEach((chunk: string) => { + chunks.push(chunk) }) + return chunks.join('') } diff --git a/packages/next/taskfile.js b/packages/next/taskfile.js index dd4c1bb9636b456..5d1d84572ad0a1f 100644 --- a/packages/next/taskfile.js +++ b/packages/next/taskfile.js @@ -641,6 +641,16 @@ export async function ncc_web_vitals(task, opts) { .target('compiled/web-vitals') } // eslint-disable-next-line camelcase +externals['zen-observable'] = 'next/dist/compiled/zen-observable' +export async function ncc_zen_observable(task, opts) { + await task + .source( + opts.src || relative(__dirname, require.resolve('zen-observable/esm')) + ) + .ncc({ packageName: 'zen-observable', externals }) + .target('compiled/zen-observable') +} +// eslint-disable-next-line camelcase externals['webpack-sources'] = 'next/dist/compiled/webpack-sources' export async function ncc_webpack_sources(task, opts) { await task @@ -863,6 +873,7 @@ export async function ncc(task, opts) { 'ncc_text_table', 'ncc_unistore', 'ncc_web_vitals', + 'ncc_zen_observable', 'ncc_webpack_bundle4', 'ncc_webpack_bundle5', 'ncc_webpack_bundle_packages', diff --git a/packages/next/types/misc.d.ts b/packages/next/types/misc.d.ts index 877c48d4294eadd..646f729ae822558 100644 --- a/packages/next/types/misc.d.ts +++ b/packages/next/types/misc.d.ts @@ -223,6 +223,10 @@ declare module 'next/dist/compiled/web-vitals' { import m from 'web-vitals' export = m } +declare module 'next/dist/compiled/zen-observable' { + import m from 'zen-observable' + export = m +} declare module 'next/dist/compiled/comment-json' { import m from 'comment-json' diff --git a/yarn.lock b/yarn.lock index 9426440e9b41682..34d8a99571d4c37 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4919,6 +4919,11 @@ dependencies: "@types/yargs-parser" "*" +"@types/zen-observable@0.8.3": + version "0.8.3" + resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.3.tgz#781d360c282436494b32fe7d9f7f8e64b3118aa3" + integrity sha512-fbF6oTd4sGGy0xjHPKAt+eS2CrxJ3+6gQ3FGcBoIJR2TLAyCkCyI8JqZNy+FeON0AhVgNJoUumVoZQjBFUqHkw== + "@typescript-eslint/eslint-plugin@4.29.1": version "4.29.1" resolved "https://registry.yarnpkg.com/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.29.1.tgz#808d206e2278e809292b5de752a91105da85860b" @@ -20610,6 +20615,11 @@ yocto-queue@^0.1.0: resolved "https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b" integrity sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q== +zen-observable@0.8.15: + version "0.8.15" + resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.8.15.tgz#96415c512d8e3ffd920afd3889604e30b9eaac15" + integrity sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ== + zwitch@^1.0.0: version "1.0.5" resolved "https://registry.yarnpkg.com/zwitch/-/zwitch-1.0.5.tgz#d11d7381ffed16b742f6af7b3f223d5cd9fe9920"