mostjs / core

Most.js core event stream
http://mostcore.rtfd.io
MIT License
403 stars 36 forks source link

Most and Stomp JS? #283

Closed scoobyshi closed 5 years ago

scoobyshi commented 5 years ago

I'm hoping to leverage Most to handle updates via Stomp over WS, but am struggling a little with the new Most (Core/v2?) API. Since the Stomp JS client uses a "subscribe" and an associated callback to receive updates, I don't believe I can use an existing helper like "domEvent" as I did with Socket IO (which worked great). I believe I need to write my own "Stream".

I did stumble across this: https://github.com/cujojs/most/issues/34, which then mentioned the "w3msg" project, but that appears to be abandoned/deprecated. I also came across this: https://github.com/cujojs/most/issues/41, however, he was using "most.create" which I understand is not recommended, due to compatibility with Core, from various other threads.

The API docs describe what a Stream is, but no examples of one explaining the anatomy of it. I did find some good threads that gave me some inkling.

So, given this currently:

const client = webstomp.client(url);
client.connect(user, token, () => {
  client.subscribe(`/queue/$[user}`, (message) => {
    console.log('Recieved Message:', JSON.parse(message.body));
  }
}

I believe then that I need to do something like this if I want to utilize Most (this doesn't actually work, I believe I'm missing something with respect to the callback)?

const client = webstomp.client(url);

function stream(topic, f) {
  return new Stream(topic, f);
}

class Stream {
  constructor (topic, method) {
    this.topic = topic;
    this.method = method;
  }

  run (sink, scheduler) {
    return asap(propagateTask(task, this.method(this.topic), sink), scheduler);
  }
}

function task (time, value, sink) {
  sink.event(time, value);
}

client.connect(user, token, () => {
  const updateStream = stream(`/queue/$[user}`, client.subscribe);
  const renderMessage = message => console.log('Recieved Message:', JSON.parse(message.body));
  runEffects(tap(renderMessage, updateStream), newDefaultScheduler());
}
briancavalier commented 5 years ago

Hi @scoobyshi. Thanks for sharing example code. I think implementing the stream interface is a good way to go here. I'm not intimately familiar with the webstomp client API, but the code seems on the right track.

I'd like to offer a couple suggestions. First, I'll show how I might write the class, and then comment on a few of the differences from the example code. I may not get some of the webstomp API particulars right, but hopefully this will still be helpful.

// other imports omitted for brevity
import { currentTime } from `@most/scheduler`

class Stream {
  constructor (topic, client) {
    this.topic = topic
    this.client = client
  }

  run (sink, scheduler) {
    const subscription = this.client.subscribe(
      this.topic,
      message => sink.event(currentTime(scheduler), message)
    )

    return {
      dispose: () => subscription.unsubscribe()
    }
  }
}

client.connect(user, token, () => {
  const updateStream = new Stream(`/queue/$[user}`, client);
  const renderMessage = message => console.log('Recieved Message:', JSON.parse(message.body));
  runEffects(tap(renderMessage, updateStream), newDefaultScheduler());
}

A few things of interest:

  1. The stream holds the webstomp client. It seems likely that webstomp wants its client's methods to be called as, well, methods on the client :). The original example code detaches the subscribe property from the client and eventually calls it as a method on a Stream. That changes subscribe's this, and subscribe's implementation does indeed depend on having a webstomp Client as its this arg.
  2. run returns a disposable that wraps the subscription returned from client.subscribe, allowing proper automatic unsubscription within the @most/core architecture. This particular disposable is ad-hoc, i.e. an object literal that implements the Disposable interface. If you decide to use Typescript or Flow, this would still be valid, and checked by those type systems for you. You could also use @most/disposable's disposeWith, or write a class that implements the Disposable interface. Personally, I prefer ad-hoc disposables for simple cases like this.
  3. run only uses the scheduler to timestamp events it sends into the sink. Stomp messages are already async, and will already uphold @most/core's always async requirement. So there's no need for additional scheduling complexity.

I hope this helps. Cheers!

scoobyshi commented 5 years ago

Thanks very much Brian, this does really help. I was definitely stuck on the paradigm and vernacular of this lib, but looking at it now makes perfect sense, "a picture is worth a thousand words" I guess.

There are some great examples in the repo for dom events and promises, but I couldn't find any like this for custom streams. I've already applied this example to another one I was stuck with (related to background geolocation tracking) and it worked great.

I have a variety of examples now I've garnered from myriad issues in the cujojs/most and mostjs/core repos, that aren't in the docs, not sure it will be any use to anyone else, but they've helped me begin to wrap my head around the use of this library (in combination with your notes/explanations). Thanks again!

briancavalier commented 5 years ago

Great, I'm glad that was helpful, @scoobyshi.