MihaelIsaev / FCM

⚡️ PushNotifications through FireBase for Vapor 3 and 4.
MIT License
120 stars 33 forks source link

Using FCM Causing Many Database Connections (Vapor 3) #28

Closed rlziii closed 3 years ago

rlziii commented 3 years ago

When using the Vapor 3 version of the FCM library (version 1.2.0) on a Heroku instance configured with a Hobby Dev Postgre add-on (with 20 database connections), I am experiencing many of the following errors:

[ ERROR ] PostgreSQLError.server.fatal.InitializeSessionUserId: too many connections for role "jmrwhxwhcnapde" (ErrorMiddleware.swift:26)

When I disable the FCM route (I'm only using a single one), the errors go away. Generally the app will sit comfortably with around 17 of 20 database connections being used at any given time (note that I'm not using any special NIOServerConfig or DatabaseConnectionPoolConfig for the app). One thing to note is that the push notifications work fine up until I start receiving the "too many connections" errors, in which case Heroku starts having problems resolving most of the requests sent to the server until I restart the Vapor app manually. Any help would be appreciated. Thanks. 👍

Here are the relevant pieces of code for my FCM setup:

In configure.swift I have the following in my configure(_:_:_:) method (note that FCMCredentials is just a struct that provides static strings for the email, projectId, and key values):

let fcm = FCM(email: FCMCredentials.email,
                  projectId: FCMCredentials.projectId,
                  key: FCMCredentials.key)
fcm.apnsDefaultConfig = FCMApnsConfig(headers: [:], aps: FCMApnsApsObject(sound: "default"))
services.register(fcm, as: FCM.self)

My entire FirebaseToken.swift file:

import FCM
import FluentPostgreSQL
import Vapor

final class FirebaseToken: PostgreSQLUUIDModel, Content {
    var id: UUID?
    var token: String
    var userID: User.ID

    init(token: String, userID: User.ID) {
        self.token = token
        self.userID = userID
    }
}

extension FirebaseToken: PostgreSQLMigration {
    static func prepare(on connection: PostgreSQLConnection) -> Future<Void> {
        return Database.create(self, on: connection) { builder in
            try addProperties(to: builder)
            builder.reference(from: \.userID, to: \User.id)
            builder.unique(on: \.token)
        }
    }
}

extension FirebaseToken {
    func sendPush(title: String, message: String, on req: Container) throws -> Future<Void> {
        return try FirebaseToken.sendPush(title: title, message: message, token: token, on: req)
    }

    static func sendPush(title: String, message: String, token: String, on container: Container) throws -> Future<Void> {
        let fcm = try container.make(FCM.self)
        let message = FCMMessage(token: token, notification: FCMNotification(title: title, body: message))

        return try fcm.sendMessage(container.make(Client.self), message: message).transform(to: ()).catchFlatMap { error in
            guard let googleError = error as? GoogleError, let fcmError = googleError.fcmError else {
                return container.eventLoop.newSucceededFuture(result: ())
            }

            switch fcmError.errorCode {
            // Delete the token if it is unregistered.
            case .unregistered:
                return container.requestPooledConnection(to: .psql).flatMap { conn in
                    return FirebaseToken.query(on: conn).filter(\.token == token).first().flatMap { firebaseToken in
                        defer {
                            try? container.releasePooledConnection(conn, to: .psql)
                        }

                        guard let firebaseToken = firebaseToken else {
                            return container.eventLoop.newSucceededFuture(result: ())
                        }

                        return firebaseToken.delete(on: conn).transform(to: ())
                    }.always {
                        try? container.releasePooledConnection(conn, to: .psql)
                    }
                }
            default:
                return container.eventLoop.newSucceededFuture(result: ())
            }
        }
    }
}

extension Array where Element: FirebaseToken {
    func sendPush(title: String, message: String, on container: Container) throws -> Future<Void> {
        return try map { try $0.sendPush(title: title, message: message, on: container) }.flatten(on: container)
    }
}

My route that I'm using to send push notifications:

func sendNotificationToUser(_ req: Request, payload: UserNotification) throws -> Future<HTTPStatus> {
    return User.find(payload.userID, on: req).flatMap { user in
        guard let user = user, let userID = user.id else {
            throw Abort(.badRequest, reason: "User not found.")
        }

        return FirebaseToken.query(on: req).filter(\.userID == userID).all().flatMap { firebaseTokens in
            return try firebaseTokens.sendPush(title: payload.title, message: payload.body, on: req)
                .transform(to: .ok)
        }
    }
}
wibed commented 3 years ago

you are using flatten on futures opening a connection to a database. futures wont wait for each other so it opens all connections simultaniously, which causes crashes as soon as you have opened too many connections. the only workaround i found to work is to gather the futures up and then resolve them one by one

you could also chunk them into working sizes and then flatten them, you'd have to bake some intelligence into it to check your resources.

in my case it was more practical to push the futures into an array and dispatch them in a background thread or similar

rlziii commented 3 years ago

