Skip to content

Commit

Permalink
feat(channels): add ToChannel + Generator + Batch + BatchWithTimeout …
Browse files Browse the repository at this point in the history
…(WIP)
  • Loading branch information
samber committed Apr 14, 2022
1 parent 1dbdfc2 commit 2ebbcb6
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 0 deletions.
82 changes: 82 additions & 0 deletions channel.go
@@ -0,0 +1,82 @@
package lo

import (
"time"
)

// ToChannel returns a read-only channels of collection elements.
func ToChannel[T any](collection []T) <-chan T {
ch := make(chan T)

go func() {
for _, item := range collection {
ch <- item
}

close(ch)
}()

return ch
}

// Generator implements the generator design pattern.
func Generator[T any](bufferSize int, generator func(int64) T) <-chan T {
ch := make(chan T, bufferSize)

go func() {
var i int64 = 0

// WARNING: infinite loop
for {
ch <- generator(i)
i++
}

close(ch)
}()

return ch
}

// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
func Batch[T any](ch <-chan T, size int) (collection []T, length int) {
buffer := make([]T, 0, size)
index := 0

for ; index < size; index++ {
select {
case item, ok := <-ch:
if !ok {
return buffer, index
}

buffer = append(buffer, item)
}
}

return buffer, index
}

// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int) {
expire := time.After(timeout)

buffer := make([]T, 0, size)
index := 0

for ; index < size; index++ {
select {
case item, ok := <-ch:
if !ok {
return buffer, index
}

buffer = append(buffer, item)

case <-expire:
return buffer, index
}
}

return buffer, index
}
88 changes: 88 additions & 0 deletions channel_test.go
@@ -0,0 +1,88 @@
package lo

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestToChannel(t *testing.T) {
is := assert.New(t)

ch := ToChannel[int]([]int{1, 2, 3})

r1, ok1 := <-ch
r2, ok2 := <-ch
r3, ok3 := <-ch
is.True(ok1)
is.Equal(1, r1)
is.True(ok2)
is.Equal(2, r2)
is.True(ok3)
is.Equal(3, r3)

_, ok4 := <-ch
is.False(ok4)
}

func TestGenerate(t *testing.T) {
// is := assert.New(t)

// next := func(i int64) int64 {
// return i
// }

// for item := range Generator[int64](10, next) {
// println(item)
// }
}

func TestBatch(t *testing.T) {
is := assert.New(t)

ch := ToChannel[int]([]int{1, 2, 3})

items1, length1 := Batch[int](ch, 2)
items2, length2 := Batch[int](ch, 2)
items3, length3 := Batch[int](ch, 2)

is.Equal([]int{1, 2}, items1)
is.Equal(2, length1)
is.Equal([]int{3}, items2)
is.Equal(1, length2)
is.Equal([]int{}, items3)
is.Equal(0, length3)
}

func TestBatchWithTimeout(t *testing.T) {
is := assert.New(t)

ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(10 * time.Millisecond)
}
}()

items1, length1 := BatchWithTimeout[int](ch, 20, 15*time.Millisecond)
is.Equal([]int{0, 1}, items1)
is.Equal(2, length1)

items2, length2 := BatchWithTimeout[int](ch, 20, 2*time.Millisecond)
is.Equal([]int{}, items2)
is.Equal(0, length2)

items3, length3 := BatchWithTimeout[int](ch, 1, 30*time.Millisecond)
is.Equal([]int{2}, items3)
is.Equal(1, length3)

items4, length4 := BatchWithTimeout[int](ch, 2, 25*time.Millisecond)
is.Equal([]int{3, 4}, items4)
is.Equal(2, length4)

items5, length5 := BatchWithTimeout[int](ch, 3, 25*time.Millisecond)
is.Equal([]int{}, items5)
is.Equal(0, length5)
}

0 comments on commit 2ebbcb6

Please sign in to comment.