Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

option.to breaks the stream and exits the program #333

Open
dnafication opened this issue Apr 8, 2022 · 13 comments
Open

option.to breaks the stream and exits the program #333

dnafication opened this issue Apr 8, 2022 · 13 comments

Comments

@dnafication
Copy link

Describe the bug

Using csv-parse package to parse a readable stream. Basically this example from your documentation. Introducing the to or to_line option seems to exit the stream without executing anything after await finished(parser); in line 27 below:

import assert from 'assert'
import fs from 'fs'
import os from 'os'
import { parse } from 'csv-parse'
import { finished } from 'stream/promises'

// Prepare the dataset
await fs.promises.writeFile(
  `${os.tmpdir()}/input.csv`,
  ['a,b,c', '1,2,3', '3,4,5'].join('\n')
)
// Read and process the CSV file
const processFile = async () => {
  const records = []
  const parser = fs.createReadStream(`${os.tmpdir()}/input.csv`).pipe(
    parse({
      to: 2,   // <-- add this option
    })
  )
  parser.on('readable', function () {
    let record
    while ((record = parser.read()) !== null) {
      // Work with each record
      records.push(record)
    }
  })
  await finished(parser)
  // nothing gets executed after this
  console.log(records) // <-- this does not print anything
  return records
}
// Parse the CSV content
const records = await processFile()
console.log(records) // <-- this doesn't get executed

// Validate the records
assert.deepStrictEqual(records, [
  ['a', 'b', 'c'],
  ['1', '2', '3'],
])

To Reproduce

The example is provided in the description above.

Additional context

Add any other context about the problem here.

@foghina
Copy link

foghina commented Jan 31, 2023

I'm having the same issue, here's my slightly shorter repro:

import * as fs from "node:fs";
import * as stream from "node:stream/promises";

import { parse } from "csv-parse";

const parser = fs.createReadStream("data.csv").pipe(
  parse({
    // to_line: 3
  })
);
parser.on("readable", () => {
  let record;
  while ((record = parser.read()) !== null) {
    console.log(record);
  }
});
console.info("waiting for stream to finish...");
await stream.finished(parser);
console.info("this won't get logged if you uncomment the to_line option.");

@foghina
Copy link

foghina commented Jan 31, 2023

FWIW, parser.on('end', callback) still gets invoked correctly, so that's one potential way around this issue for now, but it's still quite blocking.

@foghina
Copy link

foghina commented Jan 31, 2023

Follow-up from my previous comment, one way around it is adding:

parser.on('end', parser.destroy);

This will correctly resolve the promise and the code after await gets executed.

However, now you might get an error due to the stream being closed prematurely. I fixed it with:

await stream.finished(parser).catch((e) => {
  if (e && e.code === "ERR_STREAM_PREMATURE_CLOSE") {
    // 🔥 this is fine 🔥
  } else {
    console.error("Parser error", e);
  }
});

EDIT: See discussion below. Using parser.on("end", parser.end) instead avoids the premature close error completely.

@wdavidw
Copy link
Member

wdavidw commented Jan 31, 2023

I am looking at it as well, trying to read the official doc and extract some sense.

Using the more robust strategy of try-and-fail, I get to the same conclusion, destroy triggers stream.finished and it must be called after end to avoid ERR_STREAM_PREMATURE_CLOSE. Not sure why, nor about the correct way to handle this behavior.

@wdavidw
Copy link
Member

wdavidw commented Jan 31, 2023

Instead of

parser.on('end', parser.destroy);

Try

parser.end()
parser.on('end', parser.destroy);

This seems to prevent the ERR_STREAM_PREMATURE_CLOSE error in my test

@wdavidw
Copy link
Member

wdavidw commented Jan 31, 2023

It is also associated with the close event which is not thrown unless destroy is called or unless there is an error.

@foghina
Copy link

foghina commented Jan 31, 2023

