yse / easy_profiler

Lightweight profiler library for c++
MIT License
2.18k stars 189 forks source link

Sometimes cannot capture data from application #142

Open alprist opened 5 years ago

alprist commented 5 years ago

Sometimes the profiler_gui cannot capture data from profiled application. Also sometimes it is not possible to connect to device. The problem was inspected on all versions from the 1.1.0 to 2.0.1 (and later, to master) versions. May be the problem was before but I tried only from 1.1.0. I used it on Android devices.

The reason is the SocketListener::listenCapture method realization. It tries to read the header and assumes that it will have all bytes. But sometimes it may have not enough amount. So it need to save the read data and try to read later. But is doesn't. It always starts to fill its' buffer from start.

bytes = m_easySocket.receive(buffer, buffer_size);

The same situation when it tries to read reply blocks.

So I rewrote the method and it started to work properly:

void SocketListener::listenCapture()
{
    qInfo() << "listenCapture: start";

    EASY_STATIC_CONSTEXPR uint32_t bufferSize = 8 * 1024 * 1024;

    char* buffer = new char[bufferSize];
    uint32_t pos = 0;
    uint32_t availableBytes = 0;

    auto timeBegin = std::chrono::system_clock::now();

    bool isListen = true;

    //     availableBytes = 5
    //       |       |
    // 0 1 2 3 4 5 6 7 8 9 ... bufferSize -1
    //       |
    //       pos = 3

    while (isListen && !m_bInterrupt.load(std::memory_order_acquire))
    {
        if (m_bStopReceive.load(std::memory_order_acquire))
        {
            profiler::net::Message request(profiler::net::MessageType::Request_Stop_Capture);
            m_easySocket.send(&request, sizeof(request));
            m_bStopReceive.store(false, std::memory_order_release);
        }

        while (availableBytes < sizeof(profiler::net::Message))
        {
            if (pos != 0)
            {
                //shift buffer to begin
                memmove(buffer, buffer + pos, availableBytes);
                pos = 0;
            }

            int bytes = m_easySocket.receive(buffer + availableBytes, bufferSize - availableBytes);
            if (bytes < 0)
            {
                if (m_easySocket.isDisconnected())
                {
                    qInfo() << "listenCapture: Connection closed: " << bytes;
                    m_bConnected.store(false, std::memory_order_release);
                    isListen = false;
                    break;
                }
                else
                {
                    qInfo() << "listenCapture: read timeout";
                    continue;
                }
            }
            availableBytes += bytes;
        }

        if (!isListen)
        {
            break;
        }

        profiler::net::MessageType type = profiler::net::MessageType::Undefined;

        {
            auto message = reinterpret_cast<const ::profiler::net::Message*>(buffer + pos);
            if (!message->isEasyNetMessage())
            {
                qInfo() << "listenCapture: Received not easy net message";
                continue;
            }
            type = message->type;
        }

        switch (type)
        {
            case profiler::net::MessageType::Connection_Accepted:
            {
                qInfo() << "listenCapture: Receive MessageType::Connection_Accepted";
                //m_easySocket.send(&request, sizeof(request));
                pos += sizeof(profiler::net::Message);
                availableBytes -= sizeof(profiler::net::Message);
                if (availableBytes == 0)
                {
                    pos = 0;
                }
                break;
            }

            case profiler::net::MessageType::Reply_Capturing_Started:
            {
                qInfo() << "listenCapture: Receive MessageType::Reply_Capturing_Started";
                pos += sizeof(profiler::net::Message);
                availableBytes -= sizeof(profiler::net::Message);
                if (availableBytes == 0)
                {
                    pos = 0;
                }
                break;
            }

            case profiler::net::MessageType::Reply_Blocks_End:
            {
                qInfo() << "listenCapture: Receive MessageType::Reply_Blocks_End";
                pos += sizeof(profiler::net::Message);
                availableBytes -= sizeof(profiler::net::Message);
                if (availableBytes == 0)
                {
                    pos = 0;
                }

                const auto dt = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - timeBegin);
                const auto bytesNumber = m_receivedData.str().size();
                qInfo() << "listenCapture: received " << bytesNumber << " bytes, " << dt.count() << " ms, average speed = " << double(bytesNumber) * 1e3 / double(dt.count()) / 1024. << " kBytes/sec";

                isListen = false;

                qInfo() << "listenCapture: End MessageType::Reply_Blocks_End";
                break;
            }

            case profiler::net::MessageType::Reply_Blocks:
            {
                qInfo() << "listenCapture: Receive MessageType::Reply_Blocks, " << availableBytes << ", " << sizeof(profiler::net::DataMessage);

                while (availableBytes < sizeof(profiler::net::DataMessage))
                {
                    if (pos != 0)
                    {
                        //shift buffer to begin
                        memmove(buffer, buffer + pos, availableBytes);
                        pos = 0;
                    }

                    int bytes = m_easySocket.receive(buffer + availableBytes, bufferSize - availableBytes);
                    if (bytes < 0)
                    {
                        if (m_easySocket.isDisconnected())
                        {
                            qInfo() << "listenCapture: Connection closed: " << bytes;
                            m_bConnected.store(false, std::memory_order_release);
                            isListen = false;
                            break;
                        }
                        else
                        {
                            qInfo() << "listenCapture: read timeout";
                            continue;
                        }
                    }
                    availableBytes += bytes;
                }

                if (!isListen)
                {
                    break;
                }

                const profiler::net::DataMessage* dm = reinterpret_cast<const ::profiler::net::DataMessage*>(buffer + pos);
                timeBegin = std::chrono::system_clock::now();
                uint32_t neededSize = dm->size;
                qInfo() << "listenCapture: neededSize=" << neededSize;
                availableBytes -= sizeof(profiler::net::DataMessage);
                if (availableBytes == 0)
                {
                    pos = 0;
                }
                else
                {
                    pos += sizeof(profiler::net::DataMessage);
                }

                while (neededSize > 0)
                {
                    if (availableBytes > 0)
                    {
                        uint32_t bytesNumber = std::min(neededSize, availableBytes);
                        m_receivedSize += bytesNumber;
                        neededSize -= bytesNumber;
                        availableBytes -= bytesNumber;
                        m_receivedData.write(buffer + pos, bytesNumber);
                        if (availableBytes == 0)
                        {
                            pos = 0;
                        }
                        else
                        {
                            pos += bytesNumber;
                        }
                    }
                    else
                    {
                        pos = 0;
                        int bytes = m_easySocket.receive(buffer, bufferSize);
                        if (bytes < 0)
                        {
                            if (m_easySocket.isDisconnected())
                            {
                                qInfo() << "listenCapture: Connection closed: " << bytes;
                                m_bConnected.store(false, std::memory_order_release);
                                isListen = false;
                            }
                            else
                            {
                                qInfo() << "listenCapture: read timeout";
                                continue;
                            }
                        }
                        if (!isListen)
                        {
                            break;
                        }
                        availableBytes += bytes;
                    }
                }

                qInfo() << "listenCapture: End MessageType::Reply_Blocks";
                break;
            }

            default:
                qInfo() << "listenCapture: Receive unknown " << static_cast<int>(type);
                isListen = false;
                break;
        }
    }

    if (m_easySocket.isDisconnected())
    {
        clearData();
    }

    delete[] buffer;

    m_bCaptureReady.store(true, std::memory_order_release);

    qInfo() << "listenCapture: stop";
}

Please could you look at the code and make appropriate fix?

cas4ey commented 5 years ago

Hi @alprist

Thanks for the help :+1:

I can copy-paste the code you suggested or you can create a PR (you will be automaically added to the contributors list in such case). Which option do you prefer? =)

cas4ey commented 5 years ago

Fixed

alprist commented 3 years ago

Hi @cas4ey I have looked at your fix and noticed that you have high possibility to corrupt memory: https://github.com/yse/easy_profiler/blob/develop/profiler_gui/socket_listener.cpp

char* buffer = new char[buffer_size];
...
while (bytes < sizeof(profiler::net::Message))
{
    int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);
    if (receivedBytes < 1)
    {
        bytes = receivedBytes;
        break;
    }
    bytes += receivedBytes;
}

...
case profiler::net::MessageType::Reply_Blocks:
{
    qInfo() << "Receive MessageType::Reply_Blocks";

    while (bytes < sizeof(profiler::net::DataMessage))
    {
        int receivedBytes = m_easySocket.receive(buffer + seek + bytes, buffer_size);

The buffer capacity is buffer_size but you shift its pointer on (seek + bytes). So you cannot read full buffer capacity.