Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config object to connectable #6267

Merged
merged 3 commits into from Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Expand Up @@ -77,7 +77,7 @@ export declare const config: {
useDeprecatedNextContext: boolean;
};

export declare function connectable<T>(source: ObservableInput<T>, connector?: Subject<T>): ConnectableObservableLike<T>;
export declare function connectable<T>(source: ObservableInput<T>, config?: ConnectableConfig<T>): ConnectableObservableLike<T>;

export declare class ConnectableObservable<T> extends Observable<T> {
protected _connection: Subscription | null;
Expand Down
79 changes: 79 additions & 0 deletions spec/observables/connectable-spec.ts
@@ -0,0 +1,79 @@
/** @prettier */
import { expect } from 'chai';
import { connectable, of, ReplaySubject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

describe('connectable', () => {
let testScheduler: TestScheduler;

beforeEach(() => {
testScheduler = new TestScheduler(observableMatcher);
});

it('should mirror a simple source Observable', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = ' ^--------------!';
const expected = ' --1-2---3-4--5-|';

const obs = connectable(source);

expectObservable(obs).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

obs.connect();
});
});

it('should do nothing if connect is not called, despite subscriptions', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs: string[] = [];
const expected = ' -';

const obs = connectable(source);

expectObservable(obs).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should support resetOnDisconnect = true', () => {
const values: number[] = [];
const source = of(1, 2, 3);
const obs = connectable(source, {
connector: () => new ReplaySubject(1),
resetOnDisconnect: true,
});

obs.subscribe((value) => values.push(value));
const connection = obs.connect();
expect(values).to.deep.equal([1, 2, 3]);

connection.unsubscribe();

obs.subscribe((value) => values.push(value));
obs.connect();
expect(values).to.deep.equal([1, 2, 3, 1, 2, 3]);
});

it('should support resetOnDisconnect = false', () => {
const values: number[] = [];
const source = of(1, 2, 3);
const obs = connectable(source, {
connector: () => new ReplaySubject(1),
resetOnDisconnect: false,
});

obs.subscribe((value) => values.push(value));
const connection = obs.connect();
expect(values).to.deep.equal([1, 2, 3]);

connection.unsubscribe();

obs.subscribe((value) => values.push(value));
obs.connect();
expect(values).to.deep.equal([1, 2, 3, 3]);
});
});
43 changes: 36 additions & 7 deletions src/internal/observable/connectable.ts
@@ -1,4 +1,4 @@
import { ObservableInput } from '../types';
import { ObservableInput, SubjectLike } from '../types';
import { Subject } from '../Subject';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
Expand All @@ -18,29 +18,58 @@ export interface ConnectableObservableLike<T> extends Observable<T> {
connect(): Subscription;
}

export interface ConnectableConfig<T> {
/**
* A factory function used to create the Subject through which the source
* is multicast. By default this creates a {@link Subject}.
*/
connector: () => SubjectLike<T>;
/**
* If true, the resulting observable will reset internal state upon disconnetion
* and return to a "cold" state. This allows the resulting observable to be
* reconnected.
* If false, upon disconnection, the connecting subject will remain the
* connecting subject, meaning the resulting observable will not go "cold" again,
* and subsequent repeats or resubscriptions will resubscribe to that same subject.
*/
resetOnDisconnect?: boolean;
}

/**
* The default configuration for `connectable`.
*/
const DEFAULT_CONFIG: ConnectableConfig<unknown> = {
connector: () => new Subject<unknown>(),
resetOnDisconnect: true,
};

/**
* Creates an observable that multicasts once `connect()` is called on it.
*
* @param source The observable source to make connectable.
* @param connector The subject to used to multicast the source observable to all subscribers.
* Defaults to a new {@link Subject}.
* @param config The configuration object for `connectable`.
* @returns A "connectable" observable, that has a `connect()` method, that you must call to
* connect the source to all consumers through the subject provided as the connector.
*/
export function connectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
export function connectable<T>(source: ObservableInput<T>, config: ConnectableConfig<T> = DEFAULT_CONFIG): ConnectableObservableLike<T> {
// The subscription representing the connection.
let connection: Subscription | null = null;
const { connector, resetOnDisconnect = true } = config;
let subject = connector();

const result: any = new Observable<T>((subscriber) => {
return connector.subscribe(subscriber);
return subject.subscribe(subscriber);
});

// Define the `connect` function. This is what users must call
// in order to "connect" the source to the subject that is
// multicasting it.
result.connect = () => {
if (!connection) {
connection = defer(() => source).subscribe(connector);
if (!connection || connection.closed) {
connection = defer(() => source).subscribe(subject);
if (resetOnDisconnect) {
connection.add(() => (subject = connector()));
}
}
return connection;
};
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/connect.ts
Expand Up @@ -92,7 +92,7 @@ const DEFAULT_CONFIG: ConnectConfig<unknown> = {
* and Observable, that when subscribed to, will utilize the multicast observable.
* After this function is executed -- and its return value subscribed to -- the
* the operator will subscribe to the source, and the connection will be made.
* @param param0 The configuration object for `connect`.
* @param config The configuration object for `connect`.
*/
export function connect<T, O extends ObservableInput<unknown>>(
selector: (shared: Observable<T>) => O,
Expand Down