From 5f37cb6454878294ae004d13a5fcf62eef6adbbe Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 13 Oct 2022 17:26:38 +0200 Subject: [PATCH] fix(NODE-4475): make interrupted message more specific (#3437) --- src/cursor/abstract_cursor.ts | 12 +- .../change-streams/change_stream.test.ts | 116 +++++++++++++----- 2 files changed, 92 insertions(+), 36 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 79ec34d4d3..dfa3bf07f4 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -897,14 +897,22 @@ class ReadableCursorStream extends Readable { } // NOTE: This is also perhaps questionable. The rationale here is that these errors tend - // to be "operation interrupted", where a cursor has been closed but there is an + // to be "operation was interrupted", where a cursor has been closed but there is an // active getMore in-flight. This used to check if the cursor was killed but once // that changed to happen in cleanup legitimate errors would not destroy the // stream. There are change streams test specifically test these cases. - if (err.message.match(/interrupted/)) { + if (err.message.match(/operation was interrupted/)) { return this.push(null); } + // NOTE: The two above checks on the message of the error will cause a null to be pushed + // to the stream, thus closing the stream before the destroy call happens. This means + // that either of those error messages on a change stream will not get a proper + // 'error' event to be emitted (the error passed to destroy). Change stream resumability + // relies on that error event to be emitted to create its new cursor and thus was not + // working on 4.4 servers because the error emitted on failover was "interrupted at + // shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down". + // See NODE-4475. return this.destroy(err); } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index cf00987d0a..a22e4bcf11 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1674,22 +1674,30 @@ describe('ChangeStream resumability', function () { }; const resumableErrorCodes = [ - { error: 'HostUnreachable', code: 6 }, - { error: 'HostNotFound', code: 7 }, - { error: 'NetworkTimeout', code: 89 }, - { error: 'ShutdownInProgress', code: 91 }, - { error: 'PrimarySteppedDown', code: 189 }, - { error: 'ExceededTimeLimit', code: 262 }, - { error: 'SocketException', code: 9001 }, - { error: 'NotWritablePrimary', code: 10107 }, - { error: 'InterruptedAtShutdown', code: 11600 }, - { error: 'InterruptedDueToReplStateChange', code: 11602 }, - { error: 'NotPrimaryNoSecondaryOk', code: 13435 }, - { error: 'StaleShardVersion', code: 63 }, - { error: 'StaleEpoch', code: 150 }, - { error: 'RetryChangeStream', code: 234 }, - { error: 'FailedToSatisfyReadPreference', code: 133 }, - { error: 'CursorNotFound', code: 43 } + { error: 'HostUnreachable', code: 6, message: 'host unreachable' }, + { error: 'HostNotFound', code: 7, message: 'hot not found' }, + { error: 'NetworkTimeout', code: 89, message: 'network timeout' }, + { error: 'ShutdownInProgress', code: 91, message: 'shutdown in progress' }, + { error: 'PrimarySteppedDown', code: 189, message: 'primary stepped down' }, + { error: 'ExceededTimeLimit', code: 262, message: 'operation exceeded time limit' }, + { error: 'SocketException', code: 9001, message: 'socket exception' }, + { error: 'NotWritablePrimary', code: 10107, message: 'not writable primary' }, + { error: 'InterruptedAtShutdown', code: 11600, message: 'interrupted at shutdown' }, + { + error: 'InterruptedDueToReplStateChange', + code: 11602, + message: 'interrupted due to state change' + }, + { error: 'NotPrimaryNoSecondaryOk', code: 13435, message: 'not primary and no secondary ok' }, + { error: 'StaleShardVersion', code: 63, message: 'stale shard version' }, + { error: 'StaleEpoch', code: 150, message: 'stale epoch' }, + { error: 'RetryChangeStream', code: 234, message: 'retry change stream' }, + { + error: 'FailedToSatisfyReadPreference', + code: 133, + message: 'failed to satisfy read preference' + }, + { error: 'CursorNotFound', code: 43, message: 'cursor not found' } ]; const is4_2Server = (serverVersion: string) => @@ -1731,7 +1739,7 @@ describe('ChangeStream resumability', function () { context('iterator api', function () { context('#next', function () { - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '>=4.2' } }, @@ -1746,7 +1754,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: code + errorCode: code, + errmsg: message } } as FailPoint); @@ -1759,7 +1768,7 @@ describe('ChangeStream resumability', function () { } ); } - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '<4.2' } }, @@ -1778,7 +1787,7 @@ describe('ChangeStream resumability', function () { .stub(changeStream.cursor, '_getMore') .callsFake((_batchSize, callback) => { mock.restore(); - const error = new MongoServerError({ message: 'Something went wrong' }); + const error = new MongoServerError({ message: message }); error.code = code; callback(error); }); @@ -1807,7 +1816,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: resumableErrorCodes[0].code + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message } } as FailPoint); @@ -1858,7 +1868,7 @@ describe('ChangeStream resumability', function () { }); context('#hasNext', function () { - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '>=4.2' } }, @@ -1873,7 +1883,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: code + errorCode: code, + errmsg: message } } as FailPoint); @@ -1887,7 +1898,7 @@ describe('ChangeStream resumability', function () { ); } - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '<4.2' } }, @@ -1906,7 +1917,7 @@ describe('ChangeStream resumability', function () { .stub(changeStream.cursor, '_getMore') .callsFake((_batchSize, callback) => { mock.restore(); - const error = new MongoServerError({ message: 'Something went wrong' }); + const error = new MongoServerError({ message: message }); error.code = code; callback(error); }); @@ -1935,7 +1946,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: resumableErrorCodes[0].code + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message } } as FailPoint); @@ -1986,7 +1998,7 @@ describe('ChangeStream resumability', function () { }); context('#tryNext', function () { - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '>=4.2' } }, @@ -2001,7 +2013,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: code + errorCode: code, + errmsg: message } } as FailPoint); @@ -2022,7 +2035,7 @@ describe('ChangeStream resumability', function () { ); } - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '<4.2' } }, @@ -2041,7 +2054,7 @@ describe('ChangeStream resumability', function () { .stub(changeStream.cursor, '_getMore') .callsFake((_batchSize, callback) => { mock.restore(); - const error = new MongoServerError({ message: 'Something went wrong' }); + const error = new MongoServerError({ message: message }); error.code = code; callback(error); }); @@ -2077,7 +2090,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: resumableErrorCodes[0].code + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message } } as FailPoint); @@ -2127,7 +2141,7 @@ describe('ChangeStream resumability', function () { }); describe('event emitter based iteration', function () { - for (const { error, code } of resumableErrorCodes) { + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, { requires: { topology: '!single', mongodb: '>=4.2' } }, @@ -2141,7 +2155,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: code + errorCode: code, + errmsg: message } } as FailPoint); @@ -2171,7 +2186,8 @@ describe('ChangeStream resumability', function () { mode: { times: 1 }, data: { failCommands: ['getMore'], - errorCode: resumableErrorCodes[0].code + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message } } as FailPoint); @@ -2222,6 +2238,38 @@ describe('ChangeStream resumability', function () { } ); }); + + context('when the error is operation was interrupted', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const unresumableErrorCode = 237; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode, + errmsg: 'operation was interrupted' + } + } as FailPoint); + + const willBeError = once(changeStream, 'change').catch(error => error); + await once(changeStream.cursor, 'init'); + await collection.insertOne({ name: 'bailey' }); + + const error = await willBeError; + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + ); + }); }); it(