nedbat / coveragepy

The code coverage tool for Python
https://coverage.readthedocs.io
Apache License 2.0
2.92k stars 425 forks source link

Coverage.py not recording code executed asynchronously by Celery (For Flask) #689

Open tyrelkostyk opened 5 years ago

tyrelkostyk commented 5 years ago

I'm running a docker deployed Flask App, and have been testing with the native python unittest library, and monitoring the coverage of said testing with coverage.py. It was working great, up until we integrated some asynchronous tasks using Celery.

When we run these tasks, they pass all of their unit tests. But no matter what we do, the tasks remain "uncovered" in our coverage reports, which we know to be incorrect from our observations. Is there any known bug that would cause this, or a known fix? Any and all help is appreciated!

setUp & tearDown for unittests:

class APITestCase(unittest.TestCase):
    def setUp(self):
        """Create test environment"""
        self.codec = GzipJsonCodec()
        self.EXAMPLE_JSON_MESSAGE = \
            self.codec.compress_file('./tests/example.json')
        self.EXAMPLE_COMPACT_JSON_MESSAGE = \
            self.codec.compress_file('./tests/example_compact.json')
        with open("./tests/example_corrupt.gz", "rb") as f:
            self.EXAMPLE_CORRUPT_GZIP = f.read()
        self.Backend = create_app('testing')
        self.app_context = self.Backend.app_context()
        self.app_context.push()
        db.create_all()
        self.client = self.Backend.test_client()

    def tearDown(self):
        """Clost test environment"""
        try:
            cache.clear()
        except RedisError:
            print('Redis port is closed, the redis server '
                  'does not appear to be running.')
        db.session.remove()
        db.drop_all()
        self.app_context.pop()

Example Test Case

    def test_statistics(self):
        # add user
        user = User(username='user',
                    password='password',
                    confirmed=True)
        db.session.add(user)
        db.session.commit()

        # set up data
        example_json =  self.EXAMPLE_JSON_MESSAGE

        # post data
        response = self.client.post(
            url_for('api.new_post'),
            headers=self.get_api_headers(
                'user',
                'password',
                True),
            data=example_json)
        self.assertEqual(response.status_code, 201)

        # call API endpoint to kickoff Celery task
        response = self.client.get(
            url_for('api.statistics'),
            headers=self.get_api_headers(
                'user',
                'password',
                True))
        self.assertEqual(response.status_code, 202)
        stats_location = response.headers['location']
        time.sleep(0.5)   # sleep to allow Celery task time to complete

        # test receiving the result
        response = self.client.get(
            stats_location,
            headers=self.get_api_headers(
                'user',
                'password',
                True))
        self.assertEqual(response.status_code, 200)

Brief description of API flow: Make a post to the DB via api.new_post, make a request for statistical data through api.statistics which simply calls the Celery task async_stats, which returns it's value to stats_location (this url gets passed to response from initial api.statistics endpoint, using the celery task.id).

simplified API endpoint that triggers async task

@api.route('/statistics', methods=['GET'])
def statistics():
    """
    API Endpoint to run statistical analysis, which calls the async
    background function.
    """
    task = async_stats.delay()
    return jsonify({}), 202, {'Location': url_for('api.stats_status',
                                                  task_id=task.id)}

And this all works; our actual unit tests are more robust & in-depth than this and it passes every time. Putting in tons of print/logging statements in the async_stats task function & monitoring the web/celery logs proves that the asynchronous tasks are being ran, they just refuse to show up on coverage reports. Have seen good results with pytest-cov, but we already have over 200 test cases using unittest & coverage.py. Would rather not have to redo all those...

requirements.txt for docker container

