faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.62k stars 181 forks source link

Tried hello_world app - Got "Error processing append operation on partition" in Kafka logs #365

Closed drawal1 closed 2 years ago

drawal1 commented 2 years ago

Checklist

Steps to reproduce

  1. Follow these steps here: https://faust-streaming.github.io/faust/playbooks/quickstart.html
  2. See faust crash when you run - faust -A hello_world send greetings "Hello Kafka topic"

Expected behavior

Expected to see greeting on the console.

Actual behavior

Crash!

FYI

Full traceback

  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/worker.py", line 67, in exiting
    yield
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/mode/services.py", line 1073, in on_start
    await self._fut
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/faust/cli/base.py", line 598, in execute
    await self.run(*args, **kwargs)
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/faust/cli/send.py", line 78, in run
    meta: RecordMetadata = await fut_send_complete
  File "/home/drawal/streaming-analytics/Analytics/.venv-Analytics/lib/python3.8/site-packages/faust/topics.py", line 442, in _on_published
    res: RecordMetadata = fut.result()
kafka.errors.UnknownError: [Error -1] UnknownError

Versions

wbarnha commented 2 years ago

Sorry to see you're encountering this issue. I just ran faust -A hello_world worker -l info on my end and didn't encounter this issue. I looked at your Faust version- Faust 1.10.4 is from the old Robinhood fork. Try pip3 uninstall faust and pip3 install faust-streaming, should fix your issue.

wbarnha commented 2 years ago

This fork also uses aiokafka, support for kafka_python and the Confluent Kakfa Python library have been dropped but I'd be open to reviewing PRs if anyone created them.

drawal1 commented 2 years ago

@wbarnha

This fork also uses aiokafka, support for kafka_python and the Confluent Kakfa Python library have been dropped but I'd be open to reviewing PRs if anyone created them.

When I did pip3 install faust-streaming, I got below. pip3 uninstall faust not cleaning up properly? btw, I also did pip3 uninstall kafka-python

Collecting kafka-python>=2.0.0 Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB) ERROR: robinhood-aiokafka 1.1.6 has requirement kafka-python<1.5,>=1.4.6, but you'll have kafka-python 2.0.2 which is incompatible. Installing collected packages: faust-streaming, kafka-python Successfully installed faust-streaming-0.8.10 kafka-python-2.0.2

wbarnha commented 2 years ago

Something's odd with your dev environment. We don't specify kafka-python in our requirements. Try --force-reinstall? I'm not 100% clear on how your environment is configured, but I'd purge everything the Robinhood fork installed to avoid any issues.

Make sure you also uninstall robinhood-aiokafka and install the original aiokafka.

drawal1 commented 2 years ago

@wbarnha

Something's odd with your dev environment. We don't specify kafka-python in our requirements. Try --force-reinstall? I'm not 100% clear on how your environment is configured, but I'd purge everything the Robinhood fork installed to avoid any issues.

Make sure you also uninstall robinhood-aiokafka and install the original aiokafka.

Still same error! Purged everything. Ensured no robinhood-aiokafka in environment. Did pip3 install --upgrade --force-reinstall faust-streaming[rocksdb]

FYI - faust, version is now Faust 0.8.10

wbarnha commented 2 years ago

Can you verify the greetings topic exists on your Broker? I'm surprised it still gives you the error from the beginning. Updated logs would be appreciated.

drawal1 commented 2 years ago

Greetings topic is created properly. See below.

(.venv-Analytics) drawal@L-RLI22-drawal:~/streaming-analytics/Analytics$ docker exec -it kafka-sa kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets greetings hello-world-assignor-leader

Here is the log from the kafka broker: kafka broker logs.txt

