Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New helper collection: channel #95

Merged
merged 5 commits into from Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,15 @@

@samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade.

## 1.31.0 (2022-10-06)

Adding:

- lo.SliceToChannel
- lo.Generator
- lo.Batch
- lo.BatchWithTimeout

## 1.30.1 (2022-10-06)

Fix:
Expand Down
132 changes: 132 additions & 0 deletions README.md
Expand Up @@ -135,6 +135,10 @@ Supported helpers for tuples:
Supported helpers for channels:

- [ChannelDispatcher](#channeldispatcher)
- [SliceToChannel](#slicetochannel)
- [Generator](#generator)
- [Batch](#batch)
- [BatchWithTimeout](#batchwithtimeout)

Supported intersection helpers:

Expand Down Expand Up @@ -1262,6 +1266,134 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy)
...
```

### SliceToChannel

Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized.

```go
list := []int{1, 2, 3, 4, 5}

for v := range lo.SliceToChannel(2, list) {
println(v)
}
// prints 1, then 2, then 3, then 4, then 5
```

### Generator

Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized.

```go
generator := func(yield func(int)) {
yield(1)
yield(2)
yield(3)
}

for v := range lo.Generator(2, generator) {
println(v)
}
// prints 1, then 2, then 3
```

### Batch

Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed).

```go
ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})

items1, length1, duration1, ok1 := lo.Batch(ch, 3)
// []int{1, 2, 3}, 3, 0s, true
items2, length2, duration2, ok2 := lo.Batch(ch, 3)
// []int{4, 5}, 2, 0s, false
```

Example: RabbitMQ consumer 👇

```go
ch := readFromQueue()

for {
// read 1k items
items, length, _, ok := lo.Batch(ch, 1000)

// do batching stuff

if !ok {
break
}
}
```

### BatchWithTimeout

Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).

```go
generator := func(yield func(int)) {
for i := 0; i < 5; i++ {
yield(i)
time.Sleep(35*time.Millisecond)
}
}

ch := lo.Generator(0, generator)

items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{1, 2}, 2, 100ms, true
items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{3, 4, 5}, 3, 75ms, true
items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{}, 0, 10ms, false
```

Example: RabbitMQ consumer 👇

```go
ch := readFromQueue()

for {
// read 1k items
// wait up to 1 second
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)

// do batching stuff

if !ok {
break
}
}
```

Example: Multithreaded RabbitMQ consumer 👇

```go
ch := readFromQueue()

// 5 workers
// prefetch 1k messages per worker
children := lo.ChannelDispatcher(ch, 5, 1000, DispatchingStrategyFirst[int])

consumer := func(c <-chan int) {
for {
// read 1k items
// wait up to 1 second
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)

// do batching stuff

if !ok {
break
}
}
}

for i := range children {
go consumer(children[i])
}
```

### Contains

Returns true if an element is present in a collection.
Expand Down
77 changes: 77 additions & 0 deletions channel.go
Expand Up @@ -149,3 +149,80 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in
return len(channels[item]) > len(channels[max]) && channelIsNotFull(channels[item])
})
}

// SliceToChannel returns a read-only channels of collection elements.
func SliceToChannel[T any](bufferSize int, collection []T) <-chan T {
ch := make(chan T, bufferSize)

go func() {
for _, item := range collection {
ch <- item
}

close(ch)
}()

return ch
}

// Generator implements the generator design pattern.
func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T {
ch := make(chan T, bufferSize)

go func() {
// WARNING: infinite loop
generator(func(t T) {
ch <- t
})

close(ch)
}()

return ch
}

// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
// @TODO: we should probaby provide an helper that reuse the same buffer.
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
buffer := make([]T, 0, size)
index := 0
now := time.Now()

for ; index < size; index++ {
item, ok := <-ch
if !ok {
return buffer, index, time.Since(now), false
}

buffer = append(buffer, item)
}

return buffer, index, time.Since(now), true
}

// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
// @TODO: we should probaby provide an helper that reuse the same buffer.
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
expire := time.NewTimer(timeout)
defer expire.Stop()

buffer := make([]T, 0, size)
index := 0
now := time.Now()

for ; index < size; index++ {
select {
case item, ok := <-ch:
if !ok {
return buffer, index, time.Since(now), false
}

buffer = append(buffer, item)

case <-expire.C:
return buffer, index, time.Since(now), true
}
}

return buffer, index, time.Since(now), true
}
104 changes: 104 additions & 0 deletions channel_test.go
Expand Up @@ -187,3 +187,107 @@ func TestDispatchingStrategyMost(t *testing.T) {
children[1] <- 1
is.Equal(0, DispatchingStrategyMost(42, 0, rochildren))
}

func TestSliceToChannel(t *testing.T) {
t.Parallel()
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

ch := SliceToChannel[int](2, []int{1, 2, 3})

r1, ok1 := <-ch
r2, ok2 := <-ch
r3, ok3 := <-ch
is.True(ok1)
is.Equal(1, r1)
is.True(ok2)
is.Equal(2, r2)
is.True(ok3)
is.Equal(3, r3)

_, ok4 := <-ch
is.False(ok4)
}

func TestGenerate(t *testing.T) {
t.Parallel()
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

generator := func(yield func(int)) {
yield(0)
yield(1)
yield(2)
yield(3)
}

i := 0

for v := range Generator(2, generator) {
is.Equal(i, v)
i++
}

is.Equal(i, 4)
}

func TestBatch(t *testing.T) {
t.Parallel()
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

ch := SliceToChannel(2, []int{1, 2, 3})

items1, length1, _, ok1 := Batch(ch, 2)
items2, length2, _, ok2 := Batch(ch, 2)
items3, length3, _, ok3 := Batch(ch, 2)

is.Equal([]int{1, 2}, items1)
is.Equal(2, length1)
is.True(ok1)
is.Equal([]int{3}, items2)
is.Equal(1, length2)
is.False(ok2)
is.Equal([]int{}, items3)
is.Equal(0, length3)
is.False(ok3)
}

func TestBatchWithTimeout(t *testing.T) {
t.Parallel()
testWithTimeout(t, 200*time.Millisecond)
is := assert.New(t)

generator := func(yield func(int)) {
for i := 0; i < 5; i++ {
yield(i)
time.Sleep(10 * time.Millisecond)
}
}
ch := Generator(0, generator)

items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
is.Equal([]int{0, 1}, items1)
is.Equal(2, length1)
is.True(ok1)

items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
is.Equal([]int{}, items2)
is.Equal(0, length2)
is.True(ok2)

items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
is.Equal([]int{2}, items3)
is.Equal(1, length3)
is.True(ok3)

items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
is.Equal([]int{3, 4}, items4)
is.Equal(2, length4)
is.True(ok4)

items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
is.Equal([]int{}, items5)
is.Equal(0, length5)
is.False(ok5)
}