Closed lijh8 closed 1 year ago
Suppose I need to handle use case similar to Instant Messager program.
A user can read messages from a friend without writing to the friend first.
user1 --> server --> user2
How can I do this in asio?
The following is my equivalent example in libevent.
Thanks
// client.cpp
// ./libevent/sample/hello-world.c
/*
This example program provides a trivial server program that listens for TCP
connections on port 9995. When they arrive, it writes a short message to
each client connection, and closes each connection once it is flushed.
Where possible, it exits cleanly in response to a SIGINT (ctrl-c).
*/
#include <string.h>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef _WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif
#include <unistd.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
static const char MESSAGE[] = "Hello, World! ";
const char *message2;
static int PORT = 9995;
// static void listener_cb(struct evconnlistener *, evutil_socket_t,
// struct sockaddr *, int socklen, void *);
static void conn_readcb(struct bufferevent *, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
int
main(int argc, char **argv)
{
struct event_base *base;
// struct evconnlistener *listener;
struct event *signal_event;
struct sockaddr_in sin = {0};
#ifdef _WIN32
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);
#endif
if (argc != 4) {
printf("Usage: client <ip> <port> <msg>\n");
return 1;
}
base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
const char *host = argv[1];
const int port = atoi(argv[2]);
message2 = argv[3];
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
inet_aton(host, (struct in_addr*)&sin.sin_addr.s_addr);
// listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
// LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
// (struct sockaddr*)&sin,
// sizeof(sin));
// if (!listener) {
// fprintf(stderr, "Could not create a listener!\n");
// return 1;
// }
struct bufferevent * bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
fprintf(stderr, "Error constructing bufferevent!\n");
event_base_loopbreak(base);
return 1;
}
bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);
if (bufferevent_socket_connect(
bev, (struct sockaddr *)&sin, sizeof(sin)) < 0)
{
bufferevent_free(bev);
printf("bufferevent_socket_connect failed\n");
return 1;
}
bufferevent_enable(bev, EV_WRITE | EV_READ);
// signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
// if (!signal_event || event_add(signal_event, NULL)<0) {
// fprintf(stderr, "Could not create/add a signal event!\n");
// return 1;
// }
event_base_dispatch(base);
// evconnlistener_free(listener);
// event_free(signal_event);
event_base_free(base);
printf("done\n");
return 0;
}
// static void
// listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
// struct sockaddr *sa, int socklen, void *user_data)
// {
// struct sockaddr_in *in = (struct sockaddr_in *)sa;
// printf("accept connection: %s:%u \n",
// inet_ntoa(in->sin_addr), in->sin_port);
// struct event_base *base = (event_base *)user_data;
// struct bufferevent *bev;
// bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
// if (!bev) {
// fprintf(stderr, "Error constructing bufferevent!\n");
// event_base_loopbreak(base);
// return;
// }
// bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);
// bufferevent_enable(bev, EV_WRITE | EV_READ);
// }
static void
conn_readcb(struct bufferevent *bev, void *user_data)
{
char buf[1024] = {'\0'};
struct evbuffer *input = bufferevent_get_input(bev);
size_t length = evbuffer_get_length(input);
if (length > 0) {
bufferevent_read(bev, buf, sizeof(buf) - 1);
printf("%s\n", buf);
}
}
static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
// struct evbuffer *output = bufferevent_get_output(bev);
// size_t length = evbuffer_get_length(output);
// if (length == 0) {
// printf("flushed answer\n");
// // bufferevent_free(bev);
// }
sleep(1); //test
static int cnt = 1;
char buf[1000] = {'\0'};
snprintf(buf, sizeof(buf),"hello from client %s %d ", message2, cnt++);
bufferevent_write(bev, buf, strlen(buf));
}
static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
int finished = 0;
if (events & BEV_EVENT_EOF) {
printf("Connection closed.\n");
finished = 1;
} else if (events & BEV_EVENT_ERROR) {
printf("Got an error on the connection: %s\n",
// strerror(errno));/*XXX win32*/
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
finished = 1;
} else if (events & BEV_EVENT_CONNECTED) {
// ./libevent/ChangeLog-2.0
// Changes in 2.0.3-alpha (20 Nov 2009):
// When we send a BEV_EVENT_CONNECTED to indicate connected
// status, we no longer invoke the write callback as well unless
// we actually wrote data too.
// ./libevent/sample/ssl-client-mbedtls.c
const char *buf = "Hello, World! "; //new
bufferevent_write(bev, buf, strlen(buf)); //new
//or,
// ./libevent/test/test-ratelim.c
// conn_writecb(bev, user_data); //
}
/* None of the other events can happen here, since we haven't enabled
* timeouts */
if (finished) {
struct timeval delay = { 2, 0 };
free(user_data);
bufferevent_free(bev);
event_base_loopexit((event_base *)user_data, &delay);
}
}
// static void
// signal_cb(evutil_socket_t sig, short events, void *user_data)
// {
// struct event_base *base = (event_base *)user_data;
// struct timeval delay = { 2, 0 };
// printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
// event_base_loopexit(base, &delay);
// }
// server.cpp
// ./libevent/sample/hello-world.c
/*
This example program provides a trivial server program that listens for TCP
connections on port 9995. When they arrive, it writes a short message to
each client connection, and closes each connection once it is flushed.
Where possible, it exits cleanly in response to a SIGINT (ctrl-c).
*/
#include <string.h>
#include <string>
#include <errno.h>
#include <stdio.h>
#include <signal.h>
#ifndef _WIN32
#include <netinet/in.h>
# ifdef _XOPEN_SOURCE_EXTENDED
# include <arpa/inet.h>
# endif
#include <sys/socket.h>
#endif
#include <unistd.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/listener.h>
#include <event2/util.h>
#include <event2/event.h>
static const char MESSAGE[] = "Hello, World! ";
static int PORT = 9995;
static void listener_cb(struct evconnlistener *, evutil_socket_t,
struct sockaddr *, int socklen, void *);
static void conn_readcb(struct bufferevent *, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
int
main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;
struct sockaddr_in sin = {0};
#ifdef _WIN32
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);
#endif
signal(SIGPIPE, SIG_IGN);
base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
printf("Usage: server [port]\n");
if (argc == 2) {
PORT = atoi(argv[1]);
}
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
sin.sin_addr.s_addr = htons(INADDR_ANY);
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin, sizeof(sin));
if (!listener) {
fprintf(stderr, "Could not create a listener!\n");
return 1;
}
printf("listen on port: %d\n", PORT);
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
if (!signal_event || event_add(signal_event, NULL)<0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}
event_base_dispatch(base);
evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);
printf("done\n");
return 0;
}
static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data)
{
struct sockaddr_in *in = (struct sockaddr_in *)sa;
printf("accept connection: %s:%u \n",
inet_ntoa(in->sin_addr), in->sin_port);
struct event_base *base = (event_base *)user_data;
struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
fprintf(stderr, "Error constructing bufferevent!\n");
event_base_loopbreak(base);
return;
}
bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);
bufferevent_enable(bev, EV_WRITE | EV_READ);
// bufferevent_write(bev, MESSAGE, strlen(MESSAGE)); //
}
static void
conn_readcb(struct bufferevent *bev, void *user_data)
{
char buf[1024] = {'\0'};
struct evbuffer *input = bufferevent_get_input(bev);
size_t length = evbuffer_get_length(input);
if (length > 0) {
bufferevent_read(bev, buf, sizeof(buf) - 1);
printf("%s\n", buf);
}
}
static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
// struct evbuffer *output = bufferevent_get_output(bev);
// size_t length = evbuffer_get_length(output);
// if (length == 0) {
// printf("flushed answer\n");
// // bufferevent_free(bev);
// }
sleep(1); //test
static int cnt = 1;
char buf[1000] = {'\0'};
snprintf(buf, sizeof(buf),"hello from server %d ", cnt++);
bufferevent_write(bev, buf, strlen(buf));
}
static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
int finished = 0;
if (events & BEV_EVENT_EOF) {
printf("Connection closed.\n");
finished = 1;
} else if (events & BEV_EVENT_ERROR) {
printf("Got an error on the connection: %s\n",
// strerror(errno));/*XXX win32*/
evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
finished = 1;
}
/* None of the other events can happen here, since we haven't enabled
* timeouts */
if (finished) {
free(user_data); // ?
bufferevent_free(bev);
}
}
static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = (event_base *)user_data;
struct timeval delay = { 2, 0 };
printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
event_base_loopexit(base, &delay);
}
It seems that I fixed my example.
I changed the calls to async_read and async_write in server side, following the code in client side. server side read and write do not call each other.
handle_read --> start_read --> async_read handle_write --> start_write --> async_write
It seems that start_read can not call itself to perform async_read. The additional wrapper handle_read is needed. The same applies to start_write too.
Thanks for the great library.
// server.cpp
// ./boost_1_81_0/doc/html/boost_asio/example/cpp11/echo/async_tcp_echo_server.cpp
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <sanitizer/lsan_interface.h>
using boost::asio::ip::tcp;
void handlerCont(int signum){
if (signum == SIGCONT) {
printf("Got SIGCONT\n");
}
#ifndef NDEBUG
__lsan_do_recoverable_leak_check();
#endif
}
struct session
: public std::enable_shared_from_this<session>
{
session(tcp::socket socket)
: socket(std::move(socket)) { }
void start() {
start_read();
start_write();
}
void start_read() {
auto self(shared_from_this());
memset(data, 0, sizeof(data));
socket.async_read_some(boost::asio::buffer(data, max_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
handle_read(ec, length);
} else {
std::cout << ec.message() << "\n";
}
});
}
void handle_read(const boost::system::error_code& error,
std::size_t n) {
if (!error) {
std::cout << data;
start_read();
} else {
std::cout << error.message() << "\n";
}
}
void start_write() {
auto self(shared_from_this());
boost::asio::async_write(socket,
boost::asio::buffer(msg + (std::to_string(cnt++) + "\n").c_str()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) {
handle_write(ec);
} else {
std::cout << ec.message() << "\n";
}
});
}
void handle_write(const boost::system::error_code& error) {
if (!error) {
sleep(1); //test
start_write();
} else {
std::cout << error.message() << "\n";
}
}
tcp::socket socket;
enum { max_length = 1024 };
char data[max_length];
std::string msg = "hello client ";
size_t cnt = 0;
};
struct server {
server(boost::asio::io_context& io_context, short port)
: acceptor(io_context, tcp::endpoint(tcp::v4(), port))
{
std::cout << "listen on port: " << port << " \n";
do_accept();
}
void do_accept() {
acceptor.async_accept(
[this](boost::system::error_code ec, tcp::socket socket) {
if (!ec) {
std::cout << "accept connection: "
<< socket.remote_endpoint() << "\n";
std::make_shared<session>(std::move(socket))->start();
} else {
std::cout << ec.message() << ", " <<
socket.remote_endpoint() << "\n";
}
do_accept();
});
}
tcp::acceptor acceptor;
};
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: server <port>\n";
return 1;
}
signal(SIGCONT, handlerCont); // $ man 7 signal
boost::asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
return 0;
}
// client.cpp
// ./boost_1_81_0/doc/html/boost_asio/example/cpp11/timeouts/async_tcp_client.cpp
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <functional>
#include <iostream>
#include <string>
#include <sanitizer/lsan_interface.h>
using boost::asio::steady_timer;
using boost::asio::ip::tcp;
using std::placeholders::_1;
using std::placeholders::_2;
void handlerCont(int signum){
if (signum == SIGCONT) {
printf("Got SIGCONT\n");
}
#ifndef NDEBUG
__lsan_do_recoverable_leak_check();
#endif
}
struct client {
client(boost::asio::io_context& io_context)
: socket(io_context) { }
void start(tcp::resolver::results_type endpoints,
const std::string& msg)
{
endpoints_ = endpoints;
msg_ = msg;
start_connect(endpoints_.begin());
}
void start_connect(tcp::resolver::results_type::iterator
endpoint_iter)
{
if (endpoint_iter != endpoints_.end()) {
socket.async_connect(endpoint_iter->endpoint(),
std::bind(&client::handle_connect,
this, _1, endpoint_iter));
}
}
void handle_connect(const boost::system::error_code& error,
tcp::resolver::results_type::iterator endpoint_iter)
{
if (!socket.is_open()) {
std::cout << "Connect timed out\n";
start_connect(++endpoint_iter);
} else if (error) {
std::cout << "Connect error: " << error.message() << "\n";
socket.close();
start_connect(++endpoint_iter);
} else {
std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
//do not have to write before read
start_write();
start_read();
}
}
void start_write() {
boost::asio::async_write(socket,
boost::asio::buffer("hello server " + msg_ + " " +
std::to_string(cnt++) + "\n"),
std::bind(&client::handle_write, this, _1));
}
void handle_write(const boost::system::error_code& error) {
if (!error) {
sleep(1); //test
start_write();
} else {
std::cout << error.message() << "\n";
}
}
void start_read() {
boost::asio::async_read_until(socket,
boost::asio::dynamic_buffer(input_buffer), '\n',
std::bind(&client::handle_read, this, _1, _2));
}
void handle_read(const boost::system::error_code& error,
std::size_t n)
{
if (!error) {
std::string line(input_buffer.substr(0, n - 1));
input_buffer.erase(0, n);
if (!line.empty()) {
std::cout << line << "\n";
}
start_read();
} else {
std::cout << error.message() << "\n";
}
}
tcp::resolver::results_type endpoints_;
tcp::socket socket;
std::string input_buffer;
std::string msg_;
size_t cnt = 0;
};
int main(int argc, char* argv[]) {
if (argc != 4) {
std::cerr << "Usage: client <host> <port> <msg>\n";
return 1;
}
signal(SIGCONT, handlerCont); // $ man 7 signal
boost::asio::io_context io_context;
tcp::resolver r(io_context);
client c(io_context);
c.start(r.resolve(argv[1], argv[2]), argv[3]);
io_context.run();
return 0;
}
# apt install libboost-dev
#
# or, manually
# cd boost_1_81_0
# ./bootstrap.sh
# ./b2
# include path: /home/ljh/Downloads/boost_1_81_0
# library path: /home/ljh/Downloads/boost_1_81_0/stage/lib
cmake_minimum_required(VERSION 3.18)
project(MyProject VERSION 1.0)
set(CMAKE_BUILD_RPATH_USE_ORIGIN TRUE)
set(CMAKE_INSTALL_RPATH "$ORIGIN/../lib")
if (CMAKE_BUILD_TYPE STREQUAL Debug)
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -fsanitize=address) # .cpp
endif()
set(CMAKE_CXX_STANDARD 11) # .cpp
set(CMAKE_CXX_STANDARD_REQUIRED TRUE) # .cpp
#file(GLOB SOURCES *.cpp) # .cpp
#add_executable(main ${SOURCES})
add_executable(server server.cpp)
add_executable(client client.cpp)
#add_library(main SHARED ${SOURCES})
#set_target_properties(main PROPERTIES VERSION 1.2.3 SOVERSION 1.2)
target_link_libraries(server PUBLIC pthread)
target_link_directories(server PUBLIC
/home/ljh/Downloads/boost_1_81_0/stage/lib)
target_include_directories(server PUBLIC
/home/ljh/Downloads/boost_1_81_0/
/home/ljh/Downloads/boost_1_81_0/boost
)
target_link_libraries(client PUBLIC pthread)
target_link_directories(client PUBLIC
/home/ljh/Downloads/boost_1_81_0/stage/lib)
target_include_directories(client PUBLIC
/home/ljh/Downloads/boost_1_81_0/
/home/ljh/Downloads/boost_1_81_0/boost
)
#add_subdirectory(./foo)
# make DESTDIR=~/myproject install
install(TARGETS server client # foo
)
#file(GLOB LIBEVENT_LIB /home/ljh/Downloads/libevent/build/lib/*)
#install(FILES ${LIBEVENT_LIB} DESTINATION lib
# )
Hello Chris,
I am doing a simple example based on both echo/async_tcp_echo_server.cpp and timeouts/async_tcp_client.cpp examples in cpp11 folder.
1.
Is it possible to do it without nested function calls?
The server example calls functions recursively: do_read() -> do_write() -> do_read()
2.
Order of function calls matters?
Because the server calls read first. On client example, I need to call write first. Or, no one is going to receive any messages.
in libevent I register read and write callbacks with bufferevent_setcb. There is no limitation on the order of function calls.
Thanks