NerdWalletOSS / kinesis-python

Low level, multiprocessing based AWS Kinesis producer & consumer library
Other
118 stars 50 forks source link

Added improvements, better error handling, max queue size to prevent memory runaway, etc. #15

Closed keithjjones closed 3 years ago

keithjjones commented 6 years ago

Thanks for making a native Python library!

I had two issues using it, so I took the liberty of creating fixes and this PR. The first issue was with stopping of the sub processes. The processes would often hang. It worked out better to terminate the processes. Second, the memory would runaway for very active streams. This would often kill the docker container it was running in. I added a max size to the bigger queues so that the overall memory could be reduced. The native functionality is still there if you leave max size to None. Also, I added a lot more logging in the more critical areas that were causing errors for me.

I tried pytest, but it was looking for "mocker", which I didn't see in the code. Therefore, I don't know if the tests pass, but they should because I'm using this code successfully on a Kinesis stream right now.

Thanks!

codecov-io commented 6 years ago

Codecov Report

Merging #15 into master will decrease coverage by 3.68%. The diff coverage is 20.9%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #15      +/-   ##
==========================================
- Coverage   34.02%   30.33%   -3.69%     
==========================================
  Files           4        4              
  Lines         288      412     +124     
==========================================
+ Hits           98      125      +27     
- Misses        190      287      +97
Impacted Files Coverage Δ
src/kinesis/state.py 0% <0%> (ø) :arrow_up:
src/kinesis/consumer.py 32.98% <21.21%> (-5.32%) :arrow_down:
src/kinesis/producer.py 39.35% <23.95%> (-9.51%) :arrow_down:

Continue to review full report at Codecov.

Legend - Click here to learn more Δ = absolute <relative> (impact), ø = not affected, ? = missing data Powered by Codecov. Last update 5305f7b...f9b7b58. Read the comment docs.

borgstrom commented 6 years ago

Thanks for the PR @keithjjones!

I'll do a full review on this in the next couple days, just wanted to let you know it's in queue.

mwarkentin commented 5 years ago

Any updates on this? This PR looks like it answers a couple of questions we had when initially reviewing this library for use as we start to use kinesis. :)

hampsterx commented 5 years ago

sorry to +1 but this would be great to get merged in!~ @borgstrom

hampsterx commented 5 years ago

ok just tested this out and seems awesome!

readme needs to be updated to state the producer needs to use start() and shutdown() afterwards. Also on linux/python3.6 I get

AttributeError: 'Process' object has no attribute 'kill'  at Producer.py line 193

Probably

self.async_producer.kill()

should instead be ?

self.async_producer.terminate()
hampsterx commented 5 years ago

@borgstrom sorry to nag but any thoughts on this? would be great to get merged in so this project could move forward :)

hampsterx commented 5 years ago

@keithjjones pretty please can you rebase/fix the conflict :)

keithjjones commented 5 years ago

Hi I haven't touched this in quite a while. Can you work with the code I submitted?

hampsterx commented 5 years ago

hi @keithjjones I am just starting on using it for a project now (got delayed for a while), just getting beyond the basics, seems to work so far. I guess I could fork your fork (actually I already did) and submit a new PR to fix the conflicts and apply any other tweaks?

Update: never mind, wrote my own lib https://github.com/hampsterx/async-kinesis

keithjjones commented 5 years ago

No complaints out of me. I won't be able to get to it for at least a month, I'm figuring.

borgstrom commented 5 years ago

@keithjjones -- Can you share some info on why you choose to use multiprocessing directly instead of the offspring library?

This PR is very large and hard to review because there are so many different changes all lumped into a same change set.

I'm very open to incorporating some of your improvements, but the changes need to be done in smaller pieces so that they are easier to review.