lmj / lparallel

Parallelism for Common Lisp
http://lparallel.org
BSD 3-Clause "New" or "Revised" License
244 stars 29 forks source link

Waiting for completion of all pmap tasks after an error is encountered #18

Closed i-perminov closed 9 years ago

i-perminov commented 9 years ago

Hi,

I have a case where I want to use pmap with a function that A. can signal an error B. is not interrupt-safe C. is not thread-safe when called with the same argument, i.e. it is ok to run (f x) and (f y) in parallel only if x /= y. I am not sure how to deal with errors in this case. If a task signals an error, the error gets transferred to the caller and the caller can run the same pmap again, which is not thread-safe (because of C). Wrapping pmacar call with with-kill-on-abort is unsafe, because my function is not interrupt-safe. So, it looks like I need to wait for completion of all the pmap tasks before transferring the error. It would be nice to have macro with-wait-on-abort (similar to with-kill-on-abort), but I do not see an easy way to implement such a macro on top of pmap. Any suggestions?

Thanks, Ilya

lmj commented 9 years ago

Some quick background: internally, pmap submits tasks and goes into a receive-result loop. When there is an error inside a task, the task returns an error info object. When receive-result sees such an object, the corresponding error is signaled immediately.

If I understand your problem, I think it would be solved if pmap could continue receiving results after an error is received. A CONTINUE restart inside lparallel seems reasonable when there is an internal channel hidden from the user. Perhaps a wait-after-errors macro for convenience. This can't be done without a change to lparallel, of course.

In the meantime, you could temporarily suppress errors so that pmap's receiving loop won't be shortcutted by an error.

(defstruct wrapped-error value)

(defun wrap (fn)
  (lambda (&rest args)
    (handler-case (apply fn args)
      (error (e) (make-wrapped-error :value e)))))

(defun unwrap (object)
  (if (wrapped-error-p object)
      (error (wrapped-error-value object))
      object))

(defun pmap/wait (result-type fn &rest seqs)
  (let ((result (apply #'lparallel:pmap (or result-type 'vector)
                       (wrap fn) seqs)))
    (map nil #'unwrap result)
    (if result-type result nil)))

(defun test ()
  (lparallel:task-handler-bind ((error #'lparallel:invoke-transfer-error))
    (pmap/wait nil
               (lambda (x)
                 (when (zerop x)
                   (error "zero!"))
                 (sleep 10)
                 x)
               (loop for i below (lparallel:kernel-worker-count)
                     collect i))))

If the number of workers is greater than 1, then test runs for 10 seconds. If we replace pmap/wait with pmap then test returns immediately but leaves behind running tasks, as shown by task-categories-running.

WIth regard to leftover running tasks in general, the problem is that the ideal solution -- safely canceling all tasks once an error is detected somewhere -- requires deep control from the user, who must set up an exit flag and know where/when it is OK to bail from each task. See with-submit-cancelable for how I did this with pevery and friends.

i-perminov commented 9 years ago

Thanks a lot! I think I will use your pmap/wait approach for now. It is a bit cumbersome in general case though, because one will need to have special wrappers for pdotimes, preduce, etc. I like the idea of having wait-after-errors. If you implement a CONTINUE restart and expose it to the user, what will a lparallel function (pmap, preduce, pevery, etc) return if the restart is used? Just NIL?

lmj commented 9 years ago

Since the restart doesn't really match the semantics of cl:continue, it would be better to name it defer-error (which also mnemonically matches transfer-error). Invoking defer-error would receive the remaining results while discarding any errors, then re-signal the original error. with-deferred-errors would signal at most once, at the end. Perhaps defer-error would just be an implementation detail.

Though the implementation is straightforward (the only change to pmap et al would be using a channel subclass), with-deferred-errors may be too much. It occurred to me that you can also wait on tasks manually.

(defun test ()
  (let ((q (lparallel.queue:make-queue))
        (seq (map-into (make-array 100) (let ((i -1))
                                          (lambda () (incf i))))))
    (flet ((wrap (fn)
             (lambda (&rest args)
               (unwind-protect (apply fn args)
                 (lparallel.queue:push-queue t q))))
           (handler (err)
             (declare (ignore err))
             (loop repeat (length seq)
                   do (lparallel.queue:pop-queue q))))
      (handler-bind ((simple-error #'handler))
        (lparallel:task-handler-bind ((error #'lparallel:invoke-transfer-error))
          (lparallel:pmap 'vector
                          (wrap (lambda (x)
                                  (when (or (zerop x) (= x 57))
                                    (error "bad value"))
                                  x))
                          :parts (length seq)
                          seq))))))

A general solution for any :parts value would be more complicated, but if the calculation for each element in the sequence is heavy then setting the number of parts to the number of elements is already ideal.

i-perminov commented 9 years ago

I never doubted Common Lisp is Turing complete :-) I searched for pmap-like calls in our code base and there are only 5 places out of 50 where with-deferred-errors would be useful and all of them are pmap calls, so it looks like pmap/wait is good enough for my needs.

lmj commented 9 years ago

Closing, considering there are two ways to solve this without changing lparallel, though this does leave something to think about.