RussBaz / enforce

Python 3.5+ runtime type checking for integration testing and data validation
542 stars 21 forks source link

Errors with Threading #36

Closed kenfar closed 7 years ago

kenfar commented 7 years ago

I'm running into odd issues with threading in which it appears that arguements and results are being replaced with None.

It runs fine with enforce with a single worker, and runs with with multiple workers and enforce disabled. But consistently breaks with multiple workers and enforce enabled.

Here's the code:

#!/usr/bin/env python3.5
import os, sys, concurrent.futures, enforce
from typing import Union, Any, List, Tuple

try:
    WORKERS = int(sys.argv[1])
    ENFORCE_ENABLED = True if sys.argv[2] == 'true' else False
    ASSERT_ENABLED = True if sys.argv[3] == 'true' else False
except:
    raise ValueError('invalid args, should be: \n'
                     ' arg 1 (int) = number of workers,\n'
                     ' arg 2 (true or false) = enforce enabling,\n'
                     ' arg 3 (true or false) = assertions enabling')

enforce.config({'enabled': ENFORCE_ENABLED})

def main():
    for i in range(100):
        test_runner(100)
        print('.', end='', flush=True)
    print('test success')

def test_runner(widget_number):

    widgets = [ x for x in range(widget_number) ]
    with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
        for score in executor.map(test_one_widget,
                                    widgets):
            if ASSERT_ENABLED:
                assert isinstance(score, int)

@enforce.runtime_validation
def test_one_widget(widget_id: int) -> int:
    score = widget_inspector(widget_id, a='foo', b=4, c='bar')
    return  score

@enforce.runtime_validation
def widget_inspector(widget_id: int, a: str, b: int, c: str) -> int:
    if ASSERT_ENABLED:
        assert isinstance(widget_id, int)
        assert isinstance(a, str)
        assert isinstance(b, int)
        assert isinstance(c, str)
    return b

if __name__ == '__main__':
    main()

Test results with 1 worker or multiple workers but enforce off:

(enforce):~/Envs/enforce/enforcechk$ ./threading_test.py   1 false true
....................................................................................................test success
(enforce):~/Envs/enforce/enforcechk$ ./threading_test.py   1 true true                                                                                                                                           
....................................................................................................test success
(enforce):~/Envs/enforce/enforcechk$ ./threading_test.py   40 false true
....................................................................................................test success

Test results with multiple workers and enforce on:

(enforce) kenfar@darth560:~/Envs/enforce/enforcechk$ ./threading_test.py   40 true true                         
..Traceback (most recent call last):
  File "./threading_test.py", line 47, in <module>
    main()
  File "./threading_test.py", line 19, in main
    test_runner(100)
  File "./threading_test.py", line 28, in test_runner
    widgets):
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in result_iterator
    yield future.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/decorators.py", line 105, in universal
    result = wrapped(*_args, **_kwargs)
  File "./threading_test.py", line 34, in test_one_widget
    score = widget_inspector(widget_id, a='foo', b=4, c='bar')
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/decorators.py", line 105, in universal
    result = wrapped(*_args, **_kwargs)
  File "./threading_test.py", line 42, in widget_inspector
    assert isinstance(b, int)
AssertionError

(enforce) kenfar@darth560:~/Envs/enforce/enforcechk$ ./threading_test.py   40 true true
......Traceback (most recent call last):
  File "./threading_test.py", line 47, in <module>
    main()
  File "./threading_test.py", line 19, in main
    test_runner(100)
  File "./threading_test.py", line 28, in test_runner
    widgets):
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in result_iterator
    yield future.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/decorators.py", line 111, in universal
    return enforcer.validate_outputs(result)
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/enforcers.py", line 98, in validate_outputs
    raise RuntimeTypeError(exception_text)
enforce.exceptions.RuntimeTypeError: 
  The following runtime type errors were encountered:
        Return value was not of type <class 'int'>. Actual type was NoneType.

(enforce) kenfar@darth560:~/Envs/enforce/enforcechk$ ./threading_test.py   40 true false
..................Traceback (most recent call last):
  File "./threading_test.py", line 47, in <module>
    main()
  File "./threading_test.py", line 19, in main
    test_runner(100)
  File "./threading_test.py", line 28, in test_runner
    widgets):
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 556, in result_iterator
    yield future.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/decorators.py", line 105, in universal
    result = wrapped(*_args, **_kwargs)
  File "./threading_test.py", line 34, in test_one_widget
    score = widget_inspector(widget_id, a='foo', b=4, c='bar')
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/decorators.py", line 97, in universal
    _args, _kwargs, _ = enforcer.validate_inputs(parameters)
  File "/home/kenfar/Envs/enforce/lib/python3.5/site-packages/enforce/enforcers.py", line 86, in validate_inputs
    raise RuntimeTypeError(exception_text)
enforce.exceptions.RuntimeTypeError: 
  The following runtime type errors were encountered:
       Argument 'widget_id' was not of type <class 'int'>. Actual type was NoneType.

The results show failures in various areas running the same code. Sometimes caught by enforce, othertimes caught by assertions.

RussBaz commented 7 years ago

Honestly, dealing with threaded code is annoying.

I added an extra statement before calling widget_inspector and the error is fixed!

I feel like I understand nothing right now.

       @enforce.runtime_validation
        def test_one_widget(widget_id: int) -> int:
            print('ID:', widget_id)
            score = widget_inspector(widget_id, a='foo', b=4, c='bar')
            return  score

Any ideas? I am personally not very good at threaded usage of python.

UPDATE: It fails in debug though.

UPDATE2:

I think I found a quick fix for your problem using RLock. Please give it a try (dev branch) and tell me if you experience any problems with it. Thanks!

kenfar commented 7 years ago

The sample code I provided is highlighting that it behaves somewhat randomly - where you can't really be sure exactly when & where it'll break and adding a line of code may alter the pattern a bit.

But I've got a ton of code I can run this through this weekend, and will let you know.

RussBaz commented 7 years ago

Yes, please try the dev branch out. The CI running the snippet you posted here now succeeds.

kenfar commented 7 years ago

Yep, that seems to work fine - even when pushed hard with a ton of threads.

On a separate note - it fails with multiprocessing. To test it for multiprocessing all you need to do is replace this line: with concurrent.futures.ThreadPoolExecutor(max_workers=WORKERS) as executor: with this line: with concurrent.futures.ProcessPoolExecutor(max_workers=WORKERS) as executor:

However, I'd consider the multiprocessing issue to be separate, and it's still a success to work with threading even if multiprocessing doesn't work.

TheDataLeek commented 7 years ago

I am unconvinced that this is a problem on our end.

Dig into the SO page here: https://stackoverflow.com/questions/9336646/python-decorator-with-multiprocessing-fails

TL;DR ProcessPools can only accept pickleable things to spawn into processes. Decorated functions are inherently not pickleable, since it's a function wrapper and not an actual function.

A semi-reasonable solution is as follows:

def test_one_widget(widget_id: int) -> int:
    @enforce.runtime_validation
    def foo(widget_id: int) -> int:
        score = widget_inspector(widget_id, a='foo', b=4, c='bar')
        return  score
    return foo(widget_id)
kenfar commented 7 years ago

Yep, on looking more closely this is definitely a general multiprocessing issue with using a decorator on the initial reference.

RussBaz commented 7 years ago

By the way, if everyone is happy with the current state of dev branch, I will upload an updated version of enforce to PyPi this weekend.

RussBaz commented 7 years ago

Closing this issue and opening a separate one for multiprocessing.