From 0df32c6a3500d2541451846c6a152ff991a2f2ff Mon Sep 17 00:00:00 2001 From: David Worms Date: Fri, 8 Dec 2023 17:51:58 +0100 Subject: [PATCH] fix(csv-parse): call destroy on end (fix #410) --- packages/csv-parse/dist/cjs/index.cjs | 13 +++++++--- packages/csv-parse/dist/esm/index.js | 13 +++++++--- packages/csv-parse/dist/iife/index.js | 13 +++++++--- packages/csv-parse/dist/umd/index.js | 13 +++++++--- packages/csv-parse/lib/index.js | 13 +++++++--- .../csv-parse/test/api.stream.finished.coffee | 23 ++++++++++++++++- .../csv-parse/test/api.stream.iterator.coffee | 25 +++++++++++++++++++ packages/csv-parse/test/option.to_line.coffee | 13 ---------- 8 files changed, 97 insertions(+), 29 deletions(-) create mode 100644 packages/csv-parse/test/api.stream.iterator.coffee diff --git a/packages/csv-parse/dist/cjs/index.cjs b/packages/csv-parse/dist/cjs/index.cjs index 879060f2..7e6e2a00 100644 --- a/packages/csv-parse/dist/cjs/index.cjs +++ b/packages/csv-parse/dist/cjs/index.cjs @@ -1333,9 +1333,16 @@ class Parser extends stream.Transform { }, () => { this.push(null); this.end(); - this.destroy(); - // Note 231005, end wasnt used and destroy was called as: - // this.on('end', this.destroy); + // Fix #333 and break #410 + // ko: api.stream.iterator.coffee + // ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate()) + // ko: api.stream.finished # aborted (with Readable) + // this.destroy() + // Fix #410 and partially break #333 + // ok: api.stream.iterator.coffee + // ok: api.stream.finished # aborted (with generate()) + // broken: api.stream.finished # aborted (with Readable) + this.on('end', this.destroy); }); if(err !== undefined){ this.state.stop = true; diff --git a/packages/csv-parse/dist/esm/index.js b/packages/csv-parse/dist/esm/index.js index b302226d..2beed0f6 100644 --- a/packages/csv-parse/dist/esm/index.js +++ b/packages/csv-parse/dist/esm/index.js @@ -6455,9 +6455,16 @@ class Parser extends Transform { }, () => { this.push(null); this.end(); - this.destroy(); - // Note 231005, end wasnt used and destroy was called as: - // this.on('end', this.destroy); + // Fix #333 and break #410 + // ko: api.stream.iterator.coffee + // ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate()) + // ko: api.stream.finished # aborted (with Readable) + // this.destroy() + // Fix #410 and partially break #333 + // ok: api.stream.iterator.coffee + // ok: api.stream.finished # aborted (with generate()) + // broken: api.stream.finished # aborted (with Readable) + this.on('end', this.destroy); }); if(err !== undefined){ this.state.stop = true; diff --git a/packages/csv-parse/dist/iife/index.js b/packages/csv-parse/dist/iife/index.js index c16ecf8a..c86d31cd 100644 --- a/packages/csv-parse/dist/iife/index.js +++ b/packages/csv-parse/dist/iife/index.js @@ -6458,9 +6458,16 @@ var csv_parse = (function (exports) { }, () => { this.push(null); this.end(); - this.destroy(); - // Note 231005, end wasnt used and destroy was called as: - // this.on('end', this.destroy); + // Fix #333 and break #410 + // ko: api.stream.iterator.coffee + // ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate()) + // ko: api.stream.finished # aborted (with Readable) + // this.destroy() + // Fix #410 and partially break #333 + // ok: api.stream.iterator.coffee + // ok: api.stream.finished # aborted (with generate()) + // broken: api.stream.finished # aborted (with Readable) + this.on('end', this.destroy); }); if(err !== undefined){ this.state.stop = true; diff --git a/packages/csv-parse/dist/umd/index.js b/packages/csv-parse/dist/umd/index.js index be39fd59..38072204 100644 --- a/packages/csv-parse/dist/umd/index.js +++ b/packages/csv-parse/dist/umd/index.js @@ -6461,9 +6461,16 @@ }, () => { this.push(null); this.end(); - this.destroy(); - // Note 231005, end wasnt used and destroy was called as: - // this.on('end', this.destroy); + // Fix #333 and break #410 + // ko: api.stream.iterator.coffee + // ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate()) + // ko: api.stream.finished # aborted (with Readable) + // this.destroy() + // Fix #410 and partially break #333 + // ok: api.stream.iterator.coffee + // ok: api.stream.finished # aborted (with generate()) + // broken: api.stream.finished # aborted (with Readable) + this.on('end', this.destroy); }); if(err !== undefined){ this.state.stop = true; diff --git a/packages/csv-parse/lib/index.js b/packages/csv-parse/lib/index.js index d57f80e0..3ad632d9 100644 --- a/packages/csv-parse/lib/index.js +++ b/packages/csv-parse/lib/index.js @@ -33,9 +33,16 @@ class Parser extends Transform { }, () => { this.push(null); this.end(); - this.destroy(); - // Note 231005, end wasnt used and destroy was called as: - // this.on('end', this.destroy); + // Fix #333 and break #410 + // ko: api.stream.iterator.coffee + // ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate()) + // ko: api.stream.finished # aborted (with Readable) + // this.destroy() + // Fix #410 and partially break #333 + // ok: api.stream.iterator.coffee + // ok: api.stream.finished # aborted (with generate()) + // broken: api.stream.finished # aborted (with Readable) + this.on('end', this.destroy); }); if(err !== undefined){ this.state.stop = true; diff --git a/packages/csv-parse/test/api.stream.finished.coffee b/packages/csv-parse/test/api.stream.finished.coffee index d5f1a1bb..23ac98f2 100644 --- a/packages/csv-parse/test/api.stream.finished.coffee +++ b/packages/csv-parse/test/api.stream.finished.coffee @@ -1,4 +1,5 @@ +import { Readable } from 'node:stream' import * as stream from 'node:stream/promises' import { generate } from 'csv-generate' import { parse } from '../lib/index.js' @@ -15,8 +16,10 @@ describe 'API stream.finished', -> await stream.finished parser records.length.should.eql 10 - it 'resolved with `to_line`', -> + it 'aborted (with generate())', -> # See https://github.com/adaltas/node-csv/issues/333 + # See https://github.com/adaltas/node-csv/issues/410 + # Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close` records = [] parser = generate(length: 10).pipe parse to_line: 3 parser.on 'readable', () => @@ -25,6 +28,24 @@ describe 'API stream.finished', -> await stream.finished parser records.length.should.eql 3 + it.skip 'aborted (with Readable)', -> + # See https://github.com/adaltas/node-csv/issues/333 + # See https://github.com/adaltas/node-csv/issues/410 + # Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close` + records = [] + reader = new Readable + highWaterMark: 10 + read: (size) -> + for i in [0...size] + this.push "#{size},#{i}\n" + parser = reader.pipe parse to_line: 3 + parser.on 'readable', () => + while (record = parser.read()) isnt null + records.push record + await stream.finished parser + console.log records + records.length.should.eql 3 + it 'rejected on error', -> parser = parse to_line: 3 parser.write 'a,b,c\n' diff --git a/packages/csv-parse/test/api.stream.iterator.coffee b/packages/csv-parse/test/api.stream.iterator.coffee new file mode 100644 index 00000000..196b0e12 --- /dev/null +++ b/packages/csv-parse/test/api.stream.iterator.coffee @@ -0,0 +1,25 @@ + +import * as stream from 'node:stream/promises' +import { generate } from 'csv-generate' +import { parse } from '../lib/index.js' + +describe 'API stream.iterator', -> + + it 'classic', -> + parser = generate(length: 10).pipe parse() + records = [] + for await record from parser + records.push record + records.length.should.eql 10 + + it 'with iteractor stoped in between', -> + # See https://github.com/adaltas/node-csv/issues/333 + # See https://github.com/adaltas/node-csv/issues/410 + # Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close` + records = [] + parser = generate(length: 10).pipe parse + to_line: 2 + records = [] + for await record from parser + records.push record + records.length.should.eql 2 diff --git a/packages/csv-parse/test/option.to_line.coffee b/packages/csv-parse/test/option.to_line.coffee index 21440340..e9f98905 100644 --- a/packages/csv-parse/test/option.to_line.coffee +++ b/packages/csv-parse/test/option.to_line.coffee @@ -100,16 +100,3 @@ describe 'Option `to_line`', -> [ 'd','e','f' ] ] unless err next err - - it 'resolved with `to_line`', -> - # Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close` - reader = new Readable - highWaterMark: 100 - read: (size) -> - setImmediate => - for i in [0...size] - this.push "#{size},#{i}\n" - parser = reader.pipe parse to_line: 3 - parser.on 'readable', () => - while parser.read() isnt null then true - await finished parser