chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.88k stars 1.21k forks source link

Boost::Asio Asynchronous Client and Server Proxy implentation #1376

Open Saravanan1987 opened 11 months ago

Saravanan1987 commented 11 months ago

I'm working on implementing a Client and Server proxy system where the proxy is responsible for receiving commands from the client and forwarding them to the server, and vice versa. However, I'm facing a challenge in accessing the client socket from the server instance to forward messages to the client[and vice versa]. I'm currently learning about Boost.Asio behavior. Could you suggest the most effective approach for sharing the socket between the server and client instances? Below is the current implementation

In cmd_handler::read_cmd_done API - clientserver.is_open() is 0 but not 1.

#pragma once
#ifndef __OAMIP_PROXY_H__
#define __OAMIP_PROXY_H__

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <vector>
#include <functional>
#include <deque>
#include "config.h"

namespace asio = boost::asio;
class cmd_handler: public std::enable_shared_from_this<cmd_handler>
{
public:
    cmd_handler(asio::io_context &io_context, AppConfig* appConfig);
    ~cmd_handler();
    asio::ip::tcp::socket &socket();
    asio::ip::tcp::socket &c_socket();
    asio::ip::tcp::endpoint remote_endpoint();
    asio::ip::tcp::endpoint c_remote_endpoint();
    asio::io_context &m_io_context();
    void start();
    void read_cmd();
    void read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
    void read_cmd_client();
    void read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred);
private:
    asio::io_context& io_context_;
    asio::ip::tcp::socket server_socket_;
    asio::ip::tcp::socket client_socket_;
    asio::io_context::strand write_strand_;
    asio::streambuf in_packet_;
    std::deque<std::string> send_cmd_queue;
    std::mutex queue_mutex_;  // Added for thread safety
    AppConfig *app_config;
};

class ProxyServer
{
    using shared_handler_t = std::shared_ptr<cmd_handler>;
public:
    ProxyServer(int thread_count, AppConfig* appConfig);;
    ~ProxyServer();
    void start_server(std::string ip_addr, int port);
    void start_client(std::string ip_addr, int port);
    void handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec);
private:
    asio::io_context io_context_;
    int thread_count_;
    asio::ip::tcp::acceptor acceptor_;
    asio::ip::tcp::resolver resolver_;
    std::vector<std::thread> thread_pool_;
    asio::streambuf buffer_;
    AppConfig *app_config;
};

#endif //  __OAMIP_PROXY_H__

nagarajans1@PA168951:~/Projects/OpenAMIPProxy$ cat proxy.cpp
#include "proxy.h"
#include "parser.cpp"
#include "send.cpp"

ProxyServer::ProxyServer(int thread_count,
                         AppConfig *appconfig)
    : thread_count_(thread_count),
      acceptor_(io_context_),
      resolver_(io_context_),
      thread_pool_(),
      app_config(appconfig)
{
}
/*******************************************************************************************/
ProxyServer::~ProxyServer()
{
    // Stop and join the io_context to prevent memory leaks
    io_context_.stop();
    for (auto &thread : thread_pool_)
    {
        thread.join();
    }
}
/*******************************************************************************************/
void ProxyServer::start_server(std::string ip_addr, int port)
{
    std::cout << "Starting Server, IP:" << ip_addr << ", Port:" << port << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::address_v4 ipv4_address = asio::ip::address_v4::from_string(ip_addr);
    asio::ip::tcp::endpoint endpoint(ipv4_address, port);
    acceptor_.open(endpoint.protocol());
    acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
    acceptor_.bind(endpoint);
    acceptor_.listen();
    std::cout << "Start listening" << std::endl;
    try
    {
        acceptor_.async_accept(handler->socket(), [=](auto ec)
                               {
                                    if(ec)
                                        std::cerr << "Error accepting connection from : " << ec.message() << std::endl;
                                    else
                                        handle_new_connection(handler, ec); });
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }
    io_context_.run();
        // start pool of threads to process the asio events
 /*        for (int i = 0; i < thread_count_; ++i)
    {
        thread_pool_.emplace_back([=]
                                  { io_context_.run(); });
    }
    for (auto &thread : thread_pool_)
    {
        thread.join();
    } */
}
void ProxyServer::start_client(std::string ip_addr, int port)
{
    std::cout << "Starting client" << std::endl;
    auto handler = std::make_shared<cmd_handler>(io_context_, app_config);
    asio::ip::tcp::resolver::query query(ip_addr, std::to_string(port));
    asio::ip::tcp::resolver::iterator endpoint_iterator = resolver_.resolve(query);
    asio::connect(handler->c_socket(), endpoint_iterator);

    std::cout << "Connected to server on port " << port << std::endl;

    handler->read_cmd_client();
    io_context_.run();
}

