faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.6k stars 180 forks source link

Error when creating a topic using aiokafka 0.11.0 #630

Closed hgalytoby closed 2 months ago

hgalytoby commented 2 months ago

poetry add faust-stream

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

error

AttributeError: 'CreateTopicsRequest_v1' object has no attribute 'build_request_header'

No problems with aiokafka 0.10.0

fonty422 commented 2 months ago

Do you need the Faust application to create the topic? Can you try to add the topic manually with the Kafka topic scripts and see if that helps?

hgalytoby commented 2 months ago

Usually I don't need faust to create a topic, usually faust just creates the leader topic for me.

I know there are other ways to create topics, it's just a matter of me finding out about it and letting it be known!

fonty422 commented 2 months ago

I have an issue with my applications where if I use Faust to create the topics it often results in a topic not being created (something about topic can't be used while being created) and then the error message ignoring missing topic: . I think this is due to a large volume of applications being started at the same time and the CPU is wildly overloaded.

To get around this, I manually create the topics before hand using the kafka_topics.sh (or .bat if using windows) that's part of your kafka download and found in the bin directory.

For me, when ever I start my application I first run: kafka-topics.sh --create --if-not-exists --topic <topic name> --bootstrap-server=<address> --partitions=<partitions> replacing topic name, address, and partitions with the appropriate values.

The --if-not-exists argument will only create the topic if it doesn't already exist.

jjwwiodyne commented 2 months ago

I'm seeing the same issue with the combination of faust 0.11.0 and aiokafka 0.10.0. Forcing aiokafka==0.10.0 fixes the issue.

somnam commented 2 months ago

Added a PR for this https://github.com/faust-streaming/faust/pull/631, can someone have a look?