Open dantodor opened 1 year ago
facing same issue here
import faust
from faust.types.auth import AuthProtocol, SASLMechanism
from aiokafka.helpers import create_ssl_context
broker_credentials=faust.SASLCredentials(
mechanism=SASLMechanism.PLAIN,
ssl_context=create_ssl_context(),
username='XXX',
password='XXXX'
)
broker_credentials.protocol = AuthProtocol.SASL_SSL
app = faust.App(
"greetings",
broker="kafka://XXXX.us-east-2.aws.confluent.cloud:9092",
value_serializer="json",
broker_credentials=broker_credentials
)
greetings_topic = app.topic('greetings', replicas= 3)
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
ERROR : Cannot create topic: greetings-assignor-leader (44): Topic replication factor must be 3
A common issue is that your credentials lack permissions to create topics in your broker configuration. I suggest checking those and following up.
My application works with a Confluence cloud. Try setting topic_replication_factor to 3. https://faust.readthedocs.io/en/latest/userguide/settings.html#topic-replication-factor
If that does not work, try using this trick:
class MyTopic(faust.Topic):
"""
A workaround to change the replication factor
https://github.com/faust-streaming/faust/issues/76
"""
def __init__(self, *args, **kwargs):
¦ kwargs["replicas"] = int(os.environ.get("TOPIC_REPLICATION_FACTOR", 1))
¦ super().__init__(*args, **kwargs)
and instantiate your topic from MyTopic
.
You can set env var TOPIC_REPLICATION_FACTOR=3.
I got the idea from https://github.com/faust-streaming/faust/issues/76 (thx @ihor-rud )
Thanks @daigotanaka! I'll go ahead and close this.
I'm a little bit perplexed by the closing of this issue without a proper analysis.
From my personal experience, not too many people give full rights to a consumer, especially on Confluent Cloud. You know, the minimum permission principle. I fail to understand why a consumer should be able to create topics in order to read from a topic.
Long story short, the solution is to set topic_allow_declare
to false
, because it defaults to true
and probably tries to create some utility topics in the Kafka cluster, action which would obviously fail if you gave only consume
permissions.
@dantodor I feel I'm still new to Faust, but I see this in the doc:
This setting disables the creation of internal topics. Faust will only create topics that it considers to be fully owned and managed, such as intermediate repartition topics, table changelog topics etc. Some Kafka managers does not allow services to create topics, in that case you should set this to False. https://faust-streaming.github.io/faust/userguide/settings.html#topic-allow-declare
This expectation seems reasonable to me as Faust needs to create internal topics to persist the state in case of failure and leader/partition assignment changes, etc.
I ran into the same error. Setting topic_allow_declare
False worked, but resulted in a warning about the missing leader topic.
Setting topic_disable_leader
True worked, and silenced the warning:
app = faust.App(
"hello-world",
broker="xxxxxxxxxxx:9092",
value_serializer="raw",
broker_credentials=broker_credentials,
topic_disable_leader=True)
I realize this is just a workaround (see @daigotanaka's comment), but it allowed me to continue with the tutorials on Confluent Cloud.
At the moment, I'm using a Confluent Cloud Basic cluster. If I'm reading this thread and the Confluent Cloud docs correctly, the settings that enable internal topic creation aren't available to Basic and Standard clusters:
You cannot edit cluster settings on Confluent Cloud on Basic or Standard clusters
I forget why I closed this... but I'll reopen it and try to add this solution into #418. It only makes sense to add these settings for Confluent Cloud brokers.
As mentioned by @wbarnha above this issue happen when the credentials used have missing ACL in place or if there are some cluster policy violations.
Here 2 possibile solutions:
Specifically looking at the error above seems like that there is a cluster violation policy, the topic must be created with a replication factor of 3, so the option 1 proposed above could definitely fix the issue, because when creating the topic a replication factor of 3 can be set. In case of option 2 TOPIC_REPLICATION_FACTOR must be set to 3
I also had this issue using confluent cloud and I fixed it by setting the setting topic_replication_factor=3
, in the faust app initializer:
app = faust.App(
app_name,
broker=f"kafka://{config['bootstrap.servers']}",
broker_credentials=broker_credentials,
topic_replication_factor=3, # <--- Set topic replication here
web_port=web_port,
)
Not sure if this is recommended but it solved it for me and I didn't have to disable the creation of faust topics. I can see that the faust leader topics were created successfully in my confluent dashboard.
This needs to be set on the app itself and not the topic replicas field, because this is the setting that the faust app will use when creating these leader topics.
Checklist
master
branch of Faust.Steps to reproduce
I'm trying to use Faust to do some processing on Kafka messages in Confluent Cloud. Here's the example code I'm using:
Topic is created and has some messages, just trying to read them and print them out.
Expected behavior
I would expect the messages to be printed out in the console
Actual behavior
It tries for a few times, then stops with this message:
I also tried to set
TOPIC_PARTITIONS=1
andTOPIC_REPLICATION_FACTOR=1
What am I doing wrong here ? Thank you!
Full traceback
Versions