Closed skylize closed 7 years ago
hello @skylize thanks for the question,
I found one very basic example on StackOverflow
this seems to be pointing to a link about firebase, so maybe you meant to link to a different one?
but I can't seem to locate the create method (maybe it doesn't exist anymore?)
the create
method you are looking for is here https://github.com/mostjs/create
however based on the code you display below it seems like you want the most-subject
package
https://github.com/mostjs-community/subject
Also if you want something a bit different maybe you can explain your example a bit more and we can try to work it out 😄
@skylize Hi. It would perhaps be easier to address if you presented your use case. It may be that you don't need to "create" events of arbitrary data the way that you imagine.
@davidchase
this seems to be pointing to a link about firebase
Whoops. Sorry. I fixed the link in the original post.
the create method you are looking for is here ... seems like you want the most-subject package
Thanks for the link. That looks good. The example there is much clearer to me than the slightly broken example on SO. At first glance, this looks like pretty much what I need. I'll take a look at that and post back.
I had seen most-subject
already, but turned away because of this comment in it's documentation.
This library was created out of ... the need to create circular dependencies. [author of most.js and I] strongly urge you to find a way to create a custom Stream factory using the standard most.js architectural patterns before deferring to this library.
It was very clear to me that I was missing something and I was not likely to need Subjects (at least not yet). So then the obvious question is "What is the standard most.js architectural pattern for creating a custom Stream factory?" That's why I opened this issue, because the documentation doesn't seem to tell me that.
@Frikki
Whether or not I "need" to create an arbitrary data stream is not actually the question. My main goal right now is a reasonably solid understanding of observable streams, and in particular Streams as implemented by most.js
.
The exercise I'm using to do that is to reimplement Redux as a state stream. The exercise helped me quickly spot a huge gap in my understanding: how to make some kind of stream for unknown quantities of future data without an event emitter.
As implied by the most-subject
docs, there is some kind of standard pattern for this, but I wasn't able to find documentation for it. Hopefully most.create()
will do the trick. I'll let you know how it goes.
No. most.create
is definitely not the pattern I am after here. This is clearly for one-off streams, once again expecting the data to already be available before creating the stream. Actually the explicitly unsupported code near the end looks closer to the basic functionality I had in mind:
// Unsupported: let emitEvent, emitEnd, emitError const stream = most.create((add, end, error) => { emitEvent = add emitEnd = end emitError = error }) emitEvent(123) emitEnd()
Is there a sensible pattern for this type of behavior? Should I be looking closer at Subjects? Is there a related pattern that I'm completely missing that would get around the need for this in most situations?
I have a feeling at this point that what I need to be doing is working with the Stream core and building something with a Sink and a Source, but I don't quite grok the concepts and the API-docs are are pretty opaque about these for someone who doesn't already have good knowledge of how a stream should behave under the hood.
@skylize Am I understanding you correctly that you expect a source to eventually provide data, which should propagate through a stream (e.g., a third-party service eventually feeding data)?
@frikki No. No 3rd party service. I am generating data within my own app. But as far as the stream should be concerned, the quantity/frequency of that data is basically a complete unknown. A reasonable way to describe it is I'm looking to re-implement EventEmitter as purely a stream with no emitter underneath.
Maybe I'm expecting too much from the "observables" paradigm? If streams are all they are cracked up to be, this seems like it shouldn't be such a difficult proposition. Why should I need some 3rd party service to provide me emissions to be able to open a stream. I just want a single point that I can push data onto that then propagates out to anyone subscribing to that source.
I've been playing around with the API a bit and so far everything I've come up with that looks promising involves the most ridiculous convolutions of scoping as to be laughable, even before I reach an actual working example.
The only "working" example I did get so far is to just pull in the EventEmitter class from node (also works with my own simplified but still very OOPish emitter) and then usefromEvent
on the emitter. But that broke, as soon as I tried to pass streams into the emitter, because I could not resolve the values from streams before the end of the event loop they were emitted on.
Here, let me add some code, so we can discuss this with a more concrete example. This is a very minimal at this point and far from battle-hardened, but the code below works for the included tests using Dan Abramov's first basic example of a reducer function. Basically I'm trying to get rid of of the EventEmitter, so the whole thing is streamed from beginning to end, thus allowing all the data to actually process in the correct order when we start passing streams to or from the reducer instead of just plain values.
index.js
import { fromEvent} from 'most'
import EventEmitter from 'events'
export const createStore = (reducer, state) => {
const emitter = new EventEmitter()
const state$ = fromEvent( 'state', emitter )
.skipWhile(() => state === undefined)
const dispatch = action => {
const newState = reducer(state, action)
state = newState
emitter.emit('state', state)
}
return {
state$,
dispatch
}
}
test.js
import test from 'ava'
import { from } from 'most'
import { createStore } from './'
// very basic reducer function
const counter = (state = 0, action = {}) => {
if (action.type === 'INCREMENT') return state + 1
if (action.type === 'DECREMENT') return state - 1
return state
}
// 3 tests crammed into one for brevity
test('resolves default state, increment action, decrement action', async t => {
setTimeout(() =>
t.fail('Test took too long, likely indicates unresolved promise'), 1000)
const store = createStore(counter)
const done = store.state$
.slice(0,12)
.reduce((states, s) => {
states = [...states, s]
return states
}, [])
.then(states => t.deepEqual(states, [0,1,2,3,4,5,4,3,2,1,0,-1]))
.catch(t.fail)
// can resolve a default store state
store.dispatch() // can resolve a default state if none provided
// can increment
from([0,1,2,3,4]).forEach(_ => store.dispatch({type: 'INCREMENT'}))
// can decrement
from([0,1,2,3,4,5]).forEach(_ => store.dispatch({type: 'DECREMENT'}))
await done
})
I think scan
would be perfect for this example. And get rid of the EventEmitter
.
@frikki, scan
sounds fine, but it works on a stream. How do I get the initial Stream
that I can scan? How do I push new data into that stream?
Hi @skylize. I wanted to try to address what seems like an underlying assumption you may have made about what a most.js Stream is. I'll be direct, and I hope you'll bear with me. I really do think this will help us help you with your use case.
I'm looking to re-implement EventEmitter as purely a stream with no emitter underneath
This isn't a goal of most.js. A most.js Stream is neither an event emitter, nor a pub sub topic, nor a queue. it's not a goal of most.js to provide an API to create a Stream and then imperatively push values into it from the outside.
A most.js Stream is a declarative data structure representing a sequence of events--values that occur at specific times. Declaring (i.e. creating) a most.js Stream is conceptually equivalent to declaring the sequence of events that it contains.
This declarative programming model can be a different way of thinking but also has many advantages. For example, it avoids publisher/subscriber race conditions of event emitters, as well as the potential for infinite buffering that arises from trying to avoid such race conditions.
So, most
intentionally deals only in declarative event streams.
Other community libraries build on that declarative base to provide additional functionality. For example, there are libs to provide declarative Stream sources for various things, like the DOM.
That's why most-subject
and @most/create
exist as separate packages. They do not represent the primary declarative usage model of most.js. Some third-party libs, like React, have programming models that essentially require imperatively pushing events into a stream. In such focused instances, it may be necessary to accept the tradeoffs of packages most-subject
or @most/create
in order to bridge from imperative to declarative.
Another option is implementing the Source/Sink API directly, as many community packages such as most-subject do. I'm sorry that the documentation for doing this is insufficient currently. We would be happy to help with that, and there are several examples in the mostjs-community repo.
I hope that helps, and I invite you to join us in the gitter chat to discuss your particular use case.
Thanks for your comments @briancavalier. I think I follow you for the most part.
But I guess my question still is, How do you make a stream and then push data into it? I am absolutely not stuck on trying to do anything imperative. It was just easiest in this case to conceive of the what to do imperatively.
I have played around with most-subject
since my last post, and successfully replaced the event emitter with a Subject. But I immediately ran into the same race conditions you mention.
I would be much happier to learn a declarative way to handle what I'm trying to do. I would be much happier using the API as closely as possible to how it was intended. Looking at the example code, how would you implement something that more-or-less accomplishes what that code does? Is it possible with the existing API of most
? Is it a reasonable thing to try to do?
I am not here to try and strong-arm most
into doing something it can't do. I'm here to ask "How am I supposed to be doing this?"
@skylize If you're in browser environment, dispatching CustomEvent
s as your actions and listening to them via fromEvent
+ scan
may do the trick?
@TylorS Thanks for the idea. We're back to an event emitter there. Just using a clunkier api than node core's EventEmitter. Event emitters run into race condition problems just like Subject, they just don't show up as early in the implementation as they do with Subject. :(
I still feel like I'm missing something foundational here, which is why I keep posting. There's lot's of little stream types. As I read and re-read the documentation, I keep feeling like I've almost got a solution for doing this declaratively with the existing API. I just can't quite put my finger on it. Something like a never
with a switchLatest
or a periodic
with a zip
or something like that.
I keep coming back to the same question, but most of the responses keep dancing around the main question, getting overly concerned with "accomplishing my use-case" which is really just an example and not being concerned enough with answering my 1 question.
So let me try wording it differently:
Is it possible using only most
's public api with a bit of declarative vanilla js to create a generic stream, which a consumer can subscribe to, and then somehow push unknown quantities of data that arrive at an unknown future time into that stream for whoever is already subscribed to receive that new data?
If it is possible. That is what I want to know about, is how to generate a stream from nothing to somehow push into later. If that's not possible, then my only real request is some examples of usage on the Source/Sink API.
It would be really surprising to me if in all the time spent developing this library, nobody has ever wanted to deal with unknown quantities of future data generated by your own application. This seems to me like a core aspect of why we would even want streams.
I'm not only interested in data from browser events and fetch calls, as nice as it may be to have a clean interface to those things. There's all kinds of miscellaneous asynchronous data sources with all sorts of different interfaces. You shouldn't need to go through an event emitter first to get an asynchronous data source into a stream. You should be able to declaratively build a plugin that directly matches whatever your data looks like on one side and the stream input on the other. Am I completely bonkers to think this?
... most of the responses keep dancing around the main question, getting overly concerned with "accomplishing my use-case" ...
That's because we as developers solve use cases, which I don't think is being overly concerned but to the point.
For a stream to exist, it must have a data source. If you cannot describe that data source, then I cannot see how we can declare a stream of data. And in that respect, the data can surely be of an unknown quantity at an unknown future time; still, there must be a source. No source == no sink == no events to consume.
@Frikki, I can see right off that there is a never
stream which is an infinite source with no data. So that can quite easily solve the "no source" problem without a hitch. The question then becomes, after subscriber is already listening, how can we manipulate the stream to inject data into.
You seem to misunderstand the concept. You can't. A never stream is a stream that never emits events.
But tell me, what is the source of this new data that you want to supply in a stream?
@Frikki
That's because we as developers solve use cases, which I don't think is being overly concerned but to the point.
But as I explained even before posting a use-case, the example is just an exercise, a toy. Just like a teacher would use a simple example case to explain a skill, I'm using a simple example to explain the question so I can find out what the skill entails. The real use-case is anytime I have a need to deal with unknown asynchronous data that doesn't come from the dom or a promisified library.
You seem to misunderstand the concept. You can't. A never stream is a stream that never emits events.
Yes, I understand perfectly that it never emits events. That's why it makes a good source when you don't have data yet. It will run forever keeping your subscription open without dumping junk into your output. Who wrote the code for the never stream? Why does it exist? I assume it was written for a purpose and not just to take up space in the codebase and the documentation. I would guess that purpose is so you can use it as a source when your data doesn't exist yet. That part looks clear enough to me. What I'm lost on is how I can then switch to another source when there is actual data to emit if some consumer is already listening to my silent never
.
But tell me, what is the source of this new data that you want to supply in a stream?
In my toy example, the new data comes from the results of pushing the current application state through the counter
reducer. In the bigger picture, the new data is whatever random async data source you may come across that doesn't fit nicely in the pretty little box of event emitter or promise.
The real use-case is anytime I have a need to deal with unknown asynchronous data that doesn't come from the dom or a promisified library.
Unknown being the keyword here. For any software to consume data, it must have a source. It cannot be unknown; we're not dealing with wormholes.
That's why it [a
never
stream] makes a good source when you don't have data yet.
No. It is not a source. It is a stream. A stream has a source. In the case of never
, the source is not feeding any values to be encapsulated in events, and the never
stream doesn't have an end event. Thus, it is infinite.
Who wrote the code for the never stream?
I guess Brian Cavalier did.
Why does it exist?
Likely because it is useful during development when you want to specify a stream but don't know the source of data yet. When you do, you can change the never
stream for an actual event emitting stream.
What I'm lost on is how I can then switch to another source when there is actual data to emit if some consumer is already listening to my silent never.
That's because you cannot.
... the new data comes from the results of pushing the current application state ...
So the source is the application state. Hence, you could use scan
as suggested several times.
That's why it [a never stream] makes a good source when you don't have data yet.
No. It is not a source. It is a stream. A stream has a source. In the case of never, the source is not feeding any values to be encapsulated in events, and the never stream doesn't have an end event. Thus, it is infinite.
Ok. Sorry. I'm not a stream expert. Please forgive me if I slightly mix up the terminology a bit. The main point still stands that the never
stream is perfectly capable of satisfying the need for a source.
Likely because it is useful during development when you want to specify a stream but don't know the source of data yet. When you do, you can change the never stream for an actual event emitting stream.
What I'm lost on is how I can then switch to another source when there is actual data to emit if some consumer is already listening to my silent never.
That's because you cannot.
@briancavalier Why did you write the never
stream? Why did you take the time make it available on the public API? Why did you take the time to write it into the documentation.
Likely because it is useful during development when you want to specify a stream but don't know the source of data yet. When you do, you can change the never stream for an actual event emitting stream.
Hmm.... Why does this sound so eerily familiar? I wonder if maybe it relates to the question of "how do I deal with unknown future data?"
@skylize Maybe something like this can give you inspiration. https://www.webpackbin.com/bins/-Kwb-ywVKKupfVib5lSU
The main point still stands that the never stream is perfectly capable of satisfying the need for a source.
No. It does not because you cannot replace it at runtime.
Hmm.... Why does this sound so eerily familiar? I wonder if maybe it relates to the question of "how do I deal with unknown future data?"
Perhaps I wasn't clear to mention that the replacement would be in the development phase, i.e., a code change.
During development when you don't know the data source yet:
const stream = never()
During development when you know the data source:
// Never has been replaced.
const stream = someDataSourceToStream()
@Frikki
During development when you don't know the data source yet:
Who's development are you talking about? Do you mean I as a developer would benefit from a never
stream as a stand-in? If so, could you please offer an example of this? I honestly don't see what good it would do me. Or are you saying that @briancavalier benefits from the never
stream while building most
, in which case, why did he bother to document it.
@TylorS
It's going to take me a while to grok everything you've done here, but one thing that stands out quickly is that you are definitely relying on the Source/Sink API. If dealing with source/sink is required for dealing with arbitrary data sources, then the documentation on the "developer" part of the API definitely needs some nice updates. Thank you for your code contribution.
Who's development? Are you talking about? Do you mean I as a developer would benefit from a never stream as a stand-in? If so, could you please offer an example of this?
Yes, you as a developer can benefit from never()
as a stand-in. Here's a brief example.
I am developing a UI component, which is going to return a view stream, e.g., a stream of virtual nodes to be used by a virtual DOM. However, the implementation of this view is not given to me but to some other developer. In the meantime, we still need the component to be incorporated in orchestration with other components, and someplace in the code, there will be a listener to the view stream. Here is where the never
stream comes in handy:
function MyComponent(source) {
// Here I can do other stuff, such as preparing my model for the view.
...
const viewStream = never()
...
return viewStream
}
Later when the view has been made by the other developer, I may do this:
function MyComponent(source) {
// Here I have prepared my model.
...
const viewStream = map(view, modelStream)
...
return viewStream
}
Now, the data flow won't break during development, because it is possible in another place to already consume MyComponent
:
// Somewhere else in the pipeline.
import { MyComponent } from 'ui/MyComponent'
const viewStream = MyComponent(data)
...
// Later in some vdom function
runEffects(viewStream, newDefaultScheduler()).then( // do stuff )
Here is where the never stream comes in handy:
Hmm. That feels somewhat contrived at first glance. But actually I can definitely see how that might be useful.
I just want to pause here for a moment, because it feels like things are getting a little contentious.
I am not trying to be argumentative, but I think I am starting to argue. I apologize to anyone who might feel like I am yelling at you.
I don't know if I've given a bad example or what. I am really just trying to understand something that I would expect to be a core concept. I am honestly just really confused why this is seen as a weird problem that needs to be solved by directly attacking my specific use-case.
If my problem is SO hard to solve, then really I believe everyone who contributes to most
should be jumping on this question and trying to answer it. The promise of observables is to deal really nicely with arbitrary incoming data, yet somehow I can only deal with that incoming data if it happens to be in an event or a promise? Something about that seems broken.
I have said over and over that I think I am missing something, but honestly it's starting to sound like most
is missing something and I am just the first person to bother pointing it out. Somebody please please correct me on this.
To be honest, I think this issue is solved by most-subject
(disclaimer: I'm the author). If you truly need to dispatch events separate from the place at which you declare the stream, then that is exactly what it is for.
The warning in the README, which seems to have, unfortunately, put you (and others) off (maybe it should be changed?), is basically there to try and get people to have this conversation with us about use cases. The goal is to help people understand how to solve their problems declaratively if they can, because quite often a subject can be used as a crutch, rather than the right tool for the job.
it's starting to sound like most is missing something and I am just the first person to bother pointing it out
It was my first issue too! lol https://github.com/cujojs/most/issues/164
Update: Here's a webpackbin https://www.webpackbin.com/bins/-KwbGeRW-bOoHXY-U7ZU
That feels somewhat contrived at first glance.
I'm sorry you feel that way. It is not at all contrived but practiced widely.
... something that I would expect to be a core concept.
The core concept of streams is not to produce them imperatively, as you seem to suggest. It is to declare them and listen to them. A data provider will act as the source of a stream. If you want to be actually creating events for the stream in code, then you are not touching a core concept, which is why it has been separated from the core functionality of Most.js. You are basically left with three options:
1) use an existing declarative source,
2) use a subject or most.create
and own the responsibility of race conditions, or
3) create a declarative source using the Source/Sinks API.
(kudos to @briancavalier)
The promise of observables is to deal really nicely with arbitrary incoming data ...
The keyword being incoming, i.e., a source, which is what Most.js does tremendously well.
I can only deal with that incoming data if it happens to be in an event or a promise?
No, it can practically be in any shape or form. JSON, an array, a simple integer, whatever.
it's starting to sound like
most
is missing something and I am just the first person to bother pointing it out.
No, it is sounding like you are missing something, and I am trying to point it out. I don't mean to come off rude with that in any way. You are not the first who is "struggling" going from imperative to declarative and functional reactive.
@TylorS has above provided you some solutions to your non-use-case example.
The warning in the README, which seems to have, unfortunately, put you (and others) off (maybe it should be changed?), is basically there to try and get people to have this conversation with us about use cases.
Haha. It worked! 😆 I'm here having the conversation. 👍
Actually I think this conversation needs to be had. I have played with most-subject
since I opened this thread. And in the very limited scope of my toy example above, it actually mostly solves the problem of the toy example. but it falls apart quickly because the imperative handling of the data leaves all sorts of opportunities for race conditions.
There is a deeper issue at play here. I quite agree with your disclaimer that a "Subject should be a last resort", but I'm having this conversation now to try and find out what is the "first resort" and nobody seems willing to offer me any answer beyond "use an event emitter" or "use a Subject".
That feels somewhat contrived at first glance.
I sorry you feel that way. It is not at all contrived but practiced widely.
@Frikki You skipped over the rest of that sentence. I was saying that even though my initial impression was "contrived", after a bit more thought I quite recognized the value of that pattern. I was agreeing with you, with a sidenote that it at first came across as contrived. :wink:
The core concept of streams is not to produce them imperatively...
I am not wanting to do anything imperatively here. I am sorry if my initial post gave you that impression, since it offered a fairly imperative concept. I guess my example code really drove home that initial imperative impression because I imperatively pushed results from event emissions. The whole point of the post from the get-go was "how am I supposed to be doing this?" with the base assumption that there would be a correct declarative pattern for achieving similar-ish behavior that someone would tell me as soon as I bothered to ask.
Next quote edited to use numbered options:
You are basically left with three options:
- use an existing declarative source,
- use a subject or most.create and own the responsibility of race conditions, or
- create a declarative source using the Source/Sinks API.
Yes, obviously that is the "default" from our previous discussions. I.e. use a dom event, fetch promise, etc. The whole point of this thread is "How do I not rely on this existing source?"
Ok. Well if I use a Subject, I can "own" the responsibility of race conditions. That's fine. But what tools are offered for dealing with these race conditions when they arirse? I don't see anything offered to deal with them form most
or from most-subject
. Most.create
doesn't' even offer me that much. create
just says flat out, you will define the beginning and end of this stream right now, and forever hold your peace. That is totally unsuitable to the question at hand.
I guess maybe that is what I'm asking for? I guess I assumed incorrectly that there would be some sort of middle ground between using the public API and the Source/Sink API to declare a custom stream? If there is not such a pattern in existence, then the core API documentation needs a big overhaul, because I truly believe random async data should be considered a core use-case of a stream
library.
it's starting to sound like
most
is missing something and I am just the first person to bother pointing it out.No, it is sounding like you are missing something, and I am trying to point it out. I don't mean to come off rude with that in any way. You are not the first who is "struggling" going from imperative to declarative and functional reactive.
Please, be as rude as you need to be. I really don't care if you are a total asshole, if it leads to solving this 🤣 .
Good. I'm glad that it is Me that is missing something and not the library, but, please help me figure out what I am missing. I am definitely NOT NOT NOT trying to do imperative actions, even if my example code looks imperative.
@TylorS your code is definitely bookmarked. If we don't uncover a way to deal with this using the public API, I predict your WebpackBin will hold the answer for me to extend my toy example slightly beyond being a toy.
Hi @skylize. Thanks to everyone for keeping this civil. Let's please continue to do that.
I guess I assumed incorrectly that there would be some sort of middle ground between using the public API and the Source/Sink API to declare a custom stream?
Implementing a Source is the way to create a new kind of declarative source. @most/dom-event
is a simple example. The Architecture wiki is also a good place to learn about Sources.
As I mentioned previously, we'd be happy to help with that in gitter. I feel that would be a better place than this issue. And PRs that help to improve documentation are certainly welcomed.
I truly believe random async data should be considered a core use-case of a stream library.
I respect that. However, it's an expectation you've brought with you. It isn't a goal of most.js.
If we don't uncover a way to deal with this
After catching up on this thread, it's still not obvious to me what "this" is. Without a use case, and with only imperative examples, it's been much more difficult to help.
My impression is that learning how to create a new declarative source for
@briancavalier Thank you for your civil response. I hope I can continue to stay civil and I hope I can successfully avoid saying something that will invoke others to quit staying civil.
At this moment (immediately after reading your response), I think vagueness in the Architecture docs might be the culprit behind this extremely long thread. Each of the pieces in Architecture are explained separately, but there is not even one single example that uses each of those pieces together.
I do remember seeing that page previously, and it seems quite likely to me that I would have stopped there if I found a good example for building my own stream-type from scratch when reading that page.
I would be happy to jump in Gitter to solve my specific use-case if somebody can convince me that I'm asking for a non-generic use of the API.
At the moment, I still feel like I'm asking for a pretty generic use-case resolution. With a generic case, It seems better to me if the conversation is public so anybody can benefit from the results if we resolve it, and anybody can confidently turn away if we prove that most
cannot handle a specific type of use-case.
Let me clarify that this is a sacrifice on my part. I am sure you guys could solve my toy example much faster in gitter. But I think I've come across a core issue that should be dealt with. That's why I keep asking questions here and trying to elaborate more and more clearly the question instead of going to gitter to get the question answered as quickly as possible.
Hey @skylize, I've been observing silently on the sidelines and thought I'd throw in a very simple source I use for a number of my own use cases, in order to show you how trivial a simple source can be to write. Brian is correct in what he's saying about creating your own sources for your personal use cases. Once you realise how incredibly easy it is, you'll probably find yourself doing so regularly.
This little example is kind of the equivalent of now(foo)
in @most/core (or just(foo)
in cujojs/most), but allows you to supply a function that supplies the initial value when the stream is first subscribed to, which means you can latebind the creation of the value, rather than supplying it up front. This can be handy if the stream isn't guaranteed to be observed, and the construction of the stream graph is complex and may be a performance concern. Use of closures means it can also be used as a simple method for generating a proxy stream (i.e. getting access to the stream reference before you are able to initialize one of its dependencies).
import { join, propagateTask } from '@most/core';
import { asap } from '@most/scheduler';
export function defer (f, seed) {
return new Resolve(f, seed);
}
export function deferStream (f, seed) {
return join(defer(f, seed));
}
export class Resolve {
constructor (resolve, seed) {
this.resolve = resolve;
this.seed = seed;
}
run (sink, scheduler) {
return asap(propagateTask(task, this.resolve(this.seed), sink), scheduler);
}
}
function task (time, value, sink) {
sink.event(time, value);
sink.end(time);
}
... and now you have a stream source. This example uses it to generate a random number that resolves a new random number for every observer. There are many ways to skin a cat; this is just for demonstration purposes.
const max = n => a => Math.floor(a * n);
const num = map(max(100), defer(Math.random));
I have a quite elaborate, but less-than-half-written response in the works.
But I need to sleep right now, so I just want to highlight this for the moment:
import { asap } from '@most/scheduler';
Both @axefrog
and and @TylorS
included a call to asap
in their suggested solutions (although I guess @TylorS edited his link, I can't find that anymore), but as far as I have found, this asap
function has zero documentation.
... as far as I have found, this asap function has zero documentation.
It's here: https://github.com/mostjs/core/blob/31db9dcab2e06b16f200914e580a1228b8b72864/docs/api.rst#asap
asap()
schedules to run a task as soon as possible but not in the current call stack.
To be honest, I think this issue is solved by most-subject (disclaimer: I'm the author). If you truly need to dispatch events separate from the place at which you declare the stream, then that is exactly what it is for.
I think I'm coming to the conclusion that at present, using most-subject
is just the right way to handle something like this. Thanks for your work on that @TylorS and your examples. I think maybe your readme there is overly critical of the pattern, considering there is not an acceptable alternative to fit the types of use-cases it was designed for.
I feel like there is an opportunity here though to offer something that fits better with the core interface and with the popular catch-phrase that "everything is a stream". I envision being able to do something like this:
// The injectable starts as a never stream, but has a special method
// that allows injecting a switchLatest that propagates out to existing observers.
// If the newly switched stream closes, then the InjectorStream reverts to its
// underlying never stream instead of closing.
const myStream = injectable()
myStream.observe(console.log) // logs: "hello"
myStream.inject( of('hello') )
How crazy is this idea? Does it seem viable? Does it seem like it could be a good pattern if implemented?
You're talking about a proxy stream. See https://github.com/mostjs-community/proxy for an implementation, though I believe @TylorS is currently merging this implementation with most-subject for @most/core.
Also, you can get a quick and easy solution using my example above.
import { emitSourceValues } from 'your-library';
export function createLateBoundStream() {
return deferStream(emitSourceValues);
}
You're talking about a proxy stream.
Ah yes. Thanks @axefrog. That looks pretty dang close to my suggestion, and I like that it returns stream
and attach
separately for nicer composing.
However both proxy
and your deferStream
are missing something that maybe I failed to explain properly in my vision, which is infinite switching. Let me extend the previous example a bit to demonstrate.
// Default behavior should be to use switch and internally discard old streams after switching
// to a new one. But it might be nice to also allow providing a combinator. Or perhaps there
// should be no default? In which case, providing a combinator would be required.
const myStream = injectable()
myStream.observe(console.log) // logs: "hello" "tick" "tick" "tick" "stop the clock"
myStream.inject( of('hello') )
myStream.inject( constant('tick', periodic(100)) )
setTimeout(() => myStream.inject( of('stop the clock') ), 250)
const myOtherStream = injectable()
myOtherStream.observe(console.log) // logs: "a" "b" "a" "b" "a" "b"....
const a = constant('a', periodic(100))
const b = constant('b', periodic(100))
myOtherStream.inject( a )
// provide join combinator to merge instead of switch
myOtherStream.inject( join, b )
Use a higher-order stream for that. Are you familiar with chain
, merge
, mergeConcurrently
, and switch
?
Yes. I am familiar with them. I don't understand what you are suggesting though.
I think the main problem we've all had is that most of what you've described has been your perceived idea of what the solution should entail, and the features to achieve that solution, which are apparently lacking from Most.
Could you instead describe a minimal example that demonstrates what you're actually doing? If we had a clearer picture of what your use case looks like, at least in the abstract sense, perhaps we could offer a different way to look at your approach which would then lead you to the real solution. Note when I say use case, I mean, for example, what are your data sources? Why do they come "later"? Why are there multiple sources? How do they relate to each other? And so forth.
@skylize If you need infinite switching you'll be able to do this with the newest most-subject
API.
I whipped together a quick example: https://www.webpackbin.com/bins/-KwgnmjoXnMIrScB5Vjy
Well I feel like we've arrived at the conclusion that the use-case as I described it is "solved" by using a Subject, and imperatively pushing data in. So as far as answering the original question of "how am I supposed to do this?" I think that's more or less resolved.
But the problem is Subjects are imperative. So now I'm trying to brainstorm a way to get much of the benefit's of a Subject, that could be built like Proxy streams to "Create circular stream dependencies that is declarative and designed to avoid memory leaks." But Proxy only allows you to attach a stream once, so, at least by itself, it falls way short of replacing the usefulness of Subjects.
Cool @TylorS , that looks nice. Seems like it has similarities to what I was suggesting.
I feel like we've arrived at the conclusion that the use-case as I described it ... using most-subject is just the right way to handle something like this
That's great :+1:
Does it seem viable?
I think it's an interesting construct. In fact, now that most-subject 6.0.0 has been released, it's simple to create something similar using most-subject
's create
paired with @most/core
's switchLatest
(haha, which I see @TylorS also just mentioned as I was writing this). Maybe that can be a good starting point for continuing to explore the idea.
Does it seem like it could be a good pattern if implemented?
Most's current direction is to encourage smaller, separate packages. So, I encourage you to implement it and publish it as a package. It will be interesting to learn about use cases in which people find it to be useful (you've probably guessed by now that we're very use-case driven).
I do think you've reminded us that our documentation can always be better, and specifically, that it'd be helpful to add documentation and examples around implementing a custom Source. Thank you.
Given that most-subject provides a solution to the original question, and it could serve as a basis for your injectable
ideal, I'm closing this issue.
Thanks everyone for your help and for putting up with any frustration I may have caused by not having a clear enough use-case to build from.
I think we have in-fact arrived at a conclusion by 1 pinpointing basic use of a Subject as the appropriate current pattern for the minimal use-case asked about, 2 getting some ideas on how to explore a more declarative replacement for a Subject, and 3 hopefully putting some steam behind your sails for improving the custom Source documentation.
👍
Summary
How would you create a simple base stream for emitting arbitrary data that doesn't exist yet? All the examples in the documentation seem to begin with either a dom event, or an existing dataset. I found one very basic example on StackOverflow that seems to do more-or-less what I'm after, but I can't seem to locate the
create
method (maybe it doesn't exist anymore?).Happy to work directly with
Stream
if that's what I need to do, but again, can't find any documentation that demonstrates how this works beyond the raw API descriptions ofStream
,Sink
,Disposable
,Scheduler
. It would be nice to have at least one simple example that shows how these pieces fit together for those of us who are not already streaming masters.Would be even happier to use some "functional" judo to create it, if that's possible. I feel like maybe there's something here about the
never
stream merged or switched or something that could get me going, but I just can't come up with anything myself that even hints at a solution.Versions
Code to reproduce