Skip to content

1pakch/bpcqueue

Repository files navigation

bpcqueue

A producer-aware MPMC bounded queue on top of rigtorp/MPMCQueue inspired by cgaebel/pipe.

Example

Generator, transformer and sink functions:

void produce(Output<int> dst, int n) {
    for (int i=0; i<n; ++i)
	dst.push(i);
}

void count(Input<int> src, Output<int> dst) {
    int x, count=0;
    while (src.pop(x))
        ++count;
    dst.push(count);
}

void print_sum(Input<int> src) {
    int x, sum=0;
    while (src.pop(x))
        sum += x;
    printf("%d\n", sum);
}

Assembling the transformations graph and starting/finishing the threads:

int main() {
    // Arguments are capacities
    Queue<int> numbers(7), counts(4);
    const int n_producers = 1;
    const int n_counters = 1;

    // Start producers before consumers
    std::vector<std::thread> producers;
    for (int i=0; i < n_producers; ++i)
        producers.emplace_back(produce, output(numbers), 100);
    
    // Continue in topological order
    std::vector<std::thread> counters;
    for (int i=0; i < n_counters; ++i)
        counters.emplace_back(count, input(numbers), output(counts));

    // Sinks go last
    std::thread printer(print_sum, input(counts));

    // Now join in the same order
    for (int i=0; i < n_producers; ++i) producers[i].join();
    for (int i=0; i < n_counters; ++i) counters[i].join();
    printer.join();
}

About

A producer-aware MPMC bounded queue in C++11

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published