Calling parser.end() before parser.on() seems to end the parser before any records get processed though? Since the await stream.finished() has to come after both those lines.

@foghina
Copy link

foghina commented Jan 31, 2023

OK, parser.on("end", parser.end) seems to fix the premature error quite nicely.

@foghina
Copy link

foghina commented Jan 31, 2023

Thanks for the fix! Just for posterity of before a new release is made, I realized that using for await also seemingly fixes the problem (unsure if any dangling streams are left around though). My code is now:

const parser = fs.createReadStream("data.csv").pipe(
  parse({
    to_line: 2,
  })
);
for await (const record of parser) {
  console.log(record);
}
console.info("done.");

Which seems to work correctly without any errors/exceptions.

@wdavidw
Copy link
Member

wdavidw commented Jan 31, 2023

The all stream API is hard to grasp. I am not even sure whether the close event is expected to be called or not.

Anyway, new versions are published:

  • csv-parse: 5.3.3 => 5.3.4
  • csv: 6.2.5 => 6.2.6

@Yoshiji
Copy link

Yoshiji commented Apr 6, 2023

I am updating my npm dependencies, and this change in ca3f55b is breaking my CSV parsing done client-side in the browser. It seems that parser.destroy is undefined, leading to a type error.

Here is a code sandbox with the error: https://codesandbox.io/s/jolly-matan-5y0er4?file=/src/index.js

My actual usage differs from this simple example. I am reading the first line of a file selected by the user to be uploaded, and I parse the CSV headers to ensure each are valid, something like this:

const csvParser = parse({ delimiter: ",", fromLine: 1, toLine: 1 }).on("data", onParseData).on("error", onParseOrReadError);
// csvParser.destroy = () => csvParser; // <=== adding this solves the TypeError
const fileSlice = file.slice(0, 1024 * 4); // reading the first 4kb just in case it contains many long headers
fileSlice.text().then((v) => {
  csvParser.write(v);
  csvParser.end();
}).catch(onParseOrReadError);

@wdavidw
Copy link
Member

wdavidw commented Apr 6, 2023

Can you go inside the source code of the parser and add a condition, for example

if (this.destroy !== undefined) {
   this.on('end', this.destroy);
}

I would also need sth in ./demo to replicate the issue.

@wdavidw
Copy link
Member

wdavidw commented Dec 8, 2023

I am re-opening this issue. I did some testing after issue #410.

Issue #333 is fixed with this.end(); this.destroy();
Issue #410 is fixed with this.end(); this.on('end', this.destroy);

  1. It happens that to fix Option to and to_line results in ERR_STREAM_PREMATURE_CLOSE #410 which uses async iterator, we need to partially break this issue which uses stream.finished.
  2. By fixing Option to and to_line results in ERR_STREAM_PREMATURE_CLOSE #410, we now have two very similar tests with different outcome. One use a readable stream implemented with csv-generate, it works. One use a readable stream implemented with stream.Readable, it breaks.
  3. The code that fixes issue option.to breaks the stream and exits the program #333 is no longer working with the latest Node.js version 21.4.0.

Here is the note I left inside the source code. I have the feeling that this could come from Node.js and be out of grasp. We welcome anyone who would stop by with the expertise to help us.

this.end()
// Fix #333 and break #410
//   ko: api.stream.iterator.coffee
//   ko with v21.4.0, ok with node v20.5.1: api.stream.finished # aborted (with generate())
//   ko: api.stream.finished # aborted (with Readable)
// this.destroy()
// Fix #410 and partially break #333
//   ok: api.stream.iterator.coffee
//   ok: api.stream.finished # aborted (with generate())
//   broken: api.stream.finished # aborted (with Readable)
this.on('end', this.destroy);

I took the decision to fix issue #410, re-open this issue and release the patch version 5.5.3. Motivation for the patch SemVer is that this issue has not been fixed properly and was broken with the upcoming Node.js versions. I acknowledge this is a very controversial decision in term of SemVer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants