MARTIMM / raku-mongodb-driver

MongoDB driver for Raku
Artistic License 2.0
18 stars 8 forks source link

Advice / Idea on how to Improve Insert / Update performance #33

Open Zer0-Tolerance opened 1 year ago

Zer0-Tolerance commented 1 year ago

Hi, I'd like to get your opinion about this so I'm trying to increase the insert speed in my program but when I create a channel and then create multiple promises to dequeue the channel it seems to block but I don't understand why ... Here is my small code to create promise to dequeue the mongo channel.

my $mongoc = {
  react
    whenever $mongoq {
#      say $_;
      $k++;
      mongoStore($_);
    }
  }
};
my @mongop;
do { @mongop.push: Promise.start($mongoc) for 1..1 };

Below is my code to insert data in Mongo with a specific field is the entry exist or not. $hr contains the final hash of data to be stored in Mongo. So my problem is that all of this working fine for 1 promise but as soon as I start 2 or more then only one entry is stored. After some debugging it seems that this behavior is only triggered when you call mongoStore so it seems there might be some issue when the run-command is used in // ?

sub mongoStore (Hash $h) {
  my $sub=callframe.code.name;
  my $host=$h<host>;
  my $port=$h<port>;
  my $start=DateTime.now;
  my ($countperf,$packperf,$insertperf);
  $req .= new: (count => $col.name, query => (:$host, :$port),);
  my $doc;
  try {
    $doc=$db.run-command($req);
    $countperf=DateTime.now - $start;
    if ($doc<n> eq 0) {
      $h<ctime>=DateTime.now;
    }
    else {
      $h<mtime>=DateTime.now;
    }
    CATCH {
      default {
        say "$sub count exception: ",.^name, '→ ', .Str , " host: $h<host> port: $h<port>";
      }
    }
  }
my $hr= {
    :host($h<host>),
    :port($h<port>),
  };
$hr<ip>=$h<ip> if $h<ip>;
my $uq=(
    q => (:$host, :$port,),
    u => ('$set' => @($hr)),
    upsert => True,
  );
  # say $uq;
  $req .= new: (update => $col.name, updates => [$uq]);
  try {
    $db.run-command($req);
    CATCH {
      default {
        say "$sub update exception: ",.^name, '→ ', .Str , " ip: $h<ip> port: $h<port>";
      }
    }
  }
  $insertperf=DateTime.now - $start;

Do you have any idea how to improve performance / solve the concurrency issue ?

Zer0-Tolerance commented 1 year ago

I don't think I was super clear in my explanation, so I've created this snippet that triggers the issue:

#!/usr/bin/env raku
use BSON::Document;
use MongoDB::Client;
use MongoDB::Database;
use MongoDB::Collection;
# Native MongoDB driver
my MongoDB::Client $client .= new(:uri("mongodb://"));
my MongoDB::Database $db = $client.database('Test');
my MongoDB::Collection $col = $db.collection('test');
my BSON::Document $req;
my $mongoq = Channel.new;
my $k;

my $mongoc = {
  react {
    whenever $mongoq {
      my Array $docs = [];
      $docs.push: $_;
#      dd $docs;
      insert($docs);
    }
  }
};
my @mongop;
do { @mongop.push: Promise.start($mongoc) for 1..2 };

for ^10 -> $i {
  $mongoq.send: (
  code                => "n$i",
  name                => "name $i and lastname $i",
  address             => "address $i",
  test_record         => "tr$i"
  );
}
$mongoq.close;
await Promise.allof(@mongop);
say '';
say now - INIT now;

sub insert ($docs) {
  $req .= new: (
    insert => $col.name,
    documents => $docs
  );
  my BSON::Document $doc = $db.run-command($req);
  $k++;
  print "\r\e[K","inserted $k docs";
}

Let me know if you're able to reproduce to issue, this code should get stuck after 1/2 inserts.

MARTIMM commented 1 year ago

thanks, then I don't have to fill in the details. I already was thinking about a plan to test the code. Yesterday I had to read up on some of the Raku docs about react, whatever, etc which I didn't use much. So making the program now and setting up a simple server to talk to and on we go....

Zer0-Tolerance commented 1 year ago

Hopefully we will fix this and we can enable full concurrency & increase the performance. If you were not using react block how were you doing the parallelization ? With loop {while ($q->poll) {#do some stuff } } ? Let me know if you need more details.

MARTIMM commented 1 year ago

The parallelization is done just by using Promise start and that's it. when this was made there wasn't a react nor whenever.

Zer0-Tolerance commented 1 year ago

ok but you need at least a queue (Channel) to dispatch between promises ? I'm curious how you did mass insert before.

MARTIMM commented 1 year ago

True, I forgot to mention that. There is some code humming in the background to poll server status and sends it over a channel to anyone hooked up on it. The Client module is doing things on threads to be able to return quickly. BSON is also using threads to convert data in parallel into binary form but there is no need to send it elsewhere.

MARTIMM commented 1 year ago

About the code you've sent above, there are a few things to keep in mind of which the first is most important;

All the remarks are minor except for the first one, although in my eyes, it will nevertheless aid in tackling errors.

MARTIMM commented 1 year ago

oeps......

MARTIMM commented 1 year ago

Cleaned up program with several things from your code;

use BSON::Document;

use MongoDB;
use MongoDB::Client;
use MongoDB::Database;
use MongoDB::Collection;

#-------------------------------------------------------------------------------
my Str $host = 'localhost';
my Str $port = '65010';

my $mongoq = Channel.new;
my $mongoc = {
  react {
    whenever $mongoq -> Hash $h {
      mongoStore($h);
      insert( $h, [ BSON::Document.new( (
              :code($h<code>), :name($h<name>), :address($h<address>),
              :test_record($h<test_record>)
            )
          )
        ]
      )
    }
  }
};

my @mongop;
do { @mongop.push: Promise.start($mongoc) for ^5; };

for ^10 -> $i {
  $mongoq.send: %(
    code                => "n$i",
    name                => "name $i and lastname $i",
    address             => "address $i",
    test_record         => "tr$i",

    :$host, :$port#, :$client  #, :$db, :$col
  );
}

$mongoq.close;
await Promise.allof(@mongop);

say '';
say now - INIT now;

#-------------------------------------------------------------------------------
sub insert ( Hash $h, Array $docs ) {

  my $host=$h<host>;
  my $port=$h<port>;

  my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
  my MongoDB::Database $db = $client.database('Issue33');
  my MongoDB::Collection $col = $db.collection('concurrency');

  my BSON::Document $req .= new: (
    insert => $col.name,
    documents => $docs
  );

  my BSON::Document $doc = $db.run-command($req);
}

#-------------------------------------------------------------------------------
sub mongoStore (Hash $h) {
note $?LINE;

  my $sub=callframe.code.name;
  my $host=$h<host>;
  my $port=$h<port>;
  my BSON::Document ( $req, $doc);

  my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
  my MongoDB::Database $db = $client.database('Issue33');
  my MongoDB::Collection $col = $db.collection('concurrency');

  $req .= new: (
    count => $col.name, query => ( :$host, :$port),
  );

  $doc = $db.run-command($req);

  if ($doc<n> eq 0) {
    $h<ctime>=DateTime.now;
  }

  else {
    $h<mtime>=DateTime.now;
  }

  CATCH {
    default {
      say "$sub count exception: ",.^name, '→ ', .Str , ";
      # host: $h<host> port: $h<port>";
    }
  }

  my $hr = {
    :host($h<host>),
    :port($h<port>),
  };

  $hr<ip>=$h<ip> if $h<ip>;

  my $uq=(
    q => (:$host, :$port,),
    u => ('$set' => @($hr)),
    upsert => True,
  );

  $req .= new: (update => $col.name, updates => [$uq]);
  try {

    $db.run-command($req);
    CATCH {
      default {
        say "$sub update exception: ",.^name, '→ ', .Str , " ip: $h<ip> port: $h<port>";
      }
    }
  }
}

The code runs perfectly now but I've seen new errors from my own code I have to look into. Errors like Cannot look up attributes in a MongoDB::ServerPool::Server type object. Did you forget a '.new'? Perhaps it is some race condition of sorts.

Zer0-Tolerance commented 1 year ago

thanks for this review, I've just did some testing with your updated code but it's not really faster with multiple promise and when you get the error the entry is not inserted in the DB. Doing a connection per insert is really not ideal. Here is a simplified version which is working with multiple threads but not really performing better than single thread:

use BSON::Document;
use MongoDB;
use MongoDB::Client;
use MongoDB::Database;
use MongoDB::Collection;

#-------------------------------------------------------------------------------
my Str $host = 'localhost';
my Str $port = '27017';
my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
my MongoDB::Database $db = $client.database('Test');
my MongoDB::Collection $col = $db.collection('test');
my $k;
my $mongoq = Channel.new;
my $lock = Lock.new;
my $c;
my $mongoc = {
  react {
    whenever $mongoq -> Hash $h {
#      mongoStore($h);
      $lock.protect: {mongoStore($h)};
#      insert( $h, [ BSON::Document.new((:host($h<host>), :port($h<port>)))])
    }
  }
};

my @mongop;
do { @mongop.push: Promise.start($mongoc) && sleep .001 for ^@*ARGS[0]; };

#dd @*ARGS;
for ^10 -> $i {
  $mongoq.send: %(
    host => "192.168.1.$i",
    port => $i,
  );
}

$mongoq.close;
await Promise.allof(@mongop);

say '';
say now - INIT now;

#-------------------------------------------------------------------------------
sub insert ( Hash $h, Array $docs ) {

  my $host=$h<host>;
  my $port=$h<port>;
#  dd @$h;
  my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
  my MongoDB::Database $db = $client.database('Test');
  my MongoDB::Collection $col = $db.collection('test');

  my BSON::Document $req .= new: (
    insert => $col.name,
    documents => @$h
  );

  my BSON::Document $doc = $db.run-command($req);
  $k++;
  print "\r\e[K","inserted $k docs";
}

#-------------------------------------------------------------------------------
sub mongoStore (Hash $h) {
#  note $?LINE;
  my $sub = callframe.code.name;
  my BSON::Document ( $req, $doc);
#  my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
#  my MongoDB::Database $db = $client.database('Test');
#  my MongoDB::Collection $col = $db.collection('test');

  $req .= new: (
    count => $col.name, query => (:host($h<host>), :port($h<port>)),
  );

  $doc = $db.run-command($req);

  if ($doc<n> eq 0) {
    $h<ctime> = DateTime.now;
  }
  else {
    $h<mtime> = DateTime.now;
  }

  CATCH {
    default {
      say "$sub count exception: ", .^name, '→ ', .Str, ";
      # host: $h<host> port: $h<port>";
    }
  }
  my $hr = {
    :host($h<host>),
    :port($h<port>),
  };
  $hr<ctime> = $h<ctime> if $h<ctime>;
  $hr<mtime> = $h<mtime> if $h<mtime>;
  my $uq = (
    q => (:host($h<host>), :port($h<port>)),
    u => ('$set' => @($hr)),
    upsert => True,
  );
  $req .= new: (update => $col.name, updates => [$uq]);
  try {
    $db.run-command($req);
    $k++;
    print "\r\e[K", "inserted $k docs";
    CATCH {
      default {
        say "$sub update exception: ", .^name, '→ ', .Str, " host: $h<host> port: $h<port>";
      }
    }
  }
}
MARTIMM commented 1 year ago

The first intention was to get it working, from there you can look into speeding it up. First I want to get rid of the error.

You could try to get it faster by using semaphores to protect variables or to gather the docs and add them to the insert request before sending the lot in one go.

To use semaphores you can try Semaphore::ReadersWriters which I use everywhere in the package.

MARTIMM commented 1 year ago

I see in your code the use of Lock. Interesting, didn't know about that one. Need to read up on that and benchmark it against the Semaphore::ReadersWriters of mine.

Zer0-Tolerance commented 1 year ago

I understand the strategy which makes total sense, the tricky part is that this error make the program skips the data to be inserted in the DB which is a big issue. With my version nothing is lost but the lock approach make it as slow as a single threaded code so a bit pointless if we're talking about concurrency :) I think most of the driver are usually using connection pools to reuse the already open connection to DB in order the avoid open/close for each insert. Maybe we should ask David Golden (@dagolden ) for ideas (author of the P5 driver which is now deprecated but still working fine ... ) ?

MARTIMM commented 1 year ago

Found my error and made some corrections. Of course, you want every record to be sent to the database, otherwise, you're working with incomplete data.

Btw, if you have looked a bit in the code you will see that there are pools of servers so there is no problem there. The problem is that you must a) protect your data in threads or b) recreate the connections per thread. Otherwise, you get into trouble.

A small test program I've created shows two methods to handle data and in both, it is shown that concurrency really helps to get faster although, the last test shows that a single insert of 100 docs makes more difference than using threads. One could gather docs using Supplies in a thread and send it away every 50 or 100 or so, all depends on the program.

use BSON::Document;

use MongoDB;
use MongoDB::Client;
use MongoDB::Database;
use MongoDB::Collection;

#-------------------------------------------------------------------------------
my Str $host = 'localhost';
my Str $port = '65010';

my MongoDB::Client $client .= new(:uri("mongodb://$host:$port"));
my MongoDB::Database $db = $client.database('Issue33');
my MongoDB::Collection $col = $db.collection('concurrency');

note "\n1st test; separate insert commands";
$db.run-command(BSON::Document.new((:1dropDatabase,))).raku;

my Lock $lock .= new;
my Int $n = 100;
my Num $total-i0 = 0e0;

my Instant $t0 = now;
await ^$n .map: {
  start {
    my Instant $ti0 = now;
    my BSON::Document $req .= new: (
      insert => $col.name,
      documents => [ BSON::Document.new( (
            :code('c'), :name('n'), :address('a'), :test_record('t')
          )
        ),
      ]
    );

    $lock.protect: {
      $db.run-command($req);
      $total-i0 += (now - $ti0).Num; 
    };
  }
}

my Num $t0n = (now - $t0).Num;
note "\nTotal run time 1st test: ", $t0n.fmt('%5.3f');
note "Divide by nbr of inserts ($n): ", ($t0n/$n).fmt('%5.3f');
note 'Time per insert in thread: ', ($total-i0/$n).fmt('%5.3f');
#note $db.run-command(BSON::Document.new((:count($col.name),))).raku;

#------------------------------------------------------------------------------
note "\n\n2nd test; gather all in one array, then send to database";
$db.run-command(BSON::Document.new((:1dropDatabase,))).raku;

my Num $total-i1 = 0e0;
my Instant $t1 = now;
my Array $docs = [];

await ^$n .map: {
  start {
    my Instant $ti1 = now;
    $lock.protect: {
        $docs.push: BSON::Document.new( (
          :code('c'), :name('n'), :address('a'), :test_record('t')
        )
      );

      $total-i1 += (now - $ti1).Num; 
    };
  }
}

my Instant $t2 = now;
my BSON::Document $req .= new: (
  insert => $col.name,
  documents => $docs
);

my BSON::Document $doc = $db.run-command($req);
my Num $t2n = (now - $t2).Num; 

my Num $t1n = (now - $t1).Num; 
note "\nTotal run time 2nd test: ", $t1n.fmt('%5.3f');
note "Divide by nbr of records ($n): ", ($t1n/$n).fmt('%5.3f');
note 'Time per push in thread: ', ($total-i1/$n).fmt('%5.3f');
note "Time of single insert of $n documents: ", $t2n.fmt('%5.3f');
#note $db.run-command(BSON::Document.new((:count($col.name),))).raku;

With an example result output

1st test; separate insert commands

Total run time 1st test: 3.180
Divide by nbr of inserts (100): 0.032
Time per insert in thread: 1.086

2nd test; gather all in one array, then send to database

Total run time 2nd test: 0.501
Divide by nbr of records (100): 0.005
Time per push in thread: 0.036
Time of single insert of 100 documents: 0.412

Now, why is the meantime of an insert in the thread slower than the last single insert of 100 docs? A small investigation showed me that it starts off with about 0.3 sec/insert but slows down when it sends the following records ending at even 1.6 sec/insert. This means that the server cannot cope with that many inserts I believe, it's all I/O you know with interrupts and all that. That also means that making the program faster will not help you, only the handling of your data will! The conclusion is therefore that the 2nd test will be the best solution to your problem.

Zer0-Tolerance commented 1 year ago

hi, thanks a lot for spending time on this, I'll take a look at this to see how I can improve my program. Would it work the same for updates rather than insert ?