yubox-node-org / AsyncTCPSock

Reimplementation of the API of me-no-dev/AsyncTCP using high-level BSD sockets
GNU Lesser General Public License v3.0
20 stars 9 forks source link

Crash on page refresh and no web response after a while #6

Closed zekageri closed 2 years ago

zekageri commented 3 years ago

I got this error really rarely on page load:

/home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:1442 (xQueueGenericReceive)- assert failed!
abort() was called at PC 0x4009056d on core 1

ELF file SHA256: 0000000000000000

Backtrace: 0x4008f5c4:0x3ffd82a0 0x4008f83d:0x3ffd82c0 0x4009056d:0x3ffd82e0 0x401a55dc:0x3ffd8320 0x401a5728:0x3ffd8340 0x401a57cd:0x3ffd8370 0x401a6275:0x3ffd83a0 0x401a53cb:0x3ffd8400 0x40090842:0x3ffd8450
  #0  0x4008f5c4:0x3ffd82a0 in invoke_abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #1  0x4008f83d:0x3ffd82c0 in abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #2  0x4009056d:0x3ffd82e0 in xQueueGenericReceive at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:2038
  #3  0x401a55dc:0x3ffd8320 in AsyncClient::_clearWriteQueue() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:339
  #4  0x401a5728:0x3ffd8340 in AsyncClient::_error(signed char) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:339
  #5  0x401a57cd:0x3ffd8370 in AsyncClient::_notifyWrittenBuffers(std::deque<AsyncClient::notify_writebuf, std::allocator<AsyncClient::notify_writebuf> >&, int) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:339
  #6  0x401a6275:0x3ffd83a0 in AsyncClient::_sockIsWriteable() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:339
  #7  0x401a53cb:0x3ffd8400 in _asynctcpsock_task(void*) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:339
  #8  0x40090842:0x3ffd8450 in vPortTaskWrapper at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/port.c:355 (discriminator 1)

Rebooting...

@avillacis @ullix @Pablo2048 Previous discussion here

https://github.com/me-no-dev/ESPAsyncWebServer/issues/984

avillacis commented 3 years ago

Do you have a minimal reproducer or test case? It looks as though the derived destructor of AsyncClient is unprotected against concurrent check of writable socket, but since the error is rarely encountered, I am not sure if my potential fix will actually fix this.

zekageri commented 3 years ago

I will try to find a reproduction case. It looks like for now that it will happen if four or more clients are connected to one web socket. But i'm not sure yet.

avillacis commented 3 years ago

I have just uploaded commit 8ee20adb2bbc159a2e5155953fd455090eaf9fbf which wraps AsyncClient destructor with the global list semaphore, so that it may not start destruction from another task (and invalidate the socket and the write mutex) while the asyncTcpSock task is still processing the client. This probably fixes your scenario, but to be sure I would need a reproducer. I believe this scenario happens when a websocket or server-sent-event handler decides to close or destroy the connection in a task other than the socket callback task (asyncTcpSock).

zekageri commented 3 years ago

I downloaded it again in the morning. Testing since then but i can't reproduce it. And suddenly the webserver is chilling without refreshes and whatnot and the exact crash happened out of the blue.

/home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:1442 (xQueueGenericReceive)- assert failed!
abort() was called at PC 0x4009056d on core 1

ELF file SHA256: 0000000000000000

Backtrace: 0x4008f5c4:0x3ffd8760 0x4008f83d:0x3ffd8780 0x4009056d:0x3ffd87a0 0x401a6d24:0x3ffd87e0 0x401a6e88:0x3ffd8800 0x401a6f2d:0x3ffd8830 0x401a79d5:0x3ffd8860 0x401a6b13:0x3ffd88c0 0x40090842:0x3ffd8910
  #0  0x4008f5c4:0x3ffd8760 in invoke_abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715     
  #1  0x4008f83d:0x3ffd8780 in abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #2  0x4009056d:0x3ffd87a0 in xQueueGenericReceive at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:2038
  #3  0x401a6d24:0x3ffd87e0 in AsyncClient::_clearWriteQueue() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #4  0x401a6e88:0x3ffd8800 in AsyncClient::_error(signed char) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #5  0x401a6f2d:0x3ffd8830 in AsyncClient::_notifyWrittenBuffers(std::deque<AsyncClient::notify_writebuf, std::allocator<AsyncClient::notify_writebuf> >&, int) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #6  0x401a79d5:0x3ffd8860 in AsyncClient::_sockIsWriteable() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #7  0x401a6b13:0x3ffd88c0 in _asynctcpsock_task(void*) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #8  0x40090842:0x3ffd8910 in vPortTaskWrapper at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/port.c:355 (discriminator 1)

