Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add incremental updating of open streams count and closed_streams state #1185

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

kahuang
Copy link

@kahuang kahuang commented Feb 17, 2019

This fixes #1184

@kahuang
Copy link
Author

kahuang commented Feb 17, 2019

On the old code, here is how test_concurrent_stream_open_performance runs:

============================================================= FAILURES =============================================================
___________________________ TestConcurrentStreamOpenPerformance.test_concurrent_stream_open_performance ____________________________

self = <test_concurrent_stream_open.TestConcurrentStreamOpenPerformance object at 0x107d86a50>
frame_factory = <helpers.FrameFactory object at 0x107d86cd0>

    def test_concurrent_stream_open_performance(self, frame_factory):
        """
            Opening many concurrent streams is constant time operation
            """
        num_concurrent_streams = 10000
        c = h2.connection.H2Connection()
        c.initiate_connection()
        start = time.time()
        for i in xrange(num_concurrent_streams):
            c.send_headers(1 + (2 * i), self.example_request_headers, end_stream=False)
            c.clear_outbound_data_buffer()
        end = time.time()
    
        run_time = end - start
>       assert run_time < 3
E       assert 36.598387002944946 < 3

test/test_concurrent_stream_open.py:51: AssertionError
==================================================== 1 failed in 36.66 seconds =====================================================

New code:

============================================================================================================ test session starts =============================================================================================================
platform darwin -- Python 2.7.15, pytest-3.4.2, py-1.7.0, pluggy-0.6.0
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/Users/andrew/workspace/hyper-h2/.hypothesis/examples')
rootdir: /Users/andrew/workspace/hyper-h2, inifile:
plugins: xdist-1.22.2, profiling-1.6.0, forked-1.0.2, cov-2.5.1, hypothesis-4.5.11
collected 1 item                                                                                                                                                                                                                             

test/test_concurrent_stream_open.py .                                                                                                                                                                                                  [100%]

========================================================================================================== 1 passed in 1.06 seconds ==========================================================================================================

Note that for this example, I commented out the second "test" in the test_concurrent_stream_open_performance function since the old code fails the first "test".

As you can see, this is a 36X perf improvement

@kahuang
Copy link
Author

kahuang commented Feb 17, 2019

So the lints I can clean up (I assume I can just run autopep8 or something similar?), but the code coverage tests are failing for some versions of python for these lines:

h2/stream.py 479 0 106 1 99% 809->814

Pasting the code here for reference:

def sync_state_change(func):
    def wrapper(self, *args, **kwargs):
        # Collect state at the beginning.
        start_state = self.state_machine.state
        started_open = self.open
        started_closed = not started_open

        # Do the state change (if any).
        result = func(self, *args, **kwargs)

        # Collect state at the end.
        end_state = self.state_machine.state
        ended_open = self.open
        ended_closed = not ended_open

        # If at any point we've tranwsitioned to the CLOSED state
        # from any other state, close our stream.
        if end_state == StreamState.CLOSED and start_state != end_state:
            if self._close_stream_callback:
                self._close_stream_callback(self.stream_id)
                # Clear callback so we only call this once per stream
                self._close_stream_callback = None

        # If we were open, but are now closed, decrement
        # the open stream count, and call the close callback.
        if started_open and ended_closed:
            if self._decrement_open_stream_count_callback:
                self._decrement_open_stream_count_callback(self.stream_id,
                                                           -1,)
                # Clear callback so we only call this once per stream
                self._decrement_open_stream_count_callback = None

        # If we were closed, but are now open, increment
        # the open stream count.
        elif started_closed and ended_open:
>          if self._increment_open_stream_count_callback:
>              self._increment_open_stream_count_callback(self.stream_id,
>                                                          1,)
>              # Clear callback so we only call this once per stream
>              self._increment_open_stream_count_callback = None
        return result
    return wrapper

Which is odd, since I can insert a print statement there and verify that the code is getting called, not to mention the counts of open outbound/inbound streams would be completely wrong if that code wasn't getting called.

Is this a quirk with the code coverage tool?

@kahuang
Copy link
Author

kahuang commented Feb 17, 2019

Oh I see, it's because the if() never evaluates to False. If I remove the conditional the coverage tests pass

The reason that conditional is there is for defensive reasons. A function wrapped by sync_state_change can call another function that is also wrapped by sync_state_change, and we don't want to update state twice.

I can write a specific test to exercise this behavior for that conditional

@kahuang
Copy link
Author

kahuang commented Feb 21, 2019

@Lukasa @pgjones This is ready for review (picking these names based on recent merge commits)

@pgjones
Copy link
Member

pgjones commented Apr 21, 2019

Thanks, I've managed to find time to understand the problem - but I'm not sure about the solution. It would be helpful if @Lukasa could comment on the general solution and how it fits in with the codebase. I'd then be happy to comment on the details.

@kahuang
Copy link
Author

kahuang commented Jun 2, 2019

Any updates on this @pgjones @Lukasa ?

@dimaqq
Copy link

dimaqq commented Mar 23, 2020

I'm very glad someone already thought that this open stream count may be a problem 😍

However the @sync_state_change annotation on every method in H2Stream here seems invasive and possibly error-prone. Its flip-side, callbacks, don't seem very Pythonic 🤔

The open stream count is essentially a cache; could it be implemented in some other way?
For example, could it be something akin to a weakset/dict?
Or, perhaps the count could be recalculated on demand (i.e. on new stream or stream end mark cache dirty; next time recalculate)?
Or, perhaps stream state machine could have an output STREAM_CLOSED that the connection receives after pumping stream's events to the state machine, at which point stream can be removed from .streams as opposed to removal during counting?
Or, maybe, .streams could be simply split into .inbound_streams/.outbound_streams/.promises so that only specific set is ever evaluated (against own or peer's MAX_CONCURRENT_STREAMS)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Quadratic performance of h2.connection.H2Connection._open_streams
3 participants