From ba7adf605f25e712d0204f4dccca0e5ec46951b6 Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Thu, 14 Apr 2022 21:59:07 +0200 Subject: [PATCH 1/5] feat(channels): add ToChannel + Generator + Batch + BatchWithTimeout (WIP) --- channel.go | 72 +++++++++++++++++++++++++++++++++++++ channel_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/channel.go b/channel.go index 3036e32b..59fa862f 100644 --- a/channel.go +++ b/channel.go @@ -149,3 +149,75 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item]) }) } + +// ToChannel returns a read-only channels of collection elements. +func ToChannel[T any](bufferSize int, collection []T) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + for _, item := range collection { + ch <- item + } + + close(ch) + }() + + return ch +} + +// Generator implements the generator design pattern. +func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + // WARNING: infinite loop + generator(func(t T) { + ch <- t + }) + + close(ch) + }() + + return ch +} + +// Batch creates a slice of n elements from a channel. Returns the slice and the slice length. +func Batch[T any](ch <-chan T, size int) (collection []T, length int) { + buffer := make([]T, 0, size) + index := 0 + + for ; index < size; index++ { + item, ok := <-ch + if !ok { + return buffer, index + } + + buffer = append(buffer, item) + } + + return buffer, index +} + +// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int) { + expire := time.After(timeout) + + buffer := make([]T, 0, size) + index := 0 + + for ; index < size; index++ { + select { + case item, ok := <-ch: + if !ok { + return buffer, index + } + + buffer = append(buffer, item) + + case <-expire: + return buffer, index + } + } + + return buffer, index +} diff --git a/channel_test.go b/channel_test.go index eb74bfdd..88ee5d85 100644 --- a/channel_test.go +++ b/channel_test.go @@ -187,3 +187,99 @@ func TestDispatchingStrategyMost(t *testing.T) { children[1] <- 1 is.Equal(0, DispatchingStrategyMost(42, 0, rochildren)) } + +func TestToChannel(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + ch := ToChannel[int](2, []int{1, 2, 3}) + + r1, ok1 := <-ch + r2, ok2 := <-ch + r3, ok3 := <-ch + is.True(ok1) + is.Equal(1, r1) + is.True(ok2) + is.Equal(2, r2) + is.True(ok3) + is.Equal(3, r3) + + _, ok4 := <-ch + is.False(ok4) +} + +func TestGenerate(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + gen := func(yield func(int)) { + yield(0) + yield(1) + yield(2) + yield(3) + } + + i := 0 + + for v := range Generator(2, gen) { + is.Equal(i, v) + i++ + } + + is.Equal(i, 4) +} + +func TestBatch(t *testing.T) { + t.Parallel() + testWithTimeout(t, 10*time.Millisecond) + is := assert.New(t) + + ch := ToChannel(2, []int{1, 2, 3}) + + items1, length1 := Batch(ch, 2) + items2, length2 := Batch(ch, 2) + items3, length3 := Batch(ch, 2) + + is.Equal([]int{1, 2}, items1) + is.Equal(2, length1) + is.Equal([]int{3}, items2) + is.Equal(1, length2) + is.Equal([]int{}, items3) + is.Equal(0, length3) +} + +func TestBatchWithTimeout(t *testing.T) { + t.Parallel() + testWithTimeout(t, 200*time.Millisecond) + is := assert.New(t) + + ch := make(chan int) + go func() { + for i := 0; i < 5; i++ { + ch <- i + time.Sleep(10 * time.Millisecond) + } + }() + + items1, length1 := BatchWithTimeout(ch, 20, 15*time.Millisecond) + is.Equal([]int{0, 1}, items1) + is.Equal(2, length1) + + items2, length2 := BatchWithTimeout(ch, 20, 2*time.Millisecond) + is.Equal([]int{}, items2) + is.Equal(0, length2) + + items3, length3 := BatchWithTimeout(ch, 1, 30*time.Millisecond) + is.Equal([]int{2}, items3) + is.Equal(1, length3) + + items4, length4 := BatchWithTimeout(ch, 2, 25*time.Millisecond) + is.Equal([]int{3, 4}, items4) + is.Equal(2, length4) + + items5, length5 := BatchWithTimeout(ch, 3, 25*time.Millisecond) + is.Equal([]int{}, items5) + is.Equal(0, length5) +} From eb382242d89ef1b15e8b380516d69ae698a1d381 Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Fri, 7 Oct 2022 00:16:24 +0200 Subject: [PATCH 2/5] feat: return duration of Batch**** helpers --- CHANGELOG.md | 9 +++++ README.md | 103 ++++++++++++++++++++++++++++++++++++++++++++++++ channel.go | 20 +++++----- channel_test.go | 42 ++++++++++++-------- 4 files changed, 148 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b8ed27..9ba32f1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ @samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade. +## 1.31.0 (2022-10-06) + +Adding: + +- lo.SliceToChannel +- lo.Generator +- lo.Batch +- lo.BatchWithTimeout + ## 1.30.1 (2022-10-06) Fix: diff --git a/README.md b/README.md index 7c27c02b..1b09860f 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,10 @@ Supported helpers for tuples: Supported helpers for channels: - [ChannelDispatcher](#channeldispatcher) +- [SliceToChannel](#slicetochannel) +- [Generator](#generator) +- [Batch](#batch) +- [BatchWithTimeout](#batchwithtimeout) Supported intersection helpers: @@ -1262,6 +1266,105 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy) ... ``` +### SliceToChannel + +Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized. + +```go +list := []int{1, 2, 3, 4, 5} + +for v := range lo.SliceToChannel(2, list) { + println(v) +} +// prints 1, then 2, then 3, then 4, then 5 +``` + +### Generator + +Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized. + +```go +generator := func(yield func(int)) { + yield(1) + yield(2) + yield(3) +} + +for v := range lo.Generator(2, generator) { + println(v) +} +// prints 1, then 2, then 3 +``` + +### Batch + +Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed). + +```go +ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5}) + +items1, length1, duration1, ok1 := lo.Batch(ch, 3) +// []int{1, 2, 3}, 3, 0s, true +items2, length2, duration2, ok2 := lo.Batch(ch, 3) +// []int{4, 5}, 2, 0s, false +``` + +Example: RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +for { + // read 1k items + items, length, _, ok := lo.Batch(ch, 1000) + + // do batching stuff + + if !ok { + break + } +} +``` + +### BatchWithTimeout + +Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed). + +```go +generator := func(yield func(int)) { + for i := 0; i < 5; i++ { + yield(i) + time.Sleep(35*time.Millisecond) + } +} + +ch := lo.Generator(0, generator) + +items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{1, 2}, 2, 100ms, true +items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{3, 4, 5}, 3, 75ms, true +items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond) +// []int{}, 0, 10ms, false +``` + +Example: RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +for { + // read 1k items up to 1 second + items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second) + + // do batching stuff + + if !ok { + break + } +} +``` + ### Contains Returns true if an element is present in a collection. diff --git a/channel.go b/channel.go index 59fa862f..5846f6a8 100644 --- a/channel.go +++ b/channel.go @@ -150,8 +150,8 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in }) } -// ToChannel returns a read-only channels of collection elements. -func ToChannel[T any](bufferSize int, collection []T) <-chan T { +// SliceToChannel returns a read-only channels of collection elements. +func SliceToChannel[T any](bufferSize int, collection []T) <-chan T { ch := make(chan T, bufferSize) go func() { @@ -182,42 +182,44 @@ func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T { } // Batch creates a slice of n elements from a channel. Returns the slice and the slice length. -func Batch[T any](ch <-chan T, size int) (collection []T, length int) { +func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { buffer := make([]T, 0, size) index := 0 + now := time.Now() for ; index < size; index++ { item, ok := <-ch if !ok { - return buffer, index + return buffer, index, time.Since(now), false } buffer = append(buffer, item) } - return buffer, index + return buffer, index, time.Since(now), true } // BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. -func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int) { +func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { expire := time.After(timeout) buffer := make([]T, 0, size) index := 0 + now := time.Now() for ; index < size; index++ { select { case item, ok := <-ch: if !ok { - return buffer, index + return buffer, index, time.Since(now), false } buffer = append(buffer, item) case <-expire: - return buffer, index + return buffer, index, time.Since(now), true } } - return buffer, index + return buffer, index, time.Since(now), true } diff --git a/channel_test.go b/channel_test.go index 88ee5d85..b100c86f 100644 --- a/channel_test.go +++ b/channel_test.go @@ -188,12 +188,12 @@ func TestDispatchingStrategyMost(t *testing.T) { is.Equal(0, DispatchingStrategyMost(42, 0, rochildren)) } -func TestToChannel(t *testing.T) { +func TestSliceToChannel(t *testing.T) { t.Parallel() testWithTimeout(t, 10*time.Millisecond) is := assert.New(t) - ch := ToChannel[int](2, []int{1, 2, 3}) + ch := SliceToChannel[int](2, []int{1, 2, 3}) r1, ok1 := <-ch r2, ok2 := <-ch @@ -214,7 +214,7 @@ func TestGenerate(t *testing.T) { testWithTimeout(t, 10*time.Millisecond) is := assert.New(t) - gen := func(yield func(int)) { + generator := func(yield func(int)) { yield(0) yield(1) yield(2) @@ -223,7 +223,7 @@ func TestGenerate(t *testing.T) { i := 0 - for v := range Generator(2, gen) { + for v := range Generator(2, generator) { is.Equal(i, v) i++ } @@ -236,18 +236,21 @@ func TestBatch(t *testing.T) { testWithTimeout(t, 10*time.Millisecond) is := assert.New(t) - ch := ToChannel(2, []int{1, 2, 3}) + ch := SliceToChannel(2, []int{1, 2, 3}) - items1, length1 := Batch(ch, 2) - items2, length2 := Batch(ch, 2) - items3, length3 := Batch(ch, 2) + items1, length1, _, ok1 := Batch(ch, 2) + items2, length2, _, ok2 := Batch(ch, 2) + items3, length3, _, ok3 := Batch(ch, 2) is.Equal([]int{1, 2}, items1) is.Equal(2, length1) + is.True(ok1) is.Equal([]int{3}, items2) is.Equal(1, length2) + is.False(ok2) is.Equal([]int{}, items3) is.Equal(0, length3) + is.False(ok3) } func TestBatchWithTimeout(t *testing.T) { @@ -255,31 +258,36 @@ func TestBatchWithTimeout(t *testing.T) { testWithTimeout(t, 200*time.Millisecond) is := assert.New(t) - ch := make(chan int) - go func() { + generator := func(yield func(int)) { for i := 0; i < 5; i++ { - ch <- i + yield(i) time.Sleep(10 * time.Millisecond) } - }() + } + ch := Generator(0, generator) - items1, length1 := BatchWithTimeout(ch, 20, 15*time.Millisecond) + items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond) is.Equal([]int{0, 1}, items1) is.Equal(2, length1) + is.True(ok1) - items2, length2 := BatchWithTimeout(ch, 20, 2*time.Millisecond) + items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond) is.Equal([]int{}, items2) is.Equal(0, length2) + is.True(ok2) - items3, length3 := BatchWithTimeout(ch, 1, 30*time.Millisecond) + items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond) is.Equal([]int{2}, items3) is.Equal(1, length3) + is.True(ok3) - items4, length4 := BatchWithTimeout(ch, 2, 25*time.Millisecond) + items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond) is.Equal([]int{3, 4}, items4) is.Equal(2, length4) + is.True(ok4) - items5, length5 := BatchWithTimeout(ch, 3, 25*time.Millisecond) + items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond) is.Equal([]int{}, items5) is.Equal(0, length5) + is.False(ok5) } From c4b203e7652f8f87585535193099874c9f07b78d Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Fri, 7 Oct 2022 00:26:03 +0200 Subject: [PATCH 3/5] doc: improve BatchWithTimeout doc --- README.md | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1b09860f..49cbe49d 100644 --- a/README.md +++ b/README.md @@ -1354,7 +1354,8 @@ Example: RabbitMQ consumer 👇 ch := readFromQueue() for { - // read 1k items up to 1 second + // read 1k items + // wait up to 1 second items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second) // do batching stuff @@ -1365,6 +1366,34 @@ for { } ``` +Example: Multithreaded RabbitMQ consumer 👇 + +```go +ch := readFromQueue() + +// 5 workers +// prefetch 1k messages per worker +children := lo.ChannelDispatcher(ch, 5, 1000, DispatchingStrategyFirst[int]) + +consumer := func(c <-chan int) { + for { + // read 1k items + // wait up to 1 second + items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second) + + // do batching stuff + + if !ok { + break + } + } +} + +for i := range children { + go consumer(children[i]) +} +``` + ### Contains Returns true if an element is present in a collection. From 01cf836914a59383d0b5ed29fb2b8d77dd56b042 Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Fri, 7 Oct 2022 00:32:11 +0200 Subject: [PATCH 4/5] fix(BatchWithTimeout): replace time.After by time.NewTimer in order to prevent memory leak --- channel.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/channel.go b/channel.go index 5846f6a8..65046864 100644 --- a/channel.go +++ b/channel.go @@ -201,7 +201,8 @@ func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime t // BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { - expire := time.After(timeout) + expire := time.NewTimer(timeout) + defer expire.Stop() buffer := make([]T, 0, size) index := 0 @@ -216,7 +217,7 @@ func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (coll buffer = append(buffer, item) - case <-expire: + case <-expire.C: return buffer, index, time.Since(now), true } } From 8e5bf27cccb3d2b30d4c72c16e133cdecf749d00 Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Fri, 7 Oct 2022 00:35:32 +0200 Subject: [PATCH 5/5] doc: improve BatchWithTimeout doc --- channel.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/channel.go b/channel.go index 65046864..ccbd3602 100644 --- a/channel.go +++ b/channel.go @@ -182,6 +182,7 @@ func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T { } // Batch creates a slice of n elements from a channel. Returns the slice and the slice length. +// @TODO: we should probaby provide an helper that reuse the same buffer. func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) { buffer := make([]T, 0, size) index := 0 @@ -200,6 +201,7 @@ func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime t } // BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +// @TODO: we should probaby provide an helper that reuse the same buffer. func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) { expire := time.NewTimer(timeout) defer expire.Stop()