Rebooting...

There was one socket connection, no refresh, just basic websocket data send.

avillacis commented 3 years ago

@zekageri Did you actually replace the code with the one that contains the locking around the destructor? You should check that you are not using a stale version of the library. And in any case, your backtrace is not completely useful, since all of the line numbers are identical within AsyncTCP.cpp, which cannot possibly be the case. It is strange, the Linux version of the backtrace decoder does not do this.

zekageri commented 3 years ago

Oh. Silly me. I did not replace it. I thought that you merged this for some reason. Omm. The decoder that i use is the PIO's built in. I will replace it soon. Thank you

zekageri commented 3 years ago

So far it seems to mee that this crash occours when i just upload a file to the filesystem and immidiately refreshing the page.

/home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:1442 (xQueueGenericReceive)- assert failed!
abort() was called at PC 0x4009056d on core 1

ELF file SHA256: 0000000000000000

Backtrace: 0x4008f5c4:0x3ffd8440 0x4008f83d:0x3ffd8460 0x4009056d:0x3ffd8480 0x401a7270:0x3ffd84c0 0x401a73d4:0x3ffd84e0 0x401a7479:0x3ffd8510 0x401a7f21:0x3ffd8540 0x401a705f:0x3ffd85a0 0x40090842:0x3ffd85f0
  #0  0x4008f5c4:0x3ffd8440 in invoke_abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #1  0x4008f83d:0x3ffd8460 in abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #2  0x4009056d:0x3ffd8480 in xQueueGenericReceive at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:2038
  #3  0x401a7270:0x3ffd84c0 in AsyncClient::_clearWriteQueue() at 
lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #4  0x401a73d4:0x3ffd84e0 in AsyncClient::_error(signed char) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #5  0x401a7479:0x3ffd8510 in AsyncClient::_notifyWrittenBuffers(std::deque<AsyncClient::notify_writebuf, std::allocator<AsyncClient::notify_writebuf> >&, int) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #6  0x401a7f21:0x3ffd8540 in AsyncClient::_sockIsWriteable() at 
lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #7  0x401a705f:0x3ffd85a0 in _asynctcpsock_task(void*) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #8  0x40090842:0x3ffd85f0 in vPortTaskWrapper at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/port.c:355 (discriminator 1)

Rebooting...

Same error message. Happened a few times in days. But it certainly needs a user interaction.

zekageri commented 3 years ago

I can reproduce it if two client wants to reach the web simultaneously. ( With the changes you made in this commit )

zekageri commented 2 years ago
/home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:1442 (xQueueGenericReceive)- assert failed!
abort() was called at PC 0x4008f2dd on core 1     

ELF file SHA256: 0000000000000000

Backtrace: 0x4008e334:0x3ffd9390 0x4008e5ad:0x3ffd93b0 0x4008f2dd:0x3ffd93d0 0x4016de00:0x3ffd9410 0x4016df64:0x3ffd9430 0x4016e009:0x3ffd9460 0x4016eaa5:0x3ffd9490 0x4016dbef:0x3ffd94f0 0x4008f5b2:0x3ffd9540
  #0  0x4008e334:0x3ffd9390 in invoke_abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #1  0x4008e5ad:0x3ffd93b0 in abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715    
  #2  0x4008f2dd:0x3ffd93d0 in xQueueGenericReceive at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:2038
  #3  0x4016de00:0x3ffd9410 in AsyncClient::_clearWriteQueue() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #4  0x4016df64:0x3ffd9430 in AsyncClient::_error(signed char) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #5  0x4016e009:0x3ffd9460 in AsyncClient::_notifyWrittenBuffers(std::deque<AsyncClient::notify_writebuf, std::allocator<AsyncClient::notify_writebuf> >&, int) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #6  0x4016eaa5:0x3ffd9490 in AsyncClient::_sockIsWriteable() at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #7  0x4016dbef:0x3ffd94f0 in _asynctcpsock_task(void*) at lib\AsyncTCPSock-master\src/AsyncTCP.cpp:347
  #8  0x4008f5b2:0x3ffd9540 in vPortTaskWrapper at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/port.c:355 (discriminator 1)

