uNetworking / uWebSockets

Simple, secure & standards compliant web server for the most demanding of applications
Apache License 2.0
17.29k stars 1.75k forks source link

Program crash when `publish` was called too many times in a short time in websocket server #1731

Closed suzixin closed 6 months ago

suzixin commented 6 months ago

Hi team

This is my program.

struct HttpWorker
{
    HttpWorker(WorkerType type) : type_(type) {}
    HttpWorker(const json &config)
    {
        port_ = config["port"];
    }
    void work();
    void send_message(json &message);

    us_listen_socket_t *listen_socket_;
    uWS::Loop *loop_;
    WorkerType type_;
    std::shared_ptr<uWS::App> app_;
    std::shared_ptr<std::thread> thread_;
    MdFixRouterInitiatorPtr initator_;
    int port_;
    void setInitiatorPtr(MdFixRouterInitiatorPtr initator)
    {
        this->initator_ = initator;
    }
};

void HttpWorker::work() {
    loop_ = uWS::Loop::get();
    app_ = std::make_shared<uWS::App>();

    std::string wsPath = "/ws";
    app_->ws<PerSocketData>(wsPath, {
        .compression = uWS::SHARED_COMPRESSOR,
        .maxPayloadLength = 16 * 1024 * 1024,
        .idleTimeout = 10,
        .sendPingsAutomatically = true,
        .open = [](auto *ws) {
            ws->subscribe("broadcast");
        },
        .message = [this](auto *ws, std::string_view message, uWS::OpCode opCode) {
            if (message == "closedown") {
                us_listen_socket_close(0, listen_socket_);
                ws->close();
            } else {
                ws->send(message, opCode);
            }
        },
        .ping = [this](auto *ws, std::string_view) {
            json message = {
                {"topic", "Ping"},
                {"data", std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()}
            };
            app_->publish("broadcast", message.dump(), uWS::TEXT);
        },
        .close = [](auto *ws, int code, std::string_view message) {
            ws->close();
        }
    });
    app_->listen(port_, [this](auto *token) {
        listen_socket_ = token;
        if (listen_socket_) {
            std::cout << "Thread " << std::this_thread::get_id() << " listening on port_ " << this->port_ << std::endl;
        } else {
            std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port_ " << this->port_ << std::endl;
        }
    }).run();
}

void HttpWorker::send_message(json& message) {
    app_->publish("broadcast", message.dump(), uWS::TEXT);
}

Program crash when HttpWorker::send_message was called too many times in a short time.

#0  __memmove_avx_unaligned_erms_rtm () at ../sysdeps/x86_64/multiarch/memmove-vec-unaligned-erms.S:317
#1  0x0000555555669763 in uWS::protocol::formatMessage<true> (
    dst=0x7fffe8003df0 "\364~\001U{\"data\":{\"Account\":\"137F\",\"AvgPx\":21.09,\"ClOrdID\":\"6600903040131000001\",\"CumQty\":1600.0,\"LeavesQty\":100.0,\"OrdStatus\":49,\"OrdType\":49,\"OrderID\":\"107476\",\"OrderQty\":1700.0,\"Price\":21.09,\"QuoteId\":\""..., src=0x0, length=341, opCode=116, reportedLength=341, compressed=false, fin=true)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/WebSocketProtocol.h:241
#2  0x000055555566a570 in uWS::WebSocket<false, true, int>::send (this=0x7fffe8055aa0, 
    message=<error reading variable: Cannot create a lazy string with address 0x0, and a non-zero length.>, 
    opCode=116, compress=false, fin=true)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/WebSocket.h:163
#3  0x0000555555664f76 in uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}::operator()(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags) (__closure=0x7fffe8008ed8, s=0x7fffe8055bd0, message=..., 
    flags=(uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::LAST | uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::FIRST))
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/App.h:317
#4  0x0000555555682d38 in std::__invoke_impl<bool, uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}&, uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags>(std::__invoke_other, uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}&, uWS::Subscriber*&&, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::T
--Type <RET> for more, q to quit, c to continue without paging--
opicTreeBigMessage>::IteratorFlags&&) (__f=...) at /usr/include/c++/11/bits/invoke.h:61
#5  0x000055555567d109 in std::__invoke_r<bool, uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}&, uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags>(uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}&, uWS::Subscriber*&&, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags&&) (__fn=...) at /usr/include/c++/11/bits/invoke.h:114
#6  0x0000555555673d2c in std::_Function_handler<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags), uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)#1}>::_M_invoke(std::_Any_data const&, uWS::Subscriber*&&, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags&&) (__functor=..., 
    __args#0=@0x7ffff70df790: 0x7fffe8055bd0, __args#1=..., 
    __args#2=@0x7ffff70df784: (uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::LAST | uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::FIRST)) at /usr/include/c++/11/bits/std_function.h:290
#7  0x000055555567bd81 in std::function<bool (uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags)>::operator()(uWS::Subscriber*, uWS::TopicTreeMessage&, uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::IteratorFlags) const (this=0x7fffe8008ed8, 
    __args#0=0x7fffe8055bd0, __args#1=..., 
    __args#2=(uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::LAST | uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::FIRST)) at /usr/include/c++/11/bits/std_function.h:590
#8  0x0000555555671b2d in uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drainImpl (
    this=0x7fffe8008ed0, s=0x7fffe8055bd0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/TopicTree.h:126
#9  0x000055555566c67a in uWS::TopicTree<uWS::TopicTreeMessage, uWS::TopicTreeBigMessage>::drain (
    this=0x7fffe8008ed0) at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/TopicTree.h:274
#10 0x000055555566502f in uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}::operator()(uWS::Loop*) const (__closure=0x7fffe8008f88)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/App.h:342
#11 0x000055555568e75c in std::__invoke_impl<void, uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}&, uWS::Loop*>(std::__invoke_other, uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}&, uWS::Loop*&&) (__f=...) at /usr/include/c++/11/bits/invoke.h:61
#12 0x00005555556896b0 in std::__invoke<uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}&, uWS::Loop*>(uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS--Type <RET> for more, q to quit, c to continue without paging--
::Loop*)#2}&, uWS::Loop*&&) (__fn=...) at /usr/include/c++/11/bits/invoke.h:96
#13 0x0000555555682f24 in std::invoke<uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}&, uWS::Loop*>(uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}&, uWS::Loop*&&) (__fn=...) at /usr/include/c++/11/functional:97
#14 0x000055555567d430 in ofats::any_detail::handler_traits<void, uWS::Loop*>::small_handler<uWS::TemplatedApp<false>::ws<PerSocketData>(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, uWS::TemplatedApp<false>::WebSocketBehavior<PerSocketData>&&)::{lambda(uWS::Loop*)#2}>::call(ofats::any_detail::storage&, uWS::Loop*) (s=..., args#0=0x7fffe8000ba0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/MoveOnlyFunction.h:133
#15 0x0000555555668056 in ofats::any_detail::any_invocable_impl<void, false, uWS::Loop*>::call (
    this=0x7fffe8008f88, args#0=0x7fffe8000ba0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/MoveOnlyFunction.h:247
#16 0x000055555566224f in ofats::any_invocable<void (uWS::Loop*)>::operator()(uWS::Loop*) (this=0x7fffe8008f88, 
    args#0=0x7fffe8000ba0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/MoveOnlyFunction.h:354
#17 0x000055555565931f in uWS::Loop::postCb (loop=0x7fffe8000ba0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/Loop.h:58
#18 0x000055555569ea5b in us_loop_run (loop=0x7fffe8000ba0) at ../../uWebSockets/uSockets/src/loop.c:194
#19 0x0000555555659806 in uWS::Loop::run (this=0x7fffe8000ba0)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/Loop.h:182
#20 0x000055555565981f in uWS::run ()
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/Loop.h:199
#21 0x00005555556678ad in uWS::TemplatedApp<false>::run (this=0x7fffe8007e60)
    at /home/suzx/github/cpp-qi-trader-fix/md_router/include/uWebSockets/App.h:591
uNetworkingAB commented 6 months ago

This is not a complete program and it's not a proper bug report. You can't just dump your project and expect someone else to solve your problems, you need to specify an isolated problem with the library, not just "my project is not working, fix it".

suzixin commented 6 months ago

it crash in this library clearly.

uNetworkingAB commented 6 months ago

Any single threaded library will crash if misused from different threads, or if passed invalid references.

There's an arrogance I don't need. But I can say from your fraction of program that you probably mix threads for HttpWorker (the name implies multithreaded use).

But since you only posted a fraction of your program, nobody on Earth will know or want to know.

This is not a usable bug report

suzixin commented 5 months ago

Thank you for pointing out this issue. It is indeed possible that multiple threads are calling the same instance's app_->publish("broadcast", message.dump(), uWS::TEXT). However, even after adding locks to protect this call, the problem persists. Could you advise on how best to utilize your internal synchronization mechanisms in this situation? I have tried reviewing the examples and documentation, but haven't found a clear approach. Any additional resources you could provide would be greatly appreciated.

uNetworkingAB commented 5 months ago

A lock doesn't do anything for a library that is single-threaded. The only function you can call from many threads is Loop::defer it is mentioned in readmore manual.