ross / requests-futures

Asynchronous Python HTTP Requests for Humans using Futures
Other
2.11k stars 152 forks source link

How would a "retrying" decorator work on a FutureSession?! #26

Closed spex66 closed 2 years ago

spex66 commented 9 years ago

scope

Using FutureSession works pretty well for me. But due to the parallel requests, I run into a typical status code 429 "Too Many Requests".

I've handled this before with the "retrying" library.

simplified synchronous code example (w/o FutureSession)

from retrying import retry
import requests

def retry_if_result(response, retry_status_codes=[]):
    """Return True if we should retry (in this case when the status_code is 429), False otherwise"""
    # DEBUG to see this in action
    if response.status_code in retry_status_codes:
        print('RETRY %d: %s' % (response.status_code, response.url))
        return True
    else:
        return False

# https://github.com/rholder/retrying/issues/26
def never_retry_on_exception(response):
    """Return always False to raise on Exception (will not happen by default!)"""
    return False

# https://github.com/rholder/retrying/issues/25
def create_retry_decorator(retry_status_codes=[]):
    return retry(
        # create specific "retry_if_result" functions per status codes
        retry_on_result=partial(retry_if_result, retry_status_codes=retry_status_codes), 
        wait_exponential_multiplier=1000, 
        wait_exponential_max=10000,
        retry_on_exception=never_retry_on_exception
        )

# create specific decorators per status codes
retry429_decorator = create_retry_decorator(retry_status_codes=[429])

s = requests.session()

s.auth = (user, password)

# decorate them with the retry / throttling logic
s.get    = retry429_decorator(s.get)
s.put    = retry429_decorator(s.put)
s.post   = retry429_decorator(s.post)
s.delete = retry429_decorator(s.delete)

issue

With switching to

from requests_futures.sessions import FuturesSession

s = FuturesSession()
...
# decorate them with the retry / throttling logic
s.get    = retry429_decorator(s.get)
s.put    = retry429_decorator(s.put)
s.post   = retry429_decorator(s.post)
s.delete = retry429_decorator(s.delete)

# non-blocking approach
future = s.post('https://api.cxense.com/traffic', data=json.dumps(payload))

This runs in an obvious chaos, because the "retry decorator of the post()" wants to inspect the response.status_code, which is obviously not (yet) available in the defered Future object:

    231     # DEBUG to see this in action
--> 232     if response.status_code in retry_status_codes:
    233         print('RETRY %d: %s' % (response.status_code, response.url))
    234         return True
AttributeError: 'Future' object has no attribute 'status_code'

question

What is the pattern to apply a "retry" logic based on the status_code inspection for this defered / async approach? Is this something which can be applied with the background callback?

It would be awesome, if someone can at least point me into the right direction. At the moment my brain is in "async flow deadlock" once again...

(or do you think, that this question is better suited for stackoverflow?)

ross commented 9 years ago

What is the pattern to apply a "retry" logic based on the status_code inspection for this defered / async approach?

I don't have one off-hand. It's not a situation that I've needed to handle previously so I'd have to sit down and think about it to come up with something. Ideally you'd probably pace the requests you queue up to avoid hitting the rate-limits in the first place assuming you know what the limits are. You could potentially do something with background callbacks that looks to see if the call was rate limited and recording that it needs to be resent again later after things have had time to cool, but that won't be particularly easy to get right if you're firing off lots of requests and don't have a solid idea of what's getting sent when.

(or do you think, that this question is better suited for stackoverflow?)

Probably. There may or may not have been someone doing it with requests-futures, but I'd imagine someone has a pattern for solving this sort of thing.

Honestly I'd just pace the stuff that I queued up. You could potentially create a sub-class of FuturesSession that would manage the rate of starting the requests by queuing things up and sending less than N of them per second or whatever you needed. That sounds like an interesting extension of what's here. I won't have spare cycles to look at it in the near future, but feel free to leave this open and I may get around to it at some point.

spex66 commented 9 years ago

Thx @ross for your feedback. I've not a solution available and not the time to dig into it as needed. So, I'll keep you posted if I stumble over something relevant.

And yes, let's keep the issue open for the while (=PA=)

mikeage commented 7 years ago

I'm looking at doing something similar -- not rate limiting, but simply an unreliable server. Did you ever find a good solution?

ross commented 7 years ago

I'm looking at doing something similar -- not rate limiting, but simply an unreliable server. Did you ever find a good solution?

For an unreliable server I'd probably look to just code things up normally and have a check as I was working through the responses that re-requests things that fail. A contrived example of which might look something like:

#!/usr/bin/env python

from requests_futures.sessions import FuturesSession

sess = FuturesSession()