Rebooting...

I can confirm that it crashes if two or more websocket client wants to connect at nearly the same time. Always this error message. Meanwhile i got 190kb free available ram.

zekageri commented 2 years ago

My socket callback looks like this:

        const size_t SOCKET_DATA_SIZE = 65000;
        char * socketData;
        int currSocketBufferIndex     = 0;

void setup(){
    socketData = (char *) ps_malloc (SOCKET_DATA_SIZE * sizeof (char));
}

void onWsEvent(AsyncWebSocket* server, AsyncWebSocketClient* client, AwsEventType type, void* arg, uint8_t* data, size_t len) {
    if (type == WS_EVT_CONNECT) {
        Serial.println("A client connected");
        if( ws.count() >= maxSocketClients ){
            shouldCleanup = true;
        }
    } else if (type == WS_EVT_DISCONNECT) {
        Serial.println("A client disconnected");
    }else if (type == WS_EVT_DATA) {
        AwsFrameInfo * info = (AwsFrameInfo*)arg;
        if(info->opcode == WS_TEXT){
            for (size_t i = 0; i < len; i++){
                socketData[currSocketBufferIndex] = data[i];
                currSocketBufferIndex++;
            }
        }
        if( currSocketBufferIndex >= info->len ){
            canProcessSocketData = true;
            socketClientID = client->id();
        }
    }
}

static const inline void processSocketData(){
    if(canProcessSocketData){
        canProcessSocketData = false;
        //Process socket data inside socketData...
    }
}

void loop(){
    processSocketData();
}
zekageri commented 2 years ago

Any idea?

avillacis commented 2 years ago

Sorry, I have been busy with a project in my day job.

I must ask: is the quoted code above the actual production code used in your project? The code that handles any asynchronous socket event, including any handlers for HTTP responses, websockets, etcetera, runs in a completely different task from the main loop, the one that invokes processSocketData() repeatedly. The one thing that can be somewhat depended on is that each handler must return before the next one (on any socket object) can be invoked.

In your sample code, the variables socketData (as well as any memory pointed to by this pointer) and currSocketBufferIndex are being shared between at least two tasks, as well as canProcessSocketData (declaration not shown but assumed from context to be a bool) and socketClientID (declaration not shown but assumed to be an uint32_t or similar) . There is no evidence of any task synchronization primitives (semaphore, mutex, queue, critical section) being used to arbitrate concurrent access of these variables from both the loop task and the AsyncTCPSock task (the one running the websocket handler). Therefore, this code is ripe for race conditions in which the loop task (presumed to touch socketData and currSocketBufferIndex) starts processing and possibly updating the index after doing so, at the same time a second websocket event (not even from a different connection, but a second message from the same one) starts shoveling data using the same pointer and index that are being updated from the loop task. Or, since the code saves the socketClientID, the loop task might prepare a response for one client, while at the same time the websocket handler changes the value under the other task's feet when receiving the message from a different client. Result: a response being sent to the wrong client (the last one that sent a message), or else assumed to be from the wrong client, depending on the exact processing done by processSocketData.

At least, uncoordinated access to a memory buffer can and will result in random memory corruption.

Not to mention the apparent buffer overflow that might happen if, even without the previously-explained data races, the websocket handler manages to fill up the socketData buffer beyond SOCKET_DATA_SIZE because the handler was invoked enough times (from one or more clients), before the loop task has had a chance to run. This is likely to happen, since the AsyncTCPSock task has a higher default priority than the loop task, and if both are runnable, the AsyncTCPSock task will be scheduled first.

Unless and until this code is encapsulated and wrapped around proper task synchronization primitives, I must assume that any corruption you see (including corruption to the AsyncTCPSock structures) is merely a result of this uncoordinated access between two different tasks.

zekageri commented 2 years ago

Oh god. Thanks for your time. You must be absolutely right. I never thought about that. I will implement a proper task sync in these functions. Bu if I implement a semaphore to handle just one task at a time doesn't that mean that one client will successfully connect while the other fails because the semaphore won't let the task to process the next client? If I implement a queue, and start to put the long socket packets to a char array and meanwhile the other client sends it's own packet, the char array will be full of the two client's packets. I need some kind of a ring buffer.

Really appreciate your effort to answer. Thanks again.

zekageri commented 2 years ago

Or i could create a new class for every client on connect and handle each message from each client in it's own class. But that would eat up my whole ram quickly

zekageri commented 2 years ago

I'm thinking about something like this:

static const inline void processSocketClientMessage(JsonObject packet,int clientID);

#define CLIENT_STRUCT_SIZE 180000
#define CLIENT_MSG_SIZE 150000
#define MAX_SOCKET_CLIENT 10

typedef struct {
    int messageIndex;
    uint32_t clientID;
    char* clientMessage;
    boolean canProcessData = false;
} clientStruct;
clientStruct * socketClients[MAX_SOCKET_CLIENT] EXT_RAM_ATTR;

static const inline void removeClientFromStruct(uint32_t clientID){
    for (size_t i = 0; i < MAX_SOCKET_CLIENT; i++){
        if( socketClients[i] != 0 ){
            if( socketClients[i]->clientID == clientID ){
                free(socketClients[i]->clientMessage);
                free(socketClients[i]);
                socketClients[i] = 0;
                break;
            }
        }
    }
}

static const inline byte addClientIfNotExists(uint32_t clientID){
    int availableIndex = -1;
    boolean foundClient = false;
    for(byte structIndex = 0; structIndex < MAX_SOCKET_CLIENT; structIndex++){
        if( socketClients[structIndex] != 0 && socketClients[structIndex] != NULL ){
            if( socketClients[structIndex]->clientID == clientID ){
                availableIndex  = structIndex;
                foundClient     = true;
                break;
            }
        }else{
            availableIndex = structIndex;
        }
    }

    if( availableIndex != -1 && !foundClient ){
        socketClients[availableIndex]                   = new clientStruct; //(clientStruct *) ps_malloc (CLIENT_STRUCT_SIZE * sizeof (clientStruct));
        socketClients[availableIndex]->clientMessage    = (char *) ps_malloc (CLIENT_MSG_SIZE * sizeof (char));
        socketClients[availableIndex]->messageIndex     = 0;
        socketClients[availableIndex]->clientID         = clientID;
        socketClients[availableIndex]->canProcessData   = false;
    }
    return availableIndex;
}

static const inline void addMessageToClientStruct(uint8_t * data, size_t len, uint32_t clientID, int totalMessageLength ){
    byte clientIndex = addClientIfNotExists(clientID);
    if( clientIndex != -1 ){
        Serial.printf("Found client index: %d\n",clientIndex);
        for (size_t i = 0; i < len; i++){
            socketClients[clientIndex]->clientMessage[i] = data[i];
            socketClients[clientIndex]->messageIndex++;
        }

        if( socketClients[clientIndex]->messageIndex >= totalMessageLength ){
            socketClients[clientIndex]->clientMessage[ socketClients[clientIndex]->messageIndex ] = '\0';
            socketClients[clientIndex]->canProcessData = true;
        }
    }
}

byte globalClientIndex = 0;
static const inline void processSocketDataLoop(){
    if( socketClients[globalClientIndex] != 0 && socketClients[globalClientIndex] != NULL && socketClients[globalClientIndex]->canProcessData ){
        SpiRamJsonDocument doc(CLIENT_MSG_SIZE);
        DeserializationError error  =  deserializeJson(doc, socketClients[globalClientIndex]->clientMessage);
        socketClients[globalClientIndex]->canProcessData  = false;
        socketClients[globalClientIndex]->messageIndex    = 0;
        if( !error ){
            JsonObject packet = doc.as<JsonObject>();
            processSocketClientMessage(packet, socketClients[globalClientIndex]->clientID);
        }else{
            // For some reason a strange clinet ID got printed every loop but none of the other: 15204602
            Serial.printf("%d Client message deserialization error: %s\n", socketClients[globalClientIndex]->clientID, error.c_str());
        }
    }
    globalClientIndex++;
    if(globalClientIndex > MAX_SOCKET_CLIENT){globalClientIndex = 0;}
}

void onWsEvent(AsyncWebSocket* server, AsyncWebSocketClient* client, AwsEventType type, void* arg, uint8_t* data, size_t len) {
    if (type == WS_EVT_CONNECT) {
        latestSocketCLientID = client->id();
    } else if (type == WS_EVT_DISCONNECT) {
        latestSocketCLientID = client->id();
        removeClientFromStruct(client->id());
    }else if (type == WS_EVT_DATA) {
        AwsFrameInfo * info = (AwsFrameInfo*)arg;
        if(info->opcode == WS_TEXT){
            addMessageToClientStruct(data, len, client->id(), info->len);
        }
    }
}

