p5-RedisDB / RedisDB

Perl extension to access Redis
22 stars 11 forks source link

Unable to detect a down server and move on #14

Closed mburgoon closed 10 years ago

mburgoon commented 10 years ago

I'm using Redis as a queue utilizing lpush and [b]lpop to get items in and out of the queue. I'm going to be running several instances of redis to provide high availability, scalability as well as redundancy on multiple machines.

However, with RedisDB, I am unable to get it to break out of the redis call upon server failure. For this module to work for me, I need to be able to detect an error when talking to a redis server, mark that connection as bad so that I will not use it for some time, and then try to connect back to that server later.

Here's the basic script that I'm using to validate behavior. Regardless of what I put in raise_error, reconnect_attempts, etc, the RedisDB module never returns until the redis server that I stopped comes back online. Whenever I stop an instance of redis, I just get a bunch of dots signifying that the on_connect_error callback was being executed over and over again.

Is there something basic that I'm missing or an approach that I haven't seen?

#!/usr/bin/perl
$|=1;
use RedisDB;
use strict;
my(@queues);
foreach my $i (6371,6372,6373) {
  my($hash)={
    conn=> undef,
    server=>"redis:$i",
    connected=>1
  };
  my($conn)=RedisDB->new(
    host=>"redis",                     
    port=>$i,     
    raise_error=>1,
    reconnect_attempts=>1,
    on_connect_error=>sub {
      $hash->{'connected'}=0;
      $hash->{'disconnectedAt'}=time();
      print "."
    }          
  );  
  $hash->{'conn'}=$conn;
  push(@queues, $hash);
}

my($start)=time();
my($last);

while (1) {        
  my($total,$len)=0;
  print "len:";
  foreach my $q (@queues) {
    $len=undef;
    if ($q->{'connected'}) {
      $len=$q->{conn}->llen("queue_opsstats");
      $total+=$len;                           
    }              
    printf " %d",(defined($len) ? $len : -1);
  }
  printf " (%d), queue diff: %d\n", $total, $total-$last;
  $last=$total;
  sleep(1);
}
trinitum commented 10 years ago

The on_connect_error callback allows you to change host/port in case connection to redis has failed. Default on_connect_error callback confesses. If callback does not die, then reconnection loop continues indefinetely. If you want to have just one connection to any of the three servers then you should use this callback:

use 5.010;
use strict;
use warnings;

use RedisDB;
my @ports = (6371,6372,6373);
my $redis = RedisDB->new(
    port => $ports[0],
    on_connect_error => sub {
        push @ports, shift @ports;
        warn "Disconnected from $_[0]->{port}, connecting to $ports[0]";
        $_[0]->{port} = $ports[0];
    },
);

while (1) {        
    my $len = $redis->llen("queue_opsstats");
    printf "%d\n", $len;
    sleep 1;
}

In your case you seems want to read from all the servers. Set raise_error to false, and do not define on_connect_error callback. In this case when connection fails redis operation ('llen' in your case) will return RedisDB::Error::DISCONNECTED object, based on that you can skip reply from the server.

use 5.010;
use strict;
use warnings;

use RedisDB;
use Scalar::Util qw(blessed);
my @queues;
foreach my $i ( 6371, 6372, 6373 ) {
    my $redis = RedisDB->new(
        host        => "redis",
        port        => $i,
        raise_error => 0,
    );
    push @queues, $redis;
}

my $last;

while (1) {
    my ( $total, $len ) = 0;
    print "len:";
    foreach my $redis (@queues) {
        my $len;
        $len = $redis->llen("queue_opsstats");
        if ( blessed $len) {

            # error
        }
        else {
            $total += $len;
            printf " %d", ( defined($len) ? $len : -1 );
        }
    }
    printf " (%d), queue diff: %d\n", $total, $total - $last;
    $last = $total;
    sleep(1);
}

Note that each time you send new request to redis it will try to reconnect, which blocks for some time, so you may want to stop sending requests to this server for a couple of minutes before giving it another chance, alternatively use some AnyEvent based module, e.g. AnyEvent::Redis.

mburgoon commented 10 years ago

Thanks for the response. I'm attempting your methodology, however with the example script you gave, it looks like it still confesses on error.

