From a731ccf8248977cc4d3e7f17388e215340f6736d Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Wed, 23 Nov 2022 21:33:42 +0000 Subject: [PATCH 1/3] Add confluentkafka test for posargs/kwargs --- .../test_producer.py | 44 +++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index b5dcff020..129b12ef0 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -36,34 +36,62 @@ ) @background_task() def test_produce_arguments(topic, producer, client_type, serialize, headers): - callback_called = threading.Event() + callback1_called = threading.Event() + callback2_called = threading.Event() - def producer_callback(err, msg): - callback_called.set() + def producer_callback1(err, msg): + callback1_called.set() + + def producer_callback2(err, msg): + callback2_called.set() if client_type == "cimpl": + # Keyword Args producer.produce( - topic, + topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), - callback=producer_callback, partition=1, + callback=producer_callback2, timestamp=1, headers=headers, ) - else: + # Positional Args producer.produce( topic, + serialize({"foo": 1}), + serialize("my-key"), + 1, + producer_callback1, + None, + 1, + headers, + ) + else: + # Keyword Args + producer.produce( + topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), partition=1, - on_delivery=producer_callback, + on_delivery=producer_callback2, timestamp=1, headers=headers, ) + # Positional Args + producer.produce( + topic, + serialize("my-key"), + serialize({"foo": 1}), + 1, + producer_callback1, + 1, + headers, + ) producer.flush() - assert callback_called.wait(5), "Callback never called." + assert callback1_called.wait(5), "Callback never called." + assert callback2_called.wait(5), "Callback never called." def test_trace_metrics(topic, send_producer_message): From a9197e049e1c1ddebf140a44342fe07a17980780 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Wed, 23 Nov 2022 21:37:34 +0000 Subject: [PATCH 2/3] Fix confluent kafka topic argument bug --- newrelic/hooks/messagebroker_confluentkafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index e735b8ade..81d9fa59a 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -55,7 +55,7 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): topic = args[0] args = args[1:] else: - topic = kwargs.get("topic", None) + topic = kwargs.pop("topic", None) transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) From 82ac999d1b6d12e675e98f13473f57854104a660 Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Thu, 24 Nov 2022 00:28:12 +0000 Subject: [PATCH 3/3] More sensible producer arguments --- .../test_producer.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index 129b12ef0..2b3e74e7a 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import threading import pytest @@ -38,6 +39,7 @@ def test_produce_arguments(topic, producer, client_type, serialize, headers): callback1_called = threading.Event() callback2_called = threading.Event() + ts = int(time.time()) def producer_callback1(err, msg): callback1_called.set() @@ -51,9 +53,9 @@ def producer_callback2(err, msg): topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), - partition=1, + partition=0, callback=producer_callback2, - timestamp=1, + timestamp=ts, headers=headers, ) # Positional Args @@ -61,10 +63,10 @@ def producer_callback2(err, msg): topic, serialize({"foo": 1}), serialize("my-key"), - 1, + 0, producer_callback1, None, - 1, + ts, headers, ) else: @@ -73,9 +75,9 @@ def producer_callback2(err, msg): topic=topic, value=serialize({"foo": 1}), key=serialize("my-key"), - partition=1, + partition=0, on_delivery=producer_callback2, - timestamp=1, + timestamp=ts, headers=headers, ) # Positional Args @@ -83,9 +85,9 @@ def producer_callback2(err, msg): topic, serialize("my-key"), serialize({"foo": 1}), - 1, + 0, producer_callback1, - 1, + ts, headers, ) producer.flush()