Skip to content

IteratorObserable's never termines when within Observable.merge #2476

Closed
severest/retrobot
#221
@memelet

Description

@memelet

RxJS version:
5.2.0

Code to reproduce:
unit tests added to IteratorObservable-spec.js

  it('should finalize generators when merged if the subscription ends', () => {
    const iterator1 = {
      finalized: false,
      next() {
        return { value: 'duck', done: false };
      },
      return() {
        this.finalized = true;
      }
    };

    const iterable1 = {
      [Rx.Symbol.iterator]() {
        return iterator1;
      }
    };

    const iterator2 = {
      finalized: false,
      next() {
        return { value: 'duck', done: false };
      },
      return() {
        this.finalized = true;
      }
    };

    const iterable2 = {
      [Rx.Symbol.iterator]() {
        return iterator2;
      }
    };

    const results = [];

    const i1 = IteratorObservable.create(iterable1)
    const i2 = IteratorObservable.create(iterable2)
    Rx.Observable.merge(i1, i2)
      .take(3)
      .subscribe(
        x => results.push(x),
        null,
        () => results.push('GOOSE!')
      );

    // never even get here
    expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
    expect(iterator1.finalized).to.be.true;
    expect(iterator2.finalized).to.be.true;
  });

Expected behavior:

See test

Actual behavior:

The test never terminates (until V8 runs out of memory that is).

*A simpler example
Given:

const myInfiniteIterator1 = ...
const myInfiniteIterator2 = ...

This terminates:

Rx.Observable.from(myInfiniteIterator1).take(3)

But this never terminates

Rx.Observable.merge(…Rx.Observable.from(myInfiniteIterator1, myInfiniteIterator2).take(3)

Activity

mpodlasin

mpodlasin commented on Mar 18, 2017

@mpodlasin
Contributor

There seems to be a problem with order in which mergeAll subscribes to sources and adds these subscriptions to its own subscription:
https://github.com/ReactiveX/rxjs/blob/master/src/operator/mergeAll.ts#L86

Note how it subscribes first and then it adds returned subscription to root subscription. Because iterator starts emitting synchronously when subscribeToResult is called, even when take calls unsubscribe, it does not affect subscription to iterator, since it is not yet added to subscriber of mergeAll!

trxcllnt

trxcllnt commented on Mar 18, 2017

@trxcllnt
Member

Yeah, that's the fix. We need to create a wrapper subscription first, add it to the subscriptions list, then add the subscription to the source Observable to the composite subscription.

jooyunghan

jooyunghan commented on Jun 29, 2017

@jooyunghan
Contributor

This problem is caused by this.add(subscribeToResult(....)), so there are way more places showing this behavior.

As @mpodlasin pointed, for synchronous observables (e.g Array, Iterable, Range...) which are subscribed by InnerSubscriber in subscribeToResult can't be unsubscribed on downstream's unsubscribe(), because the their subscriptions are not yet added to parent subscriber (or OuterSubscriber.

Quick grep reveals there are more than 20 where this.add(subscribeToResult(..)) is used. Of course not all of them are problematic. But I guess lots of them reveals the same problem.

  • buffer(closingNotifier) - closingNotifier is subscribeToResulted. If a notifier is a infinite stream this will hang even if we limit it bytake(2).
const notifier = Observable.range(0, Infinite);
Observable.range(0, 3).buffer(notifier).take(2).subscribe();
// should be : [[], []] 
  • using(resourceFactory, observableFactory) - An observable created by observableFactory is subscribeToResulted also. If this is a synchronous observable then it can't be unsubscribed by downstream's unsubscribe().
Observable.using(..., (resource) => Observable.range(0, Infinite)).take(2).subscribe();
  • this list goes on... combineLatest, if, defer...
Observable.combineLatest(Observable.of('a'),Observable.range(0, Infinite)).take(1).subscribe();
Observable.if(() => true, Observable.range(0, Infinite), ..).take(1).subscribe();
Observable.defer(() => Observable.range(0, Infinite)).take(1).subscribe();

I was trying to look over all occurences but just have found @mpodlasin 's PR #2479 and changed my mind to help him to work more on his PR.

The patch I made was subscribeToResult accepts additional arguments just like #2749 but differs a little.

export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
                                     result: ObservableInput<T>,
                                     outerValue?: T,
                                     outerIndex?: number,
                                     handleInnerSubscriber?: (s: Subscriber<any>) => void): Subscription {
...
      if (handleInnerSubscriber) handleInnerSubscriber(destination);
      return result.subscribe(destination);

With this, merge can be modified as follows;

      subscribeToResult<Observable<T>, T>(this, observable, undefined, undefined, (s) => this.add(s));

The reason why I choose a callback is that there are too many places subscribeToResult is used and I don't want to create an InnerSubscriber and pass it every time.

In #2560 I commented my suggestion on "creating custom operators". The problem I tried to avoid was also a synchronous observable with early unsubscribe.

added a commit that references this issue on Jul 26, 2018
40852ff
locked as resolved and limited conversation to collaborators on Aug 25, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugConfirmed bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      Participants

      @memelet@trxcllnt@jooyunghan@kwonoj@mpodlasin

      Issue actions

        IteratorObserable's never termines when within Observable.merge · Issue #2476 · ReactiveX/rxjs