Skip to content

Commit

Permalink
feat: return duration of Batch**** helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
samber committed Oct 6, 2022
1 parent ba7adf6 commit eb38224
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 26 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -2,6 +2,15 @@

@samber: I sometimes forget to update this file. Ping me on [Twitter](https://twitter.com/samuelberthe) or open an issue in case of error. We need to keep a clear changelog for easier lib upgrade.

## 1.31.0 (2022-10-06)

Adding:

- lo.SliceToChannel
- lo.Generator
- lo.Batch
- lo.BatchWithTimeout

## 1.30.1 (2022-10-06)

Fix:
Expand Down
103 changes: 103 additions & 0 deletions README.md
Expand Up @@ -135,6 +135,10 @@ Supported helpers for tuples:
Supported helpers for channels:

- [ChannelDispatcher](#channeldispatcher)
- [SliceToChannel](#slicetochannel)
- [Generator](#generator)
- [Batch](#batch)
- [BatchWithTimeout](#batchwithtimeout)

Supported intersection helpers:

Expand Down Expand Up @@ -1262,6 +1266,105 @@ children := lo.ChannelDispatcher(ch, 5, 10, customStrategy)
...
```

### SliceToChannel

Returns a read-only channels of collection elements. Channel is closed after last element. Channel capacity can be customized.

```go
list := []int{1, 2, 3, 4, 5}

for v := range lo.SliceToChannel(2, list) {
println(v)
}
// prints 1, then 2, then 3, then 4, then 5
```

### Generator

Implements the generator design pattern. Channel is closed after last element. Channel capacity can be customized.

```go
generator := func(yield func(int)) {
yield(1)
yield(2)
yield(3)
}

for v := range lo.Generator(2, generator) {
println(v)
}
// prints 1, then 2, then 3
```

### Batch

Creates a slice of n elements from a channel. Returns the slice, the slice length, the read time and the channel status (opened/closed).

```go
ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})

items1, length1, duration1, ok1 := lo.Batch(ch, 3)
// []int{1, 2, 3}, 3, 0s, true
items2, length2, duration2, ok2 := lo.Batch(ch, 3)
// []int{4, 5}, 2, 0s, false
```

Example: RabbitMQ consumer 馃憞

```go
ch := readFromQueue()

for {
// read 1k items
items, length, _, ok := lo.Batch(ch, 1000)

// do batching stuff

if !ok {
break
}
}
```

### BatchWithTimeout

Creates a slice of n elements from a channel, with timeout. Returns the slice, the slice length, the read time and the channel status (opened/closed).

```go
generator := func(yield func(int)) {
for i := 0; i < 5; i++ {
yield(i)
time.Sleep(35*time.Millisecond)
}
}

ch := lo.Generator(0, generator)

items1, length1, duration1, ok1 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{1, 2}, 2, 100ms, true
items2, length2, duration2, ok2 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{3, 4, 5}, 3, 75ms, true
items3, length3, duration2, ok3 := lo.BatchWithTimeout(ch, 3, 100*time.Millisecond)
// []int{}, 0, 10ms, false
```

Example: RabbitMQ consumer 馃憞

```go
ch := readFromQueue()

for {
// read 1k items up to 1 second
items, length, _, ok := lo.BatchWithTimeout(ch, 1000, 1*time.Second)

// do batching stuff

if !ok {
break
}
}
```

### Contains

Returns true if an element is present in a collection.
Expand Down
20 changes: 11 additions & 9 deletions channel.go
Expand Up @@ -150,8 +150,8 @@ func DispatchingStrategyMost[T any](msg T, index uint64, channels []<-chan T) in
})
}

// ToChannel returns a read-only channels of collection elements.
func ToChannel[T any](bufferSize int, collection []T) <-chan T {
// SliceToChannel returns a read-only channels of collection elements.
func SliceToChannel[T any](bufferSize int, collection []T) <-chan T {
ch := make(chan T, bufferSize)

go func() {
Expand Down Expand Up @@ -182,42 +182,44 @@ func Generator[T any](bufferSize int, generator func(yield func(T))) <-chan T {
}

// Batch creates a slice of n elements from a channel. Returns the slice and the slice length.
func Batch[T any](ch <-chan T, size int) (collection []T, length int) {
func Batch[T any](ch <-chan T, size int) (collection []T, length int, readTime time.Duration, ok bool) {
buffer := make([]T, 0, size)
index := 0
now := time.Now()

for ; index < size; index++ {
item, ok := <-ch
if !ok {
return buffer, index
return buffer, index, time.Since(now), false
}

buffer = append(buffer, item)
}

return buffer, index
return buffer, index, time.Since(now), true
}

// BatchWithTimeout creates a slice of n elements from a channel, with timeout. Returns the slice and the slice length.
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int) {
func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (collection []T, length int, readTime time.Duration, ok bool) {
expire := time.After(timeout)

buffer := make([]T, 0, size)
index := 0
now := time.Now()

for ; index < size; index++ {
select {
case item, ok := <-ch:
if !ok {
return buffer, index
return buffer, index, time.Since(now), false
}

buffer = append(buffer, item)

case <-expire:
return buffer, index
return buffer, index, time.Since(now), true
}
}

return buffer, index
return buffer, index, time.Since(now), true
}
42 changes: 25 additions & 17 deletions channel_test.go
Expand Up @@ -188,12 +188,12 @@ func TestDispatchingStrategyMost(t *testing.T) {
is.Equal(0, DispatchingStrategyMost(42, 0, rochildren))
}

func TestToChannel(t *testing.T) {
func TestSliceToChannel(t *testing.T) {
t.Parallel()
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

ch := ToChannel[int](2, []int{1, 2, 3})
ch := SliceToChannel[int](2, []int{1, 2, 3})

r1, ok1 := <-ch
r2, ok2 := <-ch
Expand All @@ -214,7 +214,7 @@ func TestGenerate(t *testing.T) {
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

gen := func(yield func(int)) {
generator := func(yield func(int)) {
yield(0)
yield(1)
yield(2)
Expand All @@ -223,7 +223,7 @@ func TestGenerate(t *testing.T) {

i := 0

for v := range Generator(2, gen) {
for v := range Generator(2, generator) {
is.Equal(i, v)
i++
}
Expand All @@ -236,50 +236,58 @@ func TestBatch(t *testing.T) {
testWithTimeout(t, 10*time.Millisecond)
is := assert.New(t)

ch := ToChannel(2, []int{1, 2, 3})
ch := SliceToChannel(2, []int{1, 2, 3})

items1, length1 := Batch(ch, 2)
items2, length2 := Batch(ch, 2)
items3, length3 := Batch(ch, 2)
items1, length1, _, ok1 := Batch(ch, 2)
items2, length2, _, ok2 := Batch(ch, 2)
items3, length3, _, ok3 := Batch(ch, 2)

is.Equal([]int{1, 2}, items1)
is.Equal(2, length1)
is.True(ok1)
is.Equal([]int{3}, items2)
is.Equal(1, length2)
is.False(ok2)
is.Equal([]int{}, items3)
is.Equal(0, length3)
is.False(ok3)
}

func TestBatchWithTimeout(t *testing.T) {
t.Parallel()
testWithTimeout(t, 200*time.Millisecond)
is := assert.New(t)

ch := make(chan int)
go func() {
generator := func(yield func(int)) {
for i := 0; i < 5; i++ {
ch <- i
yield(i)
time.Sleep(10 * time.Millisecond)
}
}()
}
ch := Generator(0, generator)

items1, length1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
items1, length1, _, ok1 := BatchWithTimeout(ch, 20, 15*time.Millisecond)
is.Equal([]int{0, 1}, items1)
is.Equal(2, length1)
is.True(ok1)

items2, length2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
items2, length2, _, ok2 := BatchWithTimeout(ch, 20, 2*time.Millisecond)
is.Equal([]int{}, items2)
is.Equal(0, length2)
is.True(ok2)

items3, length3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
items3, length3, _, ok3 := BatchWithTimeout(ch, 1, 30*time.Millisecond)
is.Equal([]int{2}, items3)
is.Equal(1, length3)
is.True(ok3)

items4, length4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
items4, length4, _, ok4 := BatchWithTimeout(ch, 2, 25*time.Millisecond)
is.Equal([]int{3, 4}, items4)
is.Equal(2, length4)
is.True(ok4)

items5, length5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
items5, length5, _, ok5 := BatchWithTimeout(ch, 3, 25*time.Millisecond)
is.Equal([]int{}, items5)
is.Equal(0, length5)
is.False(ok5)
}

0 comments on commit eb38224

Please sign in to comment.