chilts / mongodb-queue

Message queues which uses MongoDB.
209 stars 91 forks source link

for await...of as interface for queue consumer #33

Open ad-m opened 4 years ago

ad-m commented 4 years ago

Hello,

I develop small wrapper to library to support promises. I added also asyncIterator interface and I like it a much in that use case, so I would like share it.

Is there any ongoing works for add support for Promises without wrapping library?

class promiseQueue {
    constructor(...args) {
        this.q = mongoDbQueue(...args);
        for (const name of ['createIndexes', 'add', 'get', 'ping', 'ack', 'clean', 'total', 'size', 'inFlight', 'done']) {
            this[name] = this._promisify(name);
        }
    }
    _promisify(name) {
        const q = this.q;
        return (...args) => new Promise((resolve, reject) => {
            q[name](...args, (err, ...out) => {
                if (err) return reject(err);
                return resolve(...out);
            });
        });
    }
    [Symbol.asyncIterator]() {
        const ctx = this;
        return {
            async next() {
                const task = await ctx.get();
                if (task) {
                    return Promise.resolve({ value: task, done: false });
                }

                return Promise.resolve({ done: true });
            }
        };
    }
}

const main = async () => {
    const client = await mongodb.MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true })
    const db = client.db('test')
    const queue = new promiseQueue(db, 'my-queue');
    await queue.createIndexes();
    await queue.add(Math.random());
    for await (const task of queue) {
        console.log(task);
        await queue.ack(task.ack);
    }
    await client.close();
    return 'Finished';
};