apple / swift-nio

Event-driven network application framework for high performance protocol servers & clients, non-blocking.
https://swiftpackageindex.com/apple/swift-nio/documentation
Apache License 2.0
7.93k stars 644 forks source link

Is it possible to wait for and get the result of a UDP response using NIO promise and future? #1255

Closed liuxuan30 closed 4 years ago

liuxuan30 commented 4 years ago

I'm fairly new to NIO and promise/future, and trying to pracise to develop a framework or tool to interact with a remote-control toy which has a built-in UDP server. Imagine when the WIFI signal is poor, the command could timeout or take longer to execute.

The purpose is that I could send a 'command' to the UDP server, and the UDP server will return the executed result, or timeout.

class Commander {
    public var serverAddress = "192.168.10.1"
    // Server IP: 192.168.10.1 UDP PORT:8889 <<- ->> iOS/Mac
    public var serverPort = 8889

    private let group: EventLoopGroup
    private let bootstrap: DatagramBootstrap
    private let channel: Channel

    public func sendCommand(cmd: String)  -> EventLoopFuture<String> {
        var buffer = channel.allocator.buffer(capacity: 128)
        buffer.writeString(cmd)
        let remoteAddr = try! SocketAddress.makeAddressResolvingHost(serverAddress, port: serverPort)
        let envelope = AddressedEnvelope(remoteAddress: remoteAddr, data: buffer)
        channel.writeAndFlush(envelope, promise: nil)
        // return a future so the caller could wait() on it
    }
    public init(localAddr: String, localPort: Int) {
        group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
        bootstrap = DatagramBootstrap(group: group)
            .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
            .channelInitializer({ channel in
                channel.pipeline.addHandler(CommandHandler())
            })
        channel = try! bootstrap.bind(host: localAddr, port: localPort).wait()

    }
}

I have a InBoundHandler that could print out the server response,

class CommandHandler: ChannelInboundHandler {
    public typealias InboundIn = AddressedEnvelope<ByteBuffer>
    public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let envelope = unwrapInboundIn(data)
        var buffer = envelope.data
        if let received = buffer.readString(length: buffer.readableBytes) {
            print("[Server] Reply: '\(received)'")
        }
    }
}

Now I'm struggling with the part of waiting for the server response and get the result, and the timeout part.

It seems I'm looking for something like

let response = channel.read(size, timeout = 3.0)

however there seems isn't, which I could understand

I wanted to ask, is it possible to achieve this using Promise and Future, or what's the best pracise for doing this? Thanks

liuxuan30 commented 4 years ago

After looking at massive examples, I found https://github.com/apple/swift-nio-examples/blob/master/http2-client/Sources/http2-client/main.swift seems giving an idea is to have a responsePromise.

I managed to write something like:

class Commander {
    public init(localAddr: String, localPort: Int) {
        group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
        self.localAddr = localAddr
        self.localPort = localPort
    }

    public func setupChannel() -> EventLoopFuture<Void> {
        self.bootstrap = DatagramBootstrap(group: self.group!)
            .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)

        self.channel = try! bootstrap.bind(host: localAddr, port: localPort).wait()
        self.commandPromise = channel.eventLoop.makePromise(of: String.self)
        return channel.pipeline.addHandler(CommandHandler(responsePromise: commandPromise!))
    }

    public func sendCommand(cmd: String) -> String {
        let configured = setupChannel()
        try! configured.wait()

        var buffer = channel.allocator.buffer(capacity: 1024)
        buffer.writeString(cmd)
        let remoteAddr = try! SocketAddress.makeAddressResolvingHost(serverAddress, port: serverPort)
        let envelope = AddressedEnvelope(remoteAddress: remoteAddr, data: buffer)
        channel.writeAndFlush(envelope, promise: nil)
        let result = try! commandPromise!.futureResult.wait()
        return result
    }
}

and in the handler:

class CommandHandler: ChannelInboundHandler {
    public typealias InboundIn = AddressedEnvelope<ByteBuffer>
    public typealias OutboundOut = AddressedEnvelope<ByteBuffer>

    private var responsePromise: EventLoopPromise<String>
    private var responseString: String = ""

    init(responsePromise: EventLoopPromise<String>) {
        self.responsePromise = responsePromise
    }
    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let envelope = unwrapInboundIn(data)
        var buffer = envelope.data
        if let received = buffer.readString(length: buffer.readableBytes) {
            print("[Server] Reply: '\(received)'")
            responseString.append(received)
        }
        context.fireChannelRead(data)
    }

    func channelReadComplete(context: ChannelHandlerContext) {
        responsePromise.succeed(responseString)
        context.fireChannelReadComplete()
    }

though there are some ugly try! future.wait(), this seems a minimum working prototype.

So could any expert share some best practise around this?

Lukasa commented 4 years ago

Yes, there are some ways to do this.

In the simplest case, if you are going to send one UDP message and get one UDP message back, you can use the RequestResponseHandler from swift-nio-extras. You'd use it like this:

class Commander {
    public var serverAddress = "192.168.10.1"
    // Server IP: 192.168.10.1 UDP PORT:8889 <<- ->> iOS/Mac
    public var serverPort = 8889

    private let group: EventLoopGroup
    private let bootstrap: DatagramBootstrap
    private let channel: Channel

