pharo-contributions / taskit

TaskIt is a library that ease Process usage in Pharo. It provides abstractions to execute and synchronize concurrent tasks, and several pre-built mechanisms that are useful for many application developers.
MIT License
43 stars 24 forks source link

[Question] How to suspend/pause a worker? #124

Open labordep opened 1 week ago

labordep commented 1 week ago

Hi, I'm trying to replace my old "fork" stuff with TaskIt. I cannot find how to suspend a worker, for example I have this code:

thread := [ "blabla" ] fork.
"...."
thread suspend.
"...."
thread resume.

I'm using TKTWorker new to replace fork but there is no way to pause/resume :/

This code don't work:

thread := TKTWorker new.
thread schedule:[ "blabla" ].
thread start.
"..."
thread stop.
"..."
thread start. "here my thread is not restarted

Any solution?

tesonep commented 1 week ago

Hi @labordep, for using Taskit you should not control the worker like this. The worker is a process that will be used to run the tasks that we send to it. You don't need to instantiate a new worker, there is a default one in the system. This is set up to have 4 threads (so there will be 4 tasks running at the same time).

A task we can see it as a single operation. If you want to run something that executes many times you need to define a TKTService (the TKTParametrizableService is nicer, as you can pass a block to it and a name).

If you want to have a single shot it is good to have something like

[ | x |
    x := Time now printString. 

    ('Start:' , x) traceCr.
    2 seconds wait.
    ('End:', x) traceCr. ] schedule

As you can see, if you execute it many times, you will have 4 concurrents in a default P13 image:

10 timesRepeat: [
    [ | x |
    x := Time now printString. 

    ('Start:' , x) traceCr.
    2 seconds wait.
    ('End:', x) traceCr. ] schedule].

That is because the configured pool has 4 threads for running tasks.

In the other side, if you have a recurrent task, it is better to define it as a service:

serv := TKTParameterizableService new
    step: [ ('MyService: ' , Time now printString) traceCr ];
    name: 'MyService';
    stepDelay: 1 second;
    start;
    yourself.

serv stop.

With this you can control a single step to do recurrently, and the delay between steps.

If you need a task to return a value you should use the future API:

f := [
     'Start ' traceCr.
     (1 to: 100) collect: [ :i |
             ZnEasy get: 'http://www.google.com' asZnUrl ] ] future
         onFailureDo: [ :e | e debug ];
         onSuccessDo: [ :x | x traceCr ];
         yourself.

If you need a more complex example, please tell me. Cheers, Pablo

Ducasse commented 1 week ago

Thanks Pablo may be we should do a pass on the "documentation" and add such an example.

labordep commented 1 week ago

Thanks @tesonep,

If I understand, for controlling a work with a play/pause I can use:

serv := TKTParameterizableService new
    step: [ "my stuff here" ];
    name: 'MyService';
    stepDelay: 1 second;
    start;
    yourself.

"..."
serv stop.
"..."
serv start.
"...etc...."

?

tesonep commented 1 week ago

Yes, @labordep, That is the idea

labordep commented 2 days ago

Hi @tesonep, I test it and it looks good! At the beginning I haven't understood that a TKTParameterizableService is in fact a loop task. This is fine because I can simplify this code:

[
                | random |
                random := Random new.
                [ true ] whileTrue: [
                    1 second wait.
                    objects do: [ :o |
                        o radiusInMeters: random next * 200000.
                        element updateObject: o ] ] ]

with this code:

[
                | random |
                random := Random new.
                    objects do: [ :o |
                        o radiusInMeters: random next * 200000.
                        element updateObject: o ] ]