From eead7a7eea559050da0b3c92af895659f0f36912 Mon Sep 17 00:00:00 2001 From: Timothy Pansino <11214426+TimPansino@users.noreply.github.com> Date: Wed, 23 Nov 2022 17:43:02 -0800 Subject: [PATCH] Fix Confluent Kafka Producer Arguments (#699) * Add confluentkafka test for posargs/kwargs * Fix confluent kafka topic argument bug * More sensible producer arguments --- .../hooks/messagebroker_confluentkafka.py | 2 +- .../test_producer.py | 54 ++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index e735b8ade..81d9fa59a 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -55,7 +55,7 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): topic = args[0] args = args[1:] else: - topic = kwargs.get("topic", None) + topic = kwargs.pop("topic", None) transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index b5dcff020..2b3e74e7a 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import threading import pytest @@ -36,34 +37,63 @@ ) @background_task() def test_produce_arguments(topic, producer, client_type, serialize, headers): - callback_called = threading.Event() + callback1_called = threading.Event() + callback2_called = threading.Event() + ts = int(time.time()) - def producer_callback(err, msg): - callback_called.set() + def producer_callback1(err, msg): + callback1_called.set() + + def producer_callback2(err, msg): + callback2_called.set() if client_type == "cimpl": + # Keyword Args producer.produce( - topic, + topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), - callback=producer_callback, - partition=1, - timestamp=1, + partition=0, + callback=producer_callback2, + timestamp=ts, headers=headers, ) - else: + # Positional Args producer.produce( topic, + serialize({"foo": 1}), + serialize("my-key"), + 0, + producer_callback1, + None, + ts, + headers, + ) + else: + # Keyword Args + producer.produce( + topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), - partition=1, - on_delivery=producer_callback, - timestamp=1, + partition=0, + on_delivery=producer_callback2, + timestamp=ts, headers=headers, ) + # Positional Args + producer.produce( + topic, + serialize("my-key"), + serialize({"foo": 1}), + 0, + producer_callback1, + ts, + headers, + ) producer.flush() - assert callback_called.wait(5), "Callback never called." + assert callback1_called.wait(5), "Callback never called." + assert callback2_called.wait(5), "Callback never called." def test_trace_metrics(topic, send_producer_message):