Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer #330

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

buffer #330

wants to merge 10 commits into from

Conversation

ronag
Copy link

@ronag ronag commented Jun 29, 2015

Adds buffer operator and implements parallel in terms of buffer which gives a simpler and more powerful solution.

Needs test and review.

@vqvu
Copy link
Collaborator

vqvu commented Jun 29, 2015

This doesn't work

var s = _([1, 2, 3]);
 _([s.fork(), s.fork()])
    .parallel(2)
    .toArray(_.log);
// => prints nothing.

@ronag
Copy link
Author

ronag commented Jun 29, 2015

I see. However, I don't quite understand why it doesn't work.

Does fork require something special?

@ronag
Copy link
Author

ronag commented Jun 29, 2015

Actually, I seem to have the same problem with sequence.

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

It doesn't work with sequence because sequence will consume from each stream in order. Since the two forks share backpressure and sequence won't consume from the second one until the first one is done, you'll deadlock.

In contrast, if you run parallel(2), both forks should be consumed at the same time, and you're ok. It's a good way to test if the streams are actually being consumed in parallel. In this case, since it doesn't work, I assume you're not actually consuming in parallel.

@ronag
Copy link
Author

ronag commented Jun 30, 2015

Ok. I've fixed that test case. Not sure if it is lazy enough right now. How would you want it to work? Should on start buffering item right away or after the first request?

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

It should start buffering after the first request, since that's the current behavior of parallel.

A few other things:

  • We can't change the signature of parallel, so we can't have the k parameter. We don't allow optional arguments in transforms because they're curried.
  • buffer needs docs.
  • buffer needs tests.
  • buffer should be implemented with consume and not pull. pull is super slow with the 2.x engine. You can run _([_(arrayOf1000000Elems)]).parallel(1).toArray(_.log) to see the problem.

I have a working version of buffer (based off yours) that uses consume that I can post.

Though since parallel works now, you only need to do all of this if you still need buffer specifically, and not just a working version of parallel. Let me know if you want to proceed.

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Oh, and also

  • This needs to be rebased on top of master.

@ronag
Copy link
Author

ronag commented Jun 30, 2015

I would like to continue with this when I have spare time. Please do post your version. Even if we don't allow extra params with parallel it would be possible to achieve the same thing relatively simply using buffer.

Do you think it would be a good idea to simplify the parallel implementation this way or would you rather leave that part as is?

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Let's leave parallel as-is for now. We can always change it later.

function buffer(n, startImmediately) {
    return function (s) {
        var buffer = [];
        var yieldRead = null;
        var yieldWrite = null;
        var handle = null;

        s = s.consume(function (err, x, push, next) {
            buffer.push([err, x]);

            if (x !== _.nil) {
                if (buffer.length < n) {
                    next();
                }
                else {
                    yieldRead = next;
                }
            }

            if (yieldWrite) {
                var yield = yieldWrite;
                yieldWrite = null;
                yield();
            }
        });

        if (startImmediately) {
            s.resume();
        }
        else {
            yieldRead = s.resume.bind(s);
        }

        var ss = _(function (push, next) {
            if (buffer.length > 0) {
                var elem = buffer.shift();
                push.apply(null, elem);
                if (elem[1] !== _.nil) {
                    next();
                }
            } else {
                yieldWrite = next;
            }

            if (yieldRead) {
                var yield = yieldRead;
                yieldRead = null;
                yield();
            }
        });

        return ss;
    };
}

One thing about the start parameter. One side effect of not being able to use optional parameters is that we tend not to put in extra parameters that most people won't use. starts seems like one such parameter.

I think we'll be better off just dropping the parameter completely and always start immediately. There's a case to be made for being as lazy as possible, but since one of the main uses of buffer is to improve throughput on consume, it probably makes sense to start buffering as soon as possible.

Do you have a use-case for setting start = false?

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Hmm...yield is a keyword in ES6, so we'll probably want to change that.

@ronag
Copy link
Author

ronag commented Jun 30, 2015

Let's leave parallel as-is for now. We can always change it later.

Sounds reasonable. Only comment I have is that parallel currently seems a bit overly complex and a little brittle? I tried to figure out the previous bug we encountered myself and quickly got a bit lost.

Do you have a use-case for setting start = false?

Not really. As you write it doesn't make much sense. The reason I added it is due to the general practice of everything being lazy.

@vqvu
Copy link
Collaborator

vqvu commented Jun 30, 2015

Yeah, we try to keep everything as lazy as possible, but there are exceptions where it makes sense. For example, latest.

@Sebmaster
Copy link

I'd actually have voted for startsImmediately = true when I originally read the issue, however this makes it somewhat more difficult to put pipelines together and I'm leaning towards the other way now. For example in my application I had:

function generateStream2() { return _(data).buffer(10); }
generateStream1().concat(generateStream2());

This confused me a bit when data from stream2 got processed before stream1 was done, so that's just my 0.02$ for consideration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants