-
Notifications
You must be signed in to change notification settings - Fork 6
/
store.ts
126 lines (99 loc) · 4.15 KB
/
store.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import { Observable } from 'rxjs/Observable';
import { InMemorySparseMap, SparseMap } from './lib/sparse-map';
import { Updatable } from './lib/updatable';
import { Api, createApi } from './lib/models/api-call';
import { ChannelBase, Message, UsersCounts, User } from './lib/models/api-shapes';
import { EventType } from './lib/models/event-type';
import { asyncMap } from './lib/promise-extras';
import { isChannel, isGroup, isDM } from './channel-utils';
import './lib/standard-operators';
import 'rxjs/add/observable/dom/webSocket';
export type ChannelList = Array<Updatable<ChannelBase|null>>;
export class Store {
api: Api[];
channels: SparseMap<string, ChannelBase>;
users: SparseMap<string, User>;
joinedChannels: Updatable<ChannelList>;
events: SparseMap<EventType, Message>;
constructor(tokenList: string[] = []) {
this.api = tokenList.map(x => createApi(x));
this.channels = new InMemorySparseMap<string, ChannelBase>((channel, api: Api) => {
return this.infoApiForModel(channel, api)();
}, 'merge');
this.users = new InMemorySparseMap<string, User>((user, api: Api) => {
return api.users.info({ user }).map(({ user }: { user: User }) => {
user.api = api;
return user;
});
}, 'merge');
this.joinedChannels = new Updatable<ChannelList>(() => Observable.of([]));
this.events = new InMemorySparseMap<EventType, Message>();
this.events.listen('user_change')
.subscribe(msg => this.users.listen((msg.user! as User).id, msg.api).playOnto(Observable.of(msg.user)));
// NB: This is the lulzy way to update channel counts when marks
// change, but we should definitely remove this code later
let somethingMarked = Observable.merge(
this.events.listen('channel_marked'),
this.events.listen('im_marked'),
this.events.listen('group_marked')
);
somethingMarked.throttleTime(3000)
.subscribe(x => this.fetchSingleInitialChannelList(x.api));
this.connectToRtm()
.groupBy(x => x.type)
.publish().refCount()
.retry()
.subscribe(x => this.events.listen(x.key).playOnto(x));
}
connectToRtm(): Observable<Message> {
return Observable.merge(
...this.api.map(x => this.createRtmConnection(x).retry(5).catch(e => {
console.log(`Failed to connect via token ${x} - ${e.message}`);
return Observable.empty();
})));
}
async fetchInitialChannelList(): Promise<void> {
const results = await asyncMap(this.api, (api) => this.fetchSingleInitialChannelList(api));
const allJoinedChannels = Array.from(results.values())
.reduce((acc, x) => acc.concat(x), []);
this.joinedChannels.next(allJoinedChannels);
}
private async fetchSingleInitialChannelList(api: Api): Promise<ChannelList> {
const joinedChannels: ChannelList = [];
const result: UsersCounts = await api.users.counts({ simple_unreads: true }).toPromise();
result.channels.forEach((c) => {
joinedChannels.push(this.makeUpdatableForModel(c, api));
});
result.groups.forEach((g) => {
joinedChannels.push(this.makeUpdatableForModel(g, api));
});
result.ims.forEach((dm) => {
joinedChannels.push(this.makeUpdatableForModel(dm, api));
});
return joinedChannels;
}
private makeUpdatableForModel(model: ChannelBase & Api, api: Api) {
model.api = api;
const updater = this.channels.listen(model.id, api);
updater.playOnto(Observable.of(model));
return updater;
}
private infoApiForModel(id: string, api: Api): () => Observable<ChannelBase|null> {
if (isChannel(id)) {
return () => api.channels.info({ channel: id })
.map((response: any) => Object.assign(response.channel, { api }));
} else if (isGroup(id)) {
return () => api.groups.info({ channel: id })
.map((response: any) => Object.assign(response.group, { api }));
} else if (isDM(id)) {
return () => Observable.of(null);
} else {
throw new Error(`Unsupported model: ${id}`);
}
}
private createRtmConnection(api: Api): Observable<Message> {
return api.rtm.connect()
.flatMap(({url}) => Observable.webSocket(url))
.map(msg => { msg.api = api; return msg; });
}
}