haraka / Haraka

A fast, highly extensible, and event driven SMTP server
https://haraka.github.io
MIT License
5.09k stars 662 forks source link

Error: Callback was already called. #1993

Closed sstevan closed 7 years ago

sstevan commented 7 years ago

Hello,

I've been testing multiple mail servers and Haraka has the best performance so far. Although the whole project concept is great I'm having issues with the Outbound plugin due to my lack of knowledge of NodeJS.

Haraka delivery API

I'm trying to bind route send-raw where I can post raw message content to be delivered. According to code below, I've implemented sender.js plugin (file attached) but I could not get outnext defined since there's no next() present in this scope. What should be changed to get the code below to work?

var outbound = require('./outbound');

var plugin = this;

var to = 'user@example.com';
var from = 'sender@example.com';

var contents = [
    "From: " + from,
    "To: " + to,
    "MIME-Version: 1.0",
    "Content-type: text/plain; charset=us-ascii",
    "Subject: Some subject here",
    "",
    "Some email body here",
    ""].join("\n");

var outnext = function (code, msg) {
    switch (code) {
        case DENY:  plugin.logerror("Sending mail failed: " + msg);
                    break;
        case OK:    plugin.loginfo("mail sent");
                    next();
                    break;
        default:    plugin.logerror("Unrecognized return code from sending email: " + msg);
                    next();
    }
};

outbound.send_email(from, to, contents, outnext);

VMTA plugin

I've also tried to enable VTMA plugin, and I can see that each IP/host combination is loaded but when I pass x-vmta: header in raw message (contents) - VMTA is not switched and header is not removed. I guess I should use send_trans_email() instead? I've managed to get at least IP rotation using sample code found here:

var outbound = require('./outbound');

var i = 0;
var ips = ['111.111.111.111', '222.222.222.222'];

exports.hook_get_mx = function (next, hmail, domain) {
   outbound.lookup_mx(domain, function (err, mxs) {
       if (err) return next(DENY, err);
       mxs.forEach(function (mx) {
           mx.bind = ips[i];
           i++;
           if (i == ips.length) i = 0;
       });
       next(OK, mxs);
   })

But I'd really like to utilize VMTA plugin.

Callback was already called

However, I've managed to setup API (pretty sure not the best solution) to test 50K mail delivery. I'm writing stats to file (sender.js) on delivered, deferred and bounce hooks (still fighting to set up graph plugin) and it goes pretty well. Actually too good. I have only 3 IPs and still not figured out how to set sending rate per IP (10K messages get delivered in 5mins and then start deferring).

At some point, I get this error and Haraka gets shut down. After re-run, it happens even sooner. Is this somehow related to my "custom" made logging - or poorly implemented hooks?

[CRIT] [-] [core] Error: Callback was already called.
[CRIT] [-] [core]     at /usr/local/lib/node_modules/Haraka/node_modules/async/dist/async.js:840:32
[CRIT] [-] [core]     at HMailItem.hmail.next_cb (/usr/local/lib/node_modules/Haraka/outbound.js:109:9)
[CRIT] [-] [core]     at /usr/local/lib/node_modules/Haraka/outbound.js:2236:15
[CRIT] [-] [core]     at FSReqWrap.oncomplete (fs.js:135:15)

Misc

I've seen multiple questions regarding IP rates and pools but I could not find any documentation for this. Is there any way to define IP pool and set sending rates or weighed distribution?

I hope you can give me guidance and point me to "proper" way of doing this. I'm really impressed with ease of setup and I plan to use Haraka in combination with RocketMQ (I'll try to make it look like RabittMQ plugin available).

Edit: Is there any way to start processing messages from queue upon Haraka restart?

Haraka

Stack Version
Haraka Haraka.js — Version: 2.8.13
Node v8.0.0
OS Ubuntu 16.04
openssl OpenSSL 1.0.2g 1 Mar 2016

Sender Plugin

const fs = require("fs");
const util = require("util");
const md5  = require("md5");

let ID;

function sendRawEmail(from, to, contents) {
  const outbound = require("./outbound");
  const plugin = this;

  ID =  md5(from + new Date().getTime());
  outbound.send_email(from, to, contents);

  return {
    id: ID,
    from,
    to,
    message: `Message queued as ${ID}`
  }
}

