python-hyper / h2

HTTP/2 State-Machine based protocol implementation
https://h2.readthedocs.io/en/stable
MIT License
963 stars 151 forks source link

Add incremental updating of open streams count and closed_streams state #1185

Open kahuang opened 5 years ago

kahuang commented 5 years ago

This fixes https://github.com/python-hyper/hyper-h2/issues/1184

kahuang commented 5 years ago

On the old code, here is how test_concurrent_stream_open_performance runs:

============================================================= FAILURES =============================================================
___________________________ TestConcurrentStreamOpenPerformance.test_concurrent_stream_open_performance ____________________________

self = <test_concurrent_stream_open.TestConcurrentStreamOpenPerformance object at 0x107d86a50>
frame_factory = <helpers.FrameFactory object at 0x107d86cd0>

    def test_concurrent_stream_open_performance(self, frame_factory):
        """
            Opening many concurrent streams is constant time operation
            """
        num_concurrent_streams = 10000
        c = h2.connection.H2Connection()
        c.initiate_connection()
        start = time.time()
        for i in xrange(num_concurrent_streams):
            c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False)
            c.clear_outbound_data_buffer()
        end = time.time()

        run_time = end - start
>       assert run_time < 3
E       assert 36.598387002944946 < 3

test/test_concurrent_stream_open.py:51: AssertionError
==================================================== 1 failed in 36.66 seconds =====================================================

New code:

============================================================================================================ test session starts =============================================================================================================
platform darwin -- Python 2.7.15, pytest-3.4.2, py-1.7.0, pluggy-0.6.0
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/andrew/workspace/hyper-h2/.hypothesis/examples')
rootdir: /Users/andrew/workspace/hyper-h2, inifile:
plugins: xdist-1.22.2, profiling-1.6.0, forked-1.0.2, cov-2.5.1, hypothesis-4.5.11
collected 1 item                                                                                                                                                                                                                             

test/test_concurrent_stream_open.py .                                                                                                                                                                                                  [100%]

========================================================================================================== 1 passed in 1.06 seconds ==========================================================================================================

Note that for this example, I commented out the second "test" in the test_concurrent_stream_open_performance function since the old code fails the first "test".

As you can see, this is a 36X perf improvement

kahuang commented 5 years ago

So the lints I can clean up (I assume I can just run autopep8 or something similar?), but the code coverage tests are failing for some versions of python for these lines:

h2/stream.py 479 0 106 1 99% 809->814

Pasting the code here for reference:

def sync_state_change(func):
    def wrapper(self, *args, **kwargs):
        # Collect state at the beginning.
        start_state = self.state_machine.state
        started_open = self.open
        started_closed = not started_open

        # Do the state change (if any).
        result = func(self, *args, **kwargs)

        # Collect state at the end.
        end_state = self.state_machine.state
        ended_open = self.open
        ended_closed = not ended_open

        # If at any point we've tranwsitioned to the CLOSED state
        # from any other state, close our stream.
        if end_state == StreamState.CLOSED and start_state != end_state:
            if self._close_stream_callback:
                self._close_stream_callback(self.stream_id)
                # Clear callback so we only call this once per stream
                self._close_stream_callback = None

        # If we were open, but are now closed, decrement
        # the open stream count, and call the close callback.
        if started_open and ended_closed:
            if self._decrement_open_stream_count_callback:
                self._decrement_open_stream_count_callback(self.stream_id,
                                                           -1,)
                # Clear callback so we only call this once per stream
                self._decrement_open_stream_count_callback = None

        # If we were closed, but are now open, increment
        # the open stream count.
        elif started_closed and ended_open:
>          if self._increment_open_stream_count_callback:
>              self._increment_open_stream_count_callback(self.stream_id,
>                                                          1,)
>              # Clear callback so we only call this once per stream
>              self._increment_open_stream_count_callback = None
        return result
    return wrapper

Which is odd, since I can insert a print statement there and verify that the code is getting called, not to mention the counts of open outbound/inbound streams would be completely wrong if that code wasn't getting called.

Is this a quirk with the code coverage tool?

kahuang commented 5 years ago

Oh I see, it's because the if() never evaluates to False. If I remove the conditional the coverage tests pass

The reason that conditional is there is for defensive reasons. A function wrapped by sync_state_change can call another function that is also wrapped by sync_state_change, and we don't want to update state twice.

I can write a specific test to exercise this behavior for that conditional

kahuang commented 5 years ago

@Lukasa @pgjones This is ready for review (picking these names based on recent merge commits)

pgjones commented 5 years ago

Thanks, I've managed to find time to understand the problem - but I'm not sure about the solution. It would be helpful if @Lukasa could comment on the general solution and how it fits in with the codebase. I'd then be happy to comment on the details.

kahuang commented 5 years ago

Any updates on this @pgjones @Lukasa ?

dimaqq commented 4 years ago

I'm very glad someone already thought that this open stream count may be a problem 😍

However the @sync_state_change annotation on every method in H2Stream here seems invasive and possibly error-prone. Its flip-side, callbacks, don't seem very Pythonic 🤔

The open stream count is essentially a cache; could it be implemented in some other way? For example, could it be something akin to a weakset/dict? Or, perhaps the count could be recalculated on demand (i.e. on new stream or stream end mark cache dirty; next time recalculate)? Or, perhaps stream state machine could have an output STREAM_CLOSED that the connection receives after pumping stream's events to the state machine, at which point stream can be removed from .streams as opposed to removal during counting? Or, maybe, .streams could be simply split into .inbound_streams/.outbound_streams/.promises so that only specific set is ever evaluated (against own or peer's MAX_CONCURRENT_STREAMS)?