Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory usage issue with stream-transform #361

Open
justinvdm opened this issue Sep 21, 2022 · 8 comments
Open

Memory usage issue with stream-transform #361

justinvdm opened this issue Sep 21, 2022 · 8 comments

Comments

@justinvdm
Copy link

Describe the bug

When using stream-transform for processing large datasets and the parallel option is a value greater than 1, we're seeing high memory usage.

To Reproduce

const fs = require('fs')
const memwatch = require('@airbnb/node-memwatch')
const { pipeline } = require('stream/promises')
const { transform } = require('stream-transform')

let maxUsedHeap = 0

async function main() {
  memwatch.on('stats', (stats) => {
    maxUsedHeap = Math.max(maxUsedHeap, stats.used_heap_size)
  })

  await pipeline(
    function* () {
      let i = -1
      const n = 9999999
      while (++i < n) {
        yield { i }
      }
    },
    transform({ parallel: +process.env.PARALLEL }, (chunk, next) =>
      next(null, chunk.i)
    ),
    fs.createWriteStream('/tmp/output')
  )

  console.log(`${maxUsedHeap / (1000 * 1000)}mb`)
}

main()

// $ PARALLEL=1 node example.js
// 6.009856mb

// $ PARALLEL=2 node example.js
// 320.684144mb

Additional context

@brianbegy
Copy link

Did you resolve this? I'm seeing similar issues.

@wdavidw
Copy link
Member

wdavidw commented Dec 8, 2022

I didn't have the time to look at the issue yet.

@dmurvihill
Copy link

dmurvihill commented Jul 28, 2023

I have on my fork a minimum working example of this bug. It looks like the transform is breaking Node's built-in backpressure logic by ignoring the return value from push. This causes the transform buffer to reach unbounded length in any case where the source is faster than the sink. More info.

To see this in action, check out the branch linked above, change directory to packages/stream-transform, and run npm test. You will get an out of memory error within a few minutes. But if you comment out the do-nothing transform in samples/backpressure.js, the test runs for an hour with no issue.

wdavidw added a commit that referenced this issue Oct 5, 2023
@wdavidw
Copy link
Member

wdavidw commented Oct 5, 2023

I reproduce your code sample. With the latest source code, the memory usage stays between 20MB and 30MB on a 30GB generated CSV file. Maybe some change since your report fixed the issue. Do you confirm?

@dmurvihill
Copy link

Still not working on my repro:

import {transform} from 'stream-transform';
import {pipeline, Readable, Writable} from "stream";

class DummyData extends Readable {

  constructor() {
    super();
    this.numReads = 0;
  }

  _read() {
    // Push incrementing values forever
    this.push(JSON.stringify({'string': 'read_' + this.numReads}));
    this.numReads++;
  }
}

class Stopper extends Writable {
  constructor() {
    super({
      objectMode: true,
      highWaterMark: 1, // Allow just one item in buffer; apply backpressure otherwise
    });
  }

  // Accept chunks extremely slowly; discard the chunk data
  _write(chunk, encoding, callback) {
    console.log('wrote one out');
    setTimeout(callback, 1000);
  }
}

pipeline(
  new DummyData(),
  transform(data => data), // Comment out this line, the test runs forever. Leave it in, run out of memory pretty quick.
  new Stopper(),
  () => { },
);

@wdavidw
Copy link
Member

wdavidw commented Oct 9, 2023

@dmurvihill Could you have a look at the latest release of stream-transform version 3.2.10. It now takes the return value from push into account to throttle the execution. Based on your feedback, I created a new script to replicate the issue.

@dmurvihill
Copy link

dmurvihill commented Oct 11, 2023

Hey, thanks for being so attentive to this issue! It looks like that does respect backpressure and pause the stream, but how does it get unpaused?

@wdavidw
Copy link
Member

wdavidw commented Oct 11, 2023

I didn't reproduce the pausing behavior. I will need to dig more into it. Any change you could reproduce the pausing in my sample ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants