faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.64k stars 181 forks source link

Commit structure validation failure: TP != TopicPartition #539

Closed richardhundt closed 1 year ago

richardhundt commented 1 year ago

I found a puzzling issue.

Both aiokafka and faust define named tuples for (topic, partition), namely TopicPartition and TP respectively. The faust one doesn't satisfy the isinstance check in aiokafka's commit_structure_validate and so the stack trace below occurs. What I don't understand is how this ever worked. Why do I only see this now? Is this some unusual commit path?

Either way, a faust TP is not a aiokafka TopicPartition according to isinstance, so this must be a bug.

ERROR:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Got exception: ValueError('Key should be TopicPartition instance')
Current assignment: {TP(topic='test-uploads', partition=0), TP(topic='test-dgtal-cases-documents-list-changelog', partition=0), TP(topic='test-dgtal.worker.agents.health_agent', partition=0), TP(topic='test-transactions', partition=0), TP(topic='dgtal-reply-to', partition=0), TP(topic='test-actions', partition=0), TP(topic='test-dgtal-documents-changelog', partition=0), TP(topic='test-__assignor-__leader', partition=0), TP(topic='test-documents', partition=0), TP(topic='test-events', partition=0), TP(topic='test-cases', partition=0), TP(topic='test-dgtal-transactions-changelog', partition=0), TP(topic='test-dgtal-cases-changelog', partition=0), TP(topic='test-cases-index', partition=0), TP(topic='test-dgtal-cases-transactions-list-changelog', partition=0)}, detail: {TP(topic='test-events', partition=0): OffsetAndMetadata(offset=3, metadata='')}
Traceback (most recent call last):
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 721, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 562, in commit
    offsets = commit_structure_validate(offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/util.py", line 63, in commit_structure_validate
    raise ValueError("Key should be TopicPartition instance")
ValueError: Key should be TopicPartition instance
ERROR:faust.transport.drivers.aiokafka:[^--AIOKafkaConsumerThread]: Crashed reason=ValueError('Key should be TopicPartition instance')
Traceback (most recent call last):
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/faust/transport/drivers/aiokafka.py", line 721, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/consumer/consumer.py", line 562, in commit
    offsets = commit_structure_validate(offsets)
  File "/home/richard/Work/dgtal/.venv/lib/python3.10/site-packages/aiokafka/util.py", line 63, in commit_structure_validate
    raise ValueError("Key should be TopicPartition instance")
ValueError: Key should be TopicPartition instance
richardhundt commented 1 year ago

If I change this https://github.com/faust-streaming/faust/blob/4a234204ee26bb28472a6640ccab286b807a3681/faust/transport/drivers/aiokafka.py#L715 from

            aiokafka_offsets = {
                tp: OffsetAndMetadata(offset, "")
                for tp, offset in offsets.items()
                if tp in self.assignment()
            }
            self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
            await consumer.commit(aiokafka_offsets)

to this:

            aiokafka_offsets = {
                TopicPartition(tp.topic, tp.partition): OffsetAndMetadata(offset, "")
                for tp, offset in offsets.items()
                if tp in self.assignment()
            }
            self.tp_last_committed_at.update({TP(tp.topic, tp.partition): now for tp in aiokafka_offsets})
            await consumer.commit(aiokafka_offsets)

it works.

wbarnha commented 1 year ago

Interesting, let's get a PR opened with these changes and talk about this further there. I've never seen this issue before, so I'm curious to know what code you're running that triggered this.

dada-engineer commented 1 year ago

@richardhundt as far as I can see a consumer has the function _new_topicpartition which should avoid exactly this for aiokafka drivers https://github.com/faust-streaming/faust/blob/87a80a968f73220d5ac6190fb7df70b85427bdae/faust/transport/drivers/aiokafka.py#L258

But it seems to be not used anywhere 🤔

richardhundt commented 1 year ago

@dabdada also, when it fetches the assignment, it makes sure they're not the aiokafka ones:

https://github.com/faust-streaming/faust/blob/4a234204ee26bb28472a6640ccab286b807a3681/faust/transport/consumer.py#L137

dada-engineer commented 1 year ago

I do suspect this to be simply for internal type usage.

Can you provide a minimal example of code that raises the error? As well as @wbarnha I didn't experience anything like this and now wonder why 😁

richardhundt commented 1 year ago

@dabdada Are you saying that _commit is only for internal use?

https://github.com/faust-streaming/faust/blob/4a234204ee26bb28472a6640ccab286b807a3681/faust/transport/drivers/aiokafka.py#L711

Because, you can just inspect it, it's not hard to see that it calls assignment and assignment calls ensure_TPset, which gives you a set of faust's TP types and there's definitely an isinstance assertion in aiokafka so you simply cannot pass anything other than aiokafka's TopicPartition types.

Surely seeing that _commit is broken doesn't require anything other than looking at it.

The real question is why has this ever worked? Does _commit not get called unless we're in some strange code path, because we're usually relying on Kafka's autocommit?

EDIT: also the stack trace proves my point ;) It actually logs the dictionary which causes the error:

Current assignment: {TP(topic='test-uploads', partition=0), TP(topic='test-dgtal-cases-documents-list-changelog', partition=0), TP(topic='test-dgtal.worker.agents.health_agent', partition=0), TP(topic='test-transactions', partition=0), TP(topic='dgtal

Those are TP instances, clearly not TopicPartition instances so you'd expect passing them to aiokafka to fail.

richardhundt commented 1 year ago

I've added comments inline to show what's going on:

# we're about to build a `Dict[TP, OffsetAndMetadata]`...
aiokafka_offsets = {
    tp: OffsetAndMetadata(offset, "")
    for tp, offset in offsets.items() # offsets is `Mapping[TP, int]`
    # if `assignment` returns `TP` instances (which it does)
    # then this builds a `Dict[TP, OffsetAndMetadata]`
    if tp in self.assignment()
}

# the following is okay, we can work with `TP` instances
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})

# however the following calls into `aiokafka` and passes the *same* `Dict[TP, OffsetAndMetadata]`.
# It cannot possibly be correct because aiokafka wants `TopicPartition` instances!
await consumer.commit(aiokafka_offsets) # BOOM!

EDIT: I'm now thinking that there are cases where the offsets: Mapping[TP, int] parameter is a Mapping[TopicPartition, int], but that doesn't explain why the if tp in self.assignment() filter works because equality should fail if the tps are different types.

dada-engineer commented 1 year ago

I think I finally figured it out after digging deeper into faust-streaming.

First of all yes _commit is an internal function according to pythonic convention (prefixed with underscore). You should be careful when using it, although in that case it's documentation here is not optimal and the type hints suggest that faust TP is used here. Secondly, to answer the question why the in self.assignemnt() filter works is simply because faust.types.tuples.TP and kafka.structs.TopicPartition are NamedTuples with the same attributes, so their equality check magic function compares them according to the attributes and those are the same.

Now for the initial question, how this all did work anyways:

  1. The aiokafka consumer started by faust naturally delivers messages with kafka.structs.TopicPartition info
  2. The Conductor defines a on_message consumer callback that is called on new messages and gets thrown in the message from aiokafka consumer (probably somewhere here: https://github.com/faust-streaming/faust/blob/87a80a968f73220d5ac6190fb7df70b85427bdae/faust/transport/conductor.py#L274C43-L274C43)
  3. Now this message gets assigned to an event and delivered to the agent
  4. The agent acks the event, and the events message.tp is appended to the acked tps in the consumer.
  5. When the consumers commit background task runs (periodically or after x messages see configuration reference) it gets the tps from the _acked attributes and calls the commit method of the consumer.
  6. The commit method hands those to aiokafka and we have a full lifecycle of the event / message completed

This means the TopicPartition is never really changed to an faust internal TP when using the aiokafka driver, It's simply typed like one.

Why do you want to call commit directly anyways? Cant you reduce the commit interval if you want to have more frequent commits?

Hope this explanation helps to understand the inner workings. And also resolves your question. This is actually not a bug just somewhat bad type hinting and weak ensuring of the correct classes transferred to aiokafka. Plus no docs about this.

Edit 1: Actually this is bad wording, it is not badly typed, its the only way we get a coherent way of typing internally.

richardhundt commented 1 year ago

That's the thing, I'm not calling commit or _commit manually. Some internal machinery is doing it.

That means that something is passing offsets of the type actually shown in the method signature.

dada-engineer commented 1 year ago

Ah okay sorry for this misunderstanding. Looks like if you start in client mode the on_message callback provides Faust TPs. You still didn't share your config how this happens so it's hard to say really.

https://github.com/faust-streaming/faust/blob/master/faust/transport/conductor.py#L268

dada-engineer commented 1 year ago

Either way it might be a low hanging fruit to ensure aiokafka topic partitions is thrown into the aiokafka lib.

richardhundt commented 1 year ago

I'm trying to create a minimal example, but I'm running a 32 hour processing job at the moment, so I don't want to interrupt it. I'll see what I can dig up.

Aren't the commits a no-op in client-only mode because the consumer doesn't have a consumer group?

dada-engineer commented 1 year ago

I guess client only mode would not reach this commit method here as it is intended as dev mode that does not require a kafka but has a simple reply_consumer.

Edit: so yes what you said.