function sendMail(sendTo) {
  const outbound = require('./outbound');
  const plugin = this;

  const from = 'sender@test.com';

  const contents = [
    "From: " + from,
    "To: " + sendTo,
    "MIME-Version: 1.0",
    "Content-type: text/plain; charset=us-ascii",
    "Subject: Test message with VMTA!",
    "",
    "Some email body here",
    ""].join("\n");

  outbound.send_email(from, sendTo, contents);

  const id = md5(from + new Date().getTime());
  return { id, from, to: sendTo, message: `Message queued as ${id}` };
}

function registerPlugin() {
  const plugin = this;
  plugin.register_hook("delivered", "delivered");
  plugin.register_hook("bounce", "bounced");
  plugin.register_hook("deferred", "deferred");
}

function setupServer(next, Server) {

  const { app } = Server.http;
  const bodyParser = require("body-parser");
  app.use(bodyParser.json());

  const urlEncoded = bodyParser.urlencoded({ extended: true });

  app.get("/send/:email", (req, res) => {
    const { email } = req.params;
    res.json(sendMail(email));
  });

  app.post("/send-raw", urlEncoded, (req, res) => {
    const { from, to, contents } = req.body;
    res.json(sendRawEmail(from, to, contents));
  });

}

function writeStats(filename, next, hmail) {
  let content = new Date().toISOString().slice(0, 19).replace('T', ' ') + ",";
  content += hmail.todo.queue_time + ",";
  content +=  hmail.todo.rcpt_to[0].original +  "\n";

  this.loginfo(content);

  fs.appendFile("stats/" + filename, content, (err) => {
    if(!err) { this.loginfo("Added to " + filename) }
  });
}

function delivered(next, hmail, params) {
  this.loginfo("Message delivered!");
  writeStats.call(this, "delivered.csv", next, hmail);
  next();
}

function bounced(next, hmail, error) {
  this.loginfo("Message bounced!");
  writeStats.call(this, "bounced.csv", next, hmail);
  next();
}

function deferred(next, hmail, msg) {
  this.loginfo("Message deferred!");
  writeStats.call(this, "deffered.csv", next, hmail);
  next();
}

exports.register = registerPlugin;
exports.hook_init_http = setupServer;
exports.delivered = delivered;
exports.bounce = bounced;
exports.deferred = deferred;
baudehlo commented 7 years ago

Can you split this into multiple tickets for us? Thanks.

On Jul 4, 2017, at 5:19 AM, Stevan Šandi notifications@github.com wrote:

Hello,

I've been testing multiple mail servers and Haraka has the best performance so far. Although the whole project concept is great I'm having issues with the Outbound plugin due to my lack of knowledge of NodeJS.

Haraka delivery API

I'm trying to bind route send-raw where I can post raw message content to be delivered. According to code below, I've implemented sender.js plugin (file attached) but I could not get outnext defined since there's no next() present in this scope. What should be changed to get the code below to work?

var outbound = require('./outbound');

var plugin = this;

var to = 'user@example.com'; var from = 'sender@example.com';

var contents = [ "From: " + from, "To: " + to, "MIME-Version: 1.0", "Content-type: text/plain; charset=us-ascii", "Subject: Some subject here", "", "Some email body here", ""].join("\n");

var outnext = function (code, msg) { switch (code) { case DENY: plugin.logerror("Sending mail failed: " + msg); break; case OK: plugin.loginfo("mail sent"); next(); break; default: plugin.logerror("Unrecognized return code from sending email: " + msg); next(); } };

outbound.send_email(from, to, contents, outnext); VMTA plugin

I've also tried to enable VTMA plugin, and I can see that each IP/host combination is loaded but when I pass x-vmta: header in raw message (contents) - VMTA is not switched and header is not removed. I guess I should use send_trans_email() instead? I've managed to get at least IP rotation using sample code found here:

var outbound = require('./outbound');

var i = 0; var ips = ['111.111.111.111', '222.222.222.222'];

