Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FanOut() and rename ChannelMerge() to FanIn() #262

Merged
merged 2 commits into from Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 16 additions & 4 deletions README.md
Expand Up @@ -161,7 +161,8 @@ Supported helpers for channels:
- [Generator](#generator)
- [Batch](#batch)
- [BatchWithTimeout](#batchwithtimeout)
- [ChannelMerge](#channelmerge)
- [FanIn](#fanin)
- [FanOut](#fanout)

Supported intersection helpers:

Expand Down Expand Up @@ -1535,16 +1536,27 @@ for i := range children {
}
```

### ChannelMerge
### FanIn

Collects messages from multiple input channels into a single buffered channel. Output messages has no priority.
Collects messages from multiple input channels into a single buffered channel. Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.

```go
stream1 := make(chan int, 42)
stream2 := make(chan int, 42)
stream3 := make(chan int, 42)

all := lo.ChannelMerge(100, stream1, stream2, stream3)
all := lo.FanIn(100, stream1, stream2, stream3)
```

### FanOut

Broadcasts all the upstream messages to multiple downstream channels. When upstream channel reach EOF, downstream channels close. If any downstream channels is full, broadcasting is paused.

```go
stream := make(chan int, 42)

all := lo.FanOut(5, 42, stream)
// [5]<-chan int
```

### Contains
Expand Down
35 changes: 32 additions & 3 deletions channel.go
Expand Up @@ -239,9 +239,9 @@ func BatchWithTimeout[T any](ch <-chan T, size int, timeout time.Duration) (coll
return buffer, index, time.Since(now), true
}

// ChannelMerge collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority.
func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
// FanIn collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.
func FanIn[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
out := make(chan T, channelBufferCap)
var wg sync.WaitGroup

Expand All @@ -263,3 +263,32 @@ func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
}()
return out
}

// ChannelMerge collects messages from multiple input channels into a single buffered channel.
// Output messages has no priority. When all upstream channels reach EOF, downstream channel closes.
// Deprecated: Use lo.FanIn instead.
func ChannelMerge[T any](channelBufferCap int, upstreams ...<-chan T) <-chan T {
return FanIn(channelBufferCap, upstreams...)
}

// FanOut broadcasts all the upstream messages to multiple downstream channels.
// When upstream channel reach EOF, downstream channels close. If any downstream
// channels is full, broadcasting is paused.
func FanOut[T any](count int, channelsBufferCap int, upstream <-chan T) []<-chan T {
downstreams := createChannels[T](count, channelsBufferCap)

go func() {
for msg := range upstream {
for i := range downstreams {
downstreams[i] <- msg
}
}

// Close out once all the output goroutines are done.
for i := range downstreams {
close(downstreams[i])
}
}()

return channelsToReadOnly(downstreams)
}
35 changes: 33 additions & 2 deletions channel_test.go
Expand Up @@ -318,7 +318,7 @@ func TestBatchWithTimeout(t *testing.T) {
is.False(ok5)
}

func TestChannelMerge(t *testing.T) {
func TestFanIn(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)
Expand All @@ -332,7 +332,7 @@ func TestChannelMerge(t *testing.T) {
close(upstreams[i])
}(i)
}
out := ChannelMerge(10, roupstreams...)
out := FanIn(10, roupstreams...)
time.Sleep(10 * time.Millisecond)

// check input channels
Expand All @@ -357,3 +357,34 @@ func TestChannelMerge(t *testing.T) {
is.Equal(false, ok0)
is.Equal(0, msg0)
}

func TestFanOut(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

upstream := SliceToChannel(10, []int{0, 1, 2, 3, 4, 5})
rodownstreams := FanOut(3, 10, upstream)

time.Sleep(10 * time.Millisecond)

// check output channels
is.Equal(3, len(rodownstreams))

// check channels allocation
for i := range rodownstreams {
is.Equal(6, len(rodownstreams[i]))
is.Equal(10, cap(rodownstreams[i]))
is.Equal([]int{0, 1, 2, 3, 4, 5}, ChannelToSlice(rodownstreams[i]))
}

// check it is closed
time.Sleep(10 * time.Millisecond)

// check channels allocation
for i := range rodownstreams {
msg, ok := <-rodownstreams[i]
is.Equal(false, ok)
is.Equal(0, msg)
}
}