swiftlang / swift

The Swift Programming Language
https://swift.org
Apache License 2.0
67.02k stars 10.32k forks source link

Segmentation fault when misusing runSynchronously by passing serial executor when enqueueing as task executor #71307

Open Joannis opened 5 months ago

Joannis commented 5 months ago

Description

I've created a task executor based on a SwiftNIO EventLoop, and spawned tasks to a discarding task group for handling new clients connecting to my TCP server. Once I attach the executor, the example crashes.

Reproduction

import NIOCore
import NIOPosix

final class EventLoopTaskExecutor: Executor, @unchecked Sendable, TaskExecutor {
    let eventLoop: EventLoop

    init(eventLoop: EventLoop) {
        self.eventLoop = eventLoop
    }

    func enqueue(_ job: UnownedJob) {
        job.runSynchronously(on: eventLoop.executor.asUnownedSerialExecutor())
    }

    func enqueue(_ job: consuming ExecutorJob) {
        eventLoop.enqueue(job)
    }

    func asUnownedTaskExecutor() -> UnownedTaskExecutor {
        .init(ordinary: self)
    }
}

let server = try await ServerBootstrap(group: NIOSingletons.posixEventLoopGroup)
    .bind(
        host: "0.0.0.0",
        port: 2048
    ) { channel in
        channel.eventLoop.makeCompletedFuture {
            // Add any handlers for parsing or serializing messages here
            // We don't need any for this echo example

            return try NIOAsyncChannel(
                wrappingChannelSynchronously: channel,
                configuration: NIOAsyncChannel.Configuration(
                    inboundType: ByteBuffer.self, // We'll read the raw bytes from the socket
                    outboundType: ByteBuffer.self // We'll also write raw bytes to the socket
                )
            )
        }
    }

// Needed to retain the executor, unfortunately
// But we only call this code with one client at a time for now, just to test executor preferences
var _executor: EventLoopTaskExecutor?

// We create a task group to manage the lifetime of our client connections
// Each client is handled by its own structured task
try await withThrowingDiscardingTaskGroup { group in
    try await server.executeThenClose { clients in
        // Iterate over the clients as an async sequence
        for try await client in clients {
            // Every time we get a new client, we add a new task to the group
            let executor = EventLoopTaskExecutor(eventLoop: client.channel.eventLoop)
            _executor = executor
            group.addTask(executorPreference: executor) {
                // We handle the client in a separate function to keep the main loop clean
                try await handleClient(client)
            }
        }
    }
}

func handleClient(_ client: NIOAsyncChannel<ByteBuffer, ByteBuffer>) async throws {
    try await client.executeThenClose { inboundMessages, outbound in
        // We'll read from the client until the connection closes
        for try await inboundMessage in inboundMessages {
            // Write the message back to the client
            try await outbound.write(inboundMessage)
            return
        }
    }
}

Stack dump

-

Expected behavior

Task runs on the EventLoop

Environment

Apple Swift version 5.11-dev (LLVM 0c7051c13ba8526, Swift 192d275b78109ff) Target: arm64-apple-macosx14.0

Additional information

No response

ktoso commented 5 months ago

Thanks for the issue about this as we chatter earlier -- I'll investigate

ktoso commented 2 weeks ago

This is executing as a task executor, so it must pass the self as task executor, not some arbitrary other object as serial executor:

This currently does:

//@available(macOS 15.0, *)
//extension EventLoop {
//    @inlinable
//    public func enqueue(_ job: consuming ExecutorJob) {
//        let unownedJob = UnownedJob(job)
//        self.execute {
//            unownedJob.runSynchronously(on: self.asUnownedSerialExecutor())
//        }
//    }
//}

but it should:

    func enqueue(_ job: consuming ExecutorJob) {
//        eventLoop.enqueue(job)

        let unownedJob = UnownedJob(job)
        print("Job = \(unownedJob)")
        self.eventLoop.execute {
            unownedJob.runSynchronously(on: self.asUnownedTaskExecutor())
        }
    }

this works correctly, but we should diagnose better.

I'll investigate this more and follow up with improving docs and diagnostics

FranzBusch commented 2 weeks ago

I just thought about this and it is going to make it almost impossible to implement both serial and task executor on a single type right? The protocol requirement for enqueue is exactly the same so how do we know if we need to pass self as an unowned serial or task executor?