Closed ripose-jp closed 10 months ago
Here's a "minimal" example I could create.
#include <coro/coro.hpp>
#include <iostream>
int main()
{
auto scheduler = std::make_shared<coro::io_scheduler>(coro::io_scheduler::options{
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline
});
constexpr size_t REQUEST_SIZE{100 * 1024 * 1024};
auto make_bogus_read_task = [](std::shared_ptr<coro::net::tcp_client> client) -> coro::task<void>
{
using namespace std::chrono_literals;
co_await client->poll(coro::poll_op::read, 10s);
};
auto make_server_task = [&]() -> coro::task<std::string>
{
coro::net::tcp_server server{scheduler};
co_await scheduler->schedule();
auto poll_status = co_await server.poll();
if (poll_status != coro::poll_status::event)
{
co_return {};
}
std::shared_ptr<coro::net::tcp_client> client =
std::make_shared<coro::net::tcp_client>(server.accept());
if (!client->socket().is_valid())
{
co_return {};
}
scheduler->schedule(make_bogus_read_task(client));
std::string request(REQUEST_SIZE, 0xAA);
std::span<const char> remaining{request};
do
{
auto [send_status, r] = client->send(remaining);
if (send_status != coro::net::send_status::ok)
{
co_return {};
}
if (r.empty())
{
break;
}
remaining = r;
auto pstatus = co_await client->poll(coro::poll_op::write);
if (pstatus != coro::poll_status::event)
{
co_return {};
}
} while (true);
co_return request;
};
auto make_client_task = [&]() -> coro::task<std::string>
{
co_await scheduler->schedule();
coro::net::tcp_client client{scheduler};
auto status = co_await client.connect();
if (status != coro::net::connect_status::connected)
{
co_return {};
}
std::string response(REQUEST_SIZE, '\0');
std::span<char> remaining{response};
do
{
auto poll_status = co_await client.poll(coro::poll_op::read);
if (poll_status != coro::poll_status::event)
{
co_return {};
}
auto [recv_status, r] = client.recv(remaining);
if (recv_status != coro::net::recv_status::ok)
{
co_return {};
}
remaining = remaining.subspan(r.size_bytes());
} while (!remaining.empty());
co_return response;
};
auto result = coro::sync_wait(
coro::when_all(make_server_task(), make_client_task())
);
std::string request = std::move(std::get<0>(result).return_value());
std::string response = std::move(std::get<1>(result).return_value());
if (request.empty())
{
std::cout << "Request was empty\n";
}
else if (response.empty())
{
std::cout << "Response was empty\n";
}
else if (request != response)
{
std::cout << "Request != Response\n";
}
else
{
std::cout << "Request == Response\n";
}
}
I think I understand, it is two coroutines trying to register a read and a write concurrently, and since once already register the second one fails, correct? I hadn't really envisioned this use case, I always pretty much assumed one thread would "own" the socket or file descriptor and issue the read or write as they are needed (or both with the read write flag).
To allow for this behavior libcoro could track the state of each file descriptor and its current epoll state and check that before it goes to call epoll_ctl. I had thought of this before but didn't do it since it incurs a lookup cost to see what it is. On the flipside as you noticed the current implementation always de-registers unconditionally which is another cost since its a kernel call. I think a more shared state is possibly the right way to solve this, but it will require some synchronization in the io_scheduler to do so.
These are my initial thoughts on possibly fixing. I'll give it some more thought though since this is a use case I hadn't really envisioned yet.
I think I understand, it is two coroutines trying to register a read and a write concurrently, and since once already register the second one fails, correct?
I believe that's the issue since the code works perfectly without this line
scheduler->schedule(make_bogus_read_task(client));
The bullet points sound correct, though it would be useful if poll only returned coro::poll_status::event
in the case that the event it was actually waiting for occurred. For example EPOLLIN | EPOLLOUT
is set, but one thread is waiting on poll(coro::poll_op::read)
and other is waiting on poll(coro::poll_op::write)
.
I'll have some time to kill tomorrow, so I might work on a fix based on your outline. Thanks for the detailed and quick reply.
Another option:
"What happens if you register the same file descriptor on an epoll instance twice?
You will probably get EEXIST. However, it is possible to add
a duplicate ([dup(2)](https://man7.org/linux/man-pages/man2/dup.2.html), [dup2(2)](https://man7.org/linux/man-pages/man2/dup2.2.html), [fcntl(2)](https://man7.org/linux/man-pages/man2/fcntl.2.html) F_DUPFD) file
descriptor to the same epoll instance. This can be a useful
technique for filtering events, if the duplicate file
descriptors are registered with different events masks."
So it might be possible for you to dup your file descriptor and register the read and write in the two coroutines without any libcoro modifications.
Its probably still worthwhile for the io_scheduler to track polls at the event level and allow for your use case without dup, but I'd be curious if this works around the problem?
It sounds like it might solve the problem so long as fd1 and fd2 don't share event masks. I don't have access to a Linux right now, so I can't actually build this code, but here's a patch that might solve the issue. The idea is to undelete the copy constructor of tcp_client
and have it dup
the file descriptor for the socket. This solution solves the issue of two coroutines sharing a tcp_client
since a shared_ptr
no longer needs to be allocated. Though it does decrease safety since it makes it easier for a user to do multiple concurrent reads or multiple concurrent writes, which will never be safe. It also makes it easier to accidentally copy client when you don't mean to.
diff --git a/include/coro/net/tcp_client.hpp b/include/coro/net/tcp_client.hpp
index 1d91ac2..2f7b1a4 100644
--- a/include/coro/net/tcp_client.hpp
+++ b/include/coro/net/tcp_client.hpp
@@ -53,9 +53,9 @@ public:
.ssl_ctx = nullptr
#endif
});
- tcp_client(const tcp_client&) = delete;
+ tcp_client(const tcp_client&);
tcp_client(tcp_client&& other);
- auto operator=(const tcp_client&) noexcept -> tcp_client& = delete;
+ auto operator=(const tcp_client&) noexcept -> tcp_client&;
auto operator=(tcp_client&& other) noexcept -> tcp_client&;
~tcp_client();
diff --git a/src/net/tcp_client.cpp b/src/net/tcp_client.cpp
index 4ce70b3..46b055d 100644
--- a/src/net/tcp_client.cpp
+++ b/src/net/tcp_client.cpp
@@ -32,6 +32,18 @@ tcp_client::tcp_client(std::shared_ptr<io_scheduler> scheduler, net::socket sock
m_socket.blocking(coro::net::socket::blocking_t::no);
}
+tcp_client::tcp_client(const tcp_client& other)
+ : m_io_scheduler(other.m_io_scheduler),
+ m_options(other.m_options),
+ m_socket(dup(other.m_socket.native_handle())),
+ m_connect_status(other.m_connect_status)
+#ifdef LIBCORO_FEATURE_SSL
+ ,
+ m_ssl_info(other.m_ssl_info)
+#endif
+{
+}
+
tcp_client::tcp_client(tcp_client&& other)
: m_io_scheduler(std::move(other.m_io_scheduler)),
m_options(std::move(other.m_options)),
@@ -59,6 +71,21 @@ tcp_client::~tcp_client()
#endif
}
+auto tcp_client::operator=(const tcp_client& other) noexcept -> tcp_client&
+{
+ if (std::addressof(other) != this)
+ {
+ m_io_scheduler = other.m_io_scheduler;
+ m_options = other.m_options;
+ m_socket = {dup(other.m_socket.native_handle())};
+ m_connect_status = other.m_connect_status;
+#ifdef LIBCORO_FEATURE_SSL
+ m_ssl_info = other.m_ssl_info;
+#endif
+ }
+ return *this;
+}
+
auto tcp_client::operator=(tcp_client&& other) noexcept -> tcp_client&
{
if (std::addressof(other) != this)
I'll mess around with some ideas for the first solution you mentioned as well.
I think we should maybe just make the socket class copy-able and it calls dup? Then each class that uses it can leverage that to make itself also copyable
I'm going to try and roll the socket copy-able change into this: https://github.com/jbaldwin/libcoro/pull/216
I've been mulling on cleaning up the network stuff for a bit, so I think it'll naturally fit in here.
I've merged the network layout refactor (sorry it'll probably require you to update your namespaces but I think its cleaner for the tls
section at least).
I've also added a test #226 in a new PR if you want to read through it and make sure it is solving the behavior you were trying to use.
Thanks for the quick work. I had written a patch earlier today based on your copyable socket idea and it seemed to work. I tested the latest 4f3684ba7193322a1f741fe86b3112997e21ad30 and everything worked the same after changing the namespaces. All in all LGTM.
Regarding #226, the test looks fine in principle, but you have to send more data to get the problem to occur. I only noticed the issue when sending ~4MB or more of data. I'm sure this size is generally OS dependent though.
I had assumed the larger amount was to make sure they both triggered at the same time, so I added a wait to make sure the bogus read was for sure registered in epoll, if I remove the wait it doesn't trigger consistently.
Getting some weird linker error on the PR though so I'll need to work through that before I merge it. Glad the copyable socket is working for you though.
I'm writing a system where a
coro::net::tcp_server
listens for incoming connections and does two things when receiving an incoming connection:poll(coro::poll_op::read)
,recv()
, and puts structured data in aring_buffer
. This coroutine lives in its owntask_container
.poll(coro::poll_op::write)
andsend()
when outgoing data is ready.The issue arises at the
poll
call. I believe thatcoro::net::tcp_client
both share an epoll file descriptor, so when two coroutines concurrently try to poll for both EPOLLIN and EPOLLOUT separately, it is possible forepoll_ctl
to fail withEEXIST
right here.I don't think that epolling for data concurrently should cause error conditions, but I'm having trouble thinking of ways to fix the current API design.
My first though was separating
poll_status::event
intopoll_status::read
andpoll_status::write
so that two coroutines could concurrently poll. If a reader coroutine gets apoll_status::write
it can ignore it and vice versa. The problem with this approach is aftertcp_client::poll()
returns, I believe it unregisters that file descriptor fromepoll
which will leave the other coroutine in an undefined state. Some sort of ref count would need to be included inpoll_info
or something to prevent this. It is also a breaking API change.Currently I've modified this code which seems to fix the issue in the case of an
coro::io_scheduler
using inline task processing. I have tested it using a threadpool yet. Regardless, I have misgivings about it's correctness, so I'm interested in finding a better solution to the problem.