wbarnha commented 2 years ago
[2022-09-20 17:13:02,934] INFO [GroupCoordinator 0]: Member[group.instance.id None, member.id faust-0.8.10-4d3857de-07e8-4692-95ae-b7363f590204] in group hello-world has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:02,934] INFO [GroupCoordinator 0]: Preparing to rebalance group hello-world in state PreparingRebalance with old generation 1 (__consumer_offsets-31) (reason: removing member faust-0.8.10-4d3857de-07e8-4692-95ae-b7363f590204 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:02,934] INFO [GroupCoordinator 0]: Group hello-world with generation 2 is now empty (__consumer_offsets-31) (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:49,469] INFO [GroupCoordinator 0]: Dynamic Member with unknown member id joins group hello-world in Empty state. Created a new member id faust-0.8.10-02de864b-1efc-4e4b-ad69-1e1f3039be1a for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:49,470] INFO [GroupCoordinator 0]: Preparing to rebalance group hello-world in state PreparingRebalance with old generation 2 (__consumer_offsets-31) (reason: Adding new member faust-0.8.10-02de864b-1efc-4e4b-ad69-1e1f3039be1a with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:49,470] INFO [GroupCoordinator 0]: Stabilized group hello-world generation 3 (__consumer_offsets-31) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:13:49,472] INFO [GroupCoordinator 0]: Assignment received from leader for group hello-world for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 17:14:02,782] ERROR [ReplicaManager broker=0] Error processing append operation on partition greetings-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
[2022-09-20 17:47:44,372] ERROR [ReplicaManager broker=0] Error processing append operation on partition greetings-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected

I'm wondering if the greetings topic had something done to it as a result of the Robinhood version of Faust. I'd remove the topic and try fresh.

You may want to try running another example, just to be sure.

drawal1 commented 2 years ago

Deleted greetings and hello-world topics. reran. Error unchanged. New log entries below:

2022-09-20 18:22:44,894] INFO Deleted log /kafka/kafka-logs-ea9653d48f85/greetings-0.c0da803367884bdab590081130cd7363-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
[2022-09-20 18:22:44,894] INFO Deleted offset index /kafka/kafka-logs-ea9653d48f85/greetings-0.c0da803367884bdab590081130cd7363-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
[2022-09-20 18:22:44,894] INFO Deleted time index /kafka/kafka-logs-ea9653d48f85/greetings-0.c0da803367884bdab590081130cd7363-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
[2022-09-20 18:22:44,895] INFO Deleted log for partition greetings-0 in /kafka/kafka-logs-ea9653d48f85/greetings-0.c0da803367884bdab590081130cd7363-delete. (kafka.log.LogManager)
[2022-09-20 18:23:29,893] INFO [Log partition=hello-world-__assignor-__leader-0, dir=/kafka/kafka-logs-ea9653d48f85] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=0, lastModifiedTime=1663682783672, largestRecordTimestamp=None) (kafka.log.Log)
[2022-09-20 18:23:29,894] INFO [Log partition=hello-world-__assignor-__leader-0, dir=/kafka/kafka-logs-ea9653d48f85] Deleting segment files LogSegment(baseOffset=0, size=0, lastModifiedTime=1663682783672, largestRecordTimestamp=None) (kafka.log.Log)
[2022-09-20 18:23:29,894] INFO Deleted log /kafka/kafka-logs-ea9653d48f85/hello-world-__assignor-__leader-0.0abb0c79ad4e49ffb637da5e1fd5571b-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
[2022-09-20 18:23:29,894] INFO Deleted offset index /kafka/kafka-logs-ea9653d48f85/hello-world-__assignor-__leader-0.0abb0c79ad4e49ffb637da5e1fd5571b-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
[2022-09-20 18:23:29,894] INFO Deleted time index /kafka/kafka-logs-ea9653d48f85/hello-world-__assignor-__leader-0.0abb0c79ad4e49ffb637da5e1fd5571b-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
[2022-09-20 18:23:29,895] INFO Deleted log for partition hello-world-__assignor-__leader-0 in /kafka/kafka-logs-ea9653d48f85/hello-world-__assignor-__leader-0.0abb0c79ad4e49ffb637da5e1fd5571b-delete. (kafka.log.LogManager)
[2022-09-20 18:25:47,858] INFO Creating topic hello-world-__assignor-__leader with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient)
[2022-09-20 18:25:47,886] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(hello-world-__assignor-__leader-0) (kafka.server.ReplicaFetcherManager)
[2022-09-20 18:25:47,891] INFO [Log partition=hello-world-__assignor-__leader-0, dir=/kafka/kafka-logs-ea9653d48f85] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2022-09-20 18:25:47,892] INFO Created log for partition hello-world-__assignor-__leader-0 in /kafka/kafka-logs-ea9653d48f85/hello-world-__assignor-__leader-0 with properties {} (kafka.log.LogManager)
[2022-09-20 18:25:47,892] INFO [Partition hello-world-__assignor-__leader-0 broker=0] No checkpointed highwatermark is found for partition hello-world-__assignor-__leader-0 (kafka.cluster.Partition)
[2022-09-20 18:25:47,892] INFO [Partition hello-world-__assignor-__leader-0 broker=0] Log loaded for partition hello-world-__assignor-__leader-0 with initial high watermark 0 (kafka.cluster.Partition)
[2022-09-20 18:25:48,952] INFO Creating topic greetings with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient)
[2022-09-20 18:25:48,955] INFO [GroupCoordinator 0]: Dynamic Member with unknown member id joins group hello-world in Empty state. Created a new member id faust-0.8.10-0df2e10c-25ed-4c1b-9586-93d7d54c9ac8 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 18:25:48,955] INFO [GroupCoordinator 0]: Preparing to rebalance group hello-world in state PreparingRebalance with old generation 0 (__consumer_offsets-31) (reason: Adding new member faust-0.8.10-0df2e10c-25ed-4c1b-9586-93d7d54c9ac8 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 18:25:48,956] INFO [GroupCoordinator 0]: Stabilized group hello-world generation 1 (__consumer_offsets-31) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 18:25:48,966] INFO [GroupCoordinator 0]: Assignment received from leader for group hello-world for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-09-20 18:25:48,978] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(greetings-0) (kafka.server.ReplicaFetcherManager)
[2022-09-20 18:25:48,981] INFO [Log partition=greetings-0, dir=/kafka/kafka-logs-ea9653d48f85] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2022-09-20 18:25:48,981] INFO Created log for partition greetings-0 in /kafka/kafka-logs-ea9653d48f85/greetings-0 with properties {} (kafka.log.LogManager)
[2022-09-20 18:25:48,982] INFO [Partition greetings-0 broker=0] No checkpointed highwatermark is found for partition greetings-0 (kafka.cluster.Partition)
[2022-09-20 18:25:48,982] INFO [Partition greetings-0 broker=0] Log loaded for partition greetings-0 with initial high watermark 0 (kafka.cluster.Partition)
[2022-09-20 18:27:01,881] ERROR [ReplicaManager broker=0] Error processing append operation on partition greetings-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
[2022-09-20 18:27:45,573] ERROR [ReplicaManager broker=0] Error processing append operation on partition greetings-0 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: One or more records have been rejected
drawal1 commented 2 years ago

