stablekernel / postgresql-dart

Dart PostgreSQL driver: supports extended query format, binary protocol and statement reuse.
https://www.dartdocs.org/documentation/postgres/latest
BSD 3-Clause "New" or "Revised" License
127 stars 32 forks source link

Long running queries / connection leak #104

Open jimmyff opened 5 years ago

jimmyff commented 5 years ago

I'm running in to a strange issue using this package with GCP Cloud SQL.

If I generate a lot of traffic I start to see timeout errors in places I really wouldn't expect to get timeout errors (eg: a simple insert statement). After running under high load for a while I then start getting this error: Caught connect error, PostgreSQLSeverity.fatal 53300: remaining connection slots are reserved for non-replication superuser connections , #0 PostgreSQLConnection.open (package:postgres/src/connection.dart:133:7)

When I look at the pg_stat_activity I see many queries doing the insert with wait_event = "ClientRead" and state = "idle in transaction". Not sure how I could fail to close a transaction?

I added a timeoutInSeconds to both the queries in my file however now the connection pool is getting filled up with the delete queries, they show wait_event = "tuple" and state = "active".

I think it sounds like I have a connection leak but I'm not sure how. I've included my code below.


/// Request should provide the AccountReplicateMessage object as it's body
Future<shelf.Response> accountReplicateService(shelf.Request request) async {
  final requestString = await request.readAsString();
  if (requestString.isEmpty) {
    return shelf.Response(400,
        body: '400: Bad Request', headers: {'Cache-Control': 'no-store'});
  }
  print('accountReplicateService request: $requestString');
  final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64
      .decode(json.decode(requestString)['message']['data'].toString()))));

  final db = GetIt.instance<DatabaseService>().db;
  try {
    await db.open();
  } catch (e, s) {
    print(['Caught connect error', e, s]);
    return shelf.Response.internalServerError(
        body: 'Failed to connect to Database.',
        headers: {'Cache-Control': 'no-store'});
  }

  try {
    final account = message.account;
    var result = await db.transaction((PostgreSQLExecutionContext ctx) async {

      await ctx.query("DELETE FROM public.profile where id = @idParam",
          substitutionValues: {"idParam": account.uid}, timeoutInSeconds: 10);

      await ctx.query("""
    INSERT INTO public.profile (
        id, dob, visible
    ) VALUES (
      @idParam, @dobParam, @visibleParam, 
    )
  """, substitutionValues: {
        "idParam": account.uid,
        "dobParam": account.dob,
        "visibleParam": account.profile.visible,
      }, timeoutInSeconds: 30);

      // Calculate the bloom filter bits
      final List<int> bitIndexes = BloomFilter.hashIndexesWithSize<String>(
          ServiceConfig.bloomFilterBitCount,
          ServiceConfig.bloomFilterExpectedItems,
          account.uid);

      final List<String> profileIds = await executeSearch(
          ctx, bitIndexes.first, account.profile, account.uid);

      final replicatedMessage = AccountReplicatedMessage((b) => b
        ..uid = account.uid
        ..bloomFilterBits = BuiltList<int>.from(bitIndexes).toBuilder()
        ..bloomFilterBitLastSearched = bitIndexes.first
        ..profiles = BuiltList<String>.from(profileIds).toBuilder());

      // publish account_replicated message
      final gapi = await GetIt.instance<GapiService>().gapiClient;

      final PubsubApi pubsubApi =
          GetIt.instance<PubSubService>().pubsubApi(gapi);
      final response = await pubsubApi.projects.topics.publish(
          PublishRequest.fromJson({
            "messages": [
              {
                "attributes": <String, String>{
                  "uid": account.uid,
                },
                "data": base64Encode(
                    utf8.encode(json.encode(replicatedMessage.toJsonMap()))),
                "publishTime": DateTime.now().toUtc().toIso8601String()
              }
            ]
          }),
          'projects/onesceneapp/topics/account_replicated');

      print('Inserted: ${account.uid}.  x${profileIds.length} search results.');

    });
    await db.close();

    return shelf.Response.ok('', headers: {'Cache-Control': 'no-store'});
  } catch (e, s) {
    print(e);
    print(s);
    await db.close();
    return shelf.Response.internalServerError(
        headers: {'Cache-Control': 'no-store'});
  }
}
itsjoeconway commented 5 years ago