def make_request(status_code):
    return sess.get('http://httpbin.org/status/{}'.format(status_code))

requests = []
for arg in (200, 200, 201, 418, 502):
    requests.append((arg, 3, make_request(arg)))

while requests:
    arg, tries, request = requests.pop(0)
    resp = request.result()
    print('arg={}, tries={}, resp={}'.format(arg, tries, resp))
    if resp.status_code > 299 and tries > 1:
        requests.append((arg, tries - 1, make_request(arg)))

This wouldn't make much sense for the rate-limit case as there you'd just hammer things to death having to get a rate limit message for each request to know to back off and retry it.

ross commented 7 years ago
(env) sweetums:requests-futures ross$ ./retry.py
arg=200, tries=3, resp=<Response [200]>
arg=200, tries=3, resp=<Response [200]>
arg=201, tries=3, resp=<Response [201]>
arg=418, tries=3, resp=<Response [418]>
arg=502, tries=3, resp=<Response [502]>
arg=418, tries=2, resp=<Response [418]>
arg=502, tries=2, resp=<Response [502]>
arg=418, tries=1, resp=<Response [418]>
arg=502, tries=1, resp=<Response [502]>
mikeage commented 7 years ago

I'm a bit new to futures (and requests-futures) in general, but can you help me understand how that would fit into this pattern? My code is roughly:

session = FuturesSession(max_workers=10)
futures = []
futures.append(session.get(url1))
futures.append(session.get(url2))
...
for future in cf.as_completed(futures):
    r = future.result()
    assert r.ok
    # Do something with r, possibly including r.url

My problem is that if future.result() throws an exception, I don't get the r to get the url of the original request. Is there a way that I can continue to use cf.as_completed (which, if I understand correctly, I need in order to get the first one that's ready first)?

ross commented 7 years ago

futures.append(session.get(url1))

Yeah, my example above was assuming you'd need more than just the response/request to re-request things so rather than having a list of futures I stored a list of args and the future object. That way if i needed to re-request something the necessary args to build that request would be available.

    requests.append((arg, 3, make_request(arg)))
session = FuturesSession(max_workers=10)
requests = []
requests.append((url, session.get(url1)))
requests.append((url, session.get(url2)))
...
# Not familiar with whatever this is `cf.as_completed(futures)`, guess maybe it gives you back the ones that have completed? If so that's pretty cool. What's below will work, but wouldn't have that behavior.
for url, future in requests:
    r = future.result()
    assert r.ok
    # you now have url which tells you what to re-request if necessary.

You could potentially keep the as_completed bit by doing something slightly hacky like:

future = session.get(url1)
future.url = url
requests.append(future)

Then the url would be should be accessible off the future object

for url, future in requests:
    r = future.result()
    assert r.ok
    # future.url should have the url you need to re-request
mikeage commented 7 years ago

Sorry, forgot to mention that I had from concurrent.futures as cf.

ross commented 7 years ago

Sorry, forgot to mention that I had from concurrent.futures as cf.

Yeah, just hadn't seen the as_completed before. Seems pretty cool/useful.

mikeage commented 7 years ago

Just as a followup, I have this working, but original idea didn't work. I was hoping to be able to do something like:

session = FuturesSession(max_workers=10)
futures = []
futures.append(session.get(url1))
futures.append(session.get(url2))
for future in concurrent.futures.as_completed(futures):
    r = future.result()
    try:
        assert r.ok
    except:
        futures.append(session.get(r.url)) # Add some code to prevent infinite loops

But the concurrent.futures.as_completed doesn't refresh the list while iterating, so the first time, we only loop through futures twice. However, on subsequent checks, it does show three elements.

Instead, I rescheduled them in a different array; it's not quite as elegant as having one ongoing loop, but it works

llamafilm commented 5 years ago

I made a list of requests and remove each item once it gets a successful response. Then retry the remainder after checking the Retry-After header.

https://stackoverflow.com/a/58261477/1997852

willitsw commented 2 years ago

Has anyone tried passing a Session that has been configured with a retry policy from urllib3.util.retry to accomplish configurable retries- something like this?

import requests
from requests.adapters import HTTPAdapter
from requests_futures.sessions import FuturesSession
from urllib3.util.retry import Retry

session = requests.Session()

retry = Retry(
    total=3,
    read=3,
    connect=3,
    backoff_factor=1, 
    allowed_methods=False,
    status_forcelist=[500, 502, 503, 504], 
)
adapter = HTTPAdapter(max_retries=retry)

session.mount("http://", adapter)
session.mount("https://", adapter)

async_session = FuturesSession(session=session)
github-actions[bot] commented 2 years ago

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 7 days.