My latest iteration on the route is this:

func sendNotificationToUser(_ req: Request, payload: UserNotification) throws -> Future<HTTPStatus> {
    return User.find(payload.userID, on: req).flatMap { user in
        guard let user = user, let userID = user.id else {
            throw Abort(.badRequest, reason: "User not found.")
        }

        guard let fcm = try? req.make(FCM.self) else {
            throw Abort(.internalServerError, reason: "Could not make FCM service.")
        }

        DispatchQueue.global(qos: .background).async {
            let firebaseTokens = try? FirebaseToken.query(on: req).filter(\.userID == userID).all().wait()

            firebaseTokens?.forEach { firebaseToken in
                let message = FCMMessage(token: firebaseToken.token, notification: FCMNotification(title: payload.title, body: payload.body))
                _ = try? fcm.sendMessage(req.client(), message: message).wait()
            }
        }

        return req.future(.ok)
    }
}

But I'm still seeing the same "too many connections" issues that I was having before, unfortunately.

MihaelIsaev commented 3 years ago

I don't have Vapor3 projects anymore so can't write working code for you, but it looks like @wibed is right, you should open one connection to database before calling sendPush and pass it into it.

Something like this:

extension FirebaseToken {
    func sendPush(title: String, message: String, on req: Container) throws -> Future<Void> {
        return container.requestPooledConnection(to: .psql).flatMap { conn in
            return try FirebaseToken.sendPush(title: title, message: message, token: token, conn : conn, on: req).always {
                try? container.releasePooledConnection(conn, to: .psql)
            }
        }
    }

    static func sendPush(title: String, message: String, token: String, conn: PostgresConnection, on container: Container) throws -> Future<Void> {
        let fcm = try container.make(FCM.self)
        let message = FCMMessage(token: token, notification: FCMNotification(title: title, body: message))

        return try fcm.sendMessage(container.make(Client.self), message: message).transform(to: ()).catchFlatMap { error in
            guard let googleError = error as? GoogleError, let fcmError = googleError.fcmError else {
                return container.eventLoop.newSucceededFuture(result: ())
            }
            switch fcmError.errorCode {
            // Delete the token if it is unregistered.
            case .unregistered:
                guard let firebaseToken = firebaseToken else {
                    return container.eventLoop.newSucceededFuture(result: ())
                }
                return firebaseToken.delete(on: conn).transform(to: ())
            default:
                return container.eventLoop.newSucceededFuture(result: ())
            }
        }
    }
}

extension Array where Element: FirebaseToken {
    func sendPush(title: String, message: String, conn: PostgresConnection, on container: Container) throws -> Future<Void> {
        return try map { try $0.sendPush(title: title, message: message, conn: conn, on: container) }.flatten(on: container)
    }
}
rlziii commented 3 years ago

Thanks for the reply, @MihaelIsaev. I tried both of the following setups with no luck unfortunately. Still getting the same "too many connections" errors.

Adding the requestPooledConnection/releasePooledConnection to the sendPush(title:message:on:) method (as suggested above).

extension FirebaseToken {
    func sendPush(title: String, message: String, on req: Container) throws -> Future<Void> {
        let token = self.token

        return req.requestPooledConnection(to: .psql).flatMap { conn in
            return try FirebaseToken.sendPush(title: title, message: message, token: token, conn: conn, on: req).always {
                try? req.releasePooledConnection(conn, to: .psql)
            }
        }
    }

    static func sendPush(title: String, message: String, token: String, conn: PostgreSQLConnection, on container: Container) throws -> Future<Void> {
        let fcm = try container.make(FCM.self)
        let message = FCMMessage(token: token, notification: FCMNotification(title: title, body: message))

        return try fcm.sendMessage(container.make(Client.self), message: message).transform(to: ()).catchFlatMap { error in
            guard let googleError = error as? GoogleError, let fcmError = googleError.fcmError else {
                return container.eventLoop.newSucceededFuture(result: ())
            }

            switch fcmError.errorCode {
            // Delete the token if it is unregistered.
            case .unregistered:
                return FirebaseToken.query(on: conn).filter(\.token == token).first().flatMap { firebaseToken in
                    guard let firebaseToken = firebaseToken else {
                        return container.eventLoop.newSucceededFuture(result: ())
                    }

                    return firebaseToken.delete(on: conn).transform(to: ())
                }
            default:
                return container.eventLoop.newSucceededFuture(result: ())
            }
        }
    }
}

extension Array where Element: FirebaseToken {
    func sendPush(title: String, message: String, on container: Container) throws -> Future<Void> {
        return try map { try $0.sendPush(title: title, message: message, on: container) }.flatten(on: container)
    }
}

Adding the requestPooledConnection/releasePooledConnection to the Array extension's sendPush(title:message:conn:on:) method, as it seems like maybe this was the intention to only create one connection and pass it into each subsequent call to sendPush(...).

