Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flight][Fizz] schedule flushing independently from performing work #28900

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 13 additions & 5 deletions packages/react-dom/src/__tests__/ReactDOMFizzServer-test.js
Expand Up @@ -3703,14 +3703,16 @@ describe('ReactDOMFizzServer', () => {
});

// https://github.com/facebook/react/issues/27540
// This test is not actually asserting much because there is possibly a bug in the closeing logic for the
// Node implementation of Fizz. The close leads to an abort which sets the destination to null before the Float
// method has an opportunity to schedule a write. We should fix this probably and once we do this test will start
// to fail if the underyling issue of writing after stream completion isn't fixed
it('does not try to write to the stream after it has been closed', async () => {
let resolve;
const promise = new Promise(res => {
resolve = res;
});
async function preloadLate() {
await 1;
ReactDOM.preconnect('foo');
await promise;
ReactDOM.preconnect('bar');
}

function Preload() {
Expand All @@ -3732,9 +3734,15 @@ describe('ReactDOMFizzServer', () => {
renderToPipeableStream(<App />).pipe(writable);
});

await act(() => {
resolve();
});

expect(getVisibleChildren(document)).toEqual(
<html>
<head />
<head>
<link rel="preconnect" href="foo" />
</head>
Comment on lines +3743 to +3745
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This demonstrates a small semantic difference in this PR. the foo preconnect resolves in a microtask which now runs before the flush. I updated the test to perform an additional preconnect new a new macrotask and it demonstrates that there is no attempted write to the stream after the render has completed.

<body>
<main>hello</main>
</body>
Expand Down
53 changes: 37 additions & 16 deletions packages/react-server/src/ReactFizzServer.js
Expand Up @@ -296,9 +296,14 @@ const OPEN = 0;
const CLOSING = 1;
const CLOSED = 2;

type ScheduleState = 10 | 11 | 12;
const IDLE = 10;
const WORK = 11;
const FLUSH = 12;

export opaque type Request = {
destination: null | Destination,
flushScheduled: boolean,
schedule: ScheduleState,
+resumableState: ResumableState,
+renderState: RenderState,
+rootFormatContext: FormatContext,
Expand Down Expand Up @@ -380,7 +385,7 @@ export function createRequest(
const abortSet: Set<Task> = new Set();
const request: Request = {
destination: null,
flushScheduled: false,
schedule: IDLE,
resumableState,
renderState,
rootFormatContext,
Expand Down Expand Up @@ -492,7 +497,7 @@ export function resumeRequest(
const abortSet: Set<Task> = new Set();
const request: Request = {
destination: null,
flushScheduled: false,
schedule: IDLE,
resumableState: postponedState.resumableState,
renderState,
rootFormatContext: postponedState.rootFormatContext,
Expand Down Expand Up @@ -593,9 +598,8 @@ export function resolveRequest(): null | Request {
function pingTask(request: Request, task: Task): void {
const pingedTasks = request.pingedTasks;
pingedTasks.push(task);
if (request.pingedTasks.length === 1) {
request.flushScheduled = request.destination !== null;
scheduleWork(() => performWork(request));
if (pingedTasks.length === 1) {
startPerformingWork(request);
}
}

Expand Down Expand Up @@ -3815,9 +3819,6 @@ export function performWork(request: Request): void {
retryTask(request, task);
}
pingedTasks.splice(0, i);
if (request.destination !== null) {
flushCompletedQueues(request, request.destination);
}
} catch (error) {
const errorInfo: ThrownInfo = {};
logRecoverableError(request, error, errorInfo);
Expand Down Expand Up @@ -4279,7 +4280,6 @@ function flushCompletedQueues(
// We don't need to check any partially completed segments because
// either they have pending task or they're complete.
) {
request.flushScheduled = false;
// We write the trailing tags but only if don't have any data to resume.
// If we need to resume we'll write the postamble in the resume instead.
if (!enablePostpone || request.trackedPostpones === null) {
Expand All @@ -4306,13 +4306,28 @@ function flushCompletedQueues(
}
}

export function startWork(request: Request): void {
request.flushScheduled = request.destination !== null;
function flushWork(request: Request) {
request.schedule = IDLE;
const destination = request.destination;
if (destination) {
flushCompletedQueues(request, destination);
}
}

function startPerformingWork(request: Request): void {
request.schedule = WORK;
if (supportsRequestStorage) {
scheduleWork(() => requestStorage.run(request, performWork, request));
} else {
scheduleWork(() => performWork(request));
}
scheduleWork(() => {
flushWork(request);
});
}

export function startWork(request: Request): void {
startPerformingWork(request);
if (request.trackedPostpones === null) {
// this is either a regular render or a resume. For regular render we want
// to call emitEarlyPreloads after the first performWork because we want
Expand Down Expand Up @@ -4344,22 +4359,28 @@ function enqueueEarlyPreloadsAfterInitialWork(request: Request) {

function enqueueFlush(request: Request): void {
if (
request.flushScheduled === false &&
request.schedule === IDLE &&
// If there are pinged tasks we are going to flush anyway after work completes
request.pingedTasks.length === 0 &&
// If there is no destination there is nothing we can flush to. A flush will
// happen when we start flowing again
request.destination !== null
) {
request.flushScheduled = true;
request.schedule = FLUSH;
scheduleWork(() => {
if (request.schedule !== FLUSH) {
// We already flushed or we started a new render and will let that finish first
// which will end up flushing so we have nothing to do here.
return;
}

request.schedule = IDLE;

// We need to existence check destination again here because it might go away
// in between the enqueueFlush call and the work execution
const destination = request.destination;
if (destination) {
flushCompletedQueues(request, destination);
} else {
request.flushScheduled = false;
}
});
}
Expand Down
57 changes: 43 additions & 14 deletions packages/react-server/src/ReactFlightServer.js
Expand Up @@ -279,9 +279,14 @@ type Task = {

interface Reference {}

type ScheduleState = 10 | 11 | 12;
const IDLE = 10;
const WORK = 11;
const FLUSH = 12;

export type Request = {
status: 0 | 1 | 2,
flushScheduled: boolean,
schedule: ScheduleState,
fatalError: mixed,
destination: null | Destination,
bundlerConfig: ClientManifest,
Expand Down Expand Up @@ -378,7 +383,7 @@ export function createRequest(
const hints = createHints();
const request: Request = ({
status: OPEN,
flushScheduled: false,
schedule: IDLE,
fatalError: null,
destination: null,
bundlerConfig,
Expand Down Expand Up @@ -1236,8 +1241,7 @@ function pingTask(request: Request, task: Task): void {
const pingedTasks = request.pingedTasks;
pingedTasks.push(task);
if (pingedTasks.length === 1) {
request.flushScheduled = request.destination !== null;
scheduleWork(() => performWork(request));
startPerformingWork(request);
}
}

Expand Down Expand Up @@ -3003,9 +3007,6 @@ function performWork(request: Request): void {
const task = pingedTasks[i];
retryTask(request, task);
}
if (request.destination !== null) {
flushCompletedChunks(request, request.destination);
}
} catch (error) {
logRecoverableError(request, error);
fatalError(request, error);
Expand Down Expand Up @@ -3093,7 +3094,6 @@ function flushCompletedChunks(
}
errorChunks.splice(0, i);
} finally {
request.flushScheduled = false;
completeWriting(destination);
}
flushBuffered(destination);
Expand All @@ -3107,27 +3107,56 @@ function flushCompletedChunks(
}
}

export function startWork(request: Request): void {
request.flushScheduled = request.destination !== null;
function flushWork(request: Request) {
request.schedule = IDLE;
const destination = request.destination;
if (destination) {
flushCompletedChunks(request, destination);
}
}

function startPerformingWork(request: Request): void {
request.schedule = WORK;
if (supportsRequestStorage) {
scheduleWork(() => requestStorage.run(request, performWork, request));
} else {
scheduleWork(() => performWork(request));
}
scheduleWork(() => {
flushWork(request);
});
}

export function startWork(request: Request): void {
startPerformingWork(request);
}

function enqueueFlush(request: Request): void {
if (
request.flushScheduled === false &&
request.schedule === IDLE &&
// If there are pinged tasks we are going to flush anyway after work completes
request.pingedTasks.length === 0 &&
// If there is no destination there is nothing we can flush to. A flush will
// happen when we start flowing again
request.destination !== null
) {
const destination = request.destination;
request.flushScheduled = true;
scheduleWork(() => flushCompletedChunks(request, destination));
request.schedule = FLUSH;
scheduleWork(() => {
if (request.schedule !== FLUSH) {
// We already flushed or we started a new render and will let that finish first
// which will end up flushing so we have nothing to do here.
return;
}

request.schedule = IDLE;

// We need to existence check destination again here because it might go away
// in between the enqueueFlush call and the work execution
const destination = request.destination;
if (destination) {
flushCompletedChunks(request, destination);
}
});
}
}

Expand Down