/*******************************************************************************************/
void ProxyServer::handle_new_connection(shared_handler_t handler, boost::system::error_code const &ec)
{
    std::cout << "Handle connection" << std::endl;
    if (ec)
    {
        std::cerr << "Error accepting connection from client: " << ec.message() << std::endl;
        return;
    }
    handler->read_cmd();
    auto new_handler = std::make_shared<cmd_handler>(io_context_, app_config);
    acceptor_.async_accept(new_handler->socket(), [=](auto ec)
                           { handle_new_connection(new_handler, ec); });
}
/*******************************************************************************************/
cmd_handler::cmd_handler(asio::io_context &io_context, AppConfig *appConfig)
    : io_context_(io_context), server_socket_(io_context), client_socket_(io_context),write_strand_(io_context), app_config(appConfig)
{

}
/*******************************************************************************************/
cmd_handler::~cmd_handler()
{
    // Explicitly clear the buffer to release the allocated memory
    in_packet_.consume(in_packet_.size());
}
/*******************************************************************************************/
asio::ip::tcp::socket &cmd_handler::socket()
{
    return server_socket_;
}
asio::ip::tcp::socket &cmd_handler::c_socket()
{
    return client_socket_;
}
asio::ip::tcp::endpoint cmd_handler::remote_endpoint()
{
    return server_socket_.remote_endpoint();
}
asio::ip::tcp::endpoint cmd_handler::c_remote_endpoint()
{
    return client_socket_.remote_endpoint();
}
asio::io_context &cmd_handler::m_io_context()
{
    return io_context_;
}
/*******************************************************************************************/
void cmd_handler::start()
{
    read_cmd();
}
/*******************************************************************************************/
void cmd_handler::read_cmd()
{
    auto remote_ip = remote_endpoint().address().to_string();
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(server_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client()
{
    std::cout << "Read cmd client" << std::endl;
    auto remote_ip = c_remote_endpoint().address().to_string();
    std::cout << client_socket_.is_open() << std::endl;
    std::cout << "Read command from " << remote_ip << std::endl;
    asio::async_read_until(client_socket_,
                           in_packet_,
                           '\n',
                           [me = shared_from_this()](boost::system::error_code const &ec, std::size_t bytes_xfer)
                           {
                               if (ec == asio::error::eof)
                               {
                                   std::cout << "Connection closed by client:" << std::endl;
                                   return; // No need to read further; the connection is closed.
                               }
                               else if (ec)
                               {
                                   std::cerr << "Error in async_read_until: " << ec.message() << std::endl;
                                   return;
                               }
                               else
                               {
                                   me->read_cmd_client_done(ec, bytes_xfer);
                               }
                           });
}
void cmd_handler::read_cmd_client_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected server IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    //CmdSender sender(server_socket_);
    //sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
void cmd_handler::read_cmd_done(boost::system::error_code const &ec, std::size_t bytes_transferred)
{
    auto remote_ip = remote_endpoint().address().to_string();
    if (ec == asio::error::eof)
    {
        std::cout << "Connection closed by client:" << remote_ip << std::endl;
        return; // No need to read further; the connection is closed.
    }
    else if (ec)
    {
        std::cerr << "Error accepting packet from the client: " << remote_ip << "," << ec.message() << std::endl;
        return;
    }

    std::string command(buffers_begin(in_packet_.data()), buffers_begin(in_packet_.data()) + bytes_transferred);
    in_packet_.consume(bytes_transferred);
    std::cout << "Connected client IP: " << remote_ip << std::endl;
    std::cout << "command:" << command << std::endl;
    Parser parser(app_config);
    std::string recv_cmd = parser.process_cmd(command);
    std::cout << client_socket_.is_open() <<c_socket().is_open() << socket().is_open()<< std::endl;
    CmdSender sender(client_socket_);
    sender.send_cmd(recv_cmd);
    read_cmd();
};
/*******************************************************************************************/
int main() {
  try
    {

        int thread_count = 1;
        int port = appConfig.serverConfig.port;
        int s_port = appConfig.clientConfig.port;
        std::vector<std::thread> thread_pool;
        std::string ip_addr = appConfig.serverConfig.ip;

        // start pool of threads to process the asio events
        for (int i = 0; i < thread_count; ++i)
        {
            thread_pool.emplace_back([&]()
                                     {  proxy_server.start_server(ip_addr, port); });
        }
        for (int i = 0; i < thread_count; ++i)
        {
                thread_pool.emplace_back([&]()
                                         { proxy_server.start_client(ip_addr, s_port);});
        }
        for (auto &thread : thread_pool)
        {
            thread.join();
        }
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception caught: " << e.what() << std::endl;
    }

    return 0;
}