exports.hook_get_mx = function (next, hmail, domain) { outbound.lookup_mx(domain, function (err, mxs) { if (err) return next(DENY, err); mxs.forEach(function (mx) { mx.bind = ips[i]; i++; if (i == ips.length) i = 0; }); next(OK, mxs); }) But I'd really like to utilize VMTA plugin.

Callback was already called

However, I've managed to setup API (pretty sure not the best solution) to test 50K mail delivery. I'm writing stats to file (sender.js) on delivered, deferred and bounce hooks (still fighting to set up graph plugin) and it goes pretty well. Actually too good. I have only 3 IPs and still not figured out how to set sending rate per IP (10K messages get delivered in 5mins and then start deferring).

At some point, I get this error and Haraka gets shut down. After re-run, it happens even sooner. Is this somehow related to my "custom" made logging - or poorly implemented hooks?

[CRIT] [-] [core] Error: Callback was already called. [CRIT] [-] [core] at /usr/local/lib/node_modules/Haraka/node_modules/async/dist/async.js:840:32 [CRIT] [-] [core] at HMailItem.hmail.next_cb (/usr/local/lib/node_modules/Haraka/outbound.js:109:9) [CRIT] [-] [core] at /usr/local/lib/node_modules/Haraka/outbound.js:2236:15 [CRIT] [-] [core] at FSReqWrap.oncomplete (fs.js:135:15) Misc

I've seen multiple questions regarding IP rates and pools but I could not find any documentation for this. Is there any way to define IP pool and set sending rates or weighed distribution?

I hope you can give me guidance and point me to "proper" way of doing this. I'm really impressed with ease of setup and I plan to use Haraka in combination with RocketMQ (I'll try to make it look like RabittMQ plugin available).

Haraka

Stack Version Haraka Haraka.js — Version: 2.8.13 Node v8.0.0 OS Ubuntu 16.04 openssl OpenSSL 1.0.2g 1 Mar 2016 Sender Plugin

const fs = require("fs"); const util = require("util"); const md5 = require("md5");

let ID;

function sendRawEmail(from, to, contents) { const outbound = require("./outbound"); const plugin = this;

ID = md5(from + new Date().getTime()); outbound.send_email(from, to, contents);

return { id: ID, from, to, message: Message queued as ${ID} } }

function sendMail(sendTo) { const outbound = require('./outbound'); const plugin = this;

const from = 'sender@test.com';

const contents = [ "From: " + from, "To: " + sendTo, "MIME-Version: 1.0", "Content-type: text/plain; charset=us-ascii", "Subject: Test message with VMTA!", "", "Some email body here", ""].join("\n");

outbound.send_email(from, sendTo, contents);

const id = md5(from + new Date().getTime()); return { id, from, to: sendTo, message: Message queued as ${id} }; }

function registerPlugin() { const plugin = this; plugin.register_hook("delivered", "delivered"); plugin.register_hook("bounce", "bounced"); plugin.register_hook("deferred", "deferred"); }

function setupServer(next, Server) {

const { app } = Server.http; const bodyParser = require("body-parser"); app.use(bodyParser.json());

const urlEncoded = bodyParser.urlencoded({ extended: true });

app.get("/send/:email", (req, res) => { const { email } = req.params; res.json(sendMail(email)); });

app.post("/send-raw", urlEncoded, (req, res) => { const { from, to, contents } = req.body; res.json(sendRawEmail(from, to, contents)); });

}

function writeStats(filename, next, hmail) { let content = new Date().toISOString().slice(0, 19).replace('T', ' ') + ","; content += hmail.todo.queue_time + ","; content += hmail.todo.rcpt_to[0].original + "\n";

this.loginfo(content);

fs.appendFile("stats/" + filename, content, (err) => { if(!err) { this.loginfo("Added to " + filename) } }); }

function delivered(next, hmail, params) { this.loginfo("Message delivered!"); writeStats.call(this, "delivered.csv", next, hmail); next(); }

function bounced(next, hmail, error) { this.loginfo("Message bounced!"); writeStats.call(this, "bounced.csv", next, hmail); next(); }

function deferred(next, hmail, msg) { this.loginfo("Message deferred!"); writeStats.call(this, "deffered.csv", next, hmail); next(); }

exports.register = registerPlugin; exports.hook_init_http = setupServer; exports.delivered = delivered; exports.bounce = bounced; exports.deferred = deferred; — You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or mute the thread.

sstevan commented 7 years ago

Sorry about the long post. I'll split this into multiple issues and I'll ask this general questions on IRC channel as suggested.