diff --git a/async_producer.go b/async_producer.go index f1ffc8f92..209fd2d34 100644 --- a/async_producer.go +++ b/async_producer.go @@ -267,6 +267,10 @@ func (pe ProducerError) Error() string { return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err) } +func (pe ProducerError) Unwrap() error { + return pe.Err +} + // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel // when closing a producer. diff --git a/consumer.go b/consumer.go index 5d9922344..fbdbff23b 100644 --- a/consumer.go +++ b/consumer.go @@ -35,6 +35,10 @@ func (ce ConsumerError) Error() string { return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err) } +func (ce ConsumerError) Unwrap() error { + return ce.Err +} + // ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. // It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors // when stopping.