newrelic / newrelic-python-agent

New Relic Python Agent
https://docs.newrelic.com/docs/agents/python-agent
Apache License 2.0
172 stars 100 forks source link

Error in NewRelic on publishing to kafka in Python #1045

Open mishikaraj opened 6 months ago

mishikaraj commented 6 months ago

Description While publishing message to apache kafka , getting error with Newrelic Message Transaction - Error 'MessageTransaction' object has no attribute 'destination_name' code

producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        )
producer.send(topic, value=event_data).get()

Error Stack Trace

File "/Users/mishika/supply-metrics/app/kafka_manager/base_consumer_v2.py", line 55, in produce
    self.producer.send(topic, value=event_data, key=key).get()
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 68, in wrap_KafkaProducer_send
    return wrapped(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 581, in send
    value_bytes = self._serialize(
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/kafka/producer/kafka.py", line 714, in _serialize
    return f(data)
  File "/Users/mishika/miniconda3/envs/myenv/lib/python3.10/site-packages/newrelic/hooks/messagebroker_kafkapython.py", line 203, in _wrap_serializer
    topic = transaction.destination_name
AttributeError: 'MessageTransaction' object has no attribute 'destination_name'

newrelic.ini file

[newrelic]
log_level = debug
high_security = false
transaction_tracer.enabled = true
transaction_tracer.transaction_threshold = apdex_f
transaction_tracer.record_sql = obfuscated
transaction_tracer.stack_trace_threshold = 0.5
transaction_tracer.explain_enabled = true
transaction_tracer.explain_threshold = 0.5
transaction_tracer.function_trace =
error_collector.enabled = true
error_collector.ignore_errors = pycommon.exceptions.common_exceptions:DuplicateError rest_framework.exceptions:ValidationError rest_framework.exceptions:NotFound rest_framework.exceptions:ParseError
browser_monitoring.auto_instrument = true
thread_profiler.enabled = true
distributed_tracing.enabled = false
app_name = 
monitor_mode = true
license_key =  

Expected Behavior Expected message should have sent to kafka topic

Steps to Reproduce

Your Environment

lrafeei commented 6 months ago

Hi @mishikaraj --out of curiosity, how are you setting up the consumer? My concern with the proposed solution is that there is some other issue that is being masked; destination_name should get set as the topic from ConsumerRecord (which is returned from the consumer iterator)

This is the sample app I am using, so I want to make sure I am not missing an obvious implementation:

import json
import time
from datetime import datetime
from threading import Thread

import kafka

TOPIC = "test-topic-%d" % datetime.now().timestamp()
BROKERS = ["localhost:9092"]

consumer = kafka.KafkaConsumer(
    TOPIC,
    bootstrap_servers=BROKERS,
    client_id="whatsup",
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
    auto_offset_reset="earliest",
    consumer_timeout_ms=5000,
    fetch_max_wait_ms=304999,
)
producer = kafka.KafkaProducer(
    bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode("utf-8")
)

def consume():
    print("Starting consumer...")
    for message in consumer:
        print(f"Recieved {message.value}")
    print("Consumer finished.")

def produce():
    print("Starting producer...")
    for json_message in [
        {"foo": "bar"},
        {"baz": "bat"},
        {"user1": "Hello!"},
        {"user2": "Hola!"},
    ]:  
        time.sleep(1)
        producer.send(TOPIC, value=json_message).get()
    producer.flush()
    print("Producer finished.")

def main():
    t1 = Thread(target=produce)
    t2 = Thread(target=consume)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finished.")
    consumer.close()
    producer.close()

if __name__ == "__main__":
    main()
mishikaraj commented 6 months ago

Hii @lrafeei , For Temporary Fix I had Disabled kafka producer transaction on newrelic by keeping below in newrelic.ini file and was able to send the message to kafka

[import-hook:kafka.producer.kafka] enabled = false

StackOverFlow Link

And for answering to your above question, Yes I have initialized my consumer in the same way

self.consumer = KafkaConsumer(
            self.topic,
            group_id=self.group_id,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda x: json.loads(v.decode("utf-8"))
            enable_auto_commit=False,
            auto_offset_reset="latest",
            max_poll_records=500,
            **consumer_kwargs,
        )

Please let me know if you need any other help with this and what should be the fix we can look for enabling back the kafka producer transaction on newrelic for kafka producer

lrafeei commented 4 months ago

Sorry about the delay, I thought I'd get an alert when you replied! Looking at this now.