andywer / threads.js

🧡 Make web workers & worker threads as simple as a function call.
https://threads.js.org/
MIT License
3.05k stars 163 forks source link

Question about intended observable api. #278

Open Nerdinacan opened 4 years ago

Nerdinacan commented 4 years ago

Thanks again for great work here. I notice some other commenters are using this to analyze covid19. I too am using it on software that is being leveraged against that. Great minds thinking alike, and all that.

I have another question regarding observable support. This is a little complex, so I've tried to break the issue down as carefully as I can.

I'll let you decide. :)

Current Behavior

You say your library makes usage of observables transparent, and yet if I...

  1. create an observable in the main thread
  2. then try to emit 2 values from the main thread into the worker
  3. inside the worker your exposed function is going to get executed 2 times and create 2 separate subscriptions to single elements of the source data

Why is this a problem?

As it currently stands, your exposed function inside the worker cannot be the thing that does processing on a continuous stream of input values because the state of the subscription is destroyed each time the exposed function runs, and a new observable is created. As a result, operators like distinct(), scan(), etc.... can't function properly inside the worker since they have no history.

Every exposed function call is necessarily a subscription on a source stream of only one value.

It is true that the worker can create an observable that emits multiple values back to the main thread, but only if that observable creates all its source values from inside the worker before its single execution ends. Thus your api is "transparent" from the worker to the main thread, but it is not "transparent" from the main thread INTO the worker.

My goal was to have your library "transparently" do some heavy processing inside the worker and return the results as an observable result from my source stream. But currently I can't do that if I need a distinct(), or a scan() or any other stateful observable operator.

Workaround

I think I can support the behavior I want by creating a unique subscription ID in the master thread and passing it in as part of the payload to your internally exposed function, or maybe materializing the source stream over the pipe and manually reading subscription/cancel events.... then creating a receiver inside the worker that keys an internal subject to a subscription ID I sent in and a stored instance of the observable that I want to run.

But all that sounds almost exactly like what your code must already be doing, right? That's what led me to think this might actually be a bug.

Suggested strategy

Here is what my dream API would look like. I will have to write the adapter functions I've described unless you already support this behavior, and I am just missing the point.

// master.js
import { spawn, Thread, Worker } from "threads"
const thread = await spawn(new Worker("./workers/counter"))
const { nameOfInternalFunction } = thread;

const proxyObservableToWorker = exposedFn => pipe(
    mergeMap(sourceVal => {
           // create some kind of subscription id
          // subscribe to the observable result of calling the exposed function
          return exposedFn({ key, ...sourceVal }) // or something
    })
)

const someSourceObservable = of(1,2,3,) // or fromEvent, or whatever your master does

const result = someSourceObservable.pipe(
    proxyObservableToWorker(nameOfInternalFunction), // I'll currently have to create this proxy operator
    doThings()
);

// worker.js
import { expose } from "threads/worker"

const proxyIncomingValue = operator => rawValue => {
   // take a subscription key,
   // make a pool of subjects
   // connect new subject to a stored instance of the operator we want to execute against multiple rawValues
}

const internalObservableOperator = pipe(
    theOperations(),
    iWantToHappen(),
    insideTheWorker()
)

expose({
  nameOfInternalFunction: proxyIncomingValue(internalObservableOperator)
})

Am I just recreating something you're already doing?

Thanks for reading this far, and sorry for the wall of text, but it's kind of a subtle issue.

andywer commented 4 years ago

Hey @Nerdinacan. Oh wow, that’s a lot to process πŸ˜‰

I am not sure I completely got your issue yet or the workaround, but I can give you a quick first response nevertheless:

The thing is that observables were never supposed to be passed from the main thread to workers, only the other way.

With #273 on the horizon, we might be able to change that, though. Not completely sure yet, since observables receive some "special treatment" under the hood.

andywer commented 4 years ago

PS: Thanks for the warm words. Nice to see that the lib is used for an important thing like Covid research by multiple different people now ❀️

Nerdinacan commented 4 years ago

The thing is that observables were never supposed to be passed from the main thread to workers, only the other way.

With #273 on the horizon, we might be able to change that, though. Not completely sure yet, since observables receive some "special treatment" under the hood.

Thanks for the clarification!

However, not being able to update an existing worker observable with a new emission eliminates a lot of valuable use cases for using observables in the first place. I'll see if I can make a workaround for what I'm doing right now, then later I'll see if I can get you a PR.

I actually don't think it's going to be that hard to do, and maybe in the end your API might look something like this?


