caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Modeling a Queue #573

Open jaidetree opened 7 years ago

jaidetree commented 7 years ago

I'm working on the final part of this build server project. After testing out the workflow it turns out webpack uses a ton of CPU usage & memory during the minification process. So my app architecture needs to be changed so that the webserver just appends to a build queue so that only one build happens at one time.

Is this something that can be easily modeled in Highland? Or does it involve storing an array as my queue then when a build completes using a tap method at the end of a build stream to push the next oldest request from that queue onto the build stream?

let queue = _().series().through(buildProcess).pipe(writableLogStream);

_.of(buildRequest)
  .through(getLastBuild)
  .through(getAssetVersion)
  .through((stream) => {
    queue.write(stream.observe());
    return stream;
  });
jaidetree commented 7 years ago

After continually playing around I was able to get it working... I think:

function createBuildQueue () {
  let stream = _();

  stream.flatMap((state) => {
    debug('incoming build state', state)
    return _.of(state)
      .through(exports.buildProcess)
      .errors(exports.handleErrors(state))
      .consume(exports.formatStatus)
  })
  .pipe(exports.writeToLog());

  return stream;
}

module.exports = createBuildQueue;

Then what adds to it:

let createBuildQueue = require('./createBuildQueue');
let buildQueue = createBuildQueue();

function createBuildStream (stream) {
  return stream.tap(exports.appendToQueue);
}

//////////////////////////////////////////////////////////////////////////////
// HELPER METHODS
//////////////////////////////////////////////////////////////////////////////

function appendToQueue (state) {
  buildQueue.write(state);
}

module.exports = createBuildStream;
module.exports.appendToQueue = appendToQueue;

Is there anything I'm missing or should add to make sure each build request is handled one-at-a-time?

vqvu commented 7 years ago

Yes, that's seems about right. Using a singleton queue and stream#writeto append is how I would do it.

vqvu commented 7 years ago

Closing this. Feel free to reopen if you have other questions.

jaidetree commented 7 years ago

The project launched and for the most part is working as intended. However, now there's some inevitable scope creep in what this server needs to do. In which case there will be a build pipeline and a test pipeline (which will share some steps such as the build process via through functions).

Because of that new additional pipeline, the queue system I have above wont cut it. My instinct inspires me to think that instead of pushing the state model onto the stream I need to push an unconsumed build stream to the build queue and use the .series() method to ensure only one stream can run at a time.

Currently, I'm having a little trouble putting it all together. I think it would go something like this:

function createBuildQueue () {
  return _()
    .series()
    .consume(exports.formatStatus)
    .pipe(exports.writeToLog());
}

let buildQueue = createBuildQueue();

////////////////////////////////////////////////////////////////////////////
// HELPER METHODS
////////////////////////////////////////////////////////////////////////////

function push (stream) {
  buildQueue.write(stream);
}

function formatStatus (err, results, push) {
  if (err) {
    push({
      type: 'error',
      message: err.stack || err,
    });
  }

  push({
    type: 'build',
    message: JSON.stringify(results.toJS()),  
  });
}

function writeToLog () {
  return fs.createWriteStream('/var/log/buildfarm.build.log', { flags: 'a+' })
}

exports = module.exports = {
  buildQueue,
  formatStatus
  push,
  writeToLog,
};

Then what adds to it:

let buildQueue = require('./buildQueue');

function createBuildStream (stream) {
  stream.observe()
    .through(exports.buildProcess)
    .through(exports.deployProcess)
    .through(buildQueue.push);

  // Returns original stream to immediately return a server response
  return stream;
}

module.exports = createBuildStream;
let buildQueue = require('./buildQueue');

function createTestStream (stream) {
  stream.observe()
    .through(exports.buildProcess)
    .through(exports.testProcess)
    .through(buildQueue.push);

  // Returns original stream to immediately return a server response
  return stream;
}

module.exports = createTestStream;

But I'm uncertain of the following:

  1. In both the createBuildStream and createTestStream functions, will those stream.observe() methods be consumed?

  2. Is the way I'm using .through(buildQueue.push) going to work like that? I'm not returning a stream but since it's the end of the chain I thought maybe it makes sense?

I'll be running & testing this myself but hopefully another set of eyes can catch any glaring flaws I'm not seeing.

jaidetree commented 7 years ago

@vqvu Hmm I'm not sure I have the capabilities of reopening this ticket.

vqvu commented 7 years ago

Sorry about that. I didn't realize Github doesn't let you reopen issues that someone else closed. That's pretty silly.

To answer your questions

In both the createBuildStream and createTestStream functions, will those stream.observe() methods be consumed?

Yes, they can be consumed as long as the original stream is fully consumed somewhere.

That is, you can't short circuit the stream if you expect observe to work as expected. For example, this code

const stream = _([1, 2, 3, 4, 5]);

stream.observe()
  .each(x => console.log('observer', x));

stream.take(1)
  .each(x => console.log('main stream', x));

will print

observer 1
main stream 1

but not observer 2 or any beyond. That is because take will simply stop consuming after the first element. You had this problem with your old tap solution too, so if it worked before, then it should work now. This is just something to keep in mind.

Is the way I'm using .through(buildQueue.push) going to work like that? I'm not returning a stream but since it's the end of the chain I thought maybe it makes sense?

Yes, that's fine, since its the end of a chain. through doesn't require you to return a value. You can argue that since buildQueue.push isn't actually a step of the pipeline (it's not consuming the stream), that something like this is more explicit about what you're doing.

buildQueue.push(
    stream
      .observe()
      .through(exports.buildProcess)
      .through(exports.testProcess));

It's up to you what style you want.

BTW, there's a slight bug in the new code that you posted. Like before, you want to return the original stream from createBuildQueue and not the result of the pipeline. Like this.

function createBuildQueue () {
  const stream = _();
  stream
    .series()
    .consume(exports.formatStatus)
    .pipe(exports.writeToLog());
  return stream;
}
jaidetree commented 7 years ago

I agree that is odd behavior on github's part. Perhaps it was intended to be used more of a bug report workflow where a bug is reported, a fix is made and throughly tested before it's merged into a release then closing the issue means it's ready to be forgotten about? But who knows I suppose.

Anyway, thanks for the feedback! The streams should be fully consumed in both the original and the new example for the outer logic is something along the lines of:

function handleResponse (req, res) { 
  return _.of(req.body)
    .through(createBuildState)
    .through(createBuildStream)
    .through(formatJSONResponse)
    .pipe(res);
}

So I believe every input does get consumed and packaged nicely as a JSON string response from the server.

For some reason I have a tendency to steer away from using complex logic as arguments to other functions like in your suggestion for pushing to the queue. Since I've been learning clojure simultaneously to better my understanding of functional programming, it has become less bothersome since that seems to be the preferred, primary syntax convention of languages like clojure & other lisp languages. Plus using .through through that way, especially looking at it a day or so after, feels fairly indirect and like I'm abusing the through() method.

Ack! You're right about the createBuildQueue function, that wouldn't have worked as intended plus I have made that error while implementing the first one. I should have caught that.

Thanks for the helpful response once again.