vapor / postgres-nio

🐘 Non-blocking, event-driven Swift client for PostgreSQL.
https://api.vapor.codes/postgresnio/documentation/postgresnio/
MIT License
304 stars 70 forks source link

PostgresNIO/ListenStateMachine.swift:182: Fatal error: Invalid state: initialized #458

Open CrownedPhoenix opened 4 months ago

CrownedPhoenix commented 4 months ago

Description

ListenStateMachine reaches an invalid state during concurrent operation of addListener on a pool that is shutting down. I'm not sure if this behavior is expected given the obvious concurrent execution described in the reproducer below.

At any rate, a fatalError here seems out of place unless there's something I'm misunderstanding about the ListenStateMachine.

To Reproduce

Simply run the following:

import NIO
import AsyncKit
import PostgresNIO
import PostgresKit

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)

let configuration = SQLPostgresConfiguration(
    hostname: "localhost",
    port: 5432,
    username: "postgres",
    password: "postgres",
    database: "test",
    tls: .disable
)
for _ in 0..<1000 {
    let pool = EventLoopGroupConnectionPool(
        source: PostgresConnectionSource(sqlConfiguration: configuration),
        on:  group
    )

    Task {
        for _ in 0..<100 {
            _ = pool.withConnection({ conn in
                let channel = "default"
                _ = conn.addListener(channel: channel) { _, notifications in
                    assert(notifications.channel == channel, "Notification channel mismatch")
                }

                return group.future()
            })
        }
    }

    let promise: EventLoopPromise<Void> = group.next().makePromise()

    pool.shutdownGracefully({ _ in
        promise.succeed(())

    })
    try promise.futureResult.wait()
}

with the following dependencies:

        .package(url: "https://github.com/apple/swift-nio.git",  from: "2.54.0"),
        .package(url: "https://github.com/vapor/async-kit.git", exact: "1.17.0"),
        .package(url: "https://github.com/vapor/postgres-nio.git", exact: "1.20.0"),
        .package(url: "https://github.com/vapor/postgres-kit.git", exact: "2.11.3"),

and the following for the database setup:

docker run \
    --detach \
    --name timescaledb \
    --publish 5432:5432 \
    --env POSTGRES_HOST_AUTH_METHOD=trust \
    --env TIMESCALEDB_TELEMETRY=off \
    timescale/timescaledb:latest-pg16;

psql -h localhost -U postgres -w -c 'create database test'

the result will be:

PostgresNIO/ListenStateMachine.swift:182: Fatal error: Invalid state: initialized

Expected behavior

My assumption about the behavior for the code would simply be that once the EventLoopGroupConnectionPool gracefully shuts down, all outstanding listeners created with addListener would be cleaned up or closed since the connections they are listening on have been shutdown.

Environment

fabianfett commented 4 months ago

@CrownedPhoenix Thank you for the bug report. In order to prioritize this, what do you want to achieve? Above code very much looks like it is geared towards reproducing the issue.

To achieve your use-case can you use the new async listen API? Maybe in connection with the new PostgresClient? Those APIs should be way more stable as well.

The code should look like this:

@_spi(ConnectionPool) import PostgresNIO
import Logging

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
enum App {
    static func main() async throws {
        let logger = Logger(label: "test")
        let configuration = PostgresClient.Configuration(
            host: "localhost",
            username: "postgres",
            password: "postgres",
            database: "test",
            tls: .disable
        )
        let client = PostgresClient(configuration: configuration, backgroundLogger: logger)

        await withTaskGroup(of: Void.self) { taskGroup in
            taskGroup.addTask {
                await client.run()
            }

            do {
                try await client.withConnection { connection in
                    for try await message in try await connection.listen("default") {
                        // you will stay forever in this loop, until you break out of it!
                        logger.info("Message received", metadata: ["message": "\(message)"])
                    }
                }
            } catch {
                logger.error("Error happened", metadata: ["error": "String(reflecting: error)"])
            }

            taskGroup.cancelAll() // shutdown postgres client when done
        }
    }
}

