syrusakbary / promise

Ultra-performant Promise implementation in Python
MIT License
362 stars 76 forks source link

Combining with futures seems problematic #8

Closed qbolec closed 8 years ago

qbolec commented 8 years ago

I have a difficulty in combining promises with futures. Here is a minimal program which reproduces my issue:

import concurrent.futures
from promise import Promise
executor = concurrent.futures.ThreadPoolExecutor(max_workers=40000);

def combine(r,n):
  print "Completed promise_factorial({})={}*promise_factorial({})".format(n,n,n-1)
  return r*n

def promise_factorial(n):
  print "Inside promise_factorial({})".format(n)
  if n < 2 :
    return 1
  print "Schedulling promise_factorial({})".format(n-1)
  a = executor.submit(promise_factorial,n-1)
  print "Making a promise promise_factorial({})".format(n-1)
  return Promise.promisify(a).then(lambda r:  combine(r,n))

def done(r):
  print "Done all computations! Results is {}".format(r)

promise_factorial(10).then(lambda r: done(r))

When I run it with python concurrency_test.py it non-deterministically either fully succeeds, or fails silently like this:

$ python concurrency_test.py
Inside promise_factorial(10)
Schedulling promise_factorial(9)
Inside promise_factorial(9)
Schedulling promise_factorial(8)
 Making a promise promise_factorial(9)
Inside promise_factorial(8)
 Making a promise promise_factorial(8)
Schedulling promise_factorial(7)
$ echo $?
0

My machine has 64 cores. I am not skilled in reasoning about concurrent processes and race conditions like that, but I suspect that there are some problems when the promise is not fulfilled before I return from the thread main function - am I right? If so: is there any way to block and wait till the promise is fulfilled before exiting a thread?

qbolec commented 8 years ago

I was desperate enough to look into the source code and found undocumented methods wait and get and tried another approach in which I block the thread until the p.get() finishes:

import concurrent.futures
import threading
from promise import Promise

executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)

def worker(foo,arg):
  try:
    return foo(arg).get()
  except Exception as ex:
    print ex;

def my_executor(foo,arg):
  try:
    return Promise.promisify(executor.submit(worker,foo,arg))
  except Exception as ex:
    print ex;

def combine(r,n):
  try:
    print("Completed promise_factorial({})={}*promise_factorial({}) inside thread {}".format(n,n,n-1,threading.current_thread().ident))
  except Exception as ex:
    print ex
  return r*n

def promise_factorial(n):
  print("Inside promise_factorial({})".format(n))
  if n < 2 :
    return Promise.resolve(1)

  return my_executor(promise_factorial,n-1).then(lambda r:  combine(r,n))

def done(r):
  print("Done all computations! Results is {}".format(r))

p = promise_factorial(30).then(lambda r: done(r))

unfortunatelly this approach seems to always end with a deadlock.

qbolec commented 8 years ago

I was able to make some progress by ditching the idea of using futures / threadpools entirely and by using a more low level approach:

import threading
from promise import Promise
from Queue import Queue

def worker(foo,arg,resolve,reject):
  p = foo(arg);
  try:
    resolve(p.get())
  except Exception as ex:
    reject(ex)

def my_executor(foo,arg):
  def promiseConfig(resolve,reject):
    t = threading.Thread(target=worker,args=(foo,arg,resolve,reject))
    t.start();

  p = Promise(promiseConfig)
  return p

def combine(r,n):
  try:
    print("Completed promise_factorial({})={}*promise_factorial({}) inside thread {}".format(n,n,n-1,threading.current_thread().ident))
  except Exception as ex:
    print ex
  return r*n

def promise_factorial(n):
  print("Inside promise_factorial({})".format(n))
  if n < 2 :
    return Promise.resolve(1)

  return my_executor(promise_factorial,n-1).then(lambda r:  combine(r,n))

def done(r):
  print("Done all computations! Results is {}".format(r))

p = promise_factorial(30).then(lambda r: done(r))

this one always succeeds.

syrusakbary commented 8 years ago

Thanks for the detailed examples, taking a look now.

syrusakbary commented 8 years ago

I got your example working.

import time
import concurrent.futures
from promise import Promise
executor = concurrent.futures.ThreadPoolExecutor(max_workers=40000);

def combine(r,n):
    return r * n

def promise_factorial(n):
    time.sleep(.01)
    if n < 2:
        return 1
    a = executor.submit(promise_factorial, n - 1)
    return Promise.promisify(a).then(lambda r: combine(r, n))

def test_factorial():
    p = promise_factorial(10)
    assert p.get() == 3628800

Please let me know if works for you too, reopen the issue if not :)

qbolec commented 8 years ago

If I understand correctly your example differs by two things:

I've tested that you can remove the time.sleep(.01) from it and it still works. The important part seems to be calling p.get() in the main thread. My understanding is that it prevents the main program from exiting before all the computations are done: which resulted in the truncated ouput I originally reported.