Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

writing-a-plugin "Call the callback function only when the current file (stream/buffer) is completely consumed" #2380

Open
craigphicks opened this issue Sep 28, 2019 · 11 comments

Comments

@craigphicks
Copy link

craigphicks commented Sep 28, 2019

What were you expecting to happen?

Following the verbal guidance here -

"Call the callback function only when the current file (stream/buffer) is completely consumed"

then in case of file.IsStream() I expected to need to wait for the 'end' event from the inner transform before calling the callback().

What actually happened?

As shown in the code example on that man page, it only works if callback() is called immediately.

Please post a sample of your gulpfile (preferably reduced to just the bit that's not working)

    innerTransform.on('end', () => {
      cb(null, file) // gulp writes an empty file doing it this way
    })

Actually implementing the above code (instead of calling the callback immediately), resulted in an empty file being written, although the inner transform had dutifully pushed all the data.

What version of gulp are you using?
gulp 4

What versions of npm and node are you using?
npm 6.9.0
gulp 4.0.2


I am unable to convince myself that

when the current file (stream/buffer) is completely consumed

could mean anything other than the end event has been emitted. Furthermore, the wording seems perfect from a design point of view, and fits with all the usage of callbacks that I know.

How to reconcile the words and the actual design?

@yocontra
Copy link
Member

Can you post the full plugin code?

@phated
Copy link
Member

phated commented Oct 1, 2019

@craigphicks I agree with @contra - there might be some weirdness here that we can only determine by seeing all the code.

@craigphicks
Copy link
Author

craigphicks commented Oct 6, 2019

Hi @contra @phated -

This is part of a repo for a javascript pre-processor reversible-preproc. Part of that project is a Node Transform Stream, with _transform and _flush defined to suit the function. It is a completely standard node transform stream.

Note: the currently published npm version (1,x,x) of reversible-preproc is a much older version which I plan to update shortly

The following code is part of a dependent repo gulp-reversible-preproc with the aim of providing a gulp transform adapter. The argument InnerTransform passed to gulpCreateOuterTransform is a standard node transform stream, e.g., as it is called here.

It's written as a general gulp interface in order to provide a clean interface within the gulp-reversible-preprocess project.

import through from 'through2'
import PluginError from 'plugin-error'

function makePluginError(PLUGIN_NAME, err) {
  if (err instanceof PluginError) {
    return err
  } else if (err instanceof Error) {
    let pe = new PluginError(PLUGIN_NAME, err.message)
    pe.stack = err.stack
    return pe
  } else {
    return new PluginError(PLUGIN_NAME, err.toString())
  }
}

// The purpose of this "makeOverrides" is only to convert InnerTransform errors
// to PluginError.  Necessary because the outer on-'error' handler's errors are ignored
// in favor of another listeners errors 
function makeOverrides(PLUGIN_NAME, InnerTransform) {
  return {
    transform(chunk, enc, cb) {
      InnerTransform.prototype._transform.call(this, chunk, enc, (err, data) => {
        if (err)
          cb(makePluginError(PLUGIN_NAME, err), null)
        else
          cb(null, data)
      })
    },
    flush(cb) {
      InnerTransform.prototype._flush.call(this, (err, data) => {
        if (err)
          cb(makePluginError(PLUGIN_NAME, err), null)
        else
          cb(null, data)
      })
    },
  }
}

function gulpCreateOuterTransform(
  PLUGIN_NAME, InnerTransform,
  innerArgs, transformOpts) {
  // Creating a stream through which each file will pass
  return through.obj(function (file, enc, cb) {
    if (file.isNull()) {
      // return empty file
      return cb(null, file)
    }
    let outChunks = []
    let rppt = new InnerTransform(
      ...innerArgs,
      Object.assign((transformOpts || {}), makeOverrides(PLUGIN_NAME, InnerTransform))
    )

    rppt.on('end', () => {
      /// https://stackoverflow.com/a/52303073
      let date = new Date()
      file.stat.atime = date
      file.stat.mtime = date
      ///
      if (file.isBuffer())
        file.contents = Buffer.concat(outChunks)
      // putting the callback here will result in Gulp writing an empty file
      //cb(null, file) 
    })
    rppt.on('error', (err) => {
      // in the case where file.isStream()===true,
      // there is another listener on error which does not 
      // convert the err to PluginErr and passed that unconverted
      // err to the destroy function of the next stage resulting 
      // in hte following PluginError not being used.
      // To workaround that,  GulpWrappedInnerTransform
      // transforms the err after every call to _transform and _flush.
     // THIS emit CODE IS THEREFORE REDUNDANT, but harmless. 
      this.emit('error',
        makePluginError(PLUGIN_NAME, err)
      )
      // cb was called earlier, so can't call it here
    })

    if (file.isBuffer()) {
      rppt.on('data', (chunk) => {
        if (!(chunk instanceof Buffer))
          chunk = Buffer.from(chunk)
        outChunks.push(chunk)
      })
      rppt.end(file.contents, 'utf8') // 'end()' is equiv write() + signal end 
    }
    if (file.isStream()) {
      file.contents = file.contents
        .pipe(rppt)
      rppt.resume()
    }
    cb(null, file) // return early - seems anti intuitive but written file is empty otherwise
  })
}
export default gulpCreateOuterTransform

Main Issue:

Making the callback cb(null, file) immediately seems anti intuitive. I expected to call it from inside end and error handlers. But when I tried that the written file was empty. According to man page example code, calling the callback immediately is correct (see below). However, according the man page verbal explanation "Call the callback function only when the current file (stream/buffer) is completely consumed" I feel my expected method (calling from end and error) should be correct.

Secondary issue

As recommended is some gulp instructionals, the code is using emit to signal the error, properly converted to a gulp plugin error. This has to done because by calling the callback early, it cannot be used to return an error later. However, at the node-stream-transform level, another listener is registered for error, and even though it is second in the listener list, it is first to pass the error (an unconverted error) up the outer gulp transform level. The workaround is the overrides for transform and flush you see near the top, used to convert the error which will be passed up by that second listener.

Caveat

The code as is works now for the case of a single file opened with src and written with dest . I haven't tested multiple files.

Except from gulp man page showing immedaite callback

module.exports = function() {
    return through.obj(function(file, encoding, callback) {
        if (file.isNull()) {
            // nothing to do
            return callback(null, file);
        }

        if (file.isStream()) {
            // file.contents is a Stream - https://nodejs.org/api/stream.html
            this.emit('error', new PluginError(PLUGIN_NAME, 'Streams not supported!'));

            // or, if you can handle Streams:
            //file.contents = file.contents.pipe(...
            //return callback(null, file);
        } else if (file.isBuffer()) {
            // file.contents is a Buffer - https://nodejs.org/api/buffer.html
            this.emit('error', new PluginError(PLUGIN_NAME, 'Buffers not supported!'));

            // or, if you can handle Buffers:
            //file.contents = ...
            //return callback(null, file);
        }
    });
};

@donpedro
Copy link

donpedro commented Oct 6, 2019

I concur; I have observed the exact behavior described by @craigphicks . I didn't realize that this conflicted with the instructions, but my notes show that I am seeing the same issue:

        .on('end', function () {

          // DON'T CALL THIS HERE. It MAY work, if the job is small enough. But it needs to 
          // be called after the stream is SET UP, not when the streaming is DONE.
          // Calling the callback here instead of below may result in data hanging in the 
          // stream--not sure of the technical term, but dest() creates no file, or the file
          // is blank

          // cb(returnErr, file);

        })

@donpedro
Copy link

donpedro commented Oct 6, 2019

Per @craigphicks caveat:

The code as is works now for the case of a single file opened with src and written with dest . I haven't tested multiple files.

I too can write one file successfully, but when I write two files only one writes correctly in some circumstances. Behavior noted below is consistent given the test data I'm using, but it may well be dependent upon timing and other factors, such as the size of the data being moved, etc.

...
    else if (file.isStream()) {
      let newStream = through2.obj(function (this: any, file: Vinyl, encoding: string, cb: Function) {
        cb(null, file)
      })

      let newFile = new Vinyl({path: newFileName, contents:newStream})

      file.contents
        .pipe(request(configObj))
        .pipe(newStream)


      // In this order, both files are written correctly by dest();
      // this.push(file)
      // this.push(newFile)

      // in THIS order newFile is still written correctly, and file
      // is written but is blank. If either line is commented 
      // out, the other line writes correctly
      this.push(newFile)
      this.push(file)


      // after our stream is set up (not necesarily finished) we call the callback
      log.debug('calling callback')    
      cb(returnErr);
    }
...

Full source file is here, and working project is here

@craigphicks
Copy link
Author

@donpedro - Your project is very interesting. I have a question about

let newStream = through2.obj(function (this: any, file: Vinyl, encoding: string, cb: Function) {
        cb(null, file)
      })

When does this function (pass as argument) get called and what entity receives the data?

@donpedro
Copy link

donpedro commented Oct 7, 2019

@craigphicks that's just an empty transformFunction; I could have just left the function out completely, as I'm not actually doing any transformation; I'm really just using newStream as a passthrough so I can use it for newFile. More here: https://www.npmjs.com/package/through2#transformfunction

Thanks for the good word! Be aware that gulp-api-adapter is very much under construction; it's being developed to work with our gulp-etl project, but hopefully it'll have some utility in the wider gulp world.

Hopefully it'll be usable and stable later this week...

@jonatanlinden
Copy link

jonatanlinden commented Apr 29, 2021

I might have a smaller example for when the vinyl stream becomes empty (even though I'm not that experienced with streams, so there might be a user error in the following example).

const {dest} = require('gulp');
const source = require('vinyl-source-stream');
const size   = require('gulp-size');

function sizeOfStream(cb) {
  const stream = source('bogusname');
  stream.write('a');
  stream.end();
  stream.pipe(size())
    .pipe(dest('.'));
  cb();
}

exports.default = sizeOfStream;

The output file of the above task is empty, expected was a content of 'a'.
The solution mentioned above, calling the callback immediately (in gulp-size), without waiting for the subtask to finish, works. However, interestingly enough, it breaks the logic in the flush function (gulp-size uses through2).

@yocontra
Copy link
Member

@jonatanlinden You're ending the stream before you piped it anywhere - you probably want to do this:

import { pipeline } from 'stream'

// later on in your code
const stream = source('bogusname');

pipeline(
  stream,
  size(),
  dest('.'),
  cb
)
 stream.write('a');
 stream.end();

@jonatanlinden
Copy link

jonatanlinden commented Apr 29, 2021

Have you actually tried your code, and gotten another result? When I run my take of it (wrapping it in a function with cb as a parameter), I get the same behaviour as with my example: with size(), bogusname is empty, without it, it is not.

(I believe stream.end() indicate that no more data will be written to the stream, but it does not really affect the readable part of the stream. Maybe?)

The point was that gulp-size also seems to be written with the understanding that the callback function should be called after the vinyl content's stream has been processed. Like this:

if (file.isStream()) {
...
  file.contents.pipe(new StreamCounter())
    .on('finish', function () {
      finish(null, this.bytes);
    });
}

Where the function finish calls the callback function.

I guess it could be possible to construct an even smaller example exposing the behaviour, using gulp-size as a starting point.

@joeyparrish
Copy link

joeyparrish commented Feb 2, 2023

I was having a very similar issue with a plugin that invokes child_process.spawn. The gulp task is "finished" in a few ms, even though the child process hasn't started yet and nobody had invoked the callback.

import {spawn} from 'node:child_process';
import process from 'node:process';

import gulp from 'gulp';
import PluginError from 'plugin-error';
import through from 'through2';

function generateSources() {
  const nodeExecutable = process.argv[0];

  return through.obj((file, encoding, callback) => {
    if (file.isNull()) {
      callback(null, file);
      return;
    }

    // stdin and stdout are pipes, and stderr is inherited from this process.
    const child = spawn(nodeExecutable, ['dist/tools/code-gen.js'], {
      stdio: ['pipe', 'pipe', 'inherit'],
    });

    if (file.isStream()) {
      console.log('Is stream.');
      // file.contents is a ReadableStream
      file.contents.pipe(child.stdin);
    } else if (file.isBuffer()) {
      console.log('Is buffer.');
      // file.contents is a Buffer
      child.stdin.write(file.contents);
      child.stdin.end();
    }

    // Now file.contents is a ReadableStream from the child process's stdout.
    file.contents = child.stdout;

    // The filename should end in .ts.
    file.extname = '.ts';

    // If the child fails, this pipe fails.
    child.on('error', (error) => {
      console.log('On error.', error);
      callback(error, null);
    });

    // Wait for the child to complete.
    // Note that 'exit' fires before the pipes close, so we use 'close' to make
    // sure we have all the data.
    child.on('close', (code) => {
      console.log('On close.', code);
      if (code == 0) {
        // Exit code zero, this pipe succeeds.
        callback(null, file);
      } else {
        // Exit code non-zero, this pipe fails.
        const error = new PluginError(
            'GenerateSources', `Failed with exit code ${code}`);
        callback(error, null);
      }
    });
  });
}

All my logs would occur after the task "finished" "successfully", even if I deliberately broke the command to trigger ENOENT.

I finally found the cause. It was my task:

gulp.task('generate-sources', gulp.series('build-tools', async () => {
  return gulp.src('src/gen/*.yaml')
    .pipe(generateSources())
    .pipe(gulp.dest('generated/gen/'));
}));

The problem was async in my task definition. The fixed version works correctly:

gulp.task('generate-sources', gulp.series('build-tools', () => {
  return gulp.src('src/gen/*.yaml')
    .pipe(generateSources())
    .pipe(gulp.dest('generated/gen/'));
}));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants