caolan / highland

High-level streams library for Node.js and the browser
https://caolan.github.io/highland
Apache License 2.0
3.43k stars 147 forks source link

Async mapping with promises #517

Closed ghost closed 8 years ago

ghost commented 8 years ago

I don't know if it has been asked before (could only find issue #313, which doesn't cover promises), but it would be really neat if mapping would work with promises, such as this:

// db.getUserFromId returns a promise
_([1,2,3,4]).map(id => db.getUserFromId(id)).each(user => {
   console.log(` - ${user.name}`)
})

Doing this would make, in my eyes, the library a lot more powerful. Also see is-promise for a predicate which determines if a returned value is a promise.

Is this worth debating?

megawac commented 8 years ago

Might be interesting to extend _.wrapCallback to support promises ala asyncify

svozza commented 8 years ago

-1 on this. It's not map's job to unwrap promises, it's supposed to apply a function to the contents of a functor. As alluded to in #313, it's only a couple of extra characters to use flatMap and it maintains the lawfulness of the API:

_([1,2,3,4]).flatMap(id => hl(db.getUserFromId(id))).each(user => {
   console.log(` - ${user.name}`)
})

That said maybe some equivalent to wrapCallback like @megawac suggests could be interesting, because you could go point free with the flatMap example there.

ghost commented 8 years ago

@svozza Thanks for the example, and I get the point you ar making. You're right. Some thing like wrapPromised would be kind of useful however.

vqvu commented 8 years ago

Some thing like wrapPromised would be kind of useful however.

@samvv We accept PRs :wink:.

megawac commented 8 years ago

Porting async.asyncify would be a good starter ;)

lpww commented 8 years ago

I'm working on this. Is there a preference for extending wrapCallback vs adding a new wrapPromise function?

vqvu commented 8 years ago

There should be a new wrapPromised function. There's no way to tell the difference between a node callback function and a function that returns a promise.

lpww commented 8 years ago

You can call the passed function and check for a .then function afterwards, which is what I did in my pull request.

I am happy to break out the logic into separate functions. I was trying to mimic the asyncify function referenced in this thread.

Also, I cut the branch from 2.x but I'm wondering if this work should actually be done on master?

vqvu commented 8 years ago

Oh, I see what you wanted to do. That's not the functionality intended here. The stream constructor already does that. What we want is a way to wrap a function that returns a promise and turn it into an equivalent function that returns a stream.

You won't need much of the logic from asyncify, as it's already done for you in the stream constructor. The function really just needs to catch errors and wrap it in _.fromError*.

Also, I cut the branch from 2.x but I'm wondering if this work should actually be done on master?

This should be done on master, yes. Only bugfixes and documentation fixes go on 2.x.

* This was a new function in 2.10, but we accidentally did not expose it in the docs before now.

vqvu commented 8 years ago

Now that I think about it, it would be trivial to extend wrapPromised into a generic streamify function that turns all supported types (i.e., Arrays, Promises, Iterators, Iterables, and Node Stream) into streams, not just promises.

Is this worth doing? @svozza?

svozza commented 8 years ago

Yes, sounds good. Would we deprecated wrapCallback then?

vqvu commented 8 years ago

I don't think so. The problem is there's no way to tell whether or not a function is meant to be called with a callback or not without the caller actually telling us. Deprecating wrapCallback would require us to add another argument to streamify to specify the correct behavior, which doesn't seem all that different from having a separate function.

svozza commented 8 years ago

Good point. The only issue though is that there will be a mismatch between the behaviour of streamify and streamifyAll.

vqvu commented 8 years ago

Oh, that's tricky. I forgot about streamifyAll. They definitely should not do different things. And the callback case should definitely remain the default because it's the more non-trivial functionality (and probably more likely to be used).

Alright, let's just bail on this idea. It's not like there's demand for support for the other types, and simplicity is best. We'll just stick with just support for promises. Though call it streamifyAsync? For async function() { }.

Then adding streamify as an alias for wrapCallback.

lpww commented 8 years ago

There's no way to tell the difference between a node callback function and a function that returns a promise.

@vqvu You don't think calling the function and checking if it returns an object with a .then property is a good way to do that?

Oh, I see what you wanted to do. That's not the functionality intended here. The stream constructor already does that. What we want is a way to wrap a function that returns a promise and turn it into an equivalent function that returns a stream.

You won't need much of the logic from asyncify, as it's already done for you in the stream constructor. The function really just needs to catch errors and wrap it in _.fromError*.

I'm not sure that I follow along with you here. Did you see my pull request? This is what I thought I was doing so now I'm a little confused. Are you saying that the constructor already detects a promise and converts it to a stream?

We'll just stick with just support for promises. Though call it streamifyAsync? For async function() { }.

Then adding streamify as an alias for wrapCallback.

