Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous execution of disposer #7230

Open
ronag opened this issue Mar 23, 2023 · 6 comments
Open

Synchronous execution of disposer #7230

ronag opened this issue Mar 23, 2023 · 6 comments

Comments

@ronag
Copy link

ronag commented Mar 23, 2023

Describe the bug

We implement an operator called combineMap which is a more efficient variant of the following pattern:

values$.pipe(rxjs.switchMap(values => value.length > 0 ? rxjs.combineMap(values.map(selector)) : rxjs.of([]) 

Instead, we can do the following which will re-use the results and subscriptions for unchanged values:

values$.pipe(combineMap(selector))

This works fine most of the time, but sometimes in production we get the following error:

    TypeError: Cannot read properties of null (reading 'unsubscribe')
        at /home/jesper/nxtedition/nxt/asset/node_modules/@nxtedition/lib/rxjs/combineMap.js:136:28
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:172:9)
        at Subscription.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:89:29)
        at Subscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscriber.js:75:42)
        at OperatorSubscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/operators/OperatorSubscriber.js:72:42)
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:175:19)
        at Subscription.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:89:29)
        at Subscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscriber.js:75:42)
        at OperatorSubscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/operators/OperatorSubscriber.js:72:42)
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:175:19)

Give the following code:

const EMPTY = Object.freeze([])

function combineMap(project, equals = (a, b) => a === b) {
  const self = this
  return new rxjs.Observable((o) => {
    let curr = EMPTY
    let scheduled = false
    let dirty = false
    let active = 0
    let empty = 0

    const _error = (err) => o.error(err)

    function _update() {
      scheduled = false

      if (empty) {
        return
      }

      if (dirty) {
        dirty = false
        o.next(curr.map((context) => context.value))
      }

      if (!active) {
        o.complete()
      }
    }

    function update() {
      if (!scheduled) {
        scheduled = true
        queueMicrotask(_update)
      }
    }

    active += 1
    const subscription = self.subscribe({
      next(keys) {
        if (!Array.isArray(keys)) {
          keys = EMPTY
        }
        // TODO (perf): Avoid array allocation & copy if nothing has updated.

        const prev = curr
        curr = new Array(keys.length)

        const prevLen = prev.length
        const currLen = curr.length

        if (currLen !== prevLen || prev === EMPTY) {
          dirty = true
          update()
        }

        for (let n = 0; n < currLen; ++n) {
          const key = keys[n]

          if (n < prevLen && prev[n] && equals(prev[n].key, key)) {
            curr[n] = prev[n]
            prev[n] = null
            continue
          }

          dirty = true
          update()

          // TODO (perf): Guess start index based on n, e.g. n - 1 and n + 1 to check if
          // a key has simply been added or removed.
          const i = prev.findIndex((entry) => entry && equals(entry.key, key))

          if (i !== -1) {
            curr[n] = prev[i]
            prev[i] = null
          } else {
            const entry = (curr[n] = {
              key,
              value: EMPTY,
              subscription: null,
            })

            let observable
            try {
              observable = rxjs.from(project(keys[n]))
            } catch (err) {
              observable = rxjs.throwError(() => err)
            }

            empty += 1
            active += 1

            const subscription = observable.subscribe({
              next(value) {
                if (entry.value === EMPTY) {
                  empty -= 1
                }

                entry.value = value

                dirty = true
                update()
              },
              error: _error,
            })
            // ** outer disposer runs before this assignment is done
            entry.subscription = subscription
            entry.subscription.add(() => {
              if (entry.value === EMPTY) {
                empty -= 1
              }

              active -= 1

              dirty = true
              update()
            })
          }
        }

        // TODO (perf): start from index where prev[n] is not null.
        for (let n = 0; n < prevLen; n++) {
          // ** this will crash since subscription is null
          prev[n]?.subscription.unsubscribe()
        }
      },
      error: _error,
      complete() {
        active -= 1
        if (!active) {
          update()
        }
      },
    })

    return () => {
      for (const entry of curr) {
        entry?.subscription.unsubscribe()
      }
      subscription.unsubscribe()
    }
  })
}

