chriskohlhoff / asio

Asio C++ Library
http://think-async.com/Asio
4.89k stars 1.21k forks source link

Boost ASIO crashes while calling callback function #1355

Open 625Lalit opened 1 year ago

625Lalit commented 1 year ago

I am new to boost library. I have written a server using boost beast async methods, but sometimes it got crashed. The scenario is not common, sometime it will take 3-4 hr to get crashed. I am sharing the screenshot of crash point which I got from application crashed dump file.

Please help me to find the reason of the crash. image

I have done some research on google and also check this link "https://www.boost.org/doc/libs/1_54_0/boost/asio/detail/deadline_timer_service.hpp" but not able to understand it.

Header File

#include "server_certificate.hpp"
#include "xlogger.h"
#include "TSWebSocket.h"

#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/mutex.hpp>
#include "algorithm"
#include "cstdlib"
#include "functional"
#include "iostream"
#include "memory"
#include "string"
#include "thread"
#include "vector"
#include "map"
#include "queue"

namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace http = beast::http;           // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio;            // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>

class ISocketEvents
{
public:
    virtual void OnConnect(unsigned int clientID, bool connect, char* clientIp, unsigned short clientPort)=0;
    virtual void OnSend(unsigned int  clientID, unsigned int noOfBytesSend, SocketError code)=0;
    virtual void OnReceive(unsigned int clientID, char * buff, unsigned int size)=0;
    virtual void OnSocketAlert() = 0;
    virtual bool onHandShake(unsigned int clientId, bool bDone) = 0;
};

//------------------------------------------------------------------------------

// The following ifdef block is the standard way of creating macros which make exporting 
// from a DLL simpler. All files within this DLL are compiled with the WEBSOCKETSERVER_EXPORTS
// symbol defined on the command line. This symbol should not be defined on any project
// that uses this DLL. This way any other project whose source files include this file see 
// WEBSOCKETSERVER_API functions as being imported from a DLL, whereas this DLL sees symbols
// defined with this macro as being exported.
#ifdef WEBSOCKETSERVER_EXPORTS
#define WEBSOCKETSERVER_API __declspec(dllexport)
#else
#define WEBSOCKETSERVER_API __declspec(dllimport)
#endif
class WebSocketSession;
// Accepts incoming connections and launches the sessions
class WEBSOCKETSERVER_API WebSocketListener : public std::enable_shared_from_this<WebSocketListener>
{
private:

    std::shared_ptr<ISocketEvents> m_pEvent;
    CRITICAL_SECTION m_hCSClientidSession;
    std::map<UINT, std::shared_ptr<WebSocketSession>> m_mapClientidSession;
public:
    WebSocketListener(tcp::endpoint endpoint);
    ~WebSocketListener();
    void Run();
    void setEventHandler(std::shared_ptr<ISocketEvents> pEvent);
    bool SendMessageToClient(UINT uClientid, std::string strMessage);
    bool RemoveClient(UINT uClientid);
private:
    void do_accept();
    void on_accept(beast::error_code ec, tcp::socket socket);
};

//------------------------------------------------------------------------------

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession>
{
    websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws_;
    beast::flat_buffer m_ReadBuffer;
    beast::flat_buffer m_WriteBuffer;
    std::shared_ptr<ISocketEvents> m_pEvent;
    SOCKET m_SocketDescriptor;
    bool bHandShake;
public:
    // Take ownership of the socket
    WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId);
    void run();
    void closeSocket();
    void on_run();
    void on_handshake(beast::error_code ec);
    void on_accept(beast::error_code ec);
    void on_upgrade(beast::error_code ec, size_t);
    void do_read();
    void on_read(beast::error_code ec, std::size_t bytes_transferred);
    UINT getSocketDescriptor();
    bool SendMessageToClient(std::string strMessage);
    void do_Disconnect();
    ~WebSocketSession();
};

//------------------------------------------------------------------------------

// Factory function that creates instances of the Server Protocol object.
WEBSOCKETSERVER_API std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey);
//------------------------------------------------------------------------------

Source File

#include "stdafx.h"
#include "WebSocketServer.h"
//------------------------------------------------------------------------------

#define NO_OF_THREAD 50

static std::shared_ptr<WebSocketListener> objWebSocketListener = nullptr;

// The io_context is required for all I/O
boost::asio::io_context g_ioc{ NO_OF_THREAD };

// The SSL context is required, and holds certificates
ssl::context g_ctx{ ssl::context::tlsv12 };

tcp::acceptor g_acceptor(net::make_strand(g_ioc));
http::request<http::string_body> g_upgrade_request;

void RunIOContextThread();

//It initialize the WebSocketListener
 std::shared_ptr<WebSocketListener> __stdcall InitializeWebSocketListener(unsigned short port, int maxThreads, std::string certificate, std::string privateKey)
{
    auto const address = net::ip::make_address("0.0.0.0");

    // This holds the self-signed certificate used by the server
    load_server_certificate(g_ctx, certificate, privateKey);

    // Create and launch a listening port
    objWebSocketListener = std::make_shared<WebSocketListener>(tcp::endpoint{ address, port });
    if(objWebSocketListener)
        objWebSocketListener->Run();

    // Run the I/O service on the requested number of threads
    for (int i = 0; i < NO_OF_THREAD; ++i)
    {
        std::thread ioContextThread(&RunIOContextThread);
        ioContextThread.detach();
    }

    return objWebSocketListener->shared_from_this();
}

void RunIOContextThread()
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "RunIOContextThread thread started %d", std::this_thread::get_id());
    g_ioc.run();
}

//------------------------------------------------------------------------------

// Listener class
WebSocketListener::~WebSocketListener()
{
}

WebSocketListener::WebSocketListener(tcp::endpoint endpoint) :
    m_pEvent(NULL)
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Enter");
    InitializeCriticalSection(&m_hCSClientidSession);
    beast::error_code ec;

    // Open the acceptor
    g_acceptor.open(endpoint.protocol(), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener open failed, error message: %s", ec.message().c_str());
        return;
    }

    // Allow address reuse
    g_acceptor.set_option(net::socket_base::reuse_address(true), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option failed, error message: %s", ec.message().c_str());
        return;
    }

    g_acceptor.set_option(tcp::no_delay(true), ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener set_option(No delay) failed, error message: %s", ec.message().c_str());
        return;
    }

    // Bind to the server address
    g_acceptor.bind(endpoint, ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener bind failed, error message: %s", ec.message().c_str());
        return;
    }

    // Start listening for connections
    g_acceptor.listen(net::socket_base::max_listen_connections, ec);
    if (ec)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener listen failed, error message: %s", ec.message().c_str());
        return;
    }
    else
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::WebSocketListener Started listening at %s:%hu", g_acceptor.local_endpoint().address().to_string().c_str(), g_acceptor.local_endpoint().port());
    }

    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::WebSocketListener Constuctor Leave");
}

bool WebSocketListener::RemoveClient(UINT uClientid) 
{
    bool bRet = false;
    try
    {
        EnterCriticalSection(&m_hCSClientidSession);
        auto iter = m_mapClientidSession.find(uClientid);
        if (iter != m_mapClientidSession.end())
        {
            iter->second->closeSocket();
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::RemoveClient Removed from map uClientid : %d refCount: %d", uClientid, iter->second.use_count());
            m_mapClientidSession.erase(iter);
            bRet = true;
        }
        LeaveCriticalSection(&m_hCSClientidSession);
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::RemoveClient Inside Generic Catch");
    }
    return bRet;
}

bool WebSocketListener::SendMessageToClient(UINT uClientid, std::string strMessage)
{
    bool bRet = false;
    try
    {

        EnterCriticalSection(&m_hCSClientidSession);
        auto iter = m_mapClientidSession.find(uClientid);
        if (iter != m_mapClientidSession.end())
        {
            bRet = iter->second->SendMessageToClient(strMessage);
        }
        else
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient ClientID: %d not found", uClientid);
        }
        LeaveCriticalSection(&m_hCSClientidSession);
    }
    catch (exception& e)
    {
        LeaveCriticalSection(&m_hCSClientidSession);
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        LeaveCriticalSection(&m_hCSClientidSession);
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::SendMessageToClient Inside Generic Catch");
    }
    return bRet;
}

void WebSocketListener::setEventHandler(std::shared_ptr<ISocketEvents> pEvent)
{
    m_pEvent = pEvent;
}

// Start accepting incoming connections
void WebSocketListener::Run()
{
    do_accept();
}

void WebSocketListener::do_accept()
{
    try
    {
        // The new connection gets its own strand
        g_acceptor.async_accept(
            net::make_strand(g_ioc),
            beast::bind_front_handler(
                &WebSocketListener::on_accept,
                shared_from_this()));
    }
    catch(exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Catch msg: %s", e.what());
    }
    catch (...) 
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::do_accept Inside Generic Catch");
    }
}

void WebSocketListener::on_accept(beast::error_code ec, tcp::socket socket)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept accept failed, error SocketID: %d, message: %s", socket.native_handle(), ec.message().c_str());
        }
        else
        {
            socket.set_option(tcp::no_delay(true), ec);
            if (ec)
            {
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept set_option(No delay) failed, error message: %s", ec.message().c_str());
                return;
            }

            std::string sClientIp = socket.remote_endpoint().address().to_string();
            unsigned short uiClientPort = socket.remote_endpoint().port();
            SOCKET socketId = socket.native_handle();
            // Create the WebSocketSession and run it
            std::shared_ptr<WebSocketSession> objSession = std::make_shared<WebSocketSession>(std::move(socket), g_ctx, m_pEvent, socketId);
            if (objSession != nullptr)
            {
                objSession->run();

                EnterCriticalSection(&m_hCSClientidSession);
                auto itr = m_mapClientidSession.find(socketId);
                if (itr != m_mapClientidSession.end())
                {
                    itr->second = objSession;
                    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept update client id : %d", socketId);
                }
                else
                {
                    m_mapClientidSession.insert(pair<unsigned int, std::shared_ptr<WebSocketSession >>(socketId, objSession));
                    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept insert client id : %d", socketId);
                }
                LeaveCriticalSection(&m_hCSClientidSession);

                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketListener::on_accept Incoming connection request from SocketID: %d, IP: %s Port: %hu", socketId, sClientIp.c_str(), uiClientPort);
            }
        }
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketListener::on_accept Inside Generic Catch");
    }
    // Accept another connection
    do_accept();
}

//------------------------------------------------------------------------------

// Session class
WebSocketSession::WebSocketSession(tcp::socket&& socket, ssl::context& ctx, std::shared_ptr<ISocketEvents> pEvent, SOCKET socketId)
    : ws_(std::move(socket), ctx), m_pEvent(pEvent), m_SocketDescriptor(socketId)
{
    m_WriteBuffer.reserve(1000);
    m_ReadBuffer.reserve(1000);
    bHandShake = false;
}

WebSocketSession::~WebSocketSession()
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d Closing ", m_SocketDescriptor);
    if (ws_.is_open())
    {
        beast::websocket::close_reason closeReason;
        beast::error_code ec;
        ws_.close(closeReason, ec);
        if (ec)
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::~WebSocketSession SocketID: %d clsoing error msg: %s", ec.message().c_str());
    }
}

// Get on the correct executor
void WebSocketSession::run()
{
    try {
        // We need to be executing within a strand to perform async operations
        // on the I/O objects in this WebSocketSession. Although not strictly necessary
        // for single-threaded contexts, this example code is written to be
        // thread-safe by default.
        net::dispatch(ws_.get_executor(),
            beast::bind_front_handler(
                &WebSocketSession::on_run,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::run Inside Generic Catch");
    }
}

void WebSocketSession::closeSocket() 
{
    stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket Closing SocketID: %d", m_SocketDescriptor);
    beast::websocket::close_reason closeReason;
    beast::error_code ec; 
    UINT ID = m_SocketDescriptor;
    ws_.async_close(closeReason, [=](beast::error_code ec)
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::closeSocket SocketID: %d, Code: %d Message: %s", ID, closeReason, ec.message().c_str());
        }
    }
    );
}

// Start the asynchronous operation
void WebSocketSession::on_run()
{
    try
    {
        // Set the timeout.
        beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));

        // Perform the SSL handshake
        ws_.next_layer().async_handshake(
            ssl::stream_base::server,
            beast::bind_front_handler(
                &WebSocketSession::on_handshake,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_run Inside Generic Catch");
    }
}

void WebSocketSession::on_handshake(beast::error_code ec)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake handshake failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
            return;
        }
        // Turn off the timeout on the tcp_stream, because
        // the websocket stream has its own timeout system.
        beast::get_lowest_layer(ws_).expires_never();

        // Set suggested timeout settings for the websocket
        ws_.set_option(
            websocket::stream_base::timeout::suggested(
                beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_.set_option(websocket::stream_base::decorator(
            [](websocket::response_type& res)
        {
            res.set(http::field::server,
                std::string(BOOST_BEAST_VERSION_STRING) +
                " websocket-server-async-ssl");
        }));

        http::async_read(ws_.next_layer(), m_ReadBuffer, g_upgrade_request,
            beast::bind_front_handler(
                &WebSocketSession::on_upgrade,
                shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_handshake Inside Generic Catch");
    }
}

void WebSocketSession::on_upgrade(beast::error_code ec, size_t)
{
    try
    {
        if (ec)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade upgrade failed, error SocketID: %d, message: %s", m_SocketDescriptor, ec.message().c_str());
            return;
        }

        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_upgrade Handshake completed for SocketID: %d, WebSocket Headers: %s", m_SocketDescriptor, boost::lexical_cast<std::string>(g_upgrade_request.base()));

        if (m_pEvent != nullptr)
        {
            m_pEvent->OnConnect(m_SocketDescriptor, true, "", 0);
        }
        // Accept the websocket handshake
        ws_.async_accept(
            g_upgrade_request,
            beast::bind_front_handler(&WebSocketSession::on_accept, shared_from_this()));
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_upgrade Inside Generic Catch");
    }
}

void WebSocketSession::on_accept(beast::error_code ec)
{
    if (ec)
    {
        do_Disconnect();
        return;
    }
    // Read a message
    do_read();
}

void WebSocketSession::do_read()
{
    static boost::mutex objMutex;
    try
    {
        objMutex.lock();
        if (m_pEvent && !bHandShake)
        {
            bHandShake = true;
            m_pEvent->onHandShake(m_SocketDescriptor, bHandShake);
        }
        // Read a message into our buffer

        ws_.async_read(
                m_ReadBuffer,
                beast::bind_front_handler(
                    &WebSocketSession::on_read,
                    shared_from_this()));
        objMutex.unlock();
    }
    catch (exception& e)
    {
        objMutex.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        objMutex.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_read Inside Generic Catch");
    }
}

void WebSocketSession::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    try
    {
        boost::ignore_unused(bytes_transferred);

        // This indicates that the WebSocketSession was closed
        if (ec == websocket::error::closed)
        {
            do_Disconnect();
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client closed  SocketID: %d", m_SocketDescriptor);
            return;
        }

        if (ec)
        {
            do_Disconnect();
            return;
        }

        if (m_pEvent != nullptr)
        {
            if (m_ReadBuffer.cdata().size() > 0)
            {
                const unsigned int iLen = m_ReadBuffer.cdata().size();
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_INFO, "WebSocketSession::on_read Client SocketID: %d dataRecv: %d cpacity: %d", m_SocketDescriptor, iLen, m_ReadBuffer.capacity());

                std::shared_ptr<char> objData(new char[iLen], std::default_delete<char[]>());
                ::memset(objData.get(), 0, iLen);
                if (objData)
                {
                    memcpy(objData.get(), m_ReadBuffer.cdata().data(), iLen);
                    m_pEvent->OnReceive(m_SocketDescriptor, objData.get(), iLen);
                }
            }
        }
        m_ReadBuffer.consume(bytes_transferred);
    }
    catch (exception& e)
    {

        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::on_read Inside Generic Catch");
    }
    do_read();
}

UINT WebSocketSession::getSocketDescriptor()
{
    return m_SocketDescriptor;
}

void testDelete(char* p) 
{
    delete[] p;
}

struct DeleteChar {
    void operator()(char* p) const {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
            "Deleting the buffer");
        delete[] p;
    }
};

bool WebSocketSession::SendMessageToClient(std::string strMessage) 
{
    bool bRet = false;
    static std::mutex obj;
    try {
        obj.lock();

        if (strMessage.length() > 0)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u msgLen: %d, msg:%s",
                m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity(), strMessage.length(), strMessage.c_str());

            if (m_WriteBuffer.size() > 40960) // Pending data size is greater than 40 MB
            {
                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                    "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, Disconnecting client due to pending buffer limit crossed",
                    m_SocketDescriptor, m_WriteBuffer.size());
                do_Disconnect();
            }
            else
            {
                boost::beast::ostream(m_WriteBuffer) << strMessage << ":";

                stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                    "WebSocketSession::SendMessageToClient SocketID: %d, BufferSize: %u, BufferCapacity: %u",
                    m_SocketDescriptor, m_WriteBuffer.size(), m_WriteBuffer.capacity());

                ws_.async_write(m_WriteBuffer.data(),
                    [this](beast::error_code ec, std::size_t transfer)
                {
                    boost::ignore_unused(transfer);
                    if (ec)
                    {
                        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                            "WebSocketSession::SendMessageToClient callback error SocketID: %d, ErrorMsg:%s"/* ,Message: %s"*/, m_SocketDescriptor, ec.message().c_str()/*,strMessage.c_str()*/);

                        do_Disconnect();
                    }

                    obj.lock();
                    m_WriteBuffer.consume(transfer);
                    obj.unlock();
                });

                bRet = true;
            }
        }
        obj.unlock();
    }
    catch (exception& e)
    {
        obj.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        obj.unlock();
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::SendMessageToClient Inside Generic Catch");
    }
    return bRet;
}

void WebSocketSession::do_Disconnect()
{
    try {
        if (m_pEvent)
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::do_Disconnect disconnecting SocketID: %d", m_SocketDescriptor);
            m_pEvent->OnConnect(m_SocketDescriptor, false, "", 0);
        }
        else
        {
            stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR,
                "WebSocketSession::do_Disconnect Removing SocketID: %d", m_SocketDescriptor);
            objWebSocketListener->RemoveClient(m_SocketDescriptor);
        }
    }
    catch (exception& e)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Catch msg: %s", e.what());
    }
    catch (...)
    {
        stl::log::trace(stl::log::LOG_GROUP_ERROR, stl::log::LOG_LEVEL_ERROR, "WebSocketSession::do_Disconnect Inside Generic Catch");
    }
}