nevill / zongji

A mysql binlog listener running on Node.js.
Other
375 stars 118 forks source link

back pressure #118

Open stephen-dahl opened 4 years ago

stephen-dahl commented 4 years ago

zongji is overwhelming downstream processes with to much data to fast. I need a way to slow it down until my processor is ready for more. Can this implement a streams interface with back pressure?

8secz-johndpope commented 4 years ago

I used zmq to dispatch events to separate node worker processes. Pseudo code below.

// BROKER.js // ZMQ socket - very simple .+ robust

in your trigger - farm out work to worker processes

const ZMQ_ADDRESS = 28332;

const updateTrigger = {
  name: 'MonitorUpdate',
  expression: '*',
  statement: MySQLEvents.STATEMENTS.UPDATE,
  onEvent: async (event) => { // You will receive the events here
    // console.log(event);
    // console.log(event.affectedRows);
    let row = event.affectedRows[0];
    let xid = row.after.id;
    let table = event.table;
    let data = {"event":event,"xid":xid}

    if (event.table == 'VideoLike' || event.table == 'VideoComment' || event.table == 'Challenge' || event.table == 'Competition'){
      dispatchWork(data).catch(err => {
        if(err){
          console.log(err);
          console.log("Attempt to heal zmq");
          rebindZmq();
        }
      });
    }
  },
}

async function bindZmqPush() {
  console.log("bind....");
  try{
    await sock.bind(ZMQ_ADDRESS);
  } catch(e) {
   console.error("bindZmqPush error: "+e);
    return false;
  }
}
async function rebindZmq() {
  console.log("attempting to unbind....")
  try{
    await sock.unbind(ZMQ_ADDRESS);
    await sock.bind(ZMQ_ADDRESS);
  } catch(e) {
   console.error("rebindZmq error: "+e);
    return false;
  }

}

async function dispatchWork(dataObj) {
  await sock.send(JSON.stringify(dataObj));
}

// ZMQ-Worker.js
const zmq = require("zeromq")
async function run() {

  //https://stackoverflow.com/questions/17814436/difference-between-pub-sub-and-push-pull-pattern-in-zeromq 
  const sock = new zmq.Pull

  sock.connect(ZMQ_ADDRESS)
  console.log("Firebase Worker connected to port 28332")

  for await (const [msg] of sock) {
    console.log("work: %s", msg.toString());
    let mgsInString = msg.toString();
    let object = JSON.parse(mgsInString);
    let event = object.event;
    let xid = object.xid; //  id - 101
    console.log("event:", event);
    console.log("xid:", xid);
   }

}

run().catch(e => console.error(e.stack));
stephen-dahl commented 4 years ago

that is interesting, but it just moves the problem to the zeromq send buffer.

8secz-johndpope commented 4 years ago

some ideas up my sleeve / yet to explore.

Will getting the code working on more cores / more machines help? Do you want to avoid dropping messages? or do you need certainty everything has been processed?

What number of transactions per second are you seeing where things go pear shape?

I have in backlog / plan to get the code to support an active/active simultaneous connections - round robin style. No single source of failure. ie. have one machine process odd event/sql ids / event ids and then the other machine do the other. zmq does this out of the box -

The underlying design of this is using replication - so when the core triggers start banking up - we would know via SECONDS_BEHIND_MASTER , right? Could this be used to help back off?

SECONDS_BEHIND_MASTER=$MYSQLPATH -e "SHOW SLAVE STATUS\G" | grep "Seconds_Behind_Master" | awk -F ": " {'print $2'} https://github.com/mhamet2/replicamonit/blob/master/brokenreplica.sh

(YET TO TRY THIS) Make the work more akin to reading a play back tape - why? it's going to guaranteed consistency - as everything is stored - everything is accounted for / down side is it's not real time.

  const instance = new MySQLEvents(connection, {
    startAtEnd: true,
    excludedSchemas: {
      startAtEnd: true, // WHAT IF WE SET TO FALSE AND PLAY ENTIRE TRANSACTION REPLAY???
      mysql: true,
    },
  });

Here we could take time - mark the events / transactions that we've processed / then on subsequent starts - skip all those.

ZMQ has a few more options to throw and introduces concept of high water mark - is that a relevant concept here ? reflecting a spike in traffic / conditions you want to handle?

https://stackoverflow.com/questions/41941702/one-zeromq-socket-per-thread-or-per-call

ZMQ high water mark / will start dropping inbound messages (I get that this maybe unacceptable for your use case) http://api.zeromq.org/4-2:zmq-setsockopt

UPDATE

did some more digging and understanding of backpressure / streams - seems like @bllevy solved this problem partially ? with nodejs / streams for mysql -> google big query https://github.com/bllevy/mysql_to_gbq

https://github.com/bllevy/mysql_to_gbq/blob/master/config.js#L86 https://github.com/bllevy/mysql_to_gbq/blob/master/mysql_to_bq.js#L362

Found these nodejs libraries / perhaps can use something off the shelf. https://npm.io/search/keyword:backpressure @nevill - did you have any thoughts?

UPDATE @nicolaspearson - seems to have approached some similar code - and is using Zonji connector + aws kinesis. Did you encounter any backpressure issues nicolaspearson? Did introducing Kinesis into the mix solve them? https://github.com/nicolaspearson/node.kinesis/blob/c4f1463b8a1a5587b2d1c3011497ba9465a61d9a/src/app/NodeKinesis.ts#L68

https://aws.amazon.com/kinesis/data-streams/faqs/

UPDATE - this library has a pause and resume (and stop) @stephen-dahl - I'm looking to use this in conjunction with last processed id to on startup. https://openbase.io/js/@rodrigogs/mysql-events/documentation

UPDATE - I got the resume functionality working using dynamodb ( I just save the last processed row id once I send it off for processing and set the playback to start of database / drop everything less than that) - I abandoned zmq for aws sqs (because of high availability across boxes / sqs simplified a heap of code / no sockets needed.) sqs dispatches work to any computer instance that's ready for work / this can be elastically scaled / there's also a check for health from elb / elastic load balancer - and it can offer fail over active / passive (as opposed to being a single point of failure)