For some reason the disposer:

    return () => {
      for (const entry of curr) {
        entry?.subscription.unsubscribe()
      }
      subscription.unsubscribe()
    }

Runs before subscription has been assigned above:

entry.subscription = subscription

I have no idea how this can occur and have been unable to reproduce it.

Expected behavior

The outer disposer does not run concurrently with the inner subscription's next invocation.

Reproduction code

Unable to reproduce outside of production code.

Reproduction URL

No response

Version

7.8.0

Environment

No response

Additional context

No response

@voliva
Copy link
Contributor

voliva commented Mar 23, 2023

My suspicion is that this happens after a an inner observable throws an error syncrhonously, or the project function also throws an error.

All of this code happens synchronously:

            const entry = (curr[n] = {
              key,
              value: EMPTY,
              subscription: null,
            })

            let observable
            try {
              observable = rxjs.from(project(keys[n]))
            } catch (err) {
              observable = rxjs.throwError(() => err)
            }

            empty += 1
            active += 1

            const subscription = observable.subscribe({
              next(value) {
                if (entry.value === EMPTY) {
                  empty -= 1
                }

                entry.value = value

                dirty = true
                update()
              },
              error: _error,
            })
            // ** outer disposer runs before this assignment is done
            entry.subscription = subscription

the only way the outer disposer can run before the asignment is done is if the consumer of this observable unsubscribes from it, also synchronously.

Initially I thought it was due to update() and maybe the consumer doing a take() an unsubscribing after N amounts of updates, but I see that's not possible because update() schedules a microtask.

But if the inner observable observable = rxjs.from(project(keys[n])) throws synchronously (or als the throwError( you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error, which calls o.error.

This o.error causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry

@ronag
Copy link
Author

ronag commented Mar 23, 2023

But if the inner observable observable = rxjs.from(project(keys[n])) throws synchronously (or als the throwError( you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error, which calls o.error.

This o.error causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry

I suspected this as well. However, when trying to make a test case the disposer seems to run afterwards.

@ronag
Copy link
Author

ronag commented Mar 23, 2023

new rxjs.Observable(o => {
  process.nextTick(() => {
    console.log('tick')
  })
  queueMicrotask(() => {
    console.log('task')
  })
  console.log('pre subscribe')
  rxjs
    .throwError(() => new Error('asd'))
    .subscribe({
      error: (err) => o.error(err)
    })
  console.log('post subscribe')
  return () => {
    console.log('disposer')
  }
}).subscribe(() => {})

Will print:

pre subscribe
post subscribe
disposer
task
tick

@voliva
Copy link
Contributor

voliva commented Mar 23, 2023

@ronag this is also expected.

If you think about it, there's no way RxJS can call the disposer function at that point, because you haven't even returned it yet, it's all happening synchronously.

If the inner subscription happens asynchronously though (such as your original case) then you get exactly the behaviour I explained: https://stackblitz.com/edit/rxjs-kwustg?file=index.ts

new rxjs.Observable((o) => {
  queueMicrotask(() => {
    console.log('task');
  });
  console.log('pre subscribe');
  queueMicrotask(() => {
    rxjs
      .throwError(() => new Error('asd'))
      .subscribe({
        error: (err) => o.error(err),
      });
    console.log('post subscribe');
  });
  return () => {
    console.log('disposer');
  };
}).subscribe(() => {});

/* Logs
pre subscribe
task
disposer
post subscribe
*/

@ronag
Copy link
Author

ronag commented Mar 23, 2023

Would it make sense to ensure e.g. throw error occurs asynchronously?

@mounilKshah
Copy link

In case no one else is working on this issue, is it fine if I work on it? Can this be assigned to me?

@benlesh benlesh self-assigned this Apr 16, 2024
@benlesh benlesh added bug Confirmed bug 7.x Issues and PRs for version 6.x 8.x Issues and PRs for version 8.x and removed bug Confirmed bug 7.x Issues and PRs for version 6.x 8.x Issues and PRs for version 8.x labels Apr 16, 2024
@benlesh benlesh removed their assignment Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants