/
SupabaseRealtimeClient.ts
78 lines (66 loc) · 2.3 KB
/
SupabaseRealtimeClient.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import { RealtimeSubscription, RealtimeClient, Transformers } from '@supabase/realtime-js'
import { SupabaseEventTypes, SupabaseRealtimePayload } from './types'
export class SupabaseRealtimeClient {
subscription: RealtimeSubscription
constructor(
socket: RealtimeClient,
headers: { [key: string]: string },
schema: string,
tableName: string
) {
const chanParams: { [key: string]: string } = {}
const topic = tableName === '*' ? `realtime:${schema}` : `realtime:${schema}:${tableName}`
const userToken = headers['Authorization'].split(' ')[1]
if (userToken) {
chanParams['user_token'] = userToken
}
this.subscription = socket.channel(topic, chanParams)
}
private getPayloadRecords(payload: any) {
const records = {
new: {},
old: {},
}
if (payload.type === 'INSERT' || payload.type === 'UPDATE') {
records.new = Transformers.convertChangeData(payload.columns, payload.record)
}
if (payload.type === 'UPDATE' || payload.type === 'DELETE') {
records.old = Transformers.convertChangeData(payload.columns, payload.old_record)
}
return records
}
/**
* The event you want to listen to.
*
* @param event The event
* @param callback A callback function that is called whenever the event occurs.
*/
on(event: SupabaseEventTypes, callback: (payload: SupabaseRealtimePayload<any>) => void) {
this.subscription.on(event, (payload: any) => {
let enrichedPayload: SupabaseRealtimePayload<any> = {
schema: payload.schema,
table: payload.table,
commit_timestamp: payload.commit_timestamp,
eventType: payload.type,
new: {},
old: {},
}
enrichedPayload = { ...enrichedPayload, ...this.getPayloadRecords(payload) }
callback(enrichedPayload)
})
return this
}
/**
* Enables the subscription.
*/
subscribe(callback: Function = () => {}) {
this.subscription.onError((e: Error) => callback('SUBSCRIPTION_ERROR', e))
this.subscription.onClose(() => callback('CLOSED'))
this.subscription
.subscribe()
.receive('ok', () => callback('SUBSCRIBED'))
.receive('error', (e: Error) => callback('SUBSCRIPTION_ERROR', e))
.receive('timeout', () => callback('RETRYING_AFTER_TIMEOUT'))
return this.subscription
}
}