Skip to content

Commit

Permalink
fix(csv-parse): call destroy on end (fix #410)
Browse files Browse the repository at this point in the history
  • Loading branch information
wdavidw committed Dec 8, 2023
1 parent 01e9061 commit 0df32c6
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 29 deletions.
13 changes: 10 additions & 3 deletions packages/csv-parse/dist/cjs/index.cjs
Expand Up @@ -1333,9 +1333,16 @@ class Parser extends stream.Transform {
}, () => {
this.push(null);
this.end();
this.destroy();
// Note 231005, end wasnt used and destroy was called as:
// this.on('end', this.destroy);
// Fix #333 and break #410
// ko: api.stream.iterator.coffee
// ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
// ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
// ok: api.stream.iterator.coffee
// ok: api.stream.finished # aborted (with generate())
// broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);
});
if(err !== undefined){
this.state.stop = true;
Expand Down
13 changes: 10 additions & 3 deletions packages/csv-parse/dist/esm/index.js
Expand Up @@ -6455,9 +6455,16 @@ class Parser extends Transform {
}, () => {
this.push(null);
this.end();
this.destroy();
// Note 231005, end wasnt used and destroy was called as:
// this.on('end', this.destroy);
// Fix #333 and break #410
// ko: api.stream.iterator.coffee
// ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
// ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
// ok: api.stream.iterator.coffee
// ok: api.stream.finished # aborted (with generate())
// broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);
});
if(err !== undefined){
this.state.stop = true;
Expand Down
13 changes: 10 additions & 3 deletions packages/csv-parse/dist/iife/index.js
Expand Up @@ -6458,9 +6458,16 @@ var csv_parse = (function (exports) {
}, () => {
this.push(null);
this.end();
this.destroy();
// Note 231005, end wasnt used and destroy was called as:
// this.on('end', this.destroy);
// Fix #333 and break #410
// ko: api.stream.iterator.coffee
// ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
// ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
// ok: api.stream.iterator.coffee
// ok: api.stream.finished # aborted (with generate())
// broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);
});
if(err !== undefined){
this.state.stop = true;
Expand Down
13 changes: 10 additions & 3 deletions packages/csv-parse/dist/umd/index.js
Expand Up @@ -6461,9 +6461,16 @@
}, () => {
this.push(null);
this.end();
this.destroy();
// Note 231005, end wasnt used and destroy was called as:
// this.on('end', this.destroy);
// Fix #333 and break #410
// ko: api.stream.iterator.coffee
// ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
// ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
// ok: api.stream.iterator.coffee
// ok: api.stream.finished # aborted (with generate())
// broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);
});
if(err !== undefined){
this.state.stop = true;
Expand Down
13 changes: 10 additions & 3 deletions packages/csv-parse/lib/index.js
Expand Up @@ -33,9 +33,16 @@ class Parser extends Transform {
}, () => {
this.push(null);
this.end();
this.destroy();
// Note 231005, end wasnt used and destroy was called as:
// this.on('end', this.destroy);
// Fix #333 and break #410
// ko: api.stream.iterator.coffee
// ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
// ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
// ok: api.stream.iterator.coffee
// ok: api.stream.finished # aborted (with generate())
// broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);
});
if(err !== undefined){
this.state.stop = true;
Expand Down
23 changes: 22 additions & 1 deletion packages/csv-parse/test/api.stream.finished.coffee
@@ -1,4 +1,5 @@

import { Readable } from 'node:stream'
import * as stream from 'node:stream/promises'
import { generate } from 'csv-generate'
import { parse } from '../lib/index.js'
Expand All @@ -15,8 +16,10 @@ describe 'API stream.finished', ->
await stream.finished parser
records.length.should.eql 10

it 'resolved with `to_line`', ->
it 'aborted (with generate())', ->
# See https://github.com/adaltas/node-csv/issues/333
# See https://github.com/adaltas/node-csv/issues/410
# Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close`
records = []
parser = generate(length: 10).pipe parse to_line: 3
parser.on 'readable', () =>
Expand All @@ -25,6 +28,24 @@ describe 'API stream.finished', ->
await stream.finished parser
records.length.should.eql 3

it.skip 'aborted (with Readable)', ->
# See https://github.com/adaltas/node-csv/issues/333
# See https://github.com/adaltas/node-csv/issues/410
# Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close`
records = []
reader = new Readable
highWaterMark: 10
read: (size) ->
for i in [0...size]
this.push "#{size},#{i}\n"
parser = reader.pipe parse to_line: 3
parser.on 'readable', () =>
while (record = parser.read()) isnt null
records.push record
await stream.finished parser
console.log records
records.length.should.eql 3

it 'rejected on error', ->
parser = parse to_line: 3
parser.write 'a,b,c\n'
Expand Down
25 changes: 25 additions & 0 deletions packages/csv-parse/test/api.stream.iterator.coffee
@@ -0,0 +1,25 @@

import * as stream from 'node:stream/promises'
import { generate } from 'csv-generate'
import { parse } from '../lib/index.js'

describe 'API stream.iterator', ->

it 'classic', ->
parser = generate(length: 10).pipe parse()
records = []
for await record from parser
records.push record
records.length.should.eql 10

it 'with iteractor stoped in between', ->
# See https://github.com/adaltas/node-csv/issues/333
# See https://github.com/adaltas/node-csv/issues/410
# Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close`
records = []
parser = generate(length: 10).pipe parse
to_line: 2
records = []
for await record from parser
records.push record
records.length.should.eql 2
13 changes: 0 additions & 13 deletions packages/csv-parse/test/option.to_line.coffee
Expand Up @@ -100,16 +100,3 @@ describe 'Option `to_line`', ->
[ 'd','e','f' ]
] unless err
next err

it 'resolved with `to_line`', ->
# Prevent `Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close`
reader = new Readable
highWaterMark: 100
read: (size) ->
setImmediate =>
for i in [0...size]
this.push "#{size},#{i}\n"
parser = reader.pipe parse to_line: 3
parser.on 'readable', () =>
while parser.read() isnt null then true
await finished parser

0 comments on commit 0df32c6

Please sign in to comment.