wahern / cqueues

Continuation Queues: Embeddable asynchronous networking, threading, and notification framework for Lua on Unix.
http://25thandclement.com/~william/projects/cqueues.html
MIT License
250 stars 37 forks source link

Errors thrown when sockets are closed. #9

Open daurnimator opened 10 years ago

daurnimator commented 10 years ago

When you have a :read() going on in one coroutine, and :close() the socket in another, :read() will throw an error:

/usr/local/share/lua/5.1/cqueues/socket.lua:397: calling 'recv' on bad self (socket closed)
wahern commented 10 years ago

I copied the semantics that Lua uses for file objects. Lua does the same thing: lua -e 'f = io.open("/dev/null", "r") f:close() f:read()'.

You can use :shutdown to terminate the connection cleanly, and then subsequent read operations should fail normally.

daurnimator commented 10 years ago

I copied the semantics that Lua uses for file objects.

In normal lua; you can't have more than one operation going on at once.

What I'm asking for, is that if I :close(), any pending :read()s will return nil, "closed" or similar.

You can use :shutdown to terminate the connection cleanly, and then subsequent read operations should fail normally.

Will :shutdown wake up my yielded :read? Or do I need to follow it up with a cq:cancel(sock:pollfd())?

What about in the case where I am not sure if there is an ongoing read? And I want to ensure the socket will be closed promptly....

wahern commented 10 years ago

On Tue, Oct 21, 2014 at 04:13:30PM -0700, daurnimator wrote:

I copied the semantics that Lua uses for file objects.

In normal lua; you can't have more than one operation going on at once.

What I'm asking for, is that if I :close(), any pending :read()s will return nil, "closed" or similar.

I considered that at the time. But I also couldn't think of any reason for one thread to close a socket currently being used by another. The only exception I thought of was cancellation. But asynchronous cancellation is such an ugly and rarely used pattern that I just figured it wasn't worth putting in the effort to make it built-in.

I think the proper way to do cancellations is to use the built-in condition variable object. User applications can do this by creating a socket proxy object that polls on both the socket and the CV when performing I/O. I actually recently implemented this pattern for listening sockets, because I needed to cancel sock:clients() and put the daemon into a standby mode, waiting to either exit or resume accepting new connections. I've attached that code.

I'm hesitant to build cancellations into the core API until I have an idea about the different patterns that it might be used with and how it might be implemented without costing too much in the 99% of cases that don't need asynchronous cancellation. So far I have two examples: yours and mine. But they both seemed trivially solved by the existing components.

You can use :shutdown to terminate the connection cleanly, and then subsequent read operations should fail normally.

Will :shutdown wake up my yielded :read?

sock:shutdown"r" works as expected on OS X and Linux (just tested). sock:read returns as-if the other end closed the socket.

I don't remember if I tested the behavior elsewhere before, but it would be odd if it didn't work the same way.

Or do I need to follow it up with a cq:cancel(sock:pollfd())?

No need to cancel the descriptor because shutdown doesn't invalidate the descriptor number.

What about in the case where I am not sure if there is an ongoing read? And I want to ensure the socket will be closed promptly....

I think the following hack should suffice, although it's not much more inelegant than async cancellations. I tested it on Linux and OS X. (No time to fire up all my VMs right now.)

local loop = cqueues.new()
local snd, rcv = socket.pair()

loop:wrap(function ()
    sleep(0) --> make sure the reader is polling

    rcv:shutdown"rw" --> should wakeup all readers and writers

    --
    -- Sleep twice because the queueing order (LIFO or FIFO) of
    -- coroutines is unspecified. The current behavior is actually LIFO.
    --
    sleep(0)
    sleep(0)
    rcv:close()
end)

assert(loop:wrap(function ()
    print(fileresult(rcv:read()))
end):loop())
--
-- This module wraps a listening cqueues socket (specifically the :clients
-- iterator method), and allows asynchronous pausing and stopping of
-- connection retrieval.
--
local cqueues = require"cqueues"
local errno = require"cqueues.errno"
local condition = require"cqueues.condition"
local monotime = cqueues.monotime
local poll = cqueues.poll
local ETIMEDOUT = errno.ETIMEDOUT

local accept = {}

function accept.new(so)
    return setmetatable({
        socket = so,
        condvar = condition.new(),
        paused = false,
        stopped = false,
    }, {
        __index = accept
    })
end -- accept.new

-- proxy the socket:clients iterator factory
function accept:clients()
    return function ()
        local con, why

        repeat
            if self.stopped then
                return
            elseif self.paused then
                self.condvar:wait()
            else
                con, why = self.socket:accept(0.0)

                if not con then
                    if why ~= ETIMEDOUT then
                        return error(errno.strerror(ETIMEDOUT), 3)
                    end

                    poll(self.socket, self.condvar)
                end
            end
        until con

        return con
    end
end -- accept:clients

function accept:pause()
    self.paused = true
    self.stopped = false
    self.condvar:signal()
end -- accept:pause

function accept:stop()
    self.stopped = true
    self.paused = false
    self.condvar:signal()
end -- accept:pause

function accept:resume()
    self.paused = false
    self.stopped = false
    self.condvar:signal()
end -- accept:resume

return accept
wahern commented 10 years ago

Ugh. Github messed up the e-mail. The Lua comments made it indent half-way through the snippet. And then the attached accept.lua code was just prepended.

The solution for your particular problem was:

local loop = cqueues.new()
local snd, rcv = socket.pair()

loop:wrap(function ()
        sleep(0)
        rcv:shutdown"rw"
        sleep(0)
        sleep(0)
        rcv:close()
end)

assert(loop:wrap(function ()
        print(fileresult(rcv:read()))
end):loop())
daurnimator commented 10 years ago

The formatting is still messed up. Could you edit your github comment(s) to look correct? Surround with lua\n to get syntax highlighting.

After you fix it up I'll try replying in more detail :)


I considered that at the time. But I also couldn't think of any reason for one thread to close a socket currently being used by another. The only exception I thought of was cancellation

My usecase was that when a :write fails; I want to :close. Close rather than :shutdown, as file descriptors are a bit precious in this program :) If there was a :read going on concurrently, it was throwing an error.

wahern commented 10 years ago

Proof-of-concept fix for your current situation.

#!/usr/bin/env lua5.2

local cqueues = require"cqueues"
local socket = require"cqueues.socket"
local auxlib = require"cqueues.auxlib"

local sleep = cqueues.sleep
local assert, fileresult = auxlib.assert, auxlib.fileresult

local loop = cqueues.new()
local snd, rcv = socket.pair()

loop:wrap(function ()
    sleep(0) --> make sure rcv:read is polling
    rcv:shutdown"rw"
    sleep(0) --> allow other thread to wake up
    sleep(0) --> wait one more cycle because we may have been scheduled before the other thread
    rcv:close()
end)

assert(loop:wrap(function ()
    print(fileresult(rcv:read()))
end):loop())

It occurs to me that theoretically this might not work if more than KPOLL_MAXWAIT descriptor events are signaled and the reading thread isn't scheduled to run on the next step through the event loop. KPOLL_MAXWAIT is a constant in the C code and is only 32. However, in my benchmarks on a modern Xeon machine with 5,000+ active connections pushing enough data to saturate a 10Gb/s connection I've never seen the limit reached. CPUs are just too fast and the network just too slow. However, it does make me think I can't push this issue off into the future as much as I'd like. (OTOH, because of the current LIFO scheduling ordering, I think we'd have to see 64 descriptor events polling true before the one we're interested in, which is even more unlikely.)

wahern commented 10 years ago

Example use of condition variables and a proxy object to add cancellation support to socket I/O.

I wrote this module be able to pause acceptance of incoming connections. It's part of a daemon restart feature--on SIGHUP the daemon will re-execute itself, allowing the new instance to inherit the listening socket so that service is never disrupted. The existing instance pauses accepting new connections, allowing them to queue in the kernel. If the new instance successfully starts up, then the old instance stops listening entirely and finishes servicing its existing connections before exiting; otherwise it resumes accepting new connections.

--
-- This module wraps a listening cqueues socket (specifically the :clients
-- iterator method), and allows asynchronous pausing and stopping of
-- connection retrieval.
--
local cqueues = require"cqueues"
local errno = require"cqueues.errno"
local condition = require"cqueues.condition"
local monotime = cqueues.monotime
local poll = cqueues.poll
local ETIMEDOUT = errno.ETIMEDOUT

local accept = {}

function accept.new(so)
    return setmetatable({
        socket = so,
        condvar = condition.new(),
        paused = false,
        stopped = false,
    }, {
        __index = accept
    })
end -- accept.new

-- proxy the socket:clients iterator factory
function accept:clients()
    return function ()
        local con, why

        repeat
            if self.stopped then
                return
            elseif self.paused then
                self.condvar:wait()
            else
                con, why = self.socket:accept(0.0)

                if not con then
                    if why ~= ETIMEDOUT then
                        return error(errno.strerror(ETIMEDOUT), 3)
                    end

                    poll(self.socket, self.condvar)
                end
            end
        until con

        return con
    end
end -- accept:clients

function accept:pause()
    self.paused = true
    self.stopped = false
    self.condvar:signal()
end -- accept:pause

function accept:stop()
    self.stopped = true
    self.paused = false
    self.condvar:signal()
end -- accept:pause

function accept:resume()
    self.paused = false
    self.stopped = false
    self.condvar:signal()
end -- accept:resume

return accept
wahern commented 10 years ago

Updated proof-of-concept which avoids the issue with event dequeueing. Even though the shutdown will eventually wakeup the thread, explicitly canceling the descriptor will schedule it to run immediately, so you were on the right track originally.

#!/usr/bin/env lua5.2

local cqueues = require"cqueues"
local socket = require"cqueues.socket"
local auxlib = require"cqueues.auxlib"

local sleep = cqueues.sleep
local assert, fileresult = auxlib.assert, auxlib.fileresult

local loop = cqueues.new()
local snd, rcv = socket.pair()

loop:wrap(function ()
    sleep(0) --> make sure rcv:read is polling

    rcv:shutdown"rw" --> ensures future I/O attempts fail immediately
    cqueues.cancel(rcv) --> ensures pollers are immediately scheduled to run
    sleep(0) --> allow other thread(s) to wake up
    sleep(0) --> wait one more cycle because we may have been scheduled before the other thread(s)
    rcv:close()
end)

assert(loop:wrap(function ()
    print(fileresult(rcv:read()))
end):loop())
daurnimator commented 9 years ago

I had to use this hack again today.... I'm starting to think it may be a common use case we need to support better?

wahern commented 9 years ago

I think you're right. I'll try to commit a fix this week, similar to way the way you suggested. Although can't simply return "closed", at least not until the error protocol is revamped so it's string + error code rather than just error code. Maybe a new socket-specific error code, like SO_ECLOSED?

daurnimator commented 9 years ago

Although can't simply return "closed", at least not until the error protocol is revamped so it's string + error code rather than just error code. Maybe a new socket-specific error code, like SO_ECLOSED?

ugh. I think I'd rather it just returned nil for now. I already check for eof() when read returns nil, nil

jprjr commented 3 years ago

Hi there, just wanted to mention I hit this issue also.

I've got an app where one routine is calling read on a UDP socket in a loop, then from another routine I close the socket, I expected the read to return nil

jprjr commented 3 years ago

I tried doing the work-around mentioned before, not sure if the issue is that this is a UDP socket, but it still seems to throw an error:

local cqueues = require'cqueues'
local socket = require'cqueues.socket'
local sleep = cqueues.sleep

local cq = cqueues.new()

local sock = socket.listen({
  host = '127.0.0.1',
  port = '1234',
  type = socket.SOCK_DGRAM
})

sock:setmode('b','bn')
assert(sock:listen())

cq:wrap(function()
  local packet, err
  while not err do
    print('read')
    packet, err = sock:read('*a')
    print('done')
  end
end)

cq:wrap(function()
  cqueues.sleep(1) -- wait a second before closing

  -- perform work-around from github
  sleep(0)
  sock:shutdown('rw')
  cqueues.cancel(sock)
  sleep(0)
  sleep(0)
  sock:close()
end)

assert(cq:loop())

Results:

$ lua test.lua 
read
lua: test.lua:37: test.lua:20: socket:read: Transport endpoint is not connected
stack traceback:
    [C]: in function 'assert'
    test.lua:37: in main chunk
    [C]: in ?