diff --git a/src/channel-header.tsx b/src/channel-header.tsx index a18a614..4a26679 100644 --- a/src/channel-header.tsx +++ b/src/channel-header.tsx @@ -15,7 +15,7 @@ export class ChannelHeaderViewModel extends Model { @fromObservable selectedChannel: ChannelBase; @fromObservable channelInfo: Channel; @fromObservable members: Array; - @fromObservable topic: string; + @fromObservable topic: { value: string}; @fromObservable isDrawerOpen: boolean; toggleDrawer: Action; @@ -32,7 +32,21 @@ export class ChannelHeaderViewModel extends Model { when(this, x => x.selectedChannel) .filter(c => !!c) .switchMap(c => this.store.channels.listen(c.id, c.api)) + .do(x => console.log(`New ChannelInfo! ${JSON.stringify(x)}`)) .toProperty(this, 'channelInfo'); + + // NB: This works but it's too damn clever + this.innerDisp.add(when(this, x => x.channelInfo) + .filter(x => x && !x.topic) + .subscribe(x => this.store.channels.listen(x.id, x.api).invalidate())); + + when(this, x => x.channelInfo.members) + .startWith([]) + .toProperty(this, 'members'); + + when(this, x => x.channelInfo.topic) + .startWith({value: ''}) + .toProperty(this, 'topic'); } } @@ -52,7 +66,7 @@ export class ChannelHeaderView extends SimpleView { tabs.push( ); @@ -60,7 +74,7 @@ export class ChannelHeaderView extends SimpleView { tabs.push( ); diff --git a/src/lib/models/api-shapes.ts b/src/lib/models/api-shapes.ts index 6ff495f..e43b5af 100644 --- a/src/lib/models/api-shapes.ts +++ b/src/lib/models/api-shapes.ts @@ -91,7 +91,7 @@ export interface Message { ephemeral_msg_type: number; // based on EphemeralMsgType.id - user?: string; // nullable userId + user?: string | User; // nullable userId username: string; topic: string; purpose: string; diff --git a/src/lib/sparse-map.ts b/src/lib/sparse-map.ts index 6068d57..c2c8017 100644 --- a/src/lib/sparse-map.ts +++ b/src/lib/sparse-map.ts @@ -1,5 +1,5 @@ import { Observable } from 'rxjs/Observable'; -import { Updatable } from './updatable'; +import { Updatable, MergeStrategy } from './updatable'; import './standard-operators'; @@ -48,10 +48,12 @@ export class SparseMapMixins { class InMemorySparseMap implements SparseMap { private _latest: Map>; private _factory: ((key: K, hint?: any) => Observable) | undefined; + private _strategy: MergeStrategy; - constructor(factory: ((key: K, hint?: any) => Observable) | undefined = undefined) { + constructor(factory: ((key: K, hint?: any) => Observable) | undefined = undefined, strategy: MergeStrategy = 'overwrite') { this._latest = new Map(); this._factory = factory; + this._strategy = strategy; } listen(key: K, hint?: any): Updatable { @@ -60,9 +62,9 @@ class InMemorySparseMap implements SparseMap { if (this._factory) { let fact = this._factory.bind(this); - ret = new Updatable(() => fact(key, hint)); + ret = new Updatable(() => fact(key, hint), this._strategy); } else { - ret = new Updatable(); + ret = new Updatable(undefined, this._strategy); } this._latest.set(key, ret); @@ -94,6 +96,7 @@ class InMemorySparseMap implements SparseMap { invalidate(key: K): Promise { let val = this._latest.get(key); if (val) { + // Release whatever subscription val's playOnto is holding currently val.playOnto(Observable.empty()); this._latest.delete(key); } diff --git a/src/lib/standard-operators.ts b/src/lib/standard-operators.ts index 4534414..b7baffc 100644 --- a/src/lib/standard-operators.ts +++ b/src/lib/standard-operators.ts @@ -17,4 +17,5 @@ import 'rxjs/add/operator/retry'; import 'rxjs/add/operator/startWith'; import 'rxjs/add/operator/switch'; import 'rxjs/add/operator/switchMap'; -import 'rxjs/add/operator/takeUntil'; \ No newline at end of file +import 'rxjs/add/operator/takeUntil'; +import 'rxjs/add/operator/throttleTime'; \ No newline at end of file diff --git a/src/lib/updatable.ts b/src/lib/updatable.ts index cbcd4d0..5ac0976 100644 --- a/src/lib/updatable.ts +++ b/src/lib/updatable.ts @@ -8,6 +8,7 @@ import { Subject } from 'rxjs/Subject'; import './standard-operators'; export type Pair = { Key: K, Value: V }; +export type MergeStrategy = 'overwrite' | 'merge'; export class Updatable extends Subject { private _value: T; @@ -15,12 +16,21 @@ export class Updatable extends Subject { private _factory: () => Observable; private _playOnto: SerialSubscription; - constructor(factory?: () => Observable) { + constructor(factory?: () => Observable, strategy?: MergeStrategy) { super(); this._hasPendingValue = false; this._factory = factory ? factory : () => Observable.empty(); this._playOnto = new SerialSubscription(); + + switch (strategy || 'overwrite') { + case 'overwrite': + this.next = this.nextOverwrite; + break; + case 'merge': + this.next = this.nextMerge; + break; + } } get value(): T { @@ -40,7 +50,6 @@ export class Updatable extends Subject { let shouldNext = true; if (!this._hasPendingValue) { - this._hasPendingValue = true; this.playOnto(this._factory()); shouldNext = false; } @@ -52,14 +61,21 @@ export class Updatable extends Subject { return subscription; } - next(value: T): void { + nextOverwrite(value: T): void { this._hasPendingValue = true; super.next(this._value = value); } + nextMerge(value: T): void { + this._hasPendingValue = true; + this._value = Object.assign(this._value || {}, value || {}); + super.next(this._value); + } + invalidate() { this._hasPendingValue = false; - this._playOnto.set(Subscription.EMPTY); + this._value = undefined; + this.playOnto(this._factory()); } playOnto(source: Observable) { diff --git a/src/lib/utils.ts b/src/lib/utils.ts new file mode 100644 index 0000000..721e0a0 --- /dev/null +++ b/src/lib/utils.ts @@ -0,0 +1,7 @@ +export function isObject(o: any): boolean { + return o === Object(o); +} + +export function isFunction(o: any): boolean { + return !!(o && o.constructor && o.call && o.apply); +}; \ No newline at end of file diff --git a/src/lib/when.ts b/src/lib/when.ts index c877b75..95ffea6 100644 --- a/src/lib/when.ts +++ b/src/lib/when.ts @@ -1,5 +1,6 @@ import { Observable } from 'rxjs/Observable'; import { ChangeNotification, Model, TypedChangeNotification } from './model'; +import { isFunction, isObject } from './utils'; import * as LRU from 'lru-cache'; import { Updatable } from './updatable'; @@ -8,14 +9,6 @@ const proxyCache = LRU(64); import './standard-operators'; -function isObject(o: any): boolean { - return o === Object(o); -} - -function isFunction(o: any): boolean { - return !!(o && o.constructor && o.call && o.apply); -}; - const identifier = /^[$A-Z_][0-9A-Z_$]*$/i; export function whenPropertyInternal(target: any, valueOnly: boolean, ...propsAndSelector: Array): Observable { diff --git a/src/store.ts b/src/store.ts index 4ac8759..f4af351 100644 --- a/src/store.ts +++ b/src/store.ts @@ -9,7 +9,7 @@ import { asyncMap } from './lib/promise-extras'; import { isChannel, isGroup, isDM } from './channel-utils'; import './lib/standard-operators'; -import 'rxjs/Observable/dom/webSocket'; +import 'rxjs/add/observable/dom/webSocket'; export type ChannelList = Array>; @@ -26,21 +26,31 @@ export class Store { this.channels = new InMemorySparseMap((channel, api: Api) => { return this.infoApiForModel(channel, api)(); - }); + }, 'merge'); this.users = new InMemorySparseMap((user, api: Api) => { return api.users.info({ user }).map(({ user }: { user: User }) => { user.api = api; return user; }); - }); + }, 'merge'); this.joinedChannels = new Updatable(() => Observable.of([])); this.events = new InMemorySparseMap(); this.events.listen('user_change') - .do(({user}) => console.log(`Updating a user!!! ${JSON.stringify(user)}`)) - .subscribe(({user}) => this.users.listen(user.id).playOnto(Observable.of(user))); + .subscribe(msg => this.users.listen((msg.user! as User).id, msg.api).playOnto(Observable.of(msg.user))); + + // NB: This is the lulzy way to update channel counts when marks + // change, but we should definitely remove this code later + let somethingMarked = Observable.merge( + this.events.listen('channel_marked'), + this.events.listen('im_marked'), + this.events.listen('group_marked') + ); + + somethingMarked.throttleTime(3000) + .subscribe(x => this.fetchSingleInitialChannelList(x.api)); this.connectToRtm() .groupBy(x => x.type) @@ -51,8 +61,10 @@ export class Store { connectToRtm(): Observable { return Observable.merge( - ...this.api.map(x => this.createRtmConnection(x)) - ); + ...this.api.map(x => this.createRtmConnection(x).retry(5).catch(e => { + console.log(`Failed to connect via token ${x} - ${e.message}`); + return Observable.empty(); + }))); } async fetchInitialChannelList(): Promise { @@ -86,7 +98,8 @@ export class Store { private makeUpdatableForModel(model: ChannelBase & Api, api: Api) { model.api = api; - const updater = new Updatable(this.infoApiForModel(model.id, api)); + + const updater = this.channels.listen(model.id, api); updater.playOnto(Observable.of(model)); return updater; } diff --git a/test/lib/sparse-map-spec.ts b/test/lib/sparse-map-spec.ts new file mode 100644 index 0000000..16bbca7 --- /dev/null +++ b/test/lib/sparse-map-spec.ts @@ -0,0 +1,37 @@ +import { Observable } from 'rxjs/Observable'; + +import { MergeStrategy } from '../../src/lib/updatable'; +import { InMemorySparseMap, SparseMap } from '../../src/lib/sparse-map'; +import { TestClass, expect } from '../support'; + +export type ValueFactoryFunction = + ((key: string, hint?: any) => Observable); +export type CreateFixtureFunction = + ((factory: ValueFactoryFunction, strategy: MergeStrategy) => SparseMap); + +function testsForClass(Klass: Function, createFixture: CreateFixtureFunction) { + const name = Klass.name; + + describe(`The ${name} class interface implementation`, function() { + it ('smoke tests successfully', function() { + let fixture = createFixture(() => Observable.of(new TestClass()), 'overwrite'); + + let result = fixture.listen('foo'); + expect((result.value as TestClass).derived).to.equal(42); + }); + + it ('creates Updatables with Merge Strategy semantics', function() { + let fixture = createFixture(() => Observable.of({a: 1}), 'merge'); + + let result = fixture.listen('foo'); + expect(result.value).to.deep.equal({a: 1}); + + result.next({b: 2}); + expect(result.value).to.deep.equal({a: 1, b: 2}); + }); + }); +} + +testsForClass(InMemorySparseMap, (factory, strategy) => { + return new InMemorySparseMap(factory, strategy); +}); \ No newline at end of file diff --git a/test/lib/updatable-spec.ts b/test/lib/updatable-spec.ts index 043ec0a..c2e8891 100644 --- a/test/lib/updatable-spec.ts +++ b/test/lib/updatable-spec.ts @@ -105,4 +105,29 @@ describe('The Updatable class', function() { fixture.subscribe(x => latest = x); expect(latest).to.equal(42); }); + + it('shallow merges objects when used with the merge strategy', function() { + let fixture = new Updatable(() => Observable.of({a: 1}), 'merge'); + expect(fixture.value).to.deep.equal({a: 1}); + + fixture.next({b: 2}); + expect(fixture.value).to.deep.equal({a: 1, b: 2}); + + fixture.next({a: 5}); + expect(fixture.value).to.deep.equal({a: 5, b: 2}); + }); + + it('drops the current value on invalidate', function() { + let fixture = new Updatable(() => Observable.of({a: 1}), 'merge'); + expect(fixture.value).to.deep.equal({a: 1}); + + fixture.next({b: 2}); + expect(fixture.value).to.deep.equal({a: 1, b: 2}); + + fixture.next({a: 5}); + expect(fixture.value).to.deep.equal({a: 5, b: 2}); + + fixture.invalidate(); + expect(fixture.value).to.deep.equal({a: 1}); + }); }); \ No newline at end of file