    public func sendCommand(cmd: String)  -> EventLoopFuture<String> {
        var buffer = channel.allocator.buffer(capacity: 128)
        buffer.writeString(cmd)
        let remoteAddr = try! SocketAddress.makeAddressResolvingHost(serverAddress, port: serverPort)
        let envelope = AddressedEnvelope(remoteAddress: remoteAddr, data: buffer)
        let responsePromise = channel.eventLoop.makePromise(of: AddressedEnvelope<ByteBuffer>.self)
        channel.writeAndFlush((envelope, responsePromise), promise: nil)
        return responsePromise.futureResult
    }
    public init(localAddr: String, localPort: Int) {
        group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
        bootstrap = DatagramBootstrap(group: group)
            .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
            .channelInitializer({ channel in
                channel.pipeline.addHandler(RequestResponseHandler<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>())
            })
        channel = try! bootstrap.bind(host: localAddr, port: localPort).wait()
    }
}

However, if you have a slightly more complex system then this gets harder, so please do say if you need more than that.

liuxuan30 commented 4 years ago

ah thanks @Lukasa for this extra. Do you mind to take a look at my https://github.com/apple/swift-nio/issues/1255#issuecomment-555904421 to give some advice?

liuxuan30 commented 4 years ago

besides, can we reuse promise as a class member?

Lukasa commented 4 years ago

Promises can only complete once. You need a new promise for every message you'd like to receive.

liuxuan30 commented 4 years ago

Thanks! I think I have my answer.

Last but not least before closing, is there any better place to ask questions? Like Stack overflow? Not sure if SO can have answers in time. But your support here is amazing!

Lukasa commented 4 years ago

The Swift forums is probably the best place: we have a label there that we pay pretty decent attention to.

liuxuan30 commented 4 years ago

@Lukasa sorry, but I am confusing about what you said

Promises can only complete once. You need a new promise for every message you'd like to receive.

in your exmaple https://github.com/apple/swift-nio-examples/blob/master/http2-client/Sources/http2-client/main.swift

there is responseReceivedPromise as a class member in SendRequestHandler. So IMO it's like reusing the promise here? I suppose only one SendRequestHandler in the pipeline, so the responseReceivedPromise is being reused.

Lukasa commented 4 years ago

That's correct, but a HTTP/2 client can only receive one response on a given stream, so only one promise is required. If you look at RequestResponseHandler you can see an example of using new promises for every message.

liuxuan30 commented 4 years ago

@Lukasa I kind of understanding now. Sorry but this is really the last question 😂 I notice in the RequestResponseHandler

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
        let (request, responsePromise) = self.unwrapOutboundIn(data)

how can I write (request, responsePromise) into a buffer in my sendCommand()? This seems the last missing puzzle.

Lukasa commented 4 years ago

This is just a tuple, so you can simply write it:

    public func sendCommand(cmd: String)  -> EventLoopFuture<String> {
        var buffer = channel.allocator.buffer(capacity: 128)
        buffer.writeString(cmd)
        let remoteAddr = try! SocketAddress.makeAddressResolvingHost(serverAddress, port: serverPort)
        let envelope = AddressedEnvelope(remoteAddress: remoteAddr, data: buffer)
        let responsePromise = channel.eventLoop.makePromise(of: AddressedEnvelope<ByteBuffer>.self)
        channel.writeAndFlush((envelope, responsePromise), promise: nil)
        return responsePromise.futureResult
liuxuan30 commented 4 years ago

oh that's sweet, I didn't realize writeAndFlush can takes any T. Thanks so much!

nonara commented 2 years ago

Hello!

I'm encountering an issue with similar code and am not certain what to do.

I implemented a timeout to cancel the promise. This works well, but it seems that when I call my send function with data that won't ever receive a response, the next time I call the function with data that does get a response, it times out as though it never got one.

Code

let channel = DatagramBootstrap(group: eventGroup)
  .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEPORT), value: 1)
  .channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_BROADCAST), value: 1)
  .channelInitializer { channel in
    channel.pipeline.addHandler(RequestResponseHandler<AddressedEnvelope<ByteBuffer>, AddressedEnvelope<ByteBuffer>>())
  }
  .bind(to: serverAddress).wait()

func sendPacketAndGetResponse(
  destAddr: SocketAddress,
  data: Data,
  timeout: TimeAmount = .seconds(5)
) async throws -> AddressedEnvelope<ByteBuffer> {
  let envelope = AddressedEnvelope(remoteAddress: destAddr, data: ByteBuffer(data: data))

  let responsePromise = channel!.eventLoop.makePromise(of: AddressedEnvelope<ByteBuffer>.self)
  let timeoutSchedule = channel!.eventLoop.scheduleTask(in: timeout) {
    responsePromise.fail(Errors.packetResponseTimeout)
  }

  channel!.writeAndFlush((envelope, responsePromise), promise: nil)
  let res = try await responsePromise.futureResult.get()

  timeoutSchedule.cancel()

  return res
}

Behaviour

Lukasa commented 2 years ago

RequestResponseHandler does not remove promises from its internal buffer simply because you failed them. This is probably a deficiency in this type: if the responsePromise is failed elsewhere then we probably ought to remove it from our list. But it's not clear that that's right.

The easiest thing to do in your case may be to copy the RequestResponseHandler into your own code and tweak it so that it handles timeouts directly, and can therefore remove the appropriate promise when it hits a timeout.

nonara commented 2 years ago

@Lukasa Thank you. I suspected as much. I appreciate the direction!