Closed clintongormley closed 10 years ago
So, I will have to look at this a little more closely, but what you are doing sounds sensible in the context of promises.
However, based on a quick review, it seems as if what you really want is the Observable pattern similar to the Rx library in .NET. Think of a Promise as being a SCALAR value in that it is kind of an all-or-nothing approach no matter if you have one async operation or a set of them. An Observable is more like an ARRAY in which you can get back each element in a series of async operations when they are ready instead of having to wait for the entire set. Currently there are no good libraries in Perl for this, I started writing one a couple months ago, but it uses p5-MOP and is really just an experiment at the moment. If you want to take a look at it though, it is here: https://github.com/stevan/react.
I'm not familiar with async patterns, but from your description the Observable pattern doesn't sound like what I need (at least for this particular usecase).
I specifically want to execute each iteration of the loop sequentially, not in parallel. So iteration 2 only happens after iteration 1 has succeeded.
That said (a) there are probably lots of other usecases for the Observable pattern and (b) my then_discard
solution probably exists with a better name and more thoroughly thought through implementation.
If you recognise the pattern and can point me to the right name, I'd be happy to look at how to implement it.
Observable sequences can happen in sequence actually, so I think this pattern is the right one, I just described it poorly.
I have to start cooking dinner now, but let me think about it some and get back to you with a better description.
So with Promises, if you want to run a sequence of asynchronous operations you will need to chain promises, just as you have done here. This is different then using the collect
operator in that you want the order of operations to be enforced. Through chaining you can force the order of the operations, but still handle them in an async style.
With the Observable pattern, you can still use an async style, but you do not need to chain. The pattern has two main components:
This is the consumer side of the pattern, it has three methods: on_completed
, on_error
and on_next
. The on_completed
method is used to signal that the sequence the observer was subscribed to has been completed. The on_error($e)
method is used to signal that there has been an error while processing the sequence, and is passed some kind of error object/message. And finally the on_next($val)
method is used to signal that the next element in the sequence is ready, which is passed in as an argument.
A simple observer that will sum all items in a sequence might look like this:
package SumObserver;
use Moose;
with 'Observer'; # lets assume there is a role to enforce the interface with
has 'sum' => ( is => 'rw', isa => 'Int' );
has 'completed' => ( is => 'rw', isa => 'Bool', default => sub { 0 } );
sub on_error {
my ($self, $e) = @_;
die $e;
}
sub on_completed {
my ($self) = @_;
$self->completed(1);
}
sub on_next {
my ($self, $val) = @_;
$self->sum( $self->sum + $val );
}
If this Observer were attached to a sequence, it would simply sum up all the numbers in the sequence, then flip the flag to let people know it has completed.
An Observable is even simpler, it only requires one method; subscribe
. The subscribe($observer)
method simply takes an instance of Observer and arranges for it to be attached to a sequence in some way.
Here is a simple Observable that will just count from 1 to 10.
package CountingObservable;
use Moose;
with 'Observable'; # again, lets assume a role to enforce the interface
sub subscribe {
my ($self, $observer) = @_;
$observer->on_next( $_ ) for 0 10 ;
$observer->on_completed;
}
Then we connect the CountingObservable
to the SumObserver
like so:
my $o = SumObserver->new;
my $seq = CountingObservable->new;
$seq->subscribe( $o );
At this point $o
will respond true to completed
and will return the correct value from sum
. Now this alone is not very exciting, so lets do something asynchronous instead.
We can leave the SumObserver
the way it is for now, but lets make the Observable half more interesting.
package AsyncCountingObservable;
use Moose;
with 'Observable'; # again, lets assume a role to enforce the interface
sub subscribe {
my ($self, $observer) = @_;
my $count = 0;
return AnyEvent->timer( after => 0, interval => 1,
cb => sub {
if ( $count++ <= 10 ) {
$observer->on_next( $_ );
} else {
$observer->on_completed;
}
}
);
}
Now, lets use it:
my $o = SumObserver->new;
my $seq = AsyncCountingObservable->new;
my $w = $seq->subscribe( $o ); # save the AnyEvent watcher …
Now, at this point $o
is not completed
and whatever value is returned from sum
is incomplete. So now we might do something like this in order to wait for the $seq
to finish.
my $cv = AnyEvent->condvar;
my $timer_w = AnyEvent->timer( after => 2, interval => 2,
cb => sub {
if ( $o->completed ) {
undef $timer_w;
$cv->send;
}
}
);
$cv->recv;
After the recv
call to the $cv
then $o
should be completed
and we should get the correct value from sum
.
So as you can see, the Observer and Observable set up a nice publish/subscribe pattern, but this is not where the usefulness of Observables end. The key feature of Observables is that they are compose-able.
There are a number of Observable operators, the simplest one to show is map
. It would look like this:
# do everything else as before …
my $o = SumObserver->new;
my $seq = AsyncCountingObservable->new;
my $w = $seq->subscribe( $o ); # save the AnyEvent watcher …
# create another observable with map
my $m = $seq->map(sub { $_ * 2 });
my $o2 = SumObserver->new;
my $w2 = $m->subscribe( $o2 ); # save the AnyEvent watcher again
The call to map
on $seq
created a second Observable sequence, totally separated from the previous one. This will process each element in the sequence and multiply it by 2, then pass it on to any Observer watching it (which we do with $o2
. Keep in mind that the map
operation is not just done once all the elements in the sequence are returned, but instead performed as each element arrives. Ponder for a moment the possibilities for operations such as; grep($f)
– which filters out certain elements in the sequence, take($n)
– which takes only the first $n
elements in the sequence then stops the async operation (either via an AnyEvent $w
or some other means), concat($o)
– which would take a second Observable sequence and string the two together such that they appear to happen in sequence, and more.
Does this better explain what the Observable pattern is about and how it differs from Promises? While I may be misinterpreting your needs (sorry, I still haven't read all your docs yet), but I have a suspicion that this pattern is really what you are after.
Thank you for the very detailed write up - most illuminating. And I think that, in part, you are right: I may well want to use this pattern with what I am doing. But the then_discard()
patch is intended to solve a different problem at a different level and could be used in conjunction with the Observable pattern.
Let me explain in more detail what I am doing: I'm in the process of writing an async version of Elasticsearch.pm. So for the basic operations you would do:
# sync
$result = $es_sync->search(....);
# async
$es_async->search(...)
->then(
sub { my $result = shift; ....},
sub { warn "@_" }
);
So all of the requests to async ES return a promise. So far so good.
Then there are two helper classes: Scroll and Bulk. While search()
returns a page of results, scroll()
allows you to keep pulling the next set of results until all have been processed, eg:
$scroll = $es_sync->scroll_helper( query => {....});
while (my $next = $scroll->next) {
say $next->{_id}
}
In the background, the scroll helper pulls a set of results and keeps them in an internal buffer, which is used to feed next()
.
Bulk, on the other hand, is used to process multiple CRUD operations in a single network request, so I can do:
$bulk = $es_sync->bulk_helper( index=>'myindex', type=>'mytype');
for (1..10000) {
$bulk->index( { id => $_, source => { my_num => $_}});
}
$bulk->flush;
While I'm adding new documents using index()
, the bulk helper will flush the internal buffer whenever it reaches a certain size. Once we're finished, we issue a final flush and we're done.
The two helpers can be combined to reindex()
documents in one index into another, possibly making changes along the way. For instance, I could do:
$scroll = $es_sync->scroll_helper( index => 'old_index');
$bulk = $es_sync->bulk_helper(
on_error => sub { warn @_ } # warns about failures on individual docs
);
while (my $next = $scroll->next) {
$next->{_index} = 'new_index';
$next->{_source}{my_num} +=2;
}
$bulk->flush;
Of course, this is a common pattern and so is provided by the reindex()
method:
$bulk = $es_sync->bulk_helper( index=>'new_index');
$bulk->reindex(
source => { index=>'old_index'}, # used to create a $scroll, could also be a sub {}
transform => sub {
my $doc = shift;
$doc->{_source}{my_num}+=2;
return $doc;
}
);
Now to do the same thing with the async version, I can't (shouldn't) use next()
. As Mark always stresses, async operations should take a "don't call me, I'll call you" approach, so instead we have:
$scroll = $es_async->scroll_helper(
index => 'old_index',
on_result => sub {
say shift()->{_id}
}
);
$scroll->start->then(
sub { say "Done" }
sub { warn "Failed: @_"}
);
(actually we have on_result
which passes in one result at a time, and on_results
which passes in all results that scroll has received in one go).
Async bulk works in much the same way:
$bulk = $es_async->bulk_helper(
index => 'myindex',
type => 'mytype',
on_error => sub { warn @_ } # warns about failures on individual docs
on_fatal => sub { warn "FAILED: @_"} # called if something major happened, eg ES not contactable
);
for (1..10000) {
$bulk->index( { id => $_, source => { my_num=> $_}});
}
$bulk->flush->then( sub { say "Done"});
Reindexing is where this gets interesting, because there is two-way interaction between bulk and scroll. Scroll receives the first set of results (asynchronously) and passes them to $bulk->index(...)
which adds them to its internal buffer and calls flush()
as necessary.
We only want scroll to make its next request once bulk has finished the last set of results because (1) if bulk requests take much longer than scroll requests then we need to exert back-pressure on scroll so that it doesn't just keep pulling, (2) for whatever reason it may be important to the user to process results in order, and to be able to cancel whenever they want (eg once they have 50 matching results, they may want to stop). So this should resemble the sync process although all IO is being performed asynchronously.
(Why not just use the sync client instead? Reindexing may be part of a bigger async application, so other async IO can happen in parallel in the same process that is handling the reindex).
So the on_results callback looks like this:
on_results => sub {
my @docs = map { $transform->($_)} @_;
$bulk->index(@docs)
->then(
# do nothing if OK
sub { @_ },
# abort scroll if some error
sub { $scroll->finish }
)
}
This passes back a promise, and inside scroll, we can wait for that promise to be resolved before we issue the request for the next set of docs. So the loop that fetches the next set of results and pushes them to bulk looks like:
sub _fetch_loop {
my $self = shift;
my $d = deferred;
my $weak_loop;
my $loop = sub {
if ( $self->is_finished ) {
$d->resolve;
return;
}
$self->es->scroll(
scroll => $self->scroll,
body => $self->_scroll_id
)->then( sub { $self->_next_results(@_) } )
->then_discard( $weak_loop, sub { $d->reject(@_) } );
};
weaken( $weak_loop = $loop );
$loop->();
return $d->promise;
}
The _next_results()
method is the part that processes each new result and calls on_results
etc. It uses a similar technique to the above.
The important part here is that final then_discard
method. If I had just used then
there, then it would wait for the return result from the next iteration of $weak_loop
and the return stack would just get bigger and bigger. Instead, then_discard
tells the event loop to call $weak_loop
on the next tick, and immediately returns an empty list, breaking the chain. This ensures that the stack never grows beyond a certain size, no matter how many times $loop
is executed.
I could imagine the Scroll class being Observable and the Bulk class being an observer, but in your example, the Observable class uses a time watcher to fire off new requests. I wouldn't want to use a time watcher but I suppose I could use an idle watcher in combination with a flag on the $scroll
object like is_ready_for_next_batch
.
In the various JS examples I've seen on stackoverflow, they all handle these recursive calls using setTimeout to place the $loop
onto the event queue. However, my Scroll and Bulk modules should not be tied to a specific event loop - they need to be implemented using just promises. With the Deferred backends that have been added, Promises now have the ability to add callbacks to the queue for whichever event loop is in use. then_discard()
just adds the ability to return immediately after doing so.
Thank you for the detailed explanation, I understand the reasoning behind then_discard
now much better. I do still think this could be done with Observable/Observer, but considering that no actual code exists for that pattern right now I am not going to push the issue further. Until I actually get around to writing that code that is, after which I will submit a patch to you to replace all then_discard
usage with it ;)
For the record, I used the time watcher in the Observable example simply because it was easy, it is not a requirement for Observables at all. I just needed to simulate an async operation.
Okay, so as for this patch, I am happy to accept it, but I have two issues.
then_discard
name, I would prefer a single word name, something different from then
so that the distinction could be clearer. I understand why you chose it, but I think it has negative connotations from the word discard
and so I would like to ponder a different name. Let me think this over for the day and get back to you, if I don't come up with anything better, then I will just accept the patch as is.$loop
and $weak_loop
parts of the example are a little too complicated for a base example. It is a good illustration of your specific problem, but perhaps you could work up a simpler example that doesn't involve weaken
? This is not a blocker on this patch, really just a nice-to-have.Sound good?
after which I will submit a patch to you to replace all then_discard usage with it ;)
splendid
I don't like the
then_discard
name
finish
? finally
?
I think the $loop and $weak_loop parts of the example are a little too complicated for a base example. It is a good illustration of your specific problem, but perhaps you could work up a simpler example that doesn't involve weaken? This is not a blocker on this patch, really just a nice-to-have.
Hmm, perhaps I can do it with a method call instead. I'll have a go.
Yes, I was thinking something like finish
or finally
too. Although finally
already has associations with exception handling, perhaps finalize
?
++
If that works for you, it works for me :)
Reworking the PR now
Done - and I've rewritten the cookbook as well
Merged! Thanks again, this is great!
Many thanks - promises have made this version of the async ES code so much cleaner and easier to understand than the last version!
Excellent, I am glad they could help. Let me know when you want me to release this version and I will do all the release engineering stuff and ship it to CPAN.
will do - thanks for the help
In my quest to make Promises recursive, I've come across another issue: because promises are chained, the return stack can become enormous. JS seems to use tail calls to flatten this out (I think) but of course this doesn't work in Perl.
For a use case imagine processing each record in a database one after another (not all at the same time), then returning when all done (or when a failure occurs)
What I needed was this:
But I want to write that without knowing ahead of time which event loop is being used. I tried to make it work with
goto $loop
but that last then() is still waiting for a return value.The solution I've come up with is
then_discard()
which works just likethen()
but doesn't return a promise.So the callback is scheduled for execution in the event loop and then it returns, unwinding the return stack.
See the PR for details.
Does the name
then_discard()
make sense to you? It is so called because the return values are discarded instead of being passed to the next promise.