static const inline void processSocketClientMessage(JsonObject packet,int clientID){
    if( packet["type"] == "example" ){
        processExamplePacket(packet);
    }
}

void serverTask(void* parameter) {
    // Set every index to 0 in the client array
    memset(socketClients, 0, sizeof(socketClients));
    ESP_SERVER hshServer;
    hshServer.setup();
    for ever {
        if( !firmwareIsUpdating ){
            processSocketDataLoop();
            vTaskDelay(1);
        }else{
            vTaskDelay(100);
        }
    }
}

Can it be good enough?

Thank you for your response.

zekageri commented 2 years ago

Okay so far it is working with little tweaks. I tried to allocate the structs in the array in PS_RAM but unfortunatelly it does not work for some reason. So i stick with new and allocate all my structs and messages inside the structs on task start and manage the connection and disconnection with an isEmpty flag. I will test this code and get back here.

Here is the code in it's raw form now:

#define CLIENT_STRUCT_SIZE 180000
#define CLIENT_MSG_SIZE 150000
#define MAX_SOCKET_CLIENT 10

static const inline void createClientSpaces();
static const inline void removeClientFromStruct(uint32_t clientID);
static const inline byte addClientIfNotExists(uint32_t clientID);
static const inline void addMessageToClientStruct(uint8_t * data, size_t len, uint32_t clientID, int totalMessageLength );
static const inline void processSocketDataLoop();

typedef struct {
    int messageIndex        = -1;
    int clientID            = -1;
    char* clientMessage;
    boolean isEmpty         = true;
    boolean canProcessData  = false;
} clientStruct;
clientStruct * socketClients[MAX_SOCKET_CLIENT] EXT_RAM_ATTR;

void serverTask(void* parameter) {
    // Here i allocate all the structs in the array.
    createClientSpaces();
    // Starting the server
    ESP_SERVER hshServer;
    hshServer.setup();
    for ever {
        if( !firmwareIsUpdating ){
            processSocketDataLoop();
            vTaskDelay(1);
        }else{
            vTaskDelay(100);
        }
    }
}

// Allocating memory for client structs
static const inline void createClientSpaces(){
    for( size_t i = 0; i < MAX_SOCKET_CLIENT; i++){
        socketClients[i]                   = new clientStruct;
        socketClients[i]->clientMessage    = (char *) ps_malloc (CLIENT_MSG_SIZE * sizeof (char));
        socketClients[i]->messageIndex     = 0;
        socketClients[i]->clientID         = -1;
        socketClients[i]->canProcessData   = false;
        socketClients[i]->isEmpty          = true;
    }
}

// Remove a client from the struct on disconnect.
static const inline void removeClientFromStruct(uint32_t clientID){
    for (size_t i = 0; i < MAX_SOCKET_CLIENT; i++){
        if( !socketClients[i]->isEmpty && socketClients[i]->clientID == clientID ){
            Serial.printf("Removing client %d from struct\n",clientID);
            socketClients[i]->clientID          = -1;
            socketClients[i]->messageIndex      = 0;
            socketClients[i]->canProcessData    = false;
            socketClients[i]->isEmpty           = true;
            break;
        }
    }
}

// Add a client to the struct if not exists already, return the client index in the array.
static const inline byte addClientIfNotExists(uint32_t clientID){
    int availableIndex = -1;
    boolean foundClient = false;
    for(byte structIndex = 0; structIndex < MAX_SOCKET_CLIENT; structIndex++){
        if( !socketClients[structIndex]->isEmpty ){
            if( socketClients[structIndex]->clientID == clientID ){
                availableIndex  = structIndex;
                foundClient     = true;
                break;
            }
        }else{
            availableIndex = structIndex;
        }
    }

    if( availableIndex != -1 && !foundClient ){
        socketClients[availableIndex]->messageIndex     = 0;
        socketClients[availableIndex]->clientID         = clientID;
        socketClients[availableIndex]->canProcessData   = false;
        socketClients[availableIndex]->isEmpty          = false;
    }
    return availableIndex;
}

// Add the message to the client in the struct with the same ID.
static const inline void addMessageToClientStruct(uint8_t * data, size_t len, uint32_t clientID, int totalMessageLength ){
    byte clientIndex = addClientIfNotExists(clientID);
    if( clientIndex != -1 ){
        Serial.printf("Found client index: %d\n",clientIndex);
        for (size_t i = 0; i < len; i++){
            socketClients[clientIndex]->clientMessage[i] = data[i];
            socketClients[clientIndex]->messageIndex++;
        }

        if( socketClients[clientIndex]->messageIndex >= totalMessageLength ){
            socketClients[clientIndex]->clientMessage[ socketClients[clientIndex]->messageIndex ] = '\0';
            socketClients[clientIndex]->canProcessData = true;
        }
    }
}

// Check if there is unprocessed data in a struct.
byte globalClientIndex = 0;
static const inline void processSocketDataLoop(){
    if( !socketClients[globalClientIndex]->isEmpty && socketClients[globalClientIndex]->canProcessData ){
        SpiRamJsonDocument doc(CLIENT_MSG_SIZE);
        DeserializationError error  =  deserializeJson(doc, socketClients[globalClientIndex]->clientMessage);
        socketClients[globalClientIndex]->canProcessData  = false;
        socketClients[globalClientIndex]->messageIndex    = 0;
        if( !error ){
            JsonObject packet = doc.as<JsonObject>();
            processSocketClientMessage(packet, socketClients[globalClientIndex]->clientID);
        }else{
            Serial.printf("%d Client message deserialization error: %s\n", socketClients[globalClientIndex]->clientID, error.c_str());
        }
    }
    globalClientIndex++;
    if(globalClientIndex >= MAX_SOCKET_CLIENT){globalClientIndex = 0;}
}

// Process the message from the clients one by one.
static const inline void processSocketClientMessage(JsonObject packet,int clientID){
    if( packet["type"] == "examplePacket" ){
      // process packet...
    }
}

// Socket callback
void onWsEvent(AsyncWebSocket* server, AsyncWebSocketClient* client, AwsEventType type, void* arg, uint8_t* data, size_t len) {
    if (type == WS_EVT_CONNECT) {
        latestSocketCLientID = client->id();
        clientID_Sent = false;
    } else if (type == WS_EVT_DISCONNECT) {
        latestSocketCLientID = client->id();
        removeClientFromStruct(client->id());
    }else if (type == WS_EVT_DATA) {
        AwsFrameInfo * info = (AwsFrameInfo*)arg;
        if(info->opcode == WS_TEXT){
            addMessageToClientStruct(data, len, client->id(), info->len);
        }
    }
}
avillacis commented 2 years ago

Oh god. Thanks for your time. You must be absolutely right. I never thought about that. I will implement a proper task sync in these functions. Bu if I implement a semaphore to handle just one task at a time doesn't that mean that one client will successfully connect while the other fails because the semaphore won't let the task to process the next client? If I implement a queue, and start to put the long socket packets to a char array and meanwhile the other client sends it's own packet, the char array will be full of the two client's packets. I need some kind of a ring buffer.

Really appreciate your effort to answer. Thanks again.

What I do normally in this situation is something like this:

  1. Encapsulate all data involving the shared access between tasks in a struct or class definition. This class definition has a mutex handle that is initialized to a valid mutex object on startup.
  2. The network handler (HTTP request, SSE event, websocket event, whatever) needs access to this shared data. So, take the mutex right before the start of the known shared access, do whatever manipulations are necessary, then release the mutex right before the network handler terminates. If using a class instance, the class methods take and release the mutex within the class method call.
  3. The main loop task (or any other task) needs the same shared access for a different step of the processing. So do the same: take mutex, do whatever, release mutex. If all manipulations are done within class methods that take care of taking and releasing the mutex, this becomes easier to manage.

What should not be done, unless you really know what you are doing, is to take the mutex and leave it in the "taken" state while the method terminates. All taking and releasing of mutexes should be balanced and (preferably) done in the same method. This should take care of allowing all clients to be serviced without data races.

avillacis commented 2 years ago

Okay so far it is working with little tweaks. I tried to allocate the structs in the array in PS_RAM but unfortunatelly it does not work for some reason. So i stick with new and allocate all my structs and messages inside the structs on task start and manage the connection and disconnection with an isEmpty flag. I will test this code and get back here.

Here is the code in it's raw form now:

Without knowing the high-level purpose of your project, I cannot be really sure. But, do you actually need to process 150Kb per single client message? And even if so, do you actually need to copy the message to a new buffer, and then enqueue these humongous messages in order to be processed by a different task? Why not just deserialize the message in place right inside the websocket handler (avoiding the copying completely, and therefore the need to ps_malloc the buffer copy) and dispatch whatever the message means right there? If your processing allows this, the entire need of synchronizing with the other task disappears - the data is no longer shared, but handled exclusively by the AsyncTCPSock task that invokes the handler.

Additionally, ArduinoJSON supports zero-copy processing of JSON data. If your source buffer is a uint8_t* and not a const uint8_t*, the ArduinoJSON library can modify the buffer in place to avoid copying strings inside the SpiRamJsonDocument doc(CLIENT_MSG_SIZE) you are reserving. This can also reduce memory requirements considerably. If the JSON document is something like an array of items, all the same size, you could even deserialize chunks at a time, reducing the RAM needed even more. See here for details.

zekageri commented 2 years ago

Thank you for your vise words. The messages unfortunately can contain data that needs a size of like 100 or it can grow much, much larger. All the socket messages are formatted as JSON, coming from a js client. The big JSON doc that iam copying all the time is a user made set of instructions. Like a really simple program that a user can put together for themself. It can grow really large. There are multiple graph data's with unix timestamps and things like that in there. The reason for it to be global is because there are multiple things ( tasks ) can take and put into this buffer to avoid flash writes regularly. Each modification starts and resets a timer of 20 sec to save this user program to the flash to avoid writing on each modification. On the startup this JSON copyed to this exact same buffer to global. Each task is running on the same core, preventing this buffer to be modified at the same time by two task.

So do you think that i will still need a semaphore even if there are multiple buffers defined. For each client. One client can write once at a time to it's own buffer isn't it? But I will look into that more carefully and I will report back.

Again, thanks for clarification and your help. Iam really glad that i know you exists

zekageri commented 2 years ago

One question if you don't mind.

If a task takes a semaphore to modify this big buffer, meanwhile the socket callback wants to modify the same buffer and takes the same semaphore, the RTOS will not give the semaphore to it, until the task is done modify the buffer. So the socket callback will continue the operation after this chunk and leave the new data behind or the callback will wait for the semaphore to be released?

zekageri commented 2 years ago

ahhh shoot. I just put semaphores around the things that other tasks uses and i got this error on page reload agaaainn

/home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:1442 (xQueueGenericReceive)- assert failed!
abort() was called at PC 0x40090431 on core 0

ELF file SHA256: 0000000000000000

Backtrace: 0x4008f488:0x3ffdbd00 0x4008f701:0x3ffdbd20 0x40090431:0x3ffdbd40 0x4017a8b4:0x3ffdbd80 0x4017aa18:0x3ffdbda0 0x4017aabd:0x3ffdbdd0 0x4017b559:0x3ffdbe00 0x4017a6a3:0x3ffdbe60 0x40090706:0x3ffdbeb0
  #0  0x4008f488:0x3ffdbd00 in invoke_abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #1  0x4008f701:0x3ffdbd20 in abort at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/esp32/panic.c:715
  #2  0x40090431:0x3ffdbd40 in xQueueGenericReceive at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/queue.c:2038
  #3  0x4017a8b4:0x3ffdbd80 in AsyncClient::_clearWriteQueue() at 
lib\AsyncTCPSock\src/AsyncTCP.cpp:347
  #4  0x4017aa18:0x3ffdbda0 in AsyncClient::_error(signed char) at lib\AsyncTCPSock\src/AsyncTCP.cpp:347
  #5  0x4017aabd:0x3ffdbdd0 in AsyncClient::_notifyWrittenBuffers(std::deque<AsyncClient::notify_writebuf, std::allocator<AsyncClient::notify_writebuf> >&, int) at lib\AsyncTCPSock\src/AsyncTCP.cpp:347
  #6  0x4017b559:0x3ffdbe00 in AsyncClient::_sockIsWriteable() at 
lib\AsyncTCPSock\src/AsyncTCP.cpp:347
  #7  0x4017a6a3:0x3ffdbe60 in _asynctcpsock_task(void*) at lib\AsyncTCPSock\src/AsyncTCP.cpp:347
  #8  0x40090706:0x3ffdbeb0 in vPortTaskWrapper at /home/runner/work/esp32-arduino-lib-builder/esp32-arduino-lib-builder/esp-idf/components/freertos/port.c:355 (discriminator 1)

Rebooting...

There is no way that it wanted to modify the same variable again. :|