Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error in NewRelic on publishing to kafka in Python #1045

Open
mishikaraj opened this issue Jan 24, 2024 · 3 comments
Open

Error in NewRelic on publishing to kafka in Python #1045

mishikaraj opened this issue Jan 24, 2024 · 3 comments

Comments

@mishikaraj
Copy link

Description
While publishing message to apache kafka , getting error with Newrelic Message Transaction - Error 'MessageTransaction' object has no attribute 'destination_name'
code

producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        )
producer.send(topic, value=event_data).get()

Error Stack Trace

File "/Users/mishika/supply-metrics/app/kafka_manager/base_consumer_v2.py", line 55, in produce
    self.producer.send(topic, value=event_data, key=key).get()
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 68, in wrap_KafkaProducer_send
    return wrapped(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 581, in send
    value_bytes = self._serialize(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 714, in _serialize
    return f(data)
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 203, in _wrap_serializer
    topic = transaction.destination_name
AttributeError: 'MessageTransaction' object has no attribute 'destination_name'

newrelic.ini file

[newrelic]
log_level = debug
high_security = false
transaction_tracer.enabled = true
transaction_tracer.transaction_threshold = apdex_f
transaction_tracer.record_sql = obfuscated
transaction_tracer.stack_trace_threshold = 0.5
transaction_tracer.explain_enabled = true
transaction_tracer.explain_threshold = 0.5
transaction_tracer.function_trace =
error_collector.enabled = true
error_collector.ignore_errors = pycommon.exceptions.common_exceptions:DuplicateError rest_framework.exceptions:ValidationError rest_framework.exceptions:NotFound rest_framework.exceptions:ParseError
browser_monitoring.auto_instrument = true
thread_profiler.enabled = true
distributed_tracing.enabled = false
app_name = 
monitor_mode = true
license_key =  

Expected Behavior
Expected message should have sent to kafka topic

Steps to Reproduce

Your Environment

  • Application Python version : 3.10
  • kafka-python==2.0.2
  • newrelic==9.4.0
@lrafeei
Copy link
Contributor

lrafeei commented Jan 24, 2024

Hi @mishikaraj --out of curiosity, how are you setting up the consumer? My concern with the proposed solution is that there is some other issue that is being masked; destination_name should get set as the topic from ConsumerRecord (which is returned from the consumer iterator)

This is the sample app I am using, so I want to make sure I am not missing an obvious implementation:

import json
import time
from datetime import datetime
from threading import Thread

import kafka

TOPIC = "test-topic-%d" % datetime.now().timestamp()
BROKERS = ["localhost:9092"]


consumer = kafka.KafkaConsumer(
    TOPIC,
    bootstrap_servers=BROKERS,
    client_id="whatsup",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    consumer_timeout_ms=5000,
    fetch_max_wait_ms=304999,
)
producer = kafka.KafkaProducer(
    bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

def consume():
    print("Starting consumer...")
    for message in consumer:
        print(f"Recieved {message.value}")
    print("Consumer finished.")

def produce():
    print("Starting producer...")
    for json_message in [
        {"foo": "bar"},
        {"baz": "bat"},
        {"user1": "Hello!"},
        {"user2": "Hola!"},
    ]:  
        time.sleep(1)
        producer.send(TOPIC, value=json_message).get()
    producer.flush()
    print("Producer finished.")

def main():
    t1 = Thread(target=produce)
    t2 = Thread(target=consume)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finished.")
    consumer.close()
    producer.close()


if __name__ == "__main__":
    main()

@mishikaraj
Copy link
Author

Hii @lrafeei , For Temporary Fix I had Disabled kafka producer transaction on newrelic by keeping below in newrelic.ini file and was able to send the message to kafka

[import-hook:kafka.producer.kafka]
enabled = false

StackOverFlow Link

And for answering to your above question, Yes I have initialized my consumer in the same way

self.consumer = KafkaConsumer(
            self.topic,
            group_id=self.group_id,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda x: json.loads(v.decode("utf-8"))
            enable_auto_commit=False,
            auto_offset_reset="latest",
            max_poll_records=500,
            **consumer_kwargs,
        )

Please let me know if you need any other help with this and what should be the fix we can look for enabling back the kafka producer transaction on newrelic for kafka producer

@lrafeei
Copy link
Contributor

lrafeei commented Mar 28, 2024

Sorry about the delay, I thought I'd get an alert when you replied! Looking at this now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants