Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: using structuredClone with ReadableStream prevents process from exiting #44985

Closed
KhafraDev opened this issue Oct 13, 2022 · 11 comments · Fixed by #51255 or #51526
Closed

bug: using structuredClone with ReadableStream prevents process from exiting #44985

KhafraDev opened this issue Oct 13, 2022 · 11 comments · Fixed by #51255 or #51526

Comments

@KhafraDev
Copy link
Member

KhafraDev commented Oct 13, 2022

Version

v18.10.0

Platform

Microsoft Windows NT 10.0.19043.0 x64

Subsystem

No response

What steps will reproduce the bug?

const rs = new ReadableStream({
  start (controller) {
    controller.enqueue(new Uint8Array([65]))
    controller.close()
  }
})

const cloned = structuredClone(rs, { transfer: [rs] })

After this the process will indefinitely stay open.

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

the process should exit

What do you see instead?

the process stays open indefinitely

Additional information

Exporting readableStreamTee would also solve my use case.

function readableStreamTee(stream, cloneForBranch2) {

@KhafraDev
Copy link
Member Author

A workaround is to tee the cloned stream.

@ronag
Copy link
Member

ronag commented Oct 14, 2022

@jasnell @addaleax

@ronag
Copy link
Member

ronag commented Oct 14, 2022

Ref: nodejs/undici#1700

@KhafraDev
Copy link
Member Author

It looks like 2 ports aren't unref'd

[
  MessagePort [EventTarget] {
    active: true,
    refed: true,
    [Symbol(kEvents)]: SafeMap(4) [Map] {        
      'newListener' => [Object],
      'removeListener' => [Object],
      'message' => [Object],
      'messageerror' => [Object]
    },
    [Symbol(events.maxEventTargetListeners)]: 10,
    [Symbol(events.maxEventTargetListenersWarned)]: false,
    [Symbol(kNewListener)]: [Function (anonymous)],
    [Symbol(kRemoveListener)]: [Function (anonymous)],
    [Symbol(nodejs.internal.kCurrentlyReceivingPorts)]: undefined,
    [Symbol(khandlers)]: SafeMap(2) [Map] {
      'message' => [Function],
      'messageerror' => [Function]
    }
  },
  MessagePort [EventTarget] {
    active: true,
    refed: true,
    [Symbol(kEvents)]: SafeMap(4) [Map] {
      'newListener' => [Object],
      'removeListener' => [Object],
      'message' => [Object],
      'messageerror' => [Object]
    },
    [Symbol(events.maxEventTargetListeners)]: 10,
    [Symbol(events.maxEventTargetListenersWarned)]: false,
    [Symbol(kNewListener)]: [Function (anonymous)],
    [Symbol(kRemoveListener)]: [Function (anonymous)],
    [Symbol(nodejs.internal.kCurrentlyReceivingPorts)]: undefined,
    [Symbol(khandlers)]: SafeMap(2) [Map] {
      'message' => [Function],
      'messageerror' => [Function]
    }
  }
]

which from my best guess are these:

this[kState].transfer.port1 = port1;
this[kState].transfer.port2 = port2;

@ronag
Copy link
Member

ronag commented Dec 10, 2023

@joyeecheung

@tsctx
Copy link
Member

tsctx commented Dec 10, 2023

@ronag
I have been able to solve this problem.

const { kTransfer } = require("internal/worker/js_transferable");

const { readableStreamPipeTo } = require("internal/webstreams/readablestream");

const { setPromiseHandled, kState } = require("internal/webstreams/util");

const {
  CrossRealmTransformWritableSink,
} = require("internal/webstreams/transfer");

function newCrossRealmWritableSink(readable, port) {
  const source = new CrossRealmTransformWritableSink(port);
  const writable = new WritableStream(source);
  const promise = readableStreamPipeTo(readable, writable, false, false, false);
  setPromiseHandled(promise);
  return {
    writable,
    source,
    promise,
  };
}

const registry = new FinalizationRegistry(({ source }) => {
  source.close();
});

ReadableStream.prototype[kTransfer] = function () {
  if (this.locked) {
    this[kState].transfer.port1?.close();
    this[kState].transfer.port1 = undefined;
    this[kState].transfer.port2 = undefined;
    throw new DOMException(
      "Cannot transfer a locked ReadableStream",
      "DataCloneError"
    );
  }

  const { port1, port2 } = this[kState].transfer;
  this[kState].transfer.port2 = undefined;

  const { writable, promise, source } = newCrossRealmWritableSink(this, port1);

  this[kState].transfer.writable = writable;
  this[kState].transfer.promise = promise;

  registry.register(port2, { source });

  return {
    data: { port: port2 },
    deserializeInfo:
      "internal/webstreams/readablestream:TransferredReadableStream",
  };
};

const stack = [
  new Uint8Array([65]),
  new Uint8Array([65]),
  new Uint8Array([65]),
];

const rs = new ReadableStream({
  pull(controller) {
    const data = stack.shift();
    if (data) {
      controller.enqueue(data);
    } else {
      controller.close();
    }
  },
});

const cloned = structuredClone(rs, { transfer: [rs] });

@tsctx
Copy link
Member

tsctx commented Dec 10, 2023

@ronag
This problem occurs not only with ReadableStream but also with WritableStream.
I am stuck on this point.
Could you please work on this instead of me?

@ronag
Copy link
Member

ronag commented Dec 10, 2023

Sorry. Webstreams are beyond my expertise and frankly my interest...

@tsctx
Copy link
Member

tsctx commented Dec 10, 2023

I apologize for the inconvenience that I have caused you.

@ronag
Copy link
Member

ronag commented Dec 10, 2023

No worries. No inconvenience at all. 🤗

jasnell added a commit to jasnell/node that referenced this issue Dec 22, 2023
When cloning a `ReadableStream` and `WritableStream`, both use an
internal `MessageChannel` to communicate with the original stream.
Those, however, previously were not unref'd which would lead to the
process not exiting if the stream was not fully consumed.

Fixes: nodejs#44985
jasnell added a commit to jasnell/node that referenced this issue Dec 22, 2023
When cloning a `ReadableStream` and `WritableStream`, both use an
internal `MessageChannel` to communicate with the original stream.
Those, however, previously were not unref'd which would lead to the
process not exiting if the stream was not fully consumed.

Fixes: nodejs#44985
lutzroeder added a commit to lutzroeder/netron that referenced this issue Dec 24, 2023
nodejs-github-bot pushed a commit that referenced this issue Dec 24, 2023
When cloning a `ReadableStream` and `WritableStream`, both use an
internal `MessageChannel` to communicate with the original stream.
Those, however, previously were not unref'd which would lead to the
process not exiting if the stream was not fully consumed.

Fixes: #44985
PR-URL: #51255
Reviewed-By: Matthew Aitken <maitken033380023@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
RafaelGSS pushed a commit that referenced this issue Jan 2, 2024
When cloning a `ReadableStream` and `WritableStream`, both use an
internal `MessageChannel` to communicate with the original stream.
Those, however, previously were not unref'd which would lead to the
process not exiting if the stream was not fully consumed.

Fixes: #44985
PR-URL: #51255
Reviewed-By: Matthew Aitken <maitken033380023@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Debadree Chatterjee <debadree333@gmail.com>
@mcollina mcollina reopened this Jan 19, 2024
@mcollina
Copy link
Member

Reopening after #51491

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants