open-wa / wa-automate-nodejs

💬 🤖 The most reliable tool for chatbots with advanced features. Be sure to 🌟 this repository for updates!
https://docs.openwa.dev/
Other
3.11k stars 588 forks source link

Queue #1592

Closed edneijunior closed 3 years ago

edneijunior commented 3 years ago

Describe the bug it is not a bug, but a cry for help!

create() code

//w2api - Version 0.0.2
Array.prototype.find = function (...args)
{
    let index = this.findIndex(...args);
    if (index >= 0)
        return index >= 0 ? this[index] : void 0;
};

global.openWA = require('@open-wa/wa-automate');
const fs = require('fs');
const async = require("async");
const request = require('request');
const moment = require('moment');
const mime = require('mime-types');
const PQueue  = require("p-queue");
const queue = new PQueue({ concurrency: 2, timeout: 1000 });
global.uaOverride = 'WhatsApp/2.16.352 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Safari/605.1.15';

global.WA_CLIENT = {};

/*
 * Function to read files as base64 string
 */
function base64Encode(file) {
    var body = fs.readFileSync(file);
    return body.toString('base64');
}
;

/*
 * WhatsApp API SUPER CLASS
 * Personal regards to:
 * Mohhamed Shah (openWA) - 
 * Peter Sírka (TotalJS) - 
 * This library was built using openWA and pieces of 
 */
function WHATS_API(USER_ID) {
    console.log("\n====================================================");
    console.log("@@Creating WhatsApp connection for: " + USER_ID);
    console.log("====================================================\n");
    this.QR_CODE = "";
    this.WEBHOOK = "";
    this.TOKEN = "";
    this.INSTANCE = USER_ID;
    this.CONNECTION = {};
}
;

/*
 * Sanitizing the type of ack response i want on webhook POST request
 * you can edit this method but pay attention to documentation.
 * ACK EVENTS:
 * 1 - send 
 * 2 - delivered
 * 3 - viewed
 * 4 - listened
 */

var SANITIZE_ACK = function (instanceID, data) {
    return JSON.stringify({
        ack: [{
                id: data.id.toString(),
                chatId: data.id,
                status: data.ack,
                body: data.body
            }],
        instanceId: instanceID
    });
};

/*
 * Sanitizing the type of message response i want on webhook POST request
 * you can edit this method but pay attention to documentation.
 */
var SANITIZE_MSG = function (instanceID, data) {
    return JSON.stringify({
        messages: [{
                id: data.id,
                body: data.body,
                filelink: (data.filelink ? data.filelink : null),
                fromMe: data.fromMe,
                self: data.self,
                isForwarded: data.isForwarded,
                author: data.from,
                time: data.t,
                lat: data.lat,
                lng: data.lng,
                ack: data.ack,
                chatId: data.chat.id,
                type: data.type,
                senderName: data.sender.pushname,
                caption: (data.caption ? data.caption : null),
                quotedMsgBody: (data.quotedMsgObj ? data.quotedMsgObj : null),
                chatName: data.sender.formattedName,
            }],
        instanceId: instanceID
    });
};

/*
 * Creating an prototype of messages to send information and control flow over webhook
 * you can edit this method but pay attention to documentation.
 */
WHATS_API.prototype.PROCESS_MESSAGE = function (data) {
    var that = this;
    var SANITIZED = SANITIZE_MSG(that.INSTANCE, data);
    request({
        method: 'POST',
        url: that.WEBHOOK,
        headers: {'Content-Type': 'application/json'},
        body: SANITIZED
    }, function (err, response, body) {
        if (err) {
            ERROR_CATCHER(err);
        } else {
            if (response.statusCode != 200) {
                ERROR_CATCHER("Status Code error: " + response.statusCode, response);
            } else {
                console.log(SANITIZED);
            }
        }
    });
};

/*
 * Creating an prototype of ack events to send information and control flow over webhook
 * you can edit this method but pay attention to documentation.
 */
WHATS_API.prototype.PROCESS_ACK = function (data) {
    var that = this;
    var SANITIZED = SANITIZE_ACK(that.INSTANCE, data);
    request({
        method: 'POST',
        url: that.WEBHOOK,
        headers: {'Content-Type': 'application/json'},
        body: SANITIZED
    }, function (err, response, body) {
        if (err) {
            ERROR_CATCHER(err);
        } else {
            if (response.statusCode != 200) {
                ERROR_CATCHER("Status Code WRONG: " + response.statusCode, response);
            } else {
                console.log(SANITIZED);
            }
        }
    });
};

/*
 * to-do - Creating webhook events to inform when something goes wrong with API
 * if you have any knowleadge about it - help me to improve
 */
WHATS_API.prototype.PROCESS_STATE = function (data) {
    console.log("[STATE CHANGED] -", data);
};

/*
 * Prototype configuration for setup events incoming from openWA module
 * keep your hands away from this
 */

    WHATS_API.prototype.SETUP = function (CLIENT, WEBHOOK_INPUT, TOKEN_INPUT) {
    var that = this;
    that.WEBHOOK = WEBHOOK_INPUT;
    that.TOKEN = TOKEN_INPUT;
    that.CONNECTION = CLIENT;

    CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    });
    CLIENT.onAck(ack => {
        that.PROCESS_ACK(ack);
    });
    CLIENT.onStateChanged(state => {
        that.PROCESS_STATE(state);
    });
};
WHATS_API.prototype.SET_QRCODE = function (code) {
    var that = this;
    if (qrCodeManager) {
        qrCodeManager.send({qr: code});
    }
    ;
    that.QR_CODE = code;
};

module.exports = WHATS_API;

ON('ready', function () {

    /*
     * Creating Connection for WhatsApp and expose conection to WA_CLIENT global var
     * Pay attention to instance id configured on /config file
     */
    WA_CLIENT = new WHATS_API(F.config['instance']);

    /*
     * Declare event getter for when qrcode is available from openWA-api
     */
    openWA.ev.on('qr.**', function (qrcode, sessionId) {
        //SETTING QRCODE AVAILABLE ON address/qrCode
        WA_CLIENT.SET_QRCODE(qrcode);
    });

    /*
     * Finally creating connection and start headless webBrowser
     * Attention to headless param
     */
    openWA.create({
        sessionId: "/whatsSessions/" + F.config['instance'],
        useChrome: false,
        headless: true,
        throwErrorOnTosBlock: true,
        qrTimeout: 0, //set to 0 to wait forever for a qr scan
        authTimeout: 0, //set to 0 to wait forever for connection to phone
        autoRefresh: true, //default to true
        safeMode: false,
        disableSpins: true

    }).then(function (client) {
         if (qrCodeManager) {
            qrCodeManager.send({connected: true});
        }
        WA_CLIENT.SETUP(client, F.config['webhook'], F.config['token']);
    });

});

Expected behavior an implemented queue

smashah commented 3 years ago

hey, @edneijunior I got your email and was working on a long term solution today.

The solution I came up with was to implement an optional queue on listener (onMessage, onAnyMessage, onAck, etc.) callbacks (the function you provide as the main parameter of these functions). This way, you can control your outflow of messages implicitly by controlling the inflow.

Here is an example of the upcoming changes:

image

In the above example, 1 message will be processed every 10 seconds via onAnyMessage whereas onMessage will remain 'real-time'. This is achieved simply by providing PQueue.options as the second parameter when setting these listeners.

Here is the corresponding console output from the above example:

image

This p-queue implementation will be unopinionated, meaning you will HAVE TO provide the p-queue options object as the second parameter in order to use the p-queue at all. This is because those options will be subjective to the desired behaviour of your automation and your desired throughput.

Hopefully, this will satisfy your needs for a p-queue. Please let me know if there are any other considerations.

Thanks

smashah commented 3 years ago

@github-actions run

⚡ Release! ⚡ ```js (async () => { function exec(cmd) { console.log(execSync(cmd).toString()); } // Config const gitUserEmail = "github-actions[bot]@users.noreply.github.com"; const gitUserName = "github-actions[bot]"; exec(`echo "//registry.npmjs.org/:_authToken=$NPM_TOKEN" > .npmrc`); exec(`git config --global user.email "${gitUserEmail}"`); exec(`git config --global user.name "${gitUserName}"`); exec(`npm i -D`); exec(`npm run release-ci minor`); //comment on the issue var result = execSync(`npx auto-changelog -o ./tempchangelog.txt --commit-limit false --template ./compact-keepachangelog.hbs --stdout`).toString(); await postComment(result); //create changelog image exec(`npm run release-image`); exec(`git commit -a -m 'updated release-image'`); exec(`git push --force`); })(); ```
smashah commented 3 years ago

Changelog

🚀 Release 3.11.0 (2021-04-17)

smashah commented 3 years ago

@edneijunior if you share your desired pqueue behaviour I can reply with the code that will work in the latest version of the library

lesmo commented 3 years ago

... what about outgoing messages when they're not sent in response to incoming messages? 🤔

smashah commented 3 years ago

@lesmo for those you can implement a PQueue manually

edneijunior commented 3 years ago

Bro, you are awesome. Thank you very much for so much consideration in your response.

edneijunior commented 3 years ago

Sem título

WA Version 3.11.1

CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    },{
        interval:10000,
        concurrency: 1,
        intervalCap:1
    });
edneijunior commented 3 years ago

@edneijunior if you share your desired pqueue behaviour I can reply with the code that will work in the latest version of the library

I would like to send messages following the best practices. I want to send messages without overloading the session.

smashah commented 3 years ago

Sem título

WA Version 3.11.1

CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    },{
        interval:10000,
        concurrency: 1,
        intervalCap:1
    });

can you explain what's going on here and what is your question?

Regarding setting up to follow best practices, you should implement a setup that works for your use case. If you're not getting that many incoming messages then maybe you don't need to implement a queue. Based on community discussions, people get away with high message throughput without any issues.

The example provided is a bit extreme (1 message every 10 seconds) so I'll give you a starting point that should be safe but also not cause a huge backlog of queued messages:

  1. Every 5 seconds, process at most 10 messages, process 2 messages at a time.
{
interval: 5000,
intervalCap: 10,
concurrency: 2,
carryoverConcurrencyCount: true //<=== important to set this to true so you don't miss messages!
}
  1. Every 2 seconds, process at most 5 messages, process one message at a time.

    {
    interval: 2000,
    intervalCap: 5,
    concurrency: 1,
    carryoverConcurrencyCount: true
    }
  2. Every second, process at most 1 message, (concurrency at 1 is redundant, but still good practice to add it)

    {
    interval: 1000,
    intervalCap: 1,
    concurrency: 1,
    carryoverConcurrencyCount: true
    }

if you have any more discussion around p-queue, please join the discord (click the badge in the readme) and ask in the #pq channel

thanks