From a6e7caff4d44e6314e4cc225c1f952cb7fc65d0e Mon Sep 17 00:00:00 2001 From: Thomas Reggi Date: Tue, 20 Oct 2020 13:46:59 -0400 Subject: [PATCH] fix: correctly re-establishes pipe destinations NODE-2172 --- lib/change_stream.js | 2 +- test/functional/change_stream.test.js | 32 +++++++++++---------------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index f3f0ed7a38..b226702ed8 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -437,7 +437,7 @@ function createChangeStreamCursor(self, options) { if (self.pipeDestinations) { const cursorStream = changeStreamCursor.stream(self.streamOptions); - for (let pipeDestination in self.pipeDestinations) { + for (let pipeDestination of self.pipeDestinations) { cursorStream.pipe(pipeDestination); } } diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index e68aa536b2..5ce7996256 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -1,4 +1,5 @@ 'use strict'; +const path = require('path'); const assert = require('assert'); const Transform = require('stream').Transform; const MongoNetworkError = require('../../lib/core').MongoNetworkError; @@ -1474,7 +1475,7 @@ describe('Change Streams', function() { } }); - it.skip('should resume piping of Change Streams when a resumable error is encountered', { + it('should resume piping of Change Streams when a resumable error is encountered', { metadata: { requires: { generators: true, @@ -1483,14 +1484,13 @@ describe('Change Streams', function() { } }, test: function(done) { + const filename = path.join(__dirname, '_nodemongodbnative_resumepipe.txt'); + this.defer(() => fs.unlinkSync(filename)); const configuration = this.configuration; const ObjectId = configuration.require.ObjectId; const Timestamp = configuration.require.Timestamp; const Long = configuration.require.Long; - // Contain mock server - let primaryServer = null; - // Default message fields const defaultFields = { setName: 'rs', @@ -1506,9 +1506,8 @@ describe('Change Streams', function() { hosts: ['localhost:32000', 'localhost:32001', 'localhost:32002'] }; - co(function*() { - primaryServer = yield mock.createServer(); - + mock.createServer(32000, 'localhost').then(primaryServer => { + this.defer(() => mock.cleanup()); let counter = 0; primaryServer.setMessageHandler(request => { const doc = request.document; @@ -1594,31 +1593,26 @@ describe('Change Streams', function() { client.connect((err, client) => { expect(err).to.not.exist; + this.defer(() => client.close()); const database = client.db('integration_tests5'); const collection = database.collection('MongoNetworkErrorTestPromises'); const changeStream = collection.watch(pipeline); - const filename = '/tmp/_nodemongodbnative_resumepipe.txt'; const outStream = fs.createWriteStream(filename); changeStream.stream({ transform: JSON.stringify }).pipe(outStream); - + this.defer(() => changeStream.close()); // Listen for changes to the file - const watcher = fs.watch(filename, function(eventType) { - assert.equal(eventType, 'change'); + const watcher = fs.watch(filename, eventType => { + this.defer(() => watcher.close()); + expect(eventType).to.equal('change'); const fileContents = fs.readFileSync(filename, 'utf8'); const parsedFileContents = JSON.parse(fileContents); - assert.equal(parsedFileContents.fullDocument.a, 1); - - watcher.close(); + expect(parsedFileContents).to.have.nested.property('fullDocument.a', 1); - changeStream.close(err => { - expect(err).to.not.exist; - - mock.cleanup(() => done()); - }); + done(); }); }); });