From 2ebbcb6e4c0c717f006cb9bbd51c08761a0a9d4a Mon Sep 17 00:00:00 2001 From: Samuel Berthe Date: Thu, 14 Apr 2022 21:59:07 +0200 Subject: [PATCH] feat(channels): add ToChannel + Generator + Batch + BatchWithTimeout (WIP) --- channel.go | 82 +++++++++++++++++++++++++++++++++++++++++++++ channel_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 channel.go create mode 100644 channel_test.go diff --git a/channel.go b/channel.go new file mode 100644 index 00000000..27af8cad --- /dev/null +++ b/channel.go @@ -0,0 +1,82 @@ +package lo + +import ( + "time" +) + +// ToChannel returns a read-only channels of collection elements. +func ToChannel[T any](collection []T) <-chan T { + ch := make(chan T) + + go func() { + for _, item := range collection { + ch <- item + } + + close(ch) + }() + + return ch +} + +// Generator implements the generator design pattern. +func Generator[T any](bufferSize int, generator func(int64) T) <-chan T { + ch := make(chan T, bufferSize) + + go func() { + var i int64 = 0 + + // WARNING: infinite loop + for { + ch <- generator(i) + i++ + } + + close(ch) + }() + + return ch +} + +// Batch creates a slice of n elements from a channel. Returns the slice and the slice length. +func Batch[T any](ch <-chan T, size int) (collection []T, length int) { + buffer := make([]T, 0, size) + index := 0 + + for ; index < size; index++ { + select { + case item, ok := <-ch: + if !ok { + return buffer, index + } + + buffer = append(buffer, item) + } + } + + return buffer, index +} + +// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length. +func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int) { + expire := time.After(timeout) + + buffer := make([]T, 0, size) + index := 0 + + for ; index < size; index++ { + select { + case item, ok := <-ch: + if !ok { + return buffer, index + } + + buffer = append(buffer, item) + + case <-expire: + return buffer, index + } + } + + return buffer, index +} diff --git a/channel_test.go b/channel_test.go new file mode 100644 index 00000000..01d9b697 --- /dev/null +++ b/channel_test.go @@ -0,0 +1,88 @@ +package lo + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestToChannel(t *testing.T) { + is := assert.New(t) + + ch := ToChannel[int]([]int{1, 2, 3}) + + r1, ok1 := <-ch + r2, ok2 := <-ch + r3, ok3 := <-ch + is.True(ok1) + is.Equal(1, r1) + is.True(ok2) + is.Equal(2, r2) + is.True(ok3) + is.Equal(3, r3) + + _, ok4 := <-ch + is.False(ok4) +} + +func TestGenerate(t *testing.T) { + // is := assert.New(t) + + // next := func(i int64) int64 { + // return i + // } + + // for item := range Generator[int64](10, next) { + // println(item) + // } +} + +func TestBatch(t *testing.T) { + is := assert.New(t) + + ch := ToChannel[int]([]int{1, 2, 3}) + + items1, length1 := Batch[int](ch, 2) + items2, length2 := Batch[int](ch, 2) + items3, length3 := Batch[int](ch, 2) + + is.Equal([]int{1, 2}, items1) + is.Equal(2, length1) + is.Equal([]int{3}, items2) + is.Equal(1, length2) + is.Equal([]int{}, items3) + is.Equal(0, length3) +} + +func TestBatchWithTimeout(t *testing.T) { + is := assert.New(t) + + ch := make(chan int) + go func() { + for i := 0; i < 5; i++ { + ch <- i + time.Sleep(10 * time.Millisecond) + } + }() + + items1, length1 := BatchWithTimeout[int](ch, 20, 15*time.Millisecond) + is.Equal([]int{0, 1}, items1) + is.Equal(2, length1) + + items2, length2 := BatchWithTimeout[int](ch, 20, 2*time.Millisecond) + is.Equal([]int{}, items2) + is.Equal(0, length2) + + items3, length3 := BatchWithTimeout[int](ch, 1, 30*time.Millisecond) + is.Equal([]int{2}, items3) + is.Equal(1, length3) + + items4, length4 := BatchWithTimeout[int](ch, 2, 25*time.Millisecond) + is.Equal([]int{3, 4}, items4) + is.Equal(2, length4) + + items5, length5 := BatchWithTimeout[int](ch, 3, 25*time.Millisecond) + is.Equal([]int{}, items5) + is.Equal(0, length5) +}