If you have any questions please feel free to reach out.

CrownedPhoenix commented 4 months ago

Yeah - the reproducer I provided was mainly to simulate an error that I'm running into in practice in my test setup. I think it is concurrency related based on the reproducer and all - but I haven't been able to decide if it's something I need to change in my code or if it's a PGNIO thing.

It just depends on whether the fatalError described above is expected behavior or if it is a state that shouldn't even be reachable under any scenario.

All said, I'll give the code changes you suggest here a try with the reproducer and then see if replicating them in my own test cases will resolve the issue for me.

fabianfett commented 4 months ago

It just depends on whether the fatalError described above is expected behavior or if it is a state that shouldn't even be reachable under any scenario.

Generally I thought the state should not be reachable. For this reason, I consider this a PostgresNIO issue, which is why I haven't closed this issue. However, to be honest, I never imagined that PostgresNIO was used in the way you do.

There is lot's to unpack in the code you provided and certain issues are foreseeable. This is why I provided you the code to show you, how I recommend to use PostgresNIO's listening support.

CrownedPhoenix commented 4 months ago

Totally understand that - if I can find a solution that doesn't require you to change anything that will be great for me as well. I'm going to see if I can refine the reproducer a little more.

In the meantime, do you have any idea why PostgresClient wouldn't be able to be found? I see the @available but I'm on Sonoma so I wouldn't expect that to be an issue. I'm getting Cannot find ``PostgresClient`` in scope while other types from PostgresNIO are found just fine. I get the same issue building on linux. I'm just adding it straight to the code in the reproducer so no other dependencies or environment things have changed.

CrownedPhoenix commented 4 months ago

Oh I need to do the @_spi thing I bet. Sorry i missed that.

fabianfett commented 4 months ago

Yeah. We intend to drop the requirement to use @_spi very soon.

fabianfett commented 4 months ago

Yeah. We intend to drop the requirement to use @_spi very soon.

https://github.com/vapor/postgres-nio/pull/460

CrownedPhoenix commented 4 months ago

Alright, it looks like using PostgresClient resolves this issue. However, it's not something I'm going to be able to use in the near-term because of sweeping the scope of the change that would be required to swap it out for EventLoopGroupConnectionPool. The main reason being that I can't get an SQLDatabase from PostgresClient synchronously like I can from EventLoopGroupConnectionPool with:

extension EventLoopGroupConnectionPool where Source == PostgresConnectionSource {
    public func database(logger: Logger) -> any PostgresDatabase {
        _EventLoopGroupConnectionPoolPostgresDatabase(pool: self, logger: logger)
    }
}

The analogous code with PostgresClient being:

try await client.withConnection({ $0.sql() })

Because of this there's a bunch of places where I depend on synchronous behavior that I'd have to change to async - which is doable just large scale in my case unfortunately.

I am interested in pursuing the use of PostgresClient though. So in the meantime I'm willing to consider this issue closed and leave a ticket in my backlog for swapping over to use PostgresClient.

NeedleInAJayStack commented 3 months ago

I've been diving into this issue a bit recently as well, and I think the root of the issue is that @CrownedPhoenix is using let channel = "default", which is a Postgres reserved keyword. I've ensured LISTEN quoting in this MR: https://github.com/vapor/postgres-nio/pull/466

CrownedPhoenix commented 3 months ago

@NeedleInAJayStack makes an important point. At any rate, I would expect different error behavior if PGNIO wasn't able to fulfill the listen request.

fabianfett commented 3 months ago

@CrownedPhoenix This is why I removed that #466, fixes this issue. But I have now an easy way to replicate it :)

NeedleInAJayStack commented 3 months ago

Oh good call - thanks for un-fixing. I totally agree that we ought to better protect against the fatalError case too.