diff --git a/internal/impl/io/output_dynamic_fan_out.go b/internal/impl/io/output_dynamic_fan_out.go index 53dbee9bd..085a07369 100644 --- a/internal/impl/io/output_dynamic_fan_out.go +++ b/internal/impl/io/output_dynamic_fan_out.go @@ -257,6 +257,7 @@ func (d *dynamicFanOutOutputBroker) loop() { _ = atomic.AddInt64(&ackPending, 1) pendingResponses := int64(len(d.outputs)) + outputsLoop: for _, output := range d.outputs { select { case output.tsChan <- message.NewTransactionFunc(ts.Payload.ShallowCopy(), func(ctx context.Context, err error) error { @@ -273,7 +274,7 @@ func (d *dynamicFanOutOutputBroker) loop() { return nil }): case <-d.shutSig.CloseAtLeisureChan(): - break // This signal will be caught again in the next loop + break outputsLoop // This signal will be caught again in the next loop } } d.outputsMut.RUnlock()