diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b8ed27..9ba32f1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ @samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade. +## 1.31.0 (2022-10-06) + +Adding: + +- lo.SliceToChannel +- lo.Generator +- lo.Batch +- lo.BatchWithTimeout + ## 1.30.1 (2022-10-06) Fix: diff --git a/README.md b/README.md index 7c27c02b..49cbe49d 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,10 @@ Supported helpers for tuples: Supported helpers for channels: - [ChannelDispatcher](#channeldispatcher) +- [SliceToChannel](#slicetochannel) +- [Generator](#generator) +- [Batch](#batch) +- [BatchWithTimeout](#batchwithtimeout) Supported intersection helpers: @@ -1262,6 +1266,134 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy) ... ``` +### SliceToChannel + +Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized. + +```go +list := []int{1, 2, 3, 4, 5} + +for v := range lo.SliceToChannel(2, list) { + println(v) +} +// prints 1, then 2, then 3, then 4, then 5 +``` + +### Generator + +Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized. + +```go +generator := func(yield func(int)) { + yield(1) + yield(2) + yield(3) +} + +for v := range lo.Generator(2, generator) { + println(v) +} +// prints 1, then 2, then 3 +``` + +### Batch + +Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed). + +```go +ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5}) + +items1, length1, duration1, ok1 := lo.Batch(ch, 3) +// []int{1, 2, 3}, 3, 0s, true +items2, length2, duration2, ok2 := lo.Batch(ch, 3) +// []int{4, 5}, 2, 0s, false +``` + +Example: RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +for { + // read 1k items + items, length, _, ok := lo.Batch(ch, 1000) + + // do batching stuff + + if !ok { + break + } +} +``` + +### BatchWithTimeout + +Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed). + +```go +generator := func(yield func(int)) { + for i := 0; i < 5; i++ { + yield(i) + time.Sleep(35*time.Millisecond) + } +} + +ch := lo.Generator(0, generator) + +items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{1, 2}, 2, 100ms, true +items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{3, 4, 5}, 3, 75ms, true +items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{}, 0, 10ms, false +``` + +Example: RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +for { + // read 1k items + // wait up to 1 second + items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second) + + // do batching stuff + + if !ok { + break + } +} +``` + +Example: Multithreaded RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +// 5 workers +// prefetch 1k messages per worker +children := lo.ChannelDispatcher(ch, 5, 1000, DispatchingStrategyFirst[int]) + +consumer := func(c <-chan int) { + for { + // read 1k items + // wait up to 1 second + items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second) + + // do batching stuff + + if !ok { + break + } + } +} + +for i := range children { + go consumer(children[i]) +} +``` + ### Contains Returns true if an element is present in a collection. diff --git a/channel.go b/channel.go index 3036e32b..ccbd3602 100644 --- a/channel.go +++ b/channel.go @@ -149,3 +149,80 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item]) }) } + +// SliceToChannel returns a read-only channels of collection elements. +func SliceToChannel[T any](bufferSize int, collection []T) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + for _, item := range collection { + ch <- item + } + + close(ch) + }() + + return ch +} + +// Generator implements the generator design pattern. +func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + // WARNING: infinite loop + generator(func(t T) { + ch <- t + }) + + close(ch) + }() + + return ch +} + +// Batch creates a slice of n elements from a channel. Returns the slice and the slice length. +// @TODO: we should probaby provide an helper that reuse the same buffer. +func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { + buffer := make([]T, 0, size) + index := 0 + now := time.Now() + + for ; index < size; index++ { + item, ok := <-ch + if !ok { + return buffer, index, time.Since(now), false + } + + buffer = append(buffer, item) + } + + return buffer, index, time.Since(now), true +} + +// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +// @TODO: we should probaby provide an helper that reuse the same buffer. +func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { + expire := time.NewTimer(timeout) + defer expire.Stop() + + buffer := make([]T, 0, size) + index := 0 + now := time.Now() + + for ; index < size; index++ { + select { + case item, ok := <-ch: + if !ok { + return buffer, index, time.Since(now), false + } + + buffer = append(buffer, item) + + case <-expire.C: + return buffer, index, time.Since(now), true + } + } + + return buffer, index, time.Since(now), true +} diff --git a/channel_test.go b/channel_test.go index eb74bfdd..b100c86f 100644 --- a/channel_test.go +++ b/channel_test.go @@ -187,3 +187,107 @@ func TestDispatchingStrategyMost(t *testing.T) { children[1] <- 1 is.Equal(0, DispatchingStrategyMost(42, 0, rochildren)) } + +func TestSliceToChannel(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + ch := SliceToChannel[int](2, []int{1, 2, 3}) + + r1, ok1 := <-ch + r2, ok2 := <-ch + r3, ok3 := <-ch + is.True(ok1) + is.Equal(1, r1) + is.True(ok2) + is.Equal(2, r2) + is.True(ok3) + is.Equal(3, r3) + + _, ok4 := <-ch + is.False(ok4) +} + +func TestGenerate(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + generator := func(yield func(int)) { + yield(0) + yield(1) + yield(2) + yield(3) + } + + i := 0 + + for v := range Generator(2, generator) { + is.Equal(i, v) + i++ + } + + is.Equal(i, 4) +} + +func TestBatch(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + ch := SliceToChannel(2, []int{1, 2, 3}) + + items1, length1, _, ok1 := Batch(ch, 2) + items2, length2, _, ok2 := Batch(ch, 2) + items3, length3, _, ok3 := Batch(ch, 2) + + is.Equal([]int{1, 2}, items1) + is.Equal(2, length1) + is.True(ok1) + is.Equal([]int{3}, items2) + is.Equal(1, length2) + is.False(ok2) + is.Equal([]int{}, items3) + is.Equal(0, length3) + is.False(ok3) +} + +func TestBatchWithTimeout(t *testing.T) { + t.Parallel() + testWithTimeout(t, 200*time.Millisecond) + is := assert.New(t) + + generator := func(yield func(int)) { + for i := 0; i < 5; i++ { + yield(i) + time.Sleep(10 * time.Millisecond) + } + } + ch := Generator(0, generator) + + items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond) + is.Equal([]int{0, 1}, items1) + is.Equal(2, length1) + is.True(ok1) + + items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond) + is.Equal([]int{}, items2) + is.Equal(0, length2) + is.True(ok2) + + items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond) + is.Equal([]int{2}, items3) + is.Equal(1, length3) + is.True(ok3) + + items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond) + is.Equal([]int{3, 4}, items4) + is.Equal(2, length4) + is.True(ok4) + + items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond) + is.Equal([]int{}, items5) + is.Equal(0, length5) + is.False(ok5) +}