From 6230ffbaec9398ba04312427b804c1506fae91ad Mon Sep 17 00:00:00 2001 From: George Teo Date: Wed, 15 Aug 2018 01:38:00 -0700 Subject: [PATCH] Fix PartitionConsumer race condition on Close. (#258) * Use Sarama AsyncClose for Sarama Cluster AsyncClose * Async Close PartitionConsumer in Consumer.Close() --- consumer.go | 2 +- partitions.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index e7a67da..8b4830e 100644 --- a/consumer.go +++ b/consumer.go @@ -295,7 +295,7 @@ func (c *Consumer) Close() (err error) { for range c.errors { } for p := range c.partitions { - _ = p.Close() + p.AsyncClose() } for range c.notifications { } diff --git a/partitions.go b/partitions.go index bfaa587..16bdc67 100644 --- a/partitions.go +++ b/partitions.go @@ -86,7 +86,7 @@ func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } // AsyncClose implements PartitionConsumer func (c *partitionConsumer) AsyncClose() { c.closeOnce.Do(func() { - c.closeErr = c.PartitionConsumer.Close() + c.PartitionConsumer.AsyncClose() close(c.dying) }) }