vapor / postgres-nio

🐘 Non-blocking, event-driven Swift client for PostgreSQL.
https://api.vapor.codes/postgresnio/documentation/postgresnio/
MIT License
305 stars 70 forks source link

Error "How can we receive a read, if the connection is closed" #435

Closed notcome closed 8 months ago

notcome commented 8 months ago

Describe the bug

I am very new to Swift Server Side and Vapor, and I am confused by this migration to Swift concurrency. I saw AsyncKit being deprecated and decided to give the new ConnectionPool a try.

I found that seconds after I read the database, I saw this preconditionFailure:

PostgresNIO/ConnectionStateMachine.swift:657: Fatal error: How can we receive a read, if the connection is closed

To Reproduce

I have done a little bit wrapping around Vapor. My reading activity mostly happens inside this extension:

extension Request {
    func readingDB<Result>(closure: @escaping @Sendable (PostgresConnectionSource.Connection) async throws -> Result) async throws -> Result {
        try await application.postgresClient.withConnection { connection in
            try await connection.sql().raw("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").run()
            do {
                let result = try await closure(connection)
                try await connection.sql().raw("COMMIT").run()
                return result
            } catch {
                try await connection.sql().raw("COMMIT").run()
                throw error
            }
        }
    }
}

and helpers:

enum PostgresClientKey: StorageKey {
    typealias Value = PostgresClient
}

extension Application {
    var postgresClient: PostgresClient! {
        get { self.storage[PostgresClientKey.self] }
        set { self.storage[PostgresClientKey.self] = newValue }
    }
}

The client starts with:

 let postgresConfiguration = PostgresClient.Configuration(
    host: hostname,
    port: port,
    username: username,
    password: password,
    database: databaseName,
    tls: .require(tlsConfiguration))
let postgresClient = PostgresClient(configuration: postgresConfiguration, backgroundLogger: Logger(label: "postgres"))
Task {
    await postgresClient.run()
}

Expected behavior

No error.

Environment

notcome commented 8 months ago

Oh when trying to reproduce more instances of this, I found this line to crash:

https://github.com/vapor/postgres-nio/blob/c8269926eb3b705b70aff1975860e357760123c8/Sources/ConnectionPoolModule/PoolStateMachine%2BConnectionGroup.swift#L441

self.stats.availableStreams is 0 and closeAction.maxStreams is 1.

I only encountered this once.

fabianfett commented 8 months ago

@notcome How many requests do you need for this? Is a single request sufficient?

fabianfett commented 8 months ago

@notcome I just created this simple testcase based on what you are describing:

    func testKeepOpenAfterRequest() async throws {
        var mlogger = Logger(label: "test")
        mlogger.logLevel = .debug
        let logger = mlogger
        let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 8)
        self.addTeardownBlock {
            try await eventLoopGroup.shutdownGracefully()
        }

        var tlsConfiguration = TLSConfiguration.makeClientConfiguration()
        tlsConfiguration.certificateVerification = .none
        let clientConfig = PostgresClient.Configuration(
            host: env("POSTGRES_HOSTNAME") ?? "localhost",
            port: env("POSTGRES_PORT").flatMap({ Int($0) }) ?? 5432,
            username: env("POSTGRES_USER") ?? "test_username",
            password: env("POSTGRES_PASSWORD") ?? "test_password",
            database: env("POSTGRES_DB") ?? "test_database",
            tls: .prefer(tlsConfiguration)
        )
        let client = PostgresClient(configuration: clientConfig, eventLoopGroup: eventLoopGroup, backgroundLogger: logger)

        try await withThrowingTaskGroup(of: Void.self) { taskGroup in
            taskGroup.addTask {
                await client.run()
            }

            for i in 0..<1 {
                taskGroup.addTask {
                    try await client.withConnection() { connection in
                        _ = try await connection.query("SELECT 1", logger: logger)
                    }
                    print("done: \(i)")
                }
            }

            for _ in 0..<1 {
                _ = await taskGroup.nextResult()!
            }

            try await Task.sleep(for: clientConfig.options.connectionIdleTimeout +  .seconds(5))

            taskGroup.cancelAll()
        }
    }

Sadly that doesn't surface the issue. Can you please help me here? That would be really appreciated.

notcome commented 8 months ago

First of all I would be really happy to help you, but I am not sure what to do next.

I have three complications here:

  1. I must point out that I am really new to backend development and I am not even using a local Postgres for testing. Instead I connect directly to AWS west-1 RDS instance all the way from China, and in case you don't know China <-> US connection can be extremely unstable. I am not sure if this is ever related.
  2. I realize that I don't correctly assign an EventLoopGroup to my PostgresClient.
  3. I realize that there is a bug in my query code: I opened another withConnection inside a withConnection closure.

During the process of me inspecting my code again and fixing 2 and 3, I found that:

How many requests do you need for this? Is a single request sufficient?

If you mean connections, it seems to require two nested connections as I said above. If you mean requests inside my connections, my query wrapper opens a transaction, executes a closure which does at least one request, and commits the transaction.

Please let me know if you want any additional info.

notcome commented 8 months ago

I now consistently hit the issue of self.stats.availableStreams -= closeAction.maxStreams. It appears to happen whenever a connection went idle.

notcome commented 8 months ago

Okay, now I have read lots of code I have a better understanding of what happened. The issue self.stats.availableStreams -= closeAction.maxStreams happens because keep alive action is triggered twice for a very short period of time. Since each keep alive action reduces availableStreams by 1, if we have one available connection, we get an overflow error.

That means something is wrong with the state machine I suppose? Maybe I can make it to be easier to surface by setting a very short keep alive period.

notcome commented 8 months ago

I should probably move this issue to a separate discussion right? I think the original issue disappeared after I properly assign an event loop and avoid opening a connection inside a connection.