squeaky-pl / japronto

Screaming-fast Python 3.5+ HTTP toolkit integrated with pipelining HTTP server based on uvloop and picohttpparser.
MIT License
8.61k stars 581 forks source link

Using Python-Kafka with Japronto #49

Closed gwthm-in closed 7 years ago

gwthm-in commented 7 years ago

I really didn't understand what is causing the issue here. I have initialised producer like below. But when we call the request is failing. But the same thing if we execute in terminal or same structured code in Sanic is working fine.

Any ideas or suggestions to fix this?

# General Settings
import json

# Web Server specific imports
from japronto import Application
from json import JSONDecodeError
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=["35.154.159.4:9092", "35.154.159.4:9093", "35.154.159.4:9094"], retries=5,value_serializer=lambda m: json.dumps(m).encode('ascii'))

def visuG(request):
    print(producer.config)
    print(producer)
    jstore = request.json
    print(jstore)   
    jevents = json.loads(jstore['e'])
    print(jevents)
    raw_data = {'topic': 'vnk-desd', 'routes': 'clst/desd', 'messages': jevents}
    f = producer.send("vnk-raw", raw_data)
    print(f)
    r = f.get(timeout=1)
    return request.Response(code=200,json=r)

app = Application()

# The Router instance lets you register your handlers and execute
# them depending on the url path and methods
r = app.router
r.add_route('/user-activity-poc', visuG, methods=["POST"])
r.add_route('/spcb', visuG, methods=["POST"])

# Finally start our server and handle requests until termination is
# requested. Enabling debug lets you see request logs and stack traces.
app.run(debug=True)

Error

Traceback (most recent call last):
  File "jap_server.py", line 22, in visuG
    r = f.get(timeout=1)
  File "/usr/local/lib/python3.6/site-packages/kafka/producer/future.py", line 60, in get
    "Timeout after waiting for %s secs." % timeout)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Timeout after waiting for 1 secs.

I don't think it is because of timeout as I tried it with 10 seconds. It didn't work. Any ideas it is failing ?

imbolc commented 7 years ago

Sorry for offtop, but it's better to use async version of kafka here: https://github.com/aio-libs/aiokafka About the error, is your kafka-related code work without any frameworks?

gwthm-in commented 7 years ago

@imbolc

Yes, it works without any framework. And recently I found it is not working with Sanic. Sometime it is working and sometimes it is not.

Thanks for suggesting it, I'll try that and get back to you.

gwthm-in commented 7 years ago

@imbolc I think, the problem with the event loops. Kafka using it's own loop and Japronto as well.

imbolc commented 7 years ago

So, aiokafka works well?

gwthm-in commented 7 years ago

@imbolc I didn't test it yet as I don't have any benchmarks with aiokafka. But surely I'll try to test that. Do you have any branch marks

gwthm-in commented 7 years ago

@squeaky-pl

Hey I guess, this might be the issue. I'm really not sure. But just a guess. The problem is async libs uses their own loops and Japronto uses it's own loops. So, I think because of the early created object is not reachable with in the server code.

I was testing it with Sanic. I found the same issue. I debugged a little there and instead of the above way, I created the producer before the server starts and hooked it to app object. Booomm..! So , it is accessible with all works via app.producer.

I think Japronto needs some workaround this, middleware or before and after starts. Or even getting the event loops or sharing the loop. Meanwhile it work like a charm with non-async modules.

Anyway thanks for such a great python beast! 👍

squeaky-pl commented 7 years ago

@Gowtham95india Getting many different event loops to cooperate is non trivial (I dont know what kafka driver uses there). You can always overcome those issues with threads and threadexecutors but this makes code harder to follow. I plan on providing before/after fork hooks and customer worker subclasses to address those things but I consider this "end-user sugar candy". And while this is important to have I need to first get the inner guts right because once you start adding those convenience things on top tweaking the inner guts will no longer be possible.

gwthm-in commented 7 years ago

@squeaky-pl Yea,, Yup that's true, I agree on that.