extension FirebaseToken {
    func sendPush(title: String, message: String, conn: PostgreSQLConnection, on req: Container) throws -> Future<Void> {
        return try FirebaseToken.sendPush(title: title, message: message, token: token, conn: conn, on: req)
    }

    static func sendPush(title: String, message: String, token: String, conn: PostgreSQLConnection, on req: Container) throws -> Future<Void> {
        let fcm = try req.make(FCM.self)
        let message = FCMMessage(token: token, notification: FCMNotification(title: title, body: message))

        return try fcm.sendMessage(req.make(Client.self), message: message).transform(to: ()).catchFlatMap { error in
            guard let googleError = error as? GoogleError, let fcmError = googleError.fcmError else {
                return conn.eventLoop.newSucceededFuture(result: ())
            }

            switch fcmError.errorCode {
            // Delete the token if it is unregistered.
            case .unregistered:
                return FirebaseToken.query(on: conn).filter(\.token == token).first().flatMap { firebaseToken in
                    guard let firebaseToken = firebaseToken else {
                        return conn.eventLoop.newSucceededFuture(result: ())
                    }

                    return firebaseToken.delete(on: conn).transform(to: ())
                }
            default:
                return conn.eventLoop.newSucceededFuture(result: ())
            }
        }
    }
}

extension Array where Element: FirebaseToken {
    func sendPush(title: String, message: String, on req: Container) throws -> Future<Void> {
        return req.requestPooledConnection(to: .psql).flatMap { conn in
            return try self.map { token in
                try token.sendPush(title: title, message: message, conn: conn, on: req)
            }.flatten(on: req).always {
                try? req.releasePooledConnection(conn, to: .psql)
            }
        }
    }
}
MihaelIsaev commented 3 years ago

Yeah, I definitely wanted to add it into extension Array but it was late night 😅 Ok, if it shows too many connections even when you open connection inside array extension then you need to open it somewhere before that since you're calling it multiple times in a row.

rlziii commented 3 years ago

@MihaelIsaev, thanks again for your help! I took in account what you said and moved the connection pool request into the route itself. I ended up with the following solution that has been working, thanks to help from @vzsg from the Vapor Discord server as well (who helped me work through some other smaller errors I had been making and helped with the NIOServerConfig and DatabaseConnectionPoolConfig setup):

In FirebaseToken.swift...

extension FirebaseToken {
    func sendPush(client: Client, fcm: FCM, title: String, message: String, on conn: PostgreSQLConnection) throws -> Future<Void> {
        return try FirebaseToken.sendPush(client: client, fcm: fcm, title: title, message: message, token: token, on: conn)
    }

    static func sendPush(client: Client, fcm: FCM, title: String, message: String, token: String, on conn: PostgreSQLConnection) throws -> Future<Void> {
        let message = FCMMessage(token: token, notification: FCMNotification(title: title, body: message))

        return try fcm.sendMessage(client, message: message).transform(to: ()).catchFlatMap { error in
            guard let googleError = error as? GoogleError, let fcmError = googleError.fcmError else {
                return conn.eventLoop.newSucceededFuture(result: ())
            }

            switch fcmError.errorCode {
            // Delete the token if it is unregistered.
            case .unregistered:
                return FirebaseToken.query(on: conn).filter(\.token == token).first().flatMap { firebaseToken in
                    guard let firebaseToken = firebaseToken else {
                        return conn.eventLoop.newSucceededFuture(result: ())
                    }

                    return firebaseToken.delete(on: conn).transform(to: ())
                }
            default:
                return conn.eventLoop.newSucceededFuture(result: ())
            }
        }
    }
}

extension Array where Element: FirebaseToken {
    func sendPush(client: Client, fcm: FCM, title: String, message: String, on conn: PostgreSQLConnection) throws -> Future<Void> {
        try map {
            try $0.sendPush(client: client, fcm: fcm, title: title, message: message, on: conn)
        }.flatten(on: conn)
    }
}

In NotificationController.swift...

func sendNotificationToUser(_ req: Request, payload: UserNotification) throws -> Future<HTTPStatus> {
    return User.find(payload.userID, on: req).flatMap { user in
        guard let user = user, let userID = user.id else {
            throw Abort(.badRequest, reason: "User not found.")
        }

        return req.withPooledConnection(to: .psql) { conn in
            return FirebaseToken.query(on: conn).filter(\.userID == userID).all().flatMap { firebaseTokens in
                return try firebaseTokens.sendPush(client: req.client(), fcm: req.make(FCM.self), title: payload.title, message: payload.body, on: conn)
                    .transform(to: .ok)
            }
        }
    }
}

In configure.swift...

services.register(NIOServerConfig.default(workerCount: 4))
services.register(DatabaseConnectionPoolConfig(maxConnections: 4))

With these changes I am no longer receiving "too many connections" errors or H12 "Request timeout" errors. 🎉

MihaelIsaev commented 3 years ago

I'm glad you figured out how to do it the best way! 🚀 Hope your experience will help other people as well 😌

P.S. Yep @vzsg is awesome guy 😎