I agree that just supporting promises and callbacks is probably fine for now. I'm not sure about naming the promise handler streamifyAsync and the callback handler streamify though. That naming makes it seem like streamify is synchronous, which is not necessarily true. I think the original wrapCallback and wrapPromise are the clearest names.

vqvu commented 8 years ago

You don't think calling the function and checking if it returns an object with a .then property is a good way to do that?

Sorry, I wasn't being clear. The problem with your PR isn't the way that it detects a promise. The problem is that it passes a callback all the time.

var result = f.apply(self, args.concat([cb]));

This is not OK because people write varargs functions and functions with optional arguments. Here's an example of a bad case,

function doAThing(optionalString) {
    var result = optionalString || 'default';
    return new Promise((res, rej) => {
        res('The result is: ' + optionalString);
    });
}

var doAThingStream = wrapCallback(doAThing);

doAThing().then(x => {
    // x === 'The result is: default'
});

doAThingStream().toArray(xs => {
    // xs !== ['The result is: default']
    // This is bad.
});

When I say "There's no way to tell the difference", I am refering to this problem. We cannot pass the callback unless we know for sure that the f supports a callback argument, and there is no way to automatically tell whether or not a particular function is a callback-style function.

Currently, the fact that someone is using wrapCallback tells us that the f requires a callback as the last argument. If we extend it to support functions that return promises, we no longer know that.

Did you see my pull request? This is what I thought I was doing so now I'm a little confused.

I misread your PR when I made that comment. Feel free to ignore.

Are you saying that the constructor already detects a promise and converts it to a stream?

Yes. You can do var stream = _(aPromise); and things will work as expected.

That naming makes it seem like streamify is synchronous, which is not necessarily true. I think the original wrapCallback and wrapPromise are the clearest names.

streamify is synchronous. It is only a conversion function. The function that it is converting is not, but streamify itself definitely is.

Furthermore, I don't like the wrapPromise name because you're not wrapping a promise. You're wrapping a function that returns a promise. I'm fine with wrapAsync too. The use of the streamify prefix was only to match the promisifyAllutility that we already have.

vqvu commented 8 years ago

There is prior art for the streamify name. Bluebird uses promisify and promisifyAll for their analogue of wrapCallback. In fact, streamifyAll was inspired by promisifyAll.

lpww commented 8 years ago

@vqvu Thanks for the detailed response, I appreciate you taking the time to answer my questions.

This is not OK because people write varargs functions and functions with optional arguments.

I had not considered this use case, it makes sense now.

Yes. You can do var stream = _(aPromise); and things will work as expected.

Understood. My confusion is that I thought this was the functionality being requested.

Furthermore, I don't like the wrapPromise name because you're not wrapping a promise. You're wrapping a function that returns a promise.

Good point. Should I name it wrapAsync for now and change it to streamifyAsync if we add the streamify alias for wrapCallback? I'm happy to add the alias as part of this work too I'm just not sure where we landed on it.

vqvu commented 8 years ago

Yes. Let's just start with wrapAsync for now so we don't get bogged down in naming.

charandas commented 6 years ago

I have been using wrapAsync pretty successfully and was wondering if it would make sense to support a node 8 AsyncFunction under the same API (one that gets generated using async keyword)?

vqvu commented 6 years ago

As far as I know, wrapAsync should work with AsyncFunction out-of-the-box, since it can be treated as a function that returns a Promise.

charandas commented 6 years ago

It did not work @vqvu, in fact I ended up spending 2-3 hours debugging this as my code wasn’t covered with tests and I had converted parts of it to asyncawait a week ago.

My usecase entailed something like below:

highland(something).batch(1000).map(wrapAsync(promiseReturningFunction))

All I did was change the promiseReturningFunction to an asyncFunction. Would I need more changes on the highland processing chain?

vqvu commented 6 years ago

If it worked before, it should still work now unless you have some sort of race condition going on. Maybe you were implicitly relying on some specific execution order?

Can you provide a test case that fails? I tried this simple pipeline in node 8.9.0 and didn't see an issue

const _ = require('highland');

function stream() {
  let i = 0;
  return _((push, next) => {
    if (i < 10) {
      setTimeout(() => {
        push(null, i++);
        next();
      }, i * 50);
    } else {
      push(null, _.nil);
    }
  });
}

function delay(x) {
  return new Promise((resolve) => setTimeout(resolve, x));
}

function promiseReturning(array) {
  return delay(1000).then(() => array[0] + array[1]);
}

async function asyncFunction(array) {
  await delay(1000);
  return array[0] + array[1];
}

stream()
    .batch(2)
    .map(_.wrapAsync(promiseReturning))
    .merge()
    .toArray(x => console.log(`promiseReturning: ${x}`));

stream()
    .batch(2)
    .map(_.wrapAsync(asyncFunction))
    .merge()
    .toArray(x => console.log(`asyncFunction: ${x}`));
benkeil commented 3 years ago

wrapAsync removes type information in typescript.