When I start the faust worker - "faust -A page_views worker -l info", it gives me an error ModuleNotFoundError: No module named 'page_views'. Can you confirm this should this be "faust -A views worker -l info" instead? (views.py is the file name and page_views is the faust app id)

Regardless, I get the same kafka error as the hello_world app

[2022-09-20 18:46:46,623] ERROR [ReplicaManager broker=0] Error processing append operation on partition page_views-0 (kafka.server.ReplicaManager) org.apache.kafka.common.InvalidRecordException: One or more records have been rejected

wbarnha commented 2 years ago

faust -A page_views worker -l info does work... unfortunately nobody ever decided to include that file:

import faust

app = faust.App(
    'page_views',
    broker="kafka://localhost:9092",
    topic_partitions=4,
)

class PageView(faust.Record):
    id: str
    user: str

page_view_topic = app.topic('page_views', value_type=PageView)
page_views = app.Table('page_views', default=int)

@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        page_views[view.id] += 1

I manually created the topic from the Kafka quickstart tarball provided on their website by running:

bin/kafka-topics.sh --create --topic page_views --bootstrap-server localhost:9092 --partitions 4
drawal1 commented 2 years ago

ok... it works with the Kafka quickstart tarball (kafka version 3.2.1), but not with docker container created using image wurstmeister/kafka:latest (kafka version 2.8.1). I'll test with a docker container using kafka 3.2.1. Hopefully, that should work too

drawal1 commented 2 years ago

Confirmed it works with docker containers using bitnami/zookeeper:latest and bitnami/kafka:3.2.1