I'm not 100% sure of what the exact cause of your issue is, but you need a query queue if you are running into the connection limit of your database server - regardless of whether or not you are leaking connections. You can accomplish this easily by just reusing the database connection - there is a queue built into it. Creating a new database connection and closing it for every HTTP request is more expensive than it needs to be.

You can see how Aqueduct does this here, or use some other pooling package available on pub. FWIW, you could also use Aqueduct, cut all of this code in half, push the test surface to the framework instead of your code, and avoid difficult to debug statements like this one: final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64.decode(json.decode(requestString)['message']['data'].toString()))));

jimmyff commented 5 years ago

Okay, I'll have a go at switching over to Aqueduct, I initially thought it was maybe a little overkill for what I needed but as I'm already running in to issues using shelf maybe I should just make the switch. Is it relatively straight forward to switch from shelf to Aqueduct?

How would it avoid those difficult to debug statements?

Thanks

itsjoeconway commented 5 years ago

Not too sure on what it takes to switch - it can't be too bad: the method you've shown here would be pretty close to valid just by switching shelf. with aqueduct. The routing and application structure are different, but that's the easy part anyway.

bubnenkoff commented 3 years ago

It seems that I faced with very similar issue!

At some point requests start to be processed extremely slowly and until I kill the connection the speed won't increase

https://stackoverflow.com/questions/68375709/why-can-a-query-periodically-take-10-times-longer-to-execute

isoos commented 3 years ago

@bubnenkoff: I've created package:postgres_pool for this exact reason: it allows you to have periodically reconnected connections based on configured settings like query counts, connection age, total session duration...

bubnenkoff commented 3 years ago

@isoos oh cool! Thanks for fast answer. I just to thought how to contact with you. Even wrote to Telegram (do not know is it you or not). Am I right understand that postgres_pool is just like hack to prevent such leek?

isoos commented 3 years ago

I have no Telegram, so that's not me :)

Am I right understand that postgres_pool is just like hack to prevent such leek?

It also has retry logic for transactions, and concurrency control.

bubnenkoff commented 3 years ago

@isoos but is there any chance that driver will get fixed in nearest time? I can try postgres_pool, but I also can wait for a fixing driver.

isoos commented 3 years ago

@bubnenkoff stablekernel's repo is no longer active, I maintain a fork and I have uploader rights to the package. If somebody wants to work on this, I'll be happy to accept PRs, but for me reconnecting periodically solves the pain, so I won't put much effort into it myself... I'm not even sure if it is something the Dart package should fix, or just some kind of resource aggregation on the connection that would happen anyway...

bubnenkoff commented 3 years ago

@isoos thanks! Is it possible to do something like:

main() {
  // ...
  Timer.periodic(Duration(hours: 1), (timer) {
   connection.close();
   connection.open();
  });

I do not want to rewrite code for now. And need any simple hack. Is it good idea to do so?

isoos commented 3 years ago

I think postgres_pool will serve you well:

bubnenkoff commented 3 years ago

@isoos Could you help me, I am writing on dart not too often. How to pass this connections settings?

PgPoolSettings? settings;

void main() async {

pg = PgPool(
    PgEndpoint(
        host: 'localhost',
        port: 5432,
        database: 'mydb',
        username: 'postgres',
        password: '123'),
        settings: settings // what should be here?
  ) ;
isoos commented 3 years ago

@bubnenkoff: I've updated the example here: https://github.com/agilord/postgres_pool/blob/master/example/example.dart#L4-L15

bubnenkoff commented 3 years ago

I found interesting setting in new PG. Is it about this issue? https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-IDLE-SESSION-TIMEOUT

Пт, 23 июля 2021 г. в 20:24, István Soós @.***>:

@bubnenkoff https://github.com/bubnenkoff: I've updated the example here:

https://github.com/agilord/postgres_pool/blob/master/example/example.dart#L4-L15

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/stablekernel/postgresql-dart/issues/104#issuecomment-885787238, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABRWNFRQDVMH3HDF2NSLU3LTZGQUJANCNFSM4I4YJKZQ .