RxPHP / RxHttp

Http Client for RxPHP
MIT License
22 stars 2 forks source link

Limit number of concurrent requests #6

Closed ameoba32 closed 7 years ago

ameoba32 commented 7 years ago

Here is a thing, RxPHP allows to make concurrent requests, which is good. So next code will create all connection in parallel. What would be the way to limit number of concurrent connections?

I am trying to think of RxPHP way of doing it, but have no idea.

It seems that this needs to be a feature of RxPHP itself, so it has internal counter and does not create new connection until pool is full.

\Rx\Observable::fromArray(
    [
        'https://www.example.com/',
        'https://www.example.com/',
        'https://www.example.com/',
    ]
)->flatMap(
    function ($url) {
        return \Rx\React\Http::get($url);
    }
mbonneau commented 7 years ago

@ameoba32 - You are wading into the waters of rx backpressure.

Because Rx is a "push" model it is conceivable that the producer could outrun the capacity of the system (or consume more resources than you wish).

There are two possible problems that need to be solved:

  1. When there is just too much information coming in to possibly be able to process.
  2. A temporary spike on the producer side (such as a large array input).

Problem 1 is probably not what you are interested in (you would just need a strategy for dropping things - like throttle or something)

For solving problem 2, we can use the following solution to limit concurrent subscriptions:

Observable::fromArray(
    [
        'http://www.example.com/',
        // .... lots more urls in here
        'http://www.example.com/',
    ]
)
    ->map(
        function ($url) use ($lifecycleWatcher) {
            static $requestNo = 0;

            $requestNo++;

            return \Rx\React\Http::get($url);
        })

    // this takes incoming Observables and splits it into 4
    // concat streams
    ->groupBy(function () {
        static $index = 0;
        return $index++ % 4;
    })
    ->flatMap(function (Observable\GroupedObservable $go) {
        return $go->concatAll();
    })

    ->subscribe(function ($x) {
        echo "$x\n";
    });

This solution works reasonably well for its simplicity.

One issue that this solution may experience is that things get "queued" into groups and wait there. This makes it so that if you have an observable waiting on one of the grouped streams when another becomes idle, it can't then switch to the idle stream. Everything will still process fine, just may not always be running at "full capacity".

Let me know if this answers your question.

ameoba32 commented 7 years ago

It works like a charm! Thank you Matt. Never thought of groupBy here!