From 14f12dd17db88bf68bc69bc6ee309116d879caa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Oliva?= Date: Fri, 11 Nov 2022 10:44:51 +0100 Subject: [PATCH 1/2] fix: manage serializer throwing exceptions. --- spec/observables/dom/webSocket-spec.ts | 46 +++++++++++++++++++ .../observable/dom/WebSocketSubject.ts | 23 ++++++---- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index a121e3f0e2..4621424932 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -416,6 +416,52 @@ describe('webSocket', () => { subject.unsubscribe(); }); + it('should take a serializer', () => { + const subject = webSocket<{ data: string}>({ + url: 'ws://mysocket', + serializer: (e: any) => { + return e.data + '!'; + } + }); + + subject.subscribe(); + + const socket = MockWebSocket.lastSocket; + socket.open(); + + + ['ahoy', 'yarr', 'shove off'].forEach(x => { + subject.next({ data: x }); + expect(socket.lastMessageSent).to.equal(x + '!'); + }); + + subject.unsubscribe(); + }); + + it('if the serializer fails it should close the connection and rethrow the error', () => { + const subject = webSocket({ + url: 'ws://mysocket', + serializer: (e: any) => { + throw new Error('I am a bad error'); + } + }); + + subject.subscribe({ next: (x: any) => { + expect(x).to.equal('this should not happen'); + }, error: (err: any) => { + expect(err).to.equal('this should not happen'); + } }); + + const socket = MockWebSocket.lastSocket; + socket.open(); + + expect(() => subject.next('weee!')).to.throw('I am a bad error'); + expect(socket.readyState).to.equal(WebSocketState.CLOSING); + expect(socket.closeCode).to.equal(1000); + + subject.unsubscribe(); + }); + it('should accept a closingObserver', () => { let calls = 0; const subject = webSocket({ diff --git a/src/internal/observable/dom/WebSocketSubject.ts b/src/internal/observable/dom/WebSocketSubject.ts index 9eecbf5ec5..3abc7a2497 100644 --- a/src/internal/observable/dom/WebSocketSubject.ts +++ b/src/internal/observable/dom/WebSocketSubject.ts @@ -296,18 +296,25 @@ export class WebSocketSubject extends AnonymousSubject { const queue = this.destination; - this.destination = Subscriber.create( - (x) => { + this.destination = new Subscriber({ + next: (x: T) => { if (socket!.readyState === 1) { try { const { serializer } = this._config; socket!.send(serializer!(x!)); - } catch (e) { - this.destination!.error(e); + } catch (e: any) { + if (e && e.code) { + this.destination!.error(e); + } else { + this.destination!.error({ + code: 1000, + }); + throw e; + } } } }, - (err) => { + error: (err: any) => { const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); @@ -319,15 +326,15 @@ export class WebSocketSubject extends AnonymousSubject { } this._resetState(); }, - () => { + complete: () => { const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); } socket!.close(); this._resetState(); - } - ) as Subscriber; + }, + }) as Subscriber; if (queue && queue instanceof ReplaySubject) { subscription.add((queue as ReplaySubject).subscribe(this.destination)); From 51008de0e72452066ba1423cba184faea6d40446 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Oliva?= Date: Thu, 15 Dec 2022 12:51:36 +0100 Subject: [PATCH 2/2] fix: manage serializer throwing exceptions. --- spec/observables/dom/webSocket-spec.ts | 10 +++---- .../observable/dom/WebSocketSubject.ts | 26 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 4621424932..cc367cec59 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -438,7 +438,7 @@ describe('webSocket', () => { subject.unsubscribe(); }); - it('if the serializer fails it should close the connection and rethrow the error', () => { + it('if the serializer fails it should go down the error path', () => { const subject = webSocket({ url: 'ws://mysocket', serializer: (e: any) => { @@ -446,16 +446,16 @@ describe('webSocket', () => { } }); + const error = sinon.spy(); subject.subscribe({ next: (x: any) => { expect(x).to.equal('this should not happen'); - }, error: (err: any) => { - expect(err).to.equal('this should not happen'); - } }); + }, error }); const socket = MockWebSocket.lastSocket; socket.open(); - expect(() => subject.next('weee!')).to.throw('I am a bad error'); + subject.next('weee!'); + expect(error).to.have.been.calledWithMatch({ message: 'I am a bad error' }); expect(socket.readyState).to.equal(WebSocketState.CLOSING); expect(socket.closeCode).to.equal(1000); diff --git a/src/internal/observable/dom/WebSocketSubject.ts b/src/internal/observable/dom/WebSocketSubject.ts index 3abc7a2497..ab62b4ece2 100644 --- a/src/internal/observable/dom/WebSocketSubject.ts +++ b/src/internal/observable/dom/WebSocketSubject.ts @@ -296,25 +296,21 @@ export class WebSocketSubject extends AnonymousSubject { const queue = this.destination; - this.destination = new Subscriber({ - next: (x: T) => { + this.destination = Subscriber.create( + (x) => { if (socket!.readyState === 1) { try { const { serializer } = this._config; socket!.send(serializer!(x!)); - } catch (e: any) { - if (e && e.code) { - this.destination!.error(e); - } else { - this.destination!.error({ - code: 1000, - }); - throw e; - } + } catch (e) { + this.destination!.error({ + code: 1000, + }); + observer.error(e); } } }, - error: (err: any) => { + (err) => { const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); @@ -326,15 +322,15 @@ export class WebSocketSubject extends AnonymousSubject { } this._resetState(); }, - complete: () => { + () => { const { closingObserver } = this._config; if (closingObserver) { closingObserver.next(undefined); } socket!.close(); this._resetState(); - }, - }) as Subscriber; + } + ) as Subscriber; if (queue && queue instanceof ReplaySubject) { subscription.add((queue as ReplaySubject).subscribe(this.destination));