Skip to content

Commit

Permalink
feat: add config object to connectable (#6267)
Browse files Browse the repository at this point in the history
* feat: add config object to connectable

* chore: update api_guardian

* chore: typo

BREAKING CHANGE: Our very new api, `connectable`, now takes a configuration object instead of just the `Subject` instance. This was necessary to make sure it covered all use cases for what we were trying to replace in the deprecated multicasting operators. Apologies for the late-in-the-game change, but we know it's not widely used yet (it's new in v7), and we want to get it right.

Co-authored-by: Ben Lesh <ben@benlesh.com>
  • Loading branch information
cartant and benlesh committed Apr 27, 2021
1 parent f802843 commit 4d98b40
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 9 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -77,7 +77,7 @@ export declare const config: {
useDeprecatedNextContext: boolean;
};

export declare function connectable<T>(source: ObservableInput<T>, connector?: Subject<T>): ConnectableObservableLike<T>;
export declare function connectable<T>(source: ObservableInput<T>, config?: ConnectableConfig<T>): ConnectableObservableLike<T>;

export declare class ConnectableObservable<T> extends Observable<T> {
protected _connection: Subscription | null;
Expand Down
79 changes: 79 additions & 0 deletions spec/observables/connectable-spec.ts
@@ -0,0 +1,79 @@
/** @prettier */
import { expect } from 'chai';
import { connectable, of, ReplaySubject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

describe('connectable', () => {
let testScheduler: TestScheduler;

beforeEach(() => {
testScheduler = new TestScheduler(observableMatcher);
});

it('should mirror a simple source Observable', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = ' ^--------------!';
const expected = ' --1-2---3-4--5-|';

const obs = connectable(source);

expectObservable(obs).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

obs.connect();
});
});

it('should do nothing if connect is not called, despite subscriptions', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs: string[] = [];
const expected = ' -';

const obs = connectable(source);

expectObservable(obs).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should support resetOnDisconnect = true', () => {
const values: number[] = [];
const source = of(1, 2, 3);
const obs = connectable(source, {
connector: () => new ReplaySubject(1),
resetOnDisconnect: true,
});

obs.subscribe((value) => values.push(value));
const connection = obs.connect();
expect(values).to.deep.equal([1, 2, 3]);

connection.unsubscribe();

obs.subscribe((value) => values.push(value));
obs.connect();
expect(values).to.deep.equal([1, 2, 3, 1, 2, 3]);
});

it('should support resetOnDisconnect = false', () => {
const values: number[] = [];
const source = of(1, 2, 3);
const obs = connectable(source, {
connector: () => new ReplaySubject(1),
resetOnDisconnect: false,
});

obs.subscribe((value) => values.push(value));
const connection = obs.connect();
expect(values).to.deep.equal([1, 2, 3]);

connection.unsubscribe();

obs.subscribe((value) => values.push(value));
obs.connect();
expect(values).to.deep.equal([1, 2, 3, 3]);
});
});
43 changes: 36 additions & 7 deletions src/internal/observable/connectable.ts
@@ -1,4 +1,4 @@
import { ObservableInput } from '../types';
import { ObservableInput, SubjectLike } from '../types';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
Expand All @@ -18,29 +18,58 @@ export interface ConnectableObservableLike<T> extends Observable<T> {
connect(): Subscription;
}

export interface ConnectableConfig<T> {
/**
* A factory function used to create the Subject through which the source
* is multicast. By default this creates a {@link Subject}.
*/
connector: () => SubjectLike<T>;
/**
* If true, the resulting observable will reset internal state upon disconnetion
* and return to a "cold" state. This allows the resulting observable to be
* reconnected.
* If false, upon disconnection, the connecting subject will remain the
* connecting subject, meaning the resulting observable will not go "cold" again,
* and subsequent repeats or resubscriptions will resubscribe to that same subject.
*/
resetOnDisconnect?: boolean;
}

/**
* The default configuration for `connectable`.
*/
const DEFAULT_CONFIG: ConnectableConfig<unknown> = {
connector: () => new Subject<unknown>(),
resetOnDisconnect: true,
};

/**
* Creates an observable that multicasts once `connect()` is called on it.
*
* @param source The observable source to make connectable.
* @param connector The subject to used to multicast the source observable to all subscribers.
* Defaults to a new {@link Subject}.
* @param config The configuration object for `connectable`.
* @returns A "connectable" observable, that has a `connect()` method, that you must call to
* connect the source to all consumers through the subject provided as the connector.
*/
export function connectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
export function connectable<T>(source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG): ConnectableObservableLike<T> {
// The subscription representing the connection.
let connection: Subscription | null = null;
const { connector, resetOnDisconnect = true } = config;
let subject = connector();

const result: any = new Observable<T>((subscriber) => {
return connector.subscribe(subscriber);
return subject.subscribe(subscriber);
});

// Define the `connect` function. This is what users must call
// in order to "connect" the source to the subject that is
// multicasting it.
result.connect = () => {
if (!connection) {
connection = defer(() => source).subscribe(connector);
if (!connection || connection.closed) {
connection = defer(() => source).subscribe(subject);
if (resetOnDisconnect) {
connection.add(() => (subject = connector()));
}
}
return connection;
};
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/connect.ts
Expand Up @@ -92,7 +92,7 @@ const DEFAULT_CONFIG: ConnectConfig<unknown> = {
* and Observable, that when subscribed to, will utilize the multicast observable.
* After this function is executed -- and its return value subscribed to -- the
* the operator will subscribe to the source, and the connection will be made.
* @param param0 The configuration object for `connect`.
* @param config The configuration object for `connect`.
*/
export function connect<T, O extends ObservableInput<unknown>>(
selector: (shared: Observable<T>) => O,
Expand Down

0 comments on commit 4d98b40

Please sign in to comment.