Couldn't connect to the redis server at redis:6372: Connection refused at /usr/share/perl5/vendor_perl/RedisDB.pm line 202
        RedisDB::_on_connect_error('RedisDB=HASH(0xcb9558)', 'Connection refused') called at /usr/share/perl5/vendor_perl/RedisDB.pm line 261
        RedisDB::_connect('RedisDB=HASH(0xcb9558)') called at /usr/share/perl5/vendor_perl/RedisDB.pm line 407
        RedisDB::_recv_data_nb('RedisDB=HASH(0xcb9558)') called at /usr/share/perl5/vendor_perl/RedisDB.pm line 486
        RedisDB::send_command('RedisDB=HASH(0xcb9558)', 'LLEN', 'queue_opsstats') called at /usr/share/perl5/vendor_perl/RedisDB.pm line 195
        RedisDB::execute('RedisDB=HASH(0xcb9558)', 'LLEN', 'queue_opsstats') called at /usr/share/perl5/vendor_perl/RedisDB.pm line 816
        RedisDB::__ANON__('RedisDB=HASH(0xcb9558)', 'queue_opsstats') called at monitor2.pl line 24

199: sub _on_connect_error {
200:    my ( $self, $err ) = @_;
201:    my $server = $self->{path} || ("$self->{host}:$self->{port}");
202:    confess "Couldn't connect to the redis server at $server: $!";

Your explanation of what should happen (return an error object) is exactly what I need, the problem is that if confess is called, it's always fatal.

I have not yet checked out AnyEvent::Redis yet - I've avoided it as I've read that you need to run it in an event loop, which for the actual code that I will be running, won't work, but if I can't get perl-Redis or perl-RedisDB to do what I need (perl-Redis always confesses on any error too) then I will check it out, and may end up having to hack these base perl modules to do what I need them to do.

trinitum commented 10 years ago

Ah, sorry I didn't test my recipe myself. Now I see that I was wrong and module doesn't really behave as I expected and as it should. It is a bug and I'm working on it.

trinitum commented 10 years ago

Unfortunately it will take some time to fix as the problem is quite deep. Meanwhile as workaround you can wrap calls to redis in eval blocks:

use 5.010;
use strict;
use warnings;

use lib qw(t ../t);
use RedisServer;
use RedisDB;
use Scalar::Util qw(blessed);
use Try::Tiny;

my @servers;
my @queues;
for ( 1 .. 3 ) {
    my $srv = RedisServer->start;
    push @servers, $srv;
    my $redis = RedisDB->new(
        host        => "127.0.0.1",
        port        => $srv->{port},
        raise_error => 0,
    );
    push @queues, $redis;
}

while (1) {
    my $n = 0;
    foreach my $redis (@queues) {
        try {
            my $len = $redis->incr("error_test");
            say "Server $n: $len";
            # randomly stop server
            $servers[$n]->stop if rand > 0.9;
        } catch {
            say "Got an error: $_";
            $redis->reset_connection;
            $servers[$n] = RedisServer->start(port => $redis->{port});
        };
        $n++;
    }
    sleep(1);
}
mburgoon commented 10 years ago

Thanks! At least I'm not crazy that I couldn't get it to work. Let me know when the fix is in, I understand it's fairly deep, I think I ran into the same 'wow, I need to change a lot of stuff' when I tried changing all of the confess to warns to see what happened. An eval won't work - this is a very, very busy subsystem and an eval will bring it to its knees.

Thanks again!

trinitum commented 10 years ago

I just uploaded 2.21_01 on cpan that should fix the problem. Please check if it works for you. There's eg/no_raise_error_1.pl example included into distribution that establishes three connections that randomly fail.

mburgoon commented 10 years ago

Thanks for the quick turn around! I'm at a meeting the next couple of days, will let you know Friday how it goes.

mburgoon commented 10 years ago

Been testing it, looks good so far. I've also set timeout=>1 and suspended/continued the process, detect failure and move on. Here's my current proof of concept code:

use 5.010;
use strict;
use warnings;

$|=1;

use RedisDB;
use Scalar::Util qw(blessed);

my @queues;
foreach my $i ( 6371, 6372, 6373 ) {
  my($hash)={
    conn=>undef,
    connected=>1,
    last_fail=>0,
    name=>"redis:$i"
  };

  my $redis = RedisDB->new(
    host        => 'redis',
    port => $i,
    raise_error => undef,
    timeout=>1
  );
  $hash->{'conn'}=$redis;
  push @queues, $hash;
}

my $last;
my($now);

while (1) {
  my ( $total, $len ) = 0;
  print "len:";
  $now=time();
  foreach my $redis (@queues) {
    my $len;
    if ($redis->{'connected'}) {
      $len = $redis->{'conn'}->llen("queue_opsstats");
      if ( blessed $len) {
        $redis->{'connected'}=0;
        $redis->{'last_fail'}=$now;
      } else {
        $total += $len;
        printf " %d",( defined($len) ? $len : -1 );
      }
    } elsif ( ($now-$redis->{'last_fail'}) > 10) {
      $redis->{'connected'}=1;
    }
  }
  printf " (%d), queue diff: %d\n", $total, $total - $last;
  $last = $total;
  sleep(1);
}