alabaster==0.7.11
alembic==1.0.0
amqp==2.3.2
astroid==2.0.1
Babel==2.6.0
backports.functools-lru-cache==1.5
billiard==3.5.0.4
blinker==1.4
celery==4.2.1
certifi==2018.4.16
chardet==3.0.4
click==6.7
colorclass==2.2.0
configparser==3.5.0
CoolProp==6.0.0
coverage==4.5.1
docopt==0.6.2
docutils==0.14
dominate==2.3.1
enum34==1.1.6
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Caching==1.4.0
Flask-HTTPAuth==3.2.4
Flask-Login==0.4.1
Flask-Mail==0.9.1
Flask-Migrate==2.2.1
Flask-Script==2.0.6
Flask-SQLAlchemy==2.3.2
Flask-WTF==0.14.2
flower==0.9.2
gunicorn==19.9.0
httpie==0.9.9
idna==2.7
imagesize==1.0.0
isort==4.3.4
itsdangerous==0.24
Jinja2==2.10
kombu==4.2.1
lazy-object-proxy==1.3.1
Mako==1.0.7
MarkupSafe==1.0
mccabe==0.6.1
numpy==1.15.0
packaging==17.1
pandas==0.23.3
passlib==1.7.1
pep8==1.7.1
pip-upgrader==1.4.6
postgres==2.2.1
psycopg2==2.7.5
Pygments==2.2.0
pylint==2.0.1
pyparsing==2.2.0
python-dateutil==2.7.3
python-editor==1.0.3
pytz==2018.5
rauth==0.7.3
redis==2.10.6
requests==2.19.1
scipy==1.1.0
singledispatch==3.4.0.3
six==1.11.0
snowballstemmer==1.2.1
Sphinx==1.7.6
sphinx-rtd-theme==0.4.1
sphinxcontrib-httpdomain==1.7.0
sphinxcontrib-websupport==1.1.0
SQLAlchemy==1.2.10
strict-rfc3339==0.7
terminaltables==3.1.0
tornado==5.1
typed-ast==1.1.0
urllib3==1.23
vine==1.1.4
visitor==0.1.3
Werkzeug==0.14.1
wrapt==1.10.11
WTForms==2.2.1
JunchengDwain commented 5 years ago

how did you set the Flask's app config? the coverage would not record when you set the app config debug is True, hope this could help

butla commented 4 years ago

Also, it seems that celery is using billiard for creating worker processes, and that prevents coverage from getting info on the code that get's run in the workers. According to this

RazerM commented 4 years ago

multiproc.py can be copied almost exactly by replacing multiprocessing with billiard everywhere and removing the env.PYVERSION check.

I think what might be missing from coverage is an appropriate plugin entrypoint to make use of it, however. I had to use non-public API:

class BilliardCoveragePlugin(CoveragePlugin):
    def configure(self, config):
        # coverage does not want us to do this, see
        # https://github.com/nedbat/coveragepy/blob/6ddd8c8e3abf1321277a07ecd2a5b1c857163ee1/coverage/control.py#L271-L275
        if isinstance(config, CoverageConfig):
            config_file = config.config_file
        elif isinstance(config, Coverage):
            config_file = config.config.config_file
        else:
            raise RuntimeError('Unexpected config type.')

        patch_billiard(rcfile=config_file)

def coverage_init(reg, options):
    reg.add_configurer(BilliardCoveragePlugin())

Here's the referenced snippet inline: https://github.com/nedbat/coveragepy/blob/6ddd8c8e3abf1321277a07ecd2a5b1c857163ee1/coverage/control.py#L271-L275

I assume this will break in some scenarios since it's not called from _init_for_start.

nedbat commented 4 years ago

@RazerM Can you show me how you used this code? Where is patch_billiard? It it would help, we can make the config_file available.

RazerM commented 4 years ago

@nedbat patch_billiard is from a copy of coverage/multiproc.py with s/multiprocessing/billiard.

As far as the original topic of this issue goes, I fear I may have spoken too soon. Celery uses billiard and the above code works for simple uses of billiard.Process, but I couldn't get it to work with code that runs in celery workers. So I didn't end up using it...

For the benefit of others, I got coverage to work in the workers using something like the following in the tasks file that your celery app imports:

from celery.signals import worker_process_init, worker_process_shutdown
from coverage import Coverage

COV = None 

@worker_process_init.connect
def start_coverage(**kwargs):
    global COV

    COV = Coverage(data_suffix=True)
    COV.start()

@worker_process_shutdown.connect
def save_coverage(**kwargs):
    if COV is not None:
        COV.stop()
        COV.save()
ShaheedHaque commented 2 years ago

Another option may be related to how the Celery pool (at least for the default configuration of "prefork") uses forking to create workers. The use of forking causes several issues including, in my experience, difficulty with gathering coverage data.

See https://github.com/celery/celery/issues/6036#issuecomment-1152424962 for a possible workaround.