Open MarcoRizk opened 5 years ago
Is you agent function async or sync? Or mix?
Is you agent function async or sync? Or mix?
It's sync, you think the problem is that I am using blocking code block in the agent async iterator ? I am currently using run_in_executer inside the agent to call my sync function
Hi,
That may be possible. Did you try consuming messages without doing any processing?
Hi,
That may be possible. Did you try consuming messages without doing any processing?
Yes, I still get the same error !
Can you post the full log output of faust worker?
` /media/data/marco/azka-vision/venv/lib/python3.6/site-packages/faust/fixups/django.py:71: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn(WARN_DEBUG_ENABLED) /home/azka/.pyxbld/temp.linux-x86_64-3.6/pyrex/aiokafka/record/_crecords/cutil.c:605:10: fatal error: crc32c.h: No such file or directory
^~~~~~~~~~
compilation terminated. /home/azka/.pyxbld/temp.linux-x86_64-3.6/pyrex/aiokafka/record/_crecords/cutil.c:605:10: fatal error: crc32c.h: No such file or directory
^~~~~~~~~~
compilation terminated. /home/azka/.pyxbld/temp.linux-x86_64-3.6/pyrex/aiokafka/record/_crecords/cutil.c:605:10: fatal error: crc32c.h: No such file or directory
^~~~~~~~~~
compilation terminated. /home/azka/.pyxbld/temp.linux-x86_64-3.6/pyrex/aiokafka/record/_crecords/cutil.c:605:10: fatal error: crc32c.h: No such file or directory
^~~~~~~~~~
compilation terminated. ┌ƒaµS† v1.7.4─┬───────────────────────────────────────────────────────────────────┐ │ id │ consumer_app │ │ transport │ [URL('kafka://localhost:9092')] │ │ store │ memory: │ │ web │ http://localhost:6067/ │ │ log │ -stderr- (info) │ │ pid │ 9049 │ │ hostname │ azka │ │ platform │ CPython 3.6.8 (Linux x86_64) │ │ + │ Cython (GCC 8.0.1 20180414 (experimental) [trunk revision 259383) │ │ drivers │ │ │ transport │ aiokafka=1.0.3 │ │ web │ aiohttp=3.5.4 │ │ datadir │ /media/data/marco/azka-vision/consumer_app-data │ │ appdir │ /media/data/marco/azka-vision/consumer_app-data/v1 │ └─────────────┴───────────────────────────────────────────────────────────────────┘
[2019-09-04 11:45:40,629: INFO]: [^--Producer]: Creating topic 'consumer_app-assignor-leader'
[2019-09-04 11:45:40,640: INFO]: [^--Producer]: Creating topic 'camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition'
[2019-09-04 11:45:41,643: INFO]: [^--Producer]: Creating topic 'consumer_app-counts_table-changelog'
[2019-09-04 11:45:42,641: INFO]: [^--Producer]: Creating topic 'consumer_app-counts_table-changelog' [2019-09-04 11:45:42,642: INFO]: [^--Producer]: Creating topic 'consumer_app-assignor-leader' [2019-09-04 11:45:42,642: INFO]: [^--Producer]: Creating topic 'camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition' [2019-09-04 11:45:42,651: INFO]: Updating subscribed topics to: frozenset({'camera_frames', 'consumer_app-assignor-leader', 'camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', 'consumer_app-counts_table-changelog'}) [2019-09-04 11:45:42,654: INFO]: Subscribed to topic(s): {'camera_frames', 'consumer_app-assignor-leader', 'camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', 'consumer_app-counts_table-changelog'} [2019-09-04 11:45:42,661: INFO]: Discovered coordinator 1 for group consumer_app [2019-09-04 11:45:42,663: INFO]: Revoking previously assigned partitions set() for group consumer_app [2019-09-04 11:45:43,652: INFO]: (Re-)joining group consumer_app [2019-09-04 11:45:46,656: INFO]: Joined group 'consumer_app' (generation 157) with member_id faust-1.7.4-0f6149bc-8170-4e89-b4bd-21124ee7fb44 [2019-09-04 11:45:46,656: INFO]: Elected group leader -- performing partition assignments using faust [2019-09-04 11:45:46,663: INFO]: Successfully synced group consumer_app with generation 157 [2019-09-04 11:45:46,664: INFO]: Setting newly assigned partitions {TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=0), TopicPartition(topic='consumer_app-counts_table-changelog', partition=6), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=2), TopicPartition(topic='consumer_app-counts_table-changelog', partition=4), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=4), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=6), TopicPartition(topic='consumer_app-counts_table-changelog', partition=3), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=1), TopicPartition(topic='consumer_app-counts_table-changelog', partition=1), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=3), TopicPartition(topic='consumer_app-counts_table-changelog', partition=7), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=5), TopicPartition(topic='consumer_app-counts_table-changelog', partition=5), TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=7), TopicPartition(topic='camera_frames', partition=0), TopicPartition(topic='consumer_app-counts_table-changelog', partition=2), TopicPartition(topic='consumer_app-assignor-leader', partition=0), TopicPartition(topic='consumer_app-counts_table-changelog', partition=0)} for group consumer_app [2019-09-04 11:45:46,678: INFO]: Fetch offset 118 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=1), resetting offset [2019-09-04 11:45:46,680: INFO]: Fetch offset 10425 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=0), resetting offset [2019-09-04 11:45:46,680: INFO]: Fetch offset 16207 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=7), resetting offset [2019-09-04 11:45:48,635: INFO]: [^---Recovery]: Highwater for active changelog partitions: ┌Highwater - Active───────────────────┬───────────┬───────────┐ │ topic │ partition │ highwater │ ├─────────────────────────────────────┼───────────┼───────────┤ │ consumer_app-counts_table-changelog │ 0 │ -1 │ │ consumer_app-counts_table-changelog │ 1 │ -1 │ │ consumer_app-counts_table-changelog │ 2 │ -1 │ │ consumer_app-counts_table-changelog │ 3 │ -1 │ │ consumer_app-counts_table-changelog │ 4 │ -1 │ │ consumer_app-counts_table-changelog │ 5 │ -1 │ │ consumer_app-counts_table-changelog │ 6 │ -1 │ │ consumer_app-counts_table-changelog │ 7 │ -1 │ └─────────────────────────────────────┴───────────┴───────────┘ [2019-09-04 11:45:50,638: INFO]: [^---Recovery]: active offsets at start of reading: ┌Reading Starts At - Active───────────┬───────────┬────────┐ │ topic │ partition │ offset │ ├─────────────────────────────────────┼───────────┼────────┤ │ consumer_app-counts_table-changelog │ 6 │ -1 │ │ consumer_app-counts_table-changelog │ 7 │ -1 │ │ consumer_app-counts_table-changelog │ 4 │ -1 │ │ consumer_app-counts_table-changelog │ 5 │ -1 │ │ consumer_app-counts_table-changelog │ 3 │ -1 │ │ consumer_app-counts_table-changelog │ 2 │ -1 │ │ consumer_app-counts_table-changelog │ 0 │ -1 │ │ consumer_app-counts_table-changelog │ 1 │ -1 │ └─────────────────────────────────────┴───────────┴────────┘ [2019-09-04 11:45:50,839: INFO]: [^---Recovery]: standby offsets at start of reading: ┌Reading Starts At - Standby─┐ │ topic │ partition │ offset │ └───────┴───────────┴────────┘ [2019-09-04 11:45:51,638: INFO]: [^---Recovery]: Resuming flow... [2019-09-04 11:45:51,639: INFO]: [^---Recovery]: Recovery complete [2019-09-04 11:45:51,740: INFO]: [^---Recovery]: Restore complete! [2019-09-04 11:45:51,741: INFO]: [^---Recovery]: Seek stream partitions to committed offsets.
[2019-09-04 11:45:52,641: INFO]: [^---Recovery]: Worker ready
[2019-09-04 11:45:52,642: WARNING]: Network Initialized [2019-09-04 11:45:52,791: INFO]: Fetch offset 16207 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=7), resetting offset [2019-09-04 11:45:52,792: INFO]: Fetch offset 118 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=1), resetting offset [2019-09-04 11:45:52,792: INFO]: Fetch offset 10425 is out of range for partition TopicPartition(topic='camera_frames-faustapp.agents.run_vision_module-consumer_app-Frame.cam_id-repartition', partition=0), resetting offset 2019-09-04 11:47:25.350744: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcudnn.so.7 [2019-09-04 11:47:26,047: INFO]: Timer _thread_keepalive woke up too late, with a drift of +0.3455595959967468 2019-09-04 11:47:26.372190: W tensorflow/core/common_runtime/bfc_allocator.cc:237] Allocator (GPU_0_bfc) ran out of memory trying to allocate 2.13GiB with freed_by_count=0. The caller indicates that this is not a failure, but may mean that there could be performance gains if more memory were available. 2019-09-04 11:47:26.729264: I tensorflow/stream_executor/platform/default/dso_loader.cc:42] Successfully opened dynamic library libcublas.so.10.0 Processed frame from Camera: 2 ---- FPS: 0.45925764675463293 [2019-09-04 11:47:26,997: INFO]: Processed frame from Camera: 2 ---- FPS: 0.45925764675463293 [2019-09-04 11:47:28,065: WARNING]: /media/data/marco/azka-vision/venv/lib/python3.6/site-packages/sklearn/utils/linearassignment.py:127: DeprecationWarning: The linear_assignment function is deprecated in 0.21 and will be removed from 0.23. Use scipy.optimize.linear_sum_assignment instead. DeprecationWarning) Processed frame from Camera: 2 ---- FPS: 0.9180982879303127 [2019-09-04 11:47:28,155: INFO]: Timer _thread_keepalive woke up too late, with a drift of +1.1065389930008678 [2019-09-04 11:47:28,161: INFO]: Processed frame from Camera: 2 ---- FPS: 0.9180982879303127 [2019-09-04 11:47:28,515: INFO]: Timer commit woke up too late, with a drift of +2.4193572739997764 [2019-09-04 11:47:29,360: WARNING]: /media/data/marco/azka-vision/venv/lib/python3.6/site-packages/sklearn/utils/linearassignment.py:127: DeprecationWarning: The linear_assignment function is deprecated in 0.21 and will be removed from 0.23. Use scipy.optimize.linear_sum_assignment instead. DeprecationWarning) Processed frame from Camera: 2 ---- FPS: 0.7098541959481522 [2019-09-04 11:47:29,930: INFO]: Processed frame from Camera: 2 ---- FPS: 0.7098541959481522 [2019-09-04 11:47:30,517: INFO]: Timer _thread_keepalive woke up too late, with a drift of +1.3456225049958448 [2019-09-04 11:47:30,882: WARNING]: /media/data/marco/azka-vision/venv/lib/python3.6/site-packages/sklearn/utils/linearassignment.py:127: DeprecationWarning: The linear_assignment function is deprecated in 0.21 and will be removed from 0.23. Use scipy.optimize.linear_sum_assignment instead. DeprecationWarning) Processed frame from Camera: 2 ---- FPS: 0.9396288466055909 [2019-09-04 11:47:31,100: INFO]: Processed frame from Camera: 2 ---- FPS: 0.9396288466055909 [2019-09-04 11:47:32,382: WARNING]: /media/data/marco/azka-vision/venv/lib/python3.6/site-packages/sklearn/utils/linearassignment.py:127: DeprecationWarning: The linear_assignment function is deprecated in 0.21 and will be removed from 0.23. Use scipy.optimize.linear_sum_assignment instead. DeprecationWarning) Processed frame from Camera: 2 ---- FPS: 0.7058253886803959 [2019-09-04 11:47:32,794: INFO]: Processed frame from Camera: 2 ---- FPS: 0.7058253886803959 [2019-09-04 11:47:33,775: INFO]: Timer _thread_keepalive woke up too late, with a drift of +2.2477690800005803
`
Did you try reducing the number of messages you get at each poll()
following my comment from another issue?
Tried different values but it didn't work for me ! I think the problem is related to the msg size (since it's an encoded message bytes array) the message size is large, it causes the issue somehow. I tried just decoding and printing the id of the decoded msg (no processing done) and I still get the same thing
From what I was told, this is totally expected... Here is a workaround which might work for you: https://github.com/robinhood/faust/issues/404#issuecomment-523580724
This should be documented because doing any kind of sync work will break Faust.
Can someone explain the severity of this issue and explain if this is just a warning that we can live with or is it something that has to be fixed?
It's very hard to run a code with mix of async and sync code using run_in_executor
.
Also, I had some issues using run_in_executor
with a pool created out of a with
statement when stopping the agents:
[2019-10-14 12:14:47,565: ERROR]: exception calling callback for <Future at 0x10c33ca90 state=finished returned list>
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py", line 362, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 728, in call_soon_threadsafe
self._check_closed()
File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Thanks in advance
I found the issue to be quite severe if you hang the timers for long enough (2-3 minutes) because you can get yourself into a situation where you hit timeouts which can cause the commit thread and potentially others stopping to work.
I have seen this a couple of times in production but wasn't able to reproduce it in a controlled environment.
Running on a different thread isn't good enough to avoid this completely. Doing anything on the CPU will lead to these messages, I consistently get them when just starting up Faust without actually doing anything, connecting to the broker and subsequent rebalancing are enough.
I added the logs to see how often it would happen, and turns out it happens a lot more frequently than I expected it to.
I'm not sure it means anything unless the latency is really long and it also happens frequently.
Given how many things need to happen in an asyncio event loop iteration, it makes sense that some timers will be late.
Can someone explain the severity of this issue and explain if this is just a warning that we can live with or is it something that has to be fixed?
It's very hard to run a code with mix of async and sync code using
run_in_executor
. Also, I had some issues usingrun_in_executor
with a pool created out of awith
statement when stopping the agents:[2019-10-14 12:14:47,565: ERROR]: exception calling callback for <Future at 0x10c33ca90 state=finished returned list> Traceback (most recent call last): File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 324, in _invoke_callbacks callback(self) File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/futures.py", line 362, in _call_set_state dest_loop.call_soon_threadsafe(_set_state, destination, source) File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 728, in call_soon_threadsafe self._check_closed() File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 475, in _check_closed raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed
Thanks in advance
It's hard to say what this error is caused by without context. If you can paste the code that runs in the thread this would help.
When using run_in_executor you need to make sure a new event loop is created for that thread, so do you create and stop an event loop for every thread? Perhaps this was scheduled to run but you already closed the loop
It's hard to say what this error is caused by without context. If you can paste the code that runs in the thread this would help.
When using run_in_executor you need to make sure a new event loop is created for that thread, so do you create and stop an event loop for every thread? Perhaps this was scheduled to run but you already closed the loop
The example that I posted was just a test using run_in_executor
. In my actual implementation I think it'll require some effort to move all the processing code into functions to be run with run_in_executor
.
To give more context:
asyncpg
), Redis (aredis
) and Boto (aioboto3
and aiobotocore
).@ask , which recommendations can you give in order to minimize this issue? Do you think faust
can be used with agents that do heavy use of CPU?
Thanks in advance
Also ran into this issue, managed to replicate with minimal code below and also found one cause and workaround -
It appears that aiokafka's crc code is a bottleneck when reading large messages, after setting broker_check_crcs = False, the issue doesn't occur without external influences(msg sends, other code)
Further digging, on my machine even with installing faust[fast] it appears to be using python crc and not the cython version. (Also looks like an issue in @MarcoRizk 's logs above)
Am I missing something in the setup process to utilize cython?
`app = faust.App('faustbenchmark%f'%(random.random()), broker='kafka://localhost',
broker_check_crcs = False
)
parser_in_topic = app.topic('test')#,value_serializer='raw')
@app.agent(parser_in_topic)#, concurrency=1, sink=[parser_out_topic])
async def perf_stub(incoming_msgs):
i = 0
msg_count = 10000
async for msg in incoming_msgs:
i += 1
if(i >= msg_count):
print('%d len:%d'%(i, len(msg)))
i = 0
def generate_message(length): msg = '' charset = string.ascii_letters for i in range(length): msg += random.choice(charset) return msg
@app.timer(10) async def produce(): msg = generate_message(1000) for i in range(10000): await parser_in_topic.send(value=msg)`
@zakin Is setting broker_check_crcs = False
recommended in a production environment? I'm not familiar enough with Kafka to know what checking the checksum achieves (data corruption in transit?), but turning it off feels like a rather hacky solution.
@joekohlsdorf Does your workaround work when the task is CPU-intensive, as opposed to being a toy sleep
function? If the CPU is a bottleneck, surely spinning off new threads (instead of blocking the agent's async loop) is going to induce lag?
Unless you're suggesting that it is preferable to run n
tasks concurrently, rather than n
tasks serially, even if they both ultimately take the same time to complete due to lag in the former.
Note that setup.py for robinhood-aiokafka, the fork of aiokafka used by Faust, will happily build itself into a wheel even if it failed to compile some of the C extensions.
If your aiokafka is using the pure python version of the CRC check, try building robinhood-aiokafka your environment with python setup.py bdist_wheel -v
and see if any of the C compilations fail - in my case, it was a missing libz-dev
Debian package that caused it to fail.
I have tried to implement all the methods in async. Still getting this error message in the production environment. The methods don't seem quite CPU intensive. They contain processing the large messages and pushing into DB, fetching the data from DB at particular intervals and then pushing them to an API, making DB copy into S3. I have also used the async libraries for all these tasks. Still I am getting this message at a daily basis.
I am sending camera frames to kafka streams and processing them through a faust agent. the agent process around 10 messages then pauses for a few seconds with a log message of
Expected behavior
messages get processed without interruption
Actual behavior
woker interruption after every 10 messages