Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk queue with timer - how to construct? cargo? timeout? #1490

Closed
deitch opened this issue Nov 3, 2017 · 12 comments
Closed

bulk queue with timer - how to construct? cargo? timeout? #1490

deitch opened this issue Nov 3, 2017 · 12 comments

Comments

@deitch
Copy link

deitch commented Nov 3, 2017

I am trying to figure out how to use async to create a "cargo-timeout-queue".

Essentially, I am trying to cache and throttle downstream i/o behaviour. Think of it as "cargo with minimum batch size and timeout to override."

const q = async.cargo_min_with_timeout(handler, 10, 5000);

The above would:

  • create a cargo-like queue
  • that would not call handler (first arg) until it has 10 (second arg) items to send together
  • but if it is at least 5000 (last arg) milliseconds since last time a handler was called and there are items on the queue, call handler with what we have

This allows me to combine input objects for efficient downstream i/o processing by handler, but ensures that it doesn't wait too long. There is a tension between batch size - larger can be better - and delay - I don't want to wait too long.

Is there some way to combine async functions to get this behaviour? Or do I need to build it?

Thanks. Once again, async is a mainstay in my JS work. :-)

@deitch
Copy link
Author

deitch commented Nov 3, 2017

I guess I could implement by using cargo and having the handler maintain a "time since last run" counter, and then refuse to run unless count(items passed in queue) > 10 or time since last run > 5000. But is there a native way?

@aearly
Copy link
Collaborator

aearly commented Nov 3, 2017

The idea of a "throttled queue" has been discussed many times before (#1314), but we have punted on it. It is a surprisingly hard problem -- usually you use something like Redis or a message queue to manage it. It's hard to do it properly in Node.

We've decided we don't want to build or support throttling in Async, instead we just offer the concurrency-limited methods. You can approximate the throttle rate by multiplying the concurrency and the average I/O response time.

@aearly
Copy link
Collaborator

aearly commented Nov 3, 2017

Also, we are missing a cargoQueue function -- where you can set both the payload size and concurrency. I believe it is already implemented internally, just not exposed publicly.

@deitch
Copy link
Author

deitch commented Nov 4, 2017

It is a surprisingly hard problem

I was actually thinking the reverse. I don't want my cargo/queue to process one at a time, but instead batch them up. I am not so much throttling as I am providing minimum batch size. I am perfectly fine if I pass through 1MM per sec, as long as each handler:

  • waits until it has a minimum number of items in its cargo - e.g. min=10 means, "do not run until you have 10 things to give to the handler"
  • doesn't wait beyond a certain threshold, so things don't get too delayed - e.g. timeout=5000 means, "even if you don't have 10 things, run if it has been 5000 seconds since you last ran the handler"

It sort of looks like this (if ugly):

const tmpQ = [], maxTime = 5000, minCargo = 10;
let lastRun = 0;
const handler = (cargo, callback) => {
  tmpQ.push(...cargo);
  const now = new Date().getTime();
  if (tmpQ.length >= minCargo || now - lastRun > maxTime) {
    // process function here
    // empty Q
    tmpQ.splice(0);
   // reset time since last run
   lastRun = now;
  }
  callback()
};

@deitch
Copy link
Author

deitch commented Nov 4, 2017

Also, we are missing a cargoQueue function -- where you can set both the payload size and concurrency

Interesting. So normally cargo just runs lots in parallel? Or just 1?

@aearly
Copy link
Collaborator

aearly commented Nov 6, 2017

This animation from the docs explains it best, a cargo with payload 2.

cargo animation

There is only ever one worker, but internally, it is implemented so there could be many workers. (A queue is like a cargo with payload 1).

But it seems like i misunderstood what you needed -- a buffering cargo with a time limit. A little bit different than a throttled queue. Do you really need the buffering? Seems like a basic cargo would accomplish most of what you need.

@deitch
Copy link
Author

deitch commented Nov 7, 2017

Seems like a basic cargo would accomplish most of what you need.

It would, mostly. For now, that is exactly what I used. The difference, as you pointed out, is the buffering. If the worker is ready and only one task is in the queue, then I want it to wait up to the specified time limit.

Essentially:

  • queue processes one task at a time
  • cargo processes as many tasks as are available at the moment a worker is ready with no min but a max
  • what I am discussing processes for maximizing the number tasks a worker will handle with an actual min but no max (although a max could be done)

Now that I think of it, you probably could implement this as a cargo with a min and timeout.

@deitch
Copy link
Author

deitch commented Dec 6, 2017

Seems like a basic cargo would accomplish most of what you need

Back at this. We went with it, and when it went from test environment to production load, the downstream i/o became an issue. The particular case in point has about 100 tasks/second. The downstream receiver is far more efficient processing a batch of 100 than 5 batches of 20. When I just use cargo, I get 5 batches of 20. I didn't think it would matter; it does.

Is there any way to create a "buffering cargo" (I like your term) with minimum buffer size and max wait to flush?

@deitch
Copy link
Author

deitch commented Dec 6, 2017

In the end, my code looks something like this.

  const bufferedHandler = (minBuffer, timeout, handler) => {
    let buffer = [];
    return (tasks, callback) => {
      const processor = () => {
        timeoutFn = null;
        handler(buffer, (err) => {
          buffer = [];
          callback(err);
        });
      };
      let timeoutFn = null;
      buffer = buffer.concat(tasks);
      if (minBuffer < 1 || buffer.length >= minBuffer) {
        processor();
      } else if (timeoutFn === null) {
        timeoutFn = setTimeout(processor,timeout);
      }
    };
  },

    // set bufferSize and bufferTimeout separately
    cargo = async.cargo(bufferedHandler(bufferSize, bufferTimeout, (tasks, callback) => {
      // do processing here as if a regular queue/cargo handler
      callback();
    }));

Would like it to be more elegant, but this is what I have.

@justinmchase
Copy link
Contributor

I would like to 👍adding the cargoQueue api discussed above.

In my usecase I'm streaming in data from a file very rapidly and then I'm sending it into a kinesis stream. The kinesis stream api prefers bulk item insert but I need to be able to do it more than 1 at a time. So I'm nabbing the internal queue and doing:

import queue from 'async/internal/queue'
const workers = 10
const payload = 100
const q = queue(putItems, workers, payload)

@aearly
Copy link
Collaborator

aearly commented Mar 2, 2018

The implementation is already there, it just needs to be exposed publicly, and docs and tests.

@aearly
Copy link
Collaborator

aearly commented Jul 8, 2018

Closing in favor of #1555

@aearly aearly closed this as completed Jul 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants