Skip to content

Commit

Permalink
Merge pull request #32 from paulcbetts/channel-change-rtm
Browse files Browse the repository at this point in the history
Update channel unreads via RTM
  • Loading branch information
anaisbetts committed Mar 2, 2017
2 parents 0e535be + 340bc34 commit 16eea68
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 29 deletions.
20 changes: 17 additions & 3 deletions src/channel-header.tsx
Expand Up @@ -15,7 +15,7 @@ export class ChannelHeaderViewModel extends Model {
@fromObservable selectedChannel: ChannelBase;
@fromObservable channelInfo: Channel;
@fromObservable members: Array<string>;
@fromObservable topic: string;
@fromObservable topic: { value: string};
@fromObservable isDrawerOpen: boolean;
toggleDrawer: Action<boolean>;

Expand All @@ -32,7 +32,21 @@ export class ChannelHeaderViewModel extends Model {
when(this, x => x.selectedChannel)
.filter(c => !!c)
.switchMap(c => this.store.channels.listen(c.id, c.api))
.do(x => console.log(`New ChannelInfo! ${JSON.stringify(x)}`))
.toProperty(this, 'channelInfo');

// NB: This works but it's too damn clever
this.innerDisp.add(when(this, x => x.channelInfo)
.filter(x => x && !x.topic)
.subscribe(x => this.store.channels.listen(x.id, x.api).invalidate()));

when(this, x => x.channelInfo.members)
.startWith([])
.toProperty(this, 'members');

when(this, x => x.channelInfo.topic)
.startWith({value: ''})
.toProperty(this, 'topic');
}
}

Expand All @@ -52,15 +66,15 @@ export class ChannelHeaderView extends SimpleView<ChannelHeaderViewModel> {
tabs.push(
<Tab
key='members'
label={`Members: ${this.viewModel.channelInfo.members.length}`}
label={`Members: ${this.viewModel.members.length}`}
style={tabStyle}
/>
);

tabs.push(
<Tab
key='topic'
label={this.viewModel.channelInfo.topic.value}
label={this.viewModel.topic.value}
style={tabStyle}
/>
);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/models/api-shapes.ts
Expand Up @@ -91,7 +91,7 @@ export interface Message {

ephemeral_msg_type: number; // based on EphemeralMsgType.id

user?: string; // nullable userId
user?: string | User; // nullable userId
username: string;
topic: string;
purpose: string;
Expand Down
11 changes: 7 additions & 4 deletions src/lib/sparse-map.ts
@@ -1,5 +1,5 @@
import { Observable } from 'rxjs/Observable';
import { Updatable } from './updatable';
import { Updatable, MergeStrategy } from './updatable';

import './standard-operators';

Expand Down Expand Up @@ -48,10 +48,12 @@ export class SparseMapMixins {
class InMemorySparseMap<K, V> implements SparseMap<K, V> {
private _latest: Map<K, Updatable<V>>;
private _factory: ((key: K, hint?: any) => Observable<V>) | undefined;
private _strategy: MergeStrategy;

constructor(factory: ((key: K, hint?: any) => Observable<V>) | undefined = undefined) {
constructor(factory: ((key: K, hint?: any) => Observable<V>) | undefined = undefined, strategy: MergeStrategy = 'overwrite') {
this._latest = new Map();
this._factory = factory;
this._strategy = strategy;
}

listen(key: K, hint?: any): Updatable<V> {
Expand All @@ -60,9 +62,9 @@ class InMemorySparseMap<K, V> implements SparseMap<K, V> {

if (this._factory) {
let fact = this._factory.bind(this);
ret = new Updatable<V>(() => fact(key, hint));
ret = new Updatable<V>(() => fact(key, hint), this._strategy);
} else {
ret = new Updatable<V>();
ret = new Updatable<V>(undefined, this._strategy);
}

this._latest.set(key, ret);
Expand Down Expand Up @@ -94,6 +96,7 @@ class InMemorySparseMap<K, V> implements SparseMap<K, V> {
invalidate(key: K): Promise<void> {
let val = this._latest.get(key);
if (val) {
// Release whatever subscription val's playOnto is holding currently
val.playOnto(Observable.empty());
this._latest.delete(key);
}
Expand Down
3 changes: 2 additions & 1 deletion src/lib/standard-operators.ts
Expand Up @@ -17,4 +17,5 @@ import 'rxjs/add/operator/retry';
import 'rxjs/add/operator/startWith';
import 'rxjs/add/operator/switch';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/operator/throttleTime';
24 changes: 20 additions & 4 deletions src/lib/updatable.ts
Expand Up @@ -8,19 +8,29 @@ import { Subject } from 'rxjs/Subject';
import './standard-operators';

export type Pair<K, V> = { Key: K, Value: V };
export type MergeStrategy = 'overwrite' | 'merge';

export class Updatable<T> extends Subject<T> {
private _value: T;
private _hasPendingValue: boolean;
private _factory: () => Observable<T>;
private _playOnto: SerialSubscription;

constructor(factory?: () => Observable<T>) {
constructor(factory?: () => Observable<T>, strategy?: MergeStrategy) {
super();

this._hasPendingValue = false;
this._factory = factory ? factory : () => Observable.empty();
this._playOnto = new SerialSubscription();

switch (strategy || 'overwrite') {
case 'overwrite':
this.next = this.nextOverwrite;
break;
case 'merge':
this.next = this.nextMerge;
break;
}
}

get value(): T {
Expand All @@ -40,7 +50,6 @@ export class Updatable<T> extends Subject<T> {

let shouldNext = true;
if (!this._hasPendingValue) {
this._hasPendingValue = true;
this.playOnto(this._factory());
shouldNext = false;
}
Expand All @@ -52,14 +61,21 @@ export class Updatable<T> extends Subject<T> {
return subscription;
}

next(value: T): void {
nextOverwrite(value: T): void {
this._hasPendingValue = true;
super.next(this._value = value);
}

nextMerge(value: T): void {
this._hasPendingValue = true;
this._value = Object.assign(this._value || {}, value || {});
super.next(this._value);
}

invalidate() {
this._hasPendingValue = false;
this._playOnto.set(Subscription.EMPTY);
this._value = undefined;
this.playOnto(this._factory());
}

playOnto(source: Observable<T>) {
Expand Down
7 changes: 7 additions & 0 deletions src/lib/utils.ts
@@ -0,0 +1,7 @@
export function isObject(o: any): boolean {
return o === Object(o);
}

export function isFunction(o: any): boolean {
return !!(o && o.constructor && o.call && o.apply);
};
9 changes: 1 addition & 8 deletions src/lib/when.ts
@@ -1,5 +1,6 @@
import { Observable } from 'rxjs/Observable';
import { ChangeNotification, Model, TypedChangeNotification } from './model';
import { isFunction, isObject } from './utils';

import * as LRU from 'lru-cache';
import { Updatable } from './updatable';
Expand All @@ -8,14 +9,6 @@ const proxyCache = LRU(64);

import './standard-operators';

function isObject(o: any): boolean {
return o === Object(o);
}

function isFunction(o: any): boolean {
return !!(o && o.constructor && o.call && o.apply);
};

const identifier = /^[$A-Z_][0-9A-Z_$]*$/i;

export function whenPropertyInternal(target: any, valueOnly: boolean, ...propsAndSelector: Array<string|Function|string[]>): Observable<any> {
Expand Down
29 changes: 21 additions & 8 deletions src/store.ts
Expand Up @@ -9,7 +9,7 @@ import { asyncMap } from './lib/promise-extras';
import { isChannel, isGroup, isDM } from './channel-utils';

import './lib/standard-operators';
import 'rxjs/Observable/dom/webSocket';
import 'rxjs/add/observable/dom/webSocket';

export type ChannelList = Array<Updatable<ChannelBase|null>>;

Expand All @@ -26,21 +26,31 @@ export class Store {

this.channels = new InMemorySparseMap<string, ChannelBase>((channel, api: Api) => {
return this.infoApiForModel(channel, api)();
});
}, 'merge');

this.users = new InMemorySparseMap<string, User>((user, api: Api) => {
return api.users.info({ user }).map(({ user }: { user: User }) => {
user.api = api;
return user;
});
});
}, 'merge');

this.joinedChannels = new Updatable<ChannelList>(() => Observable.of([]));

this.events = new InMemorySparseMap<EventType, Message>();
this.events.listen('user_change')
.do(({user}) => console.log(`Updating a user!!! ${JSON.stringify(user)}`))
.subscribe(({user}) => this.users.listen(user.id).playOnto(Observable.of(user)));
.subscribe(msg => this.users.listen((msg.user! as User).id, msg.api).playOnto(Observable.of(msg.user)));

// NB: This is the lulzy way to update channel counts when marks
// change, but we should definitely remove this code later
let somethingMarked = Observable.merge(
this.events.listen('channel_marked'),
this.events.listen('im_marked'),
this.events.listen('group_marked')
);

somethingMarked.throttleTime(3000)
.subscribe(x => this.fetchSingleInitialChannelList(x.api));

this.connectToRtm()
.groupBy(x => x.type)
Expand All @@ -51,8 +61,10 @@ export class Store {

connectToRtm(): Observable<Message> {
return Observable.merge(
...this.api.map(x => this.createRtmConnection(x))
);
...this.api.map(x => this.createRtmConnection(x).retry(5).catch(e => {
console.log(`Failed to connect via token ${x} - ${e.message}`);
return Observable.empty();
})));
}

async fetchInitialChannelList(): Promise<void> {
Expand Down Expand Up @@ -86,7 +98,8 @@ export class Store {

private makeUpdatableForModel(model: ChannelBase & Api, api: Api) {
model.api = api;
const updater = new Updatable(this.infoApiForModel(model.id, api));

const updater = this.channels.listen(model.id, api);
updater.playOnto(Observable.of(model));
return updater;
}
Expand Down
37 changes: 37 additions & 0 deletions test/lib/sparse-map-spec.ts
@@ -0,0 +1,37 @@
import { Observable } from 'rxjs/Observable';

import { MergeStrategy } from '../../src/lib/updatable';
import { InMemorySparseMap, SparseMap } from '../../src/lib/sparse-map';
import { TestClass, expect } from '../support';

export type ValueFactoryFunction =
((key: string, hint?: any) => Observable<Object>);
export type CreateFixtureFunction =
((factory: ValueFactoryFunction, strategy: MergeStrategy) => SparseMap<string, Object>);

function testsForClass(Klass: Function, createFixture: CreateFixtureFunction) {
const name = Klass.name;

describe(`The ${name} class interface implementation`, function() {
it ('smoke tests successfully', function() {
let fixture = createFixture(() => Observable.of(new TestClass()), 'overwrite');

let result = fixture.listen('foo');
expect((result.value as TestClass).derived).to.equal(42);
});

it ('creates Updatables with Merge Strategy semantics', function() {
let fixture = createFixture(() => Observable.of({a: 1}), 'merge');

let result = fixture.listen('foo');
expect(result.value).to.deep.equal({a: 1});

result.next({b: 2});
expect(result.value).to.deep.equal({a: 1, b: 2});
});
});
}

testsForClass(InMemorySparseMap, (factory, strategy) => {
return new InMemorySparseMap(factory, strategy);
});
25 changes: 25 additions & 0 deletions test/lib/updatable-spec.ts
Expand Up @@ -105,4 +105,29 @@ describe('The Updatable class', function() {
fixture.subscribe(x => latest = x);
expect(latest).to.equal(42);
});

it('shallow merges objects when used with the merge strategy', function() {
let fixture = new Updatable<Object>(() => Observable.of({a: 1}), 'merge');
expect(fixture.value).to.deep.equal({a: 1});

fixture.next({b: 2});
expect(fixture.value).to.deep.equal({a: 1, b: 2});

fixture.next({a: 5});
expect(fixture.value).to.deep.equal({a: 5, b: 2});
});

it('drops the current value on invalidate', function() {
let fixture = new Updatable<Object>(() => Observable.of({a: 1}), 'merge');
expect(fixture.value).to.deep.equal({a: 1});

fixture.next({b: 2});
expect(fixture.value).to.deep.equal({a: 1, b: 2});

fixture.next({a: 5});
expect(fixture.value).to.deep.equal({a: 5, b: 2});

fixture.invalidate();
expect(fixture.value).to.deep.equal({a: 1});
});
});

0 comments on commit 16eea68

Please sign in to comment.