Closed JWCook closed 2 years ago
This works a lot better, but it seems that it can still break. Its rare but in one test, i did get an extra call through over the limits. So running 2 different instance of below script was able to run 61 times instead of 60 within a minute.
limiter = Limiter(
RequestRate(2, Duration.SECOND ),
RequestRate(60, Duration.MINUTE),
bucket_class=SQLiteBucket,
bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)
i = 0
def limitMe2():
global limiter, i
while True:
try:
limiter.try_acquire("Test2")
i += 1
print(i, datetime.datetime.now())
return
except BucketFullException as err:
time.sleep( err.meta_info['remaining_time'] )
while True:
limitMe2()
1 2022-01-18 09:51:31.536435 1 2022-01-18 09:51:33.538127
2 2022-01-18 09:51:31.536507 2 2022-01-18 09:51:33.540045
3 2022-01-18 09:51:32.536823 3 2022-01-18 09:51:34.540482
4 2022-01-18 09:51:32.538525 4 2022-01-18 09:51:34.542329
5 2022-01-18 09:51:37.546645 5 2022-01-18 09:51:35.542841
6 2022-01-18 09:51:37.548498 6 2022-01-18 09:51:35.544794
7 2022-01-18 09:51:38.549025 7 2022-01-18 09:51:36.544453
8 2022-01-18 09:51:38.550807 8 2022-01-18 09:51:36.546201
9 2022-01-18 09:51:45.562282 9 2022-01-18 09:51:39.550162
10 2022-01-18 09:51:45.564106 10 2022-01-18 09:51:39.552159
11 2022-01-18 09:51:46.564638 11 2022-01-18 09:51:40.551759
12 2022-01-18 09:51:46.566303 12 2022-01-18 09:51:40.553565
13 2022-01-18 09:51:47.566965 13 2022-01-18 09:51:41.554177
14 2022-01-18 09:51:47.568626 14 2022-01-18 09:51:41.556119
15 2022-01-18 09:51:48.569230 15 2022-01-18 09:51:42.556145
16 2022-01-18 09:51:48.570923 16 2022-01-18 09:51:42.558058
17 2022-01-18 09:51:49.571550 17 2022-01-18 09:51:43.558493
18 2022-01-18 09:51:49.573491 18 2022-01-18 09:51:43.560216
19 2022-01-18 09:51:50.573997 19 2022-01-18 09:51:44.560446
20 2022-01-18 09:51:50.575949 20 2022-01-18 09:51:44.562185
21 2022-01-18 09:51:51.575613 21 2022-01-18 09:51:54.582857
22 2022-01-18 09:51:51.577463 22 2022-01-18 09:51:54.584566
23 2022-01-18 09:51:52.578033 23 2022-01-18 09:51:55.583582
24 2022-01-18 09:51:52.579793 24 2022-01-18 09:51:55.585518
25 2022-01-18 09:51:53.579603 25 2022-01-18 09:52:00.595022
26 2022-01-18 09:51:53.581356 26 2022-01-18 09:52:00.596920
27 2022-01-18 09:51:56.584739
28 2022-01-18 09:51:56.586404
29 2022-01-18 09:51:57.587229
30 2022-01-18 09:51:57.589168
31 2022-01-18 09:51:58.589656
32 2022-01-18 09:51:58.591284
33 2022-01-18 09:51:59.591581
34 2022-01-18 09:51:59.593294
35 2022-01-18 09:52:00.602044
Perhaps you also need to get below within the transaction in limiter.py
volume = bucket.size()
self.bucket_group[item_id].put(now)
I have not understood the actual logic of how everything works and i don't know much about sqlite (have not used in a decade), but below works ok for my use case of using a single bucket.
lock() is perhaps not needed since you changed 'isolation_level'. With below fix, i think there might be deadlock issues if a program tries to use say multiple sql buckets - say A and B and another tries to access B and A as both might end up waiting for other to release. So its probably wrong.
Thanks
def try_acquire(self, *identities) -> None:
"""Acquiring an item or reject it if rate-limit has been exceeded"""
self._init_buckets(identities)
# workaround fix
# Acquire lock
for item_id in identities:
self.bucket_group[item_id].lock()
now = self.time_function()
for idx, rate in enumerate(self._rates):
for item_id in identities:
bucket = self.bucket_group[item_id]
volume = bucket.size()
if volume < rate.limit:
continue
# Determine rate's time-window starting point
start_time = now - rate.interval
item_count, remaining_time = bucket.inspect_expired_items(start_time)
if item_count >= rate.limit:
# workaround fix
# Release lock through Rollback
for item_id in identities:
self.bucket_group[item_id].rollback()
raise BucketFullException(item_id, rate, remaining_time)
if idx == len(self._rates) - 1:
# We remove item based on the request-rate with the max-limit
bucket.get(volume - item_count)
for item_id in identities:
self.bucket_group[item_id].put(now)
# workaround fix
# Release lock through Commit
for item_id in identities:
self.bucket_group[item_id].commit()
sqlite_bucket.py
def lock(self):
self.connection.execute('BEGIN EXCLUSIVE')
def commit(self):
self.connection.commit()
def rollback(self):
self.connection.rollback()
One extra call is quite rare, i only got it once after many runs. And i also see that we are able to get 60 calls done few seconds earlier than 30seconds sometimes. One example below
1 2022-01-18 11:25:04.467394 1 2022-01-18 11:25:06.469235
2 2022-01-18 11:25:04.467436 2 2022-01-18 11:25:06.471081
3 2022-01-18 11:25:05.468024 3 2022-01-18 11:25:07.471528
4 2022-01-18 11:25:05.470016 4 2022-01-18 11:25:07.473374
5 2022-01-18 11:25:10.477771 5 2022-01-18 11:25:08.473998
6 2022-01-18 11:25:10.479472 6 2022-01-18 11:25:08.475710
7 2022-01-18 11:25:11.480083 7 2022-01-18 11:25:09.475544
8 2022-01-18 11:25:11.481802 8 2022-01-18 11:25:09.477153
9 2022-01-18 11:25:12.482566 9 2022-01-18 11:25:15.492735
10 2022-01-18 11:25:12.484302 10 2022-01-18 11:25:15.494780
11 2022-01-18 11:25:13.484905 11 2022-01-18 11:25:16.495104
12 2022-01-18 11:25:13.493864 12 2022-01-18 11:25:16.496853
13 2022-01-18 11:25:14.491597 13 2022-01-18 11:25:17.497412
14 2022-01-18 11:25:14.493340 14 2022-01-18 11:25:17.499261
15 2022-01-18 11:25:15.498006 15 2022-01-18 11:25:18.499737
16 2022-01-18 11:25:16.503290 16 2022-01-18 11:25:18.501453
17 2022-01-18 11:25:19.500953 17 2022-01-18 11:25:21.503579
18 2022-01-18 11:25:19.502629 18 2022-01-18 11:25:21.505668
19 2022-01-18 11:25:20.503356 19 2022-01-18 11:25:22.506082
20 2022-01-18 11:25:20.505085 20 2022-01-18 11:25:22.507937
21 2022-01-18 11:25:24.510874 21 2022-01-18 11:25:23.507487
22 2022-01-18 11:25:24.512596 22 2022-01-18 11:25:23.509070
23 2022-01-18 11:25:25.511571 23 2022-01-18 11:25:26.512737
24 2022-01-18 11:25:25.513332 24 2022-01-18 11:25:26.514486
25 2022-01-18 11:25:32.525899 25 2022-01-18 11:25:27.515075
26 2022-01-18 11:25:32.527847 26 2022-01-18 11:25:27.517122
27 2022-01-18 11:25:28.517630
28 2022-01-18 11:25:28.519548
29 2022-01-18 11:25:29.519605
30 2022-01-18 11:25:29.521338
31 2022-01-18 11:25:30.522052
32 2022-01-18 11:25:30.524316
33 2022-01-18 11:25:31.523590
34 2022-01-18 11:25:31.525311
@JWCook is this ready? :D
@vutran1710 Sorry for the delay, I started this and then got busy with other projects. I think this is about 80% done. It still needs tests, and I think I previously go stuck trying to figure out how an extra (unexpected) call can still occasionally be made, as in SpiffSpaceman's example above.
I'll take another look at it this week.
@vutran1710 Okay, I found another thing that needs to change.
Here is how try_aquire()
is currently structured (simplified, with just the bucket operations):
for rate in rates:
for bucket in buckets:
# Begin transaction
bucket.size()
bucket.inspect_expired_items()
bucket.get() # Last rate only
# End transaction
for bucket in buckets:
# Begin transaction
bucket.put()
# End transaction
Those final bucket.put()
calls are a problem, since they're not part of the same transaction. If we switch the inner and outer for
loops, we can put all operations for a given bucket within in a single transaction:
for bucket in buckets:
# Begin transaction
bucket.size()
for rate in rates:
bucket.inspect_expired_items()
bucket.get() # Last rate only
bucket.put()
# End transaction
Does that look reasonable to you?
@vutran1710 Okay, I found another thing that needs to change.
Here is how
try_aquire()
is currently structured (simplified, with just the bucket operations):for rate in rates: for bucket in buckets: # Begin transaction bucket.size() bucket.inspect_expired_items() bucket.get() # Last rate only # End transaction for bucket in buckets: # Begin transaction bucket.put() # End transaction
Those final
bucket.put()
calls are a problem, since they're not part of the same transaction. If we switch the inner and outerfor
loops, we can put all operations for a given bucket within in a single transaction:for bucket in buckets: # Begin transaction bucket.size() for rate in rates: bucket.inspect_expired_items() bucket.get() # Last rate only bucket.put() # End transaction
Does that look reasonable to you?
I remember thinking about this already when I was writing this part, meaning there is something about it that made me go the other way - which I obviously forgot. But anyway, if this change can get past the tests then I think it is still valid.
So, yeah, I agree it looks reasonable.
I remember thinking about this already when I was writing this part, meaning there is something about it that made me go the other way
I was wondering about that. All the tests do still pass. But I did just think of one case that would have different in behavior: if two different buckets would exceed a different rate limit within a single call of try_acquire()
. For example, if we have:
hourly_rate = RequestRate(500, Duration.HOUR)
daily_rate = RequestRate(1000, Duration.DAY)
limiter = Limiter(hourly_rate, daily_rate)
# Many requests later....
limiter.try_acquire(
'bucket_1', # This bucket has exceeded the daily_rate
'bucket_2', # This bucket has exceeded the hourly_rate
)
BucketFullException
will always be for the smaller rate (hourly).BucketFullException
would be for the first bucket, and in this case the daily rate.I believe that's not what we want. If we're adding delays based on remaining_time
, we will want to apply the smallest delay first.
That also makes me realize the current behavior might not be ideal either! The smallest rate isn't guaranteed to have the smallest remaining_time
. The daily_rate
for bucket_1
could have only 5 minutes remaining, while the hourly_rate
for bucket_2
could have 59 minutes remaining.
So I'll need to think about this a bit more.
Okay, so here are some changes I made:
try_acquire('id_1', 'id_2')
try_acquire('id_2', 'id_1')
try_acquire()
into a separate method since it's a bit more code now@vutran1710 What do you think so far? Still needs a few more tests, but I think this may be getting close to done.
Collect all possible BucketFullExceptions for a request, and delay raising until all buckets have been processed
@JWCook can you explain the idea behind this? because originally it was my intention to make it fail-fast
and it would save some computing time too.
For the rest I think it's ok to proceed.
That was to handle the case I described above (https://github.com/vutran1710/PyrateLimiter/pull/61#issuecomment-1076500269), in which multiple buckets are full, but the first one to raise an error has a significantly longer remaining_time
.
That may be a fairly obscure case, though, so I'd be happy to revert it back to the fail-fast approach.
I reverted it back to the way it was before: iterate over rates first, then buckets, and fail on the first BucketFullException
. The case I mentioned above could be handled in a separate issue and PR, if/when someone actually encounters that.
Now I'm seeing a different issue: the tests I added with ThreadPoolExecutor
are passing locally (and were previously passing in CI), but are now failing in CI. Looking into that now.
I reverted it back to the way it was before: iterate over rates first, then buckets, and fail on the first
BucketFullException
. The case I mentioned above could be handled in a separate issue and PR, if/when someone actually encounters that.Now I'm seeing a different issue: the tests I added with
ThreadPoolExecutor
are passing locally (and were previously passing in CI), but are now failing in CI. Looking into that now.
I agree.
I have yet had enough time to wrap my head around all this, but it just came to my mind that - if we worry about the remaining time, cant we just process all raises on 1 turn and save the remaining time info somewhere, so the next time checking we wont have to process every bucket again?
Merging #61 (a1e042f) into master (54dd04f) will increase coverage by
0.12%
. The diff coverage is100.00%
.
@@ Coverage Diff @@
## master #61 +/- ##
==========================================
+ Coverage 97.82% 97.95% +0.12%
==========================================
Files 8 8
Lines 322 342 +20
Branches 30 32 +2
==========================================
+ Hits 315 335 +20
Misses 4 4
Partials 3 3
Impacted Files | Coverage Δ | |
---|---|---|
pyrate_limiter/bucket.py | 93.63% <100.00%> (+0.11%) |
:arrow_up: |
pyrate_limiter/limit_context_decorator.py | 100.00% <100.00%> (ø) |
|
pyrate_limiter/limiter.py | 100.00% <100.00%> (ø) |
|
pyrate_limiter/sqlite_bucket.py | 100.00% <100.00%> (ø) |
Continue to review full report at Codecov.
Legend - Click here to learn more
Δ = absolute <relative> (impact)
,ø = not affected
,? = missing data
Powered by Codecov. Last update 54dd04f...a1e042f. Read the comment docs.
Okay, I think I finally figured out the remaining issues:
bucket.get()
writes to the bucket and leaves the transaction open. Later, bucket.put()
needs to both read (to get size) and write to the bucketget()
from one thread/bucket can potentially block a put()
from another thread/bucketSolution:
try_acquire()
is completeEXCLUSIVE
transaction mode unnecessary, which is good, because that means try_acquire()
only needs to lock the buckets it's processing, not the entire database@vutran1710 This is now ready for review and merge.
if we worry about the remaining time, cant we just process all raises on 1 turn and save the remaining time info somewhere, so the next time checking we wont have to process every bucket again?
Yeah, good idea!
Hello, I tried to take the latest code and test it against same tester code i used earlier. I just run the script twice - at the same time from different terminal. ( or copy script to another file and run them both .. )
Maybe i made a mistake, but it seems latest code does not work. As soon as i start the 2nd script, something happens and both of them run too fast and do not honor either limit correctly. They did stop but at 60/54 instead of x+y=60. In another run it was 60/56.
It seems that the script gets run progressively faster. 17:21:59 has the highest prints followed by 17:21:58 and so on.
Test code and output below. Please run the tester code to confirm. Thanks
limiter = Limiter(
RequestRate(2, Duration.SECOND ),
RequestRate(60, Duration.MINUTE),
bucket_class=SQLiteBucket,
bucket_kwargs={'path': '/tmp/pyrate_limiter.sqlite'},
)
i = 0
def limitMe2():
global limiter, i
while True:
try:
limiter.try_acquire("Test2")
i += 1
print(i, datetime.datetime.now())
return
except BucketFullException as err:
time.sleep( err.meta_info['remaining_time'] )
while True:
limitMe2()
% python3 test2.py % python3 test2.py
1 2022-04-05 17:21:52.560871 1 2022-04-05 17:21:55.561757
2 2022-04-05 17:21:52.562150 2 2022-04-05 17:21:55.563307
3 2022-04-05 17:21:53.559914 3 2022-04-05 17:21:56.570909
4 2022-04-05 17:21:53.562332 4 2022-04-05 17:21:56.572378
5 2022-04-05 17:21:54.561165 5 2022-04-05 17:21:56.573816
6 2022-04-05 17:21:54.562763 6 2022-04-05 17:21:56.575248
7 2022-04-05 17:21:55.565014 7 2022-04-05 17:21:57.565068
8 2022-04-05 17:21:55.566479 8 2022-04-05 17:21:57.566559
9 2022-04-05 17:21:56.562808 9 2022-04-05 17:21:57.568086
10 2022-04-05 17:21:56.564300 10 2022-04-05 17:21:57.569617
11 2022-04-05 17:21:56.565673 11 2022-04-05 17:21:57.571232
12 2022-04-05 17:21:56.567017 12 2022-04-05 17:21:57.572666
13 2022-04-05 17:21:57.581827 13 2022-04-05 17:21:57.574043
14 2022-04-05 17:21:57.583374 14 2022-04-05 17:21:57.575435
15 2022-04-05 17:21:57.584952 15 2022-04-05 17:21:58.582520
16 2022-04-05 17:21:57.586452 16 2022-04-05 17:21:58.584102
17 2022-04-05 17:21:57.588142 17 2022-04-05 17:21:58.585747
18 2022-04-05 17:21:57.589612 18 2022-04-05 17:21:58.587358
19 2022-04-05 17:21:57.591097 19 2022-04-05 17:21:58.589024
20 2022-04-05 17:21:57.592591 20 2022-04-05 17:21:58.590476
21 2022-04-05 17:21:58.564324 21 2022-04-05 17:21:58.591944
22 2022-04-05 17:21:58.566865 22 2022-04-05 17:21:58.593431
23 2022-04-05 17:21:58.568479 23 2022-04-05 17:21:58.594786
24 2022-04-05 17:21:58.569981 24 2022-04-05 17:21:58.596123
25 2022-04-05 17:21:58.571458 25 2022-04-05 17:21:58.597505
26 2022-04-05 17:21:58.573026 26 2022-04-05 17:21:58.604229
27 2022-04-05 17:21:58.574479 27 2022-04-05 17:21:58.605780
28 2022-04-05 17:21:58.576014 28 2022-04-05 17:21:58.607130
29 2022-04-05 17:21:58.577503 29 2022-04-05 17:21:58.608651
30 2022-04-05 17:21:58.616874 30 2022-04-05 17:21:58.610103
31 2022-04-05 17:21:58.618368 31 2022-04-05 17:21:59.565607
32 2022-04-05 17:21:58.620001 32 2022-04-05 17:21:59.567509
33 2022-04-05 17:21:58.621470 33 2022-04-05 17:21:59.569247
34 2022-04-05 17:21:58.622980 34 2022-04-05 17:21:59.570983
35 2022-04-05 17:21:58.624472 35 2022-04-05 17:21:59.572687
36 2022-04-05 17:21:58.626095 36 2022-04-05 17:21:59.574326
37 2022-04-05 17:21:59.583375 37 2022-04-05 17:21:59.576024
38 2022-04-05 17:21:59.585016 38 2022-04-05 17:21:59.577728
39 2022-04-05 17:21:59.586578 39 2022-04-05 17:21:59.579375
40 2022-04-05 17:21:59.588255 40 2022-04-05 17:21:59.581154
41 2022-04-05 17:21:59.589799 41 2022-04-05 17:21:59.637688
42 2022-04-05 17:21:59.591232 42 2022-04-05 17:21:59.639177
43 2022-04-05 17:21:59.592720 43 2022-04-05 17:21:59.640748
44 2022-04-05 17:21:59.594143 44 2022-04-05 17:21:59.642209
45 2022-04-05 17:21:59.595820 45 2022-04-05 17:21:59.643726
46 2022-04-05 17:21:59.597225 46 2022-04-05 17:21:59.645163
47 2022-04-05 17:21:59.598665 47 2022-04-05 17:21:59.646677
48 2022-04-05 17:21:59.600135 48 2022-04-05 17:21:59.648083
49 2022-04-05 17:21:59.601523 49 2022-04-05 17:21:59.649542
50 2022-04-05 17:21:59.602936 50 2022-04-05 17:21:59.650941
51 2022-04-05 17:21:59.604316 51 2022-04-05 17:21:59.652468
52 2022-04-05 17:21:59.605674 52 2022-04-05 17:21:59.653876
53 2022-04-05 17:21:59.607064 53 2022-04-05 17:21:59.655338
54 2022-04-05 17:21:59.608529 54 2022-04-05 17:21:59.656739
55 2022-04-05 17:21:59.609955
56 2022-04-05 17:21:59.611579
57 2022-04-05 17:21:59.613147
58 2022-04-05 17:21:59.614564
59 2022-04-05 17:21:59.616092
60 2022-04-05 17:21:59.617534
Fixes #60
try_aquire()
will be locked.isolation_level="EXCLUSIVE"
locks the entire database, but a per-bucket lock allows any other buckets to be safely be processed in parallel.try_acquire('id_1', 'id_2')
try_acquire('id_2', 'id_1')
lock_acquire()
andlock_release()
method toAbstractBucket
, since this needs to be called fromLimiter
SQLiteBucket
, but this could potentially apply to other future backends.