Rxnet / eventstore-client

EventStore asynchronous PHP client with reactiveX flavours
Apache License 2.0
26 stars 10 forks source link

Add an auto reconnect #31

Open Th3Mouk opened 6 years ago

Th3Mouk commented 6 years ago

Hello !

The old implementation of reconnect in the persistentSubscription wasn't working.

One of the actual workaround to handle NotMasterException can be:

// You only need to set the default scheduler once
Scheduler::setDefaultFactory(
    function () {
        // The getLoop method auto start loop
        return new Scheduler\EventLoopScheduler(EventLoop::getLoop());
    }
);

$dsn = 'tcp://admin:changeit@toto.com:1113';

$observer = new CallbackObserver(
    function($e) {
        var_dump($e);
    }
);

$this->eventStore = new EventStore(EventLoop::getLoop());

$connect = function($dsn, $stream, $group) {
    return $this->eventStore
        ->connect($dsn)
        ->flatMapTo($this->eventStore->persistentSubscription($stream, $group));
};

$connect($dsn, $stream, $group)
    ->catch(function (\Throwable $e) use ($connect, $dsn, $stream, $group) {
        $this->eventStore = new EventStore(EventLoop::getLoop());
        $credentials = parse_url($dsn);
        if ($e instanceof NotMasterException) {
            $dsn = 'tcp://'.$credentials['user'].':'.$credentials['pass'].'@'.$e->getMasterIp().':'.$e->getMasterPort();
            return $connect($dsn, $stream, $group);
        }
        throw $e;
    })
    ->flatMap($this->adapter)
    ->subscribe($observer);

But it's not a solution when you want to handle multiple \Throwable before the message consumption. The catch method only executed once. Probably the connect method can handle internally this logic, but actually i'm a little lost.

Vinceveve commented 6 years ago

Connect is an independent stream, so the best solution

I've made an exemple in a previous release that can be adapted

<?php
use EventLoop\EventLoop;
use Ramsey\Uuid\Uuid;
use Rx\Observer\CallbackObserver;
use Rx\Scheduler\EventLoopScheduler;
use Rxnet\EventStore\Data\WriteEventsCompleted;
use Rxnet\EventStore\EventRecord;
use Rxnet\EventStore\EventStore;
use Rxnet\EventStore\NewEvent\JsonEvent;
use Rxnet\Httpd\Httpd;
use Rxnet\Httpd\HttpdEvent;
use Rxnet\Operator\OnBackPressureBuffer;
require '../vendor/autoload.php';
$eventStore = new EventStore();
// Wait connexion to be up before starting
\Rxnet\awaitOnce(
    $eventStore->connect()
        ->doOnError(function (\Exception $e) {
            echo "got an error {$e->getMessage()} \n";
        })
        // on error try to reconnect until end of world
        ->retryWhen(function (\Rx\Observable $errors) {
            // Wait 5s between reconnection
            return $errors->delay(5000)
                ->doOnNext(function () {
                    echo "Disconnected, retry\n";
                });
        })
);
// When http is faster than write
// buffer in memory, could have been redis buffer
$memoryBuffer = new OnBackPressureBuffer(
    10000,
    function () {
        echo "Buffer overflow !";
    },
    OnBackPressureBuffer::OVERFLOW_STRATEGY_ERROR
);
// Write to this stream id
// API part :
// Forward to event store when an event is received
$httpd = new Httpd();
$httpd->listen(8082)
    // Don't filter anything and answer ok
    // in real life that would be
    ->map(function (HttpdEvent $event) use ($memoryBuffer) {
        $id = Uuid::uuid4()->toString();
        echo "Received HTTP request {$id} on /test, save an event \n";
        // Answer we handle it
        $event->getResponse()
            ->json(compact('id'));
        // Transform it to an event (do what you want)
        return new JsonEvent(
            '/test/asked',
            ['i' => microtime(true)],
            $id
        );
    })
    // here we buffer until commit finished
    ->lift($memoryBuffer->operator())
    // Write to event store
    ->flatMap(function ($data) use ($eventStore) {
        return $eventStore->write('category-test', $data);
    })
    // Ask for next element in buffer
    ->doOnNext([$memoryBuffer, 'request'])
    // Output some debug when write is complete
    ->subscribe(
        new CallbackObserver(
            function (WriteEventsCompleted $eventsCompleted) {
                echo "Event saved with number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n";
            }
        ),
        new EventLoopScheduler(EventLoop::getLoop())
    );
echo "HTTPD server listening on http://localhost:8082/test\n";
// Listener part :
// listen for events on given stream and to projection
// persistent subscription is better here
$eventStore->volatileSubscription('category-test', true)
    ->subscribeCallback(function (EventRecord $record) {
        echo "Event received {$record->getNumber()}@{$record->getStreamId()} {$record->getType()} with ID {$record->getId()}\n";
    });
// Make the event loop run
EventLoop::getLoop()->run();
Vinceveve commented 6 years ago

For await you can replace it by https://github.com/RxPHP/RxAwait

Vinceveve commented 6 years ago

@Th3Mouk is it ok with this example ?

Th3Mouk commented 6 years ago

Yes thank you, I didn't want to use an await, but for this kind of thing it's acceptable. The timeout will be handled after the connect, and the NotMasterException after the subscription.

The problem with the NotMasterException is that it needs a brand new connection and EventStore object currently. I will probably have a look to propose a solution.