// worker.js

import { expose } from "threads"

expose({

    doMagicThings: asObservable(observableSrc => {

          // observableSrc would be a local Subject that the "asObservable" function
          // creates and associates with the external subscription, then would pass
          // incoming function calls onto that Subject

          return observableSrc.pipe(
              doLotsOfProcessing(),
              scan(....),
              distinct(),
              etc()
          )
    })

I'll see if I can put together a working example for you after i get out from under my current backlog.

Thanks again!

andywer commented 4 years ago

However, not being able to update an existing worker observable with a new emission eliminates a lot of valuable use cases for using observables in the first place

This sums up the bit that despite the long description above I didn't quite get yet. So what does "worker observable" mean here? Is the observable created in the worker and returned by an expose()-ed function?

After reading the long description a few times: So your problem is that you want to have a stateful worker that creates an observable, returns it from an expose()-ed function and then you want to emit new values from that observable on subsequent calls to exposed worker functions?

The sample code you provided feels rather cryptic, since it's so abstract. I think a one or two sentence explanation of the use case might help here πŸ™‚

Having stateful workers is usually simple, though. Just put some "global" variables (those vars will still be scoped to this worker instance) in the worker and track whatever state or long-living observables you have there and use those vars in your exposed functions.

Nerdinacan commented 4 years ago

Ah hah! I just found your Streaming Results section in your docs, it's very similar to what I'm suggesting.

This does do the kind of thing I need, so that's great! But your api necessitates that you expose 3 functions to achieve it instead of exposing a simple observable interface as an observable operator. That's not "transparent" in my estimation, but it'll certainly will work for what I need today. So thanks again.

My goal here is to make it REALLY transparent by adding 2 utility operators. Here's what I mean by that:


// in the main thread...

import { spawn, Thread, Worker } from "threads"
const minmax = await spawn(new Worker("./workers/minmax"))

export const thingThatNeedsCounting$.pipe(
  toOperator(minmax),
);

// new version of minmax that could work with stateful, pipeable observable operators

expose(
  asObservable(input => {
   return input.pipe(
      scan((range, val) => {
          const { min, max } = range;
          return { min: Math.min(min, val), max: Math.max(max, val) };
      }, { min: Infinity, max: -Infinity})
   )
)

I'm going to do this by making "asObservable" and "toOperator" to keep track of new subscriptions and generate subjects inside the worker automatically. so that the state is preserved.

The reason I thought this might be a bug is because the process of creating a new subscription ID and spawning a subject inside the worker is probably the way I'd approach building this kind of library in the first place and I thought you might have already done the ground work.

Honestly, I should have read some more of your src before I started asking questions. But again, thanks a ton.

Nerdinacan commented 4 years ago

Heres a quick implementation I cobbled together.

https://github.com/Nerdinacan/threads-observable-adapter

andywer commented 4 years ago

Thanks for the sample code. I have to say, I am having a really hard time wrapping my head around this code, though. Take client.js, for instance: The code is so abstract that it's insanely hard to understand if you don't know RxJS by heart.

Based on your second-to-last comment I will try to wrap up what you want to achieve as I am still not 100% sure we are on the same page here:

Bottom line all you want is to be able to transparently pass observables from the main thread to workers. Right now you achieved more or less the same result by a userland workaround – the functions asObservable() and toOperator(), one wrapping the worker function in the main thread, one wrapping the worker function in the worker.

In this case it could (as you showed) definitely be implemented without much effort today, but I would say that we should probably still make it part of #273 or a follow-up PR of it as that PR generalizes a lot of the serialization, message passing and thread invocation code to work the same both ways (main -> worker, worker -> main). Maybe it already works out-of-the-box with #273 – haven't checked yet.

Correct me if I got something wrong here πŸ˜‰

Nerdinacan commented 4 years ago

Take client.js, for instance: The code is so abstract that it's insanely hard to understand if you don't know RxJS by heart.

That code is generating a custom observable operator from one of your exposed thread methods that can be used by exterior observable streams so that you can mix and match it with other observables without knowing that the processing actually happens inside the worker.

I apologize if my code was opaque. It does indeed rely heavily on the materialize operator from RxJS. I tried to put enough comments in there to explain but I concede that it is a convoluted thing to grasp.

But you definitely understand my goal. I'll check out your PR when I get some breathing room, maybe there is indeed a simpler way to go about this.

Thanks!

andywer commented 4 years ago

Hey @Nerdinacan, any follow-ups?