From ebb925435100edc9ca4c465bd4c5db82564f5eaa Mon Sep 17 00:00:00 2001 From: Jason Madden Date: Sat, 7 Aug 2021 14:11:38 -0500 Subject: [PATCH 1/2] Add failing test for #251. All tested versions of Python are demonstrating a growth of one extra list. --- src/greenlet/__init__.py | 3 +- src/greenlet/tests/test_leaks.py | 176 +++++++++++++++++++++---------- 2 files changed, 122 insertions(+), 57 deletions(-) diff --git a/src/greenlet/__init__.py b/src/greenlet/__init__.py index 924d2ca5..3141da0b 100644 --- a/src/greenlet/__init__.py +++ b/src/greenlet/__init__.py @@ -48,7 +48,8 @@ from ._greenlet import settrace except ImportError: # Tracing wasn't supported. - # TODO: Remove the option to disable it. + # XXX: The option to disable it was removed in 1.0, + # so this branch should be dead code. pass ### diff --git a/src/greenlet/tests/test_leaks.py b/src/greenlet/tests/test_leaks.py index 2b24ea0f..7aec9636 100644 --- a/src/greenlet/tests/test_leaks.py +++ b/src/greenlet/tests/test_leaks.py @@ -4,82 +4,146 @@ import time import weakref -import greenlet import threading +import greenlet + +class TestLeaks(unittest.TestCase): -class ArgRefcountTests(unittest.TestCase): def test_arg_refs(self): args = ('a', 'b', 'c') refcount_before = sys.getrefcount(args) + # pylint:disable=unnecessary-lambda g = greenlet.greenlet( lambda *args: greenlet.getcurrent().parent.switch(*args)) - for i in range(100): + for _ in range(100): g.switch(*args) self.assertEqual(sys.getrefcount(args), refcount_before) def test_kwarg_refs(self): kwargs = {} + # pylint:disable=unnecessary-lambda g = greenlet.greenlet( lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs)) - for i in range(100): + for _ in range(100): g.switch(**kwargs) self.assertEqual(sys.getrefcount(kwargs), 2) - if greenlet.GREENLET_USE_GC: - # These only work with greenlet gc support - - def recycle_threads(self): - # By introducing a thread that does sleep we allow other threads, - # that have triggered their __block condition, but did not have a - # chance to deallocate their thread state yet, to finally do so. - # The way it works is by requiring a GIL switch (different thread), - # which does a GIL release (sleep), which might do a GIL switch - # to finished threads and allow them to clean up. - def worker(): - time.sleep(0.001) + assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0 + + def recycle_threads(self): + # By introducing a thread that does sleep we allow other threads, + # that have triggered their __block condition, but did not have a + # chance to deallocate their thread state yet, to finally do so. + # The way it works is by requiring a GIL switch (different thread), + # which does a GIL release (sleep), which might do a GIL switch + # to finished threads and allow them to clean up. + def worker(): + time.sleep(0.001) + t = threading.Thread(target=worker) + t.start() + time.sleep(0.001) + t.join() + + def test_threaded_leak(self): + gg = [] + def worker(): + # only main greenlet present + gg.append(weakref.ref(greenlet.getcurrent())) + for _ in range(2): t = threading.Thread(target=worker) t.start() - time.sleep(0.001) t.join() + del t + greenlet.getcurrent() # update ts_current + self.recycle_threads() + greenlet.getcurrent() # update ts_current + gc.collect() + greenlet.getcurrent() # update ts_current + for g in gg: + self.assertIsNone(g()) - def test_threaded_leak(self): - gg = [] - def worker(): - # only main greenlet present - gg.append(weakref.ref(greenlet.getcurrent())) - for i in range(2): - t = threading.Thread(target=worker) - t.start() - t.join() - del t - greenlet.getcurrent() # update ts_current - self.recycle_threads() - greenlet.getcurrent() # update ts_current - gc.collect() - greenlet.getcurrent() # update ts_current - for g in gg: - self.assertTrue(g() is None) - - def test_threaded_adv_leak(self): - gg = [] - def worker(): - # main and additional *finished* greenlets - ll = greenlet.getcurrent().ll = [] - def additional(): - ll.append(greenlet.getcurrent()) - for i in range(2): - greenlet.greenlet(additional).switch() - gg.append(weakref.ref(greenlet.getcurrent())) - for i in range(2): - t = threading.Thread(target=worker) - t.start() - t.join() - del t - greenlet.getcurrent() # update ts_current - self.recycle_threads() - greenlet.getcurrent() # update ts_current + def test_threaded_adv_leak(self): + gg = [] + def worker(): + # main and additional *finished* greenlets + ll = greenlet.getcurrent().ll = [] + def additional(): + ll.append(greenlet.getcurrent()) + for _ in range(2): + greenlet.greenlet(additional).switch() + gg.append(weakref.ref(greenlet.getcurrent())) + for _ in range(2): + t = threading.Thread(target=worker) + t.start() + t.join() + del t + greenlet.getcurrent() # update ts_current + self.recycle_threads() + greenlet.getcurrent() # update ts_current + gc.collect() + greenlet.getcurrent() # update ts_current + for g in gg: + self.assertIsNone(g()) + + def test_issue251_killing_cross_thread_leaks_list(self): + # See https://github.com/python-greenlet/greenlet/issues/251 + # Killing a greenlet (probably not the main one) + # in one thread from another thread would + # result in leaking a list (the ts_delkey list). + + # For the test to be valid, even empty lists have to be tracked by the + # GC + assert gc.is_tracked([]) + + def count_lists(): + # pylint:disable=unidiomatic-typecheck + # Collect the garbage. + for _ in range(3): + gc.collect() gc.collect() - greenlet.getcurrent() # update ts_current - for g in gg: - self.assertTrue(g() is None) + return sum( + 1 + for x in gc.get_objects() + if type(x) is list + ) + + background_glet_running = threading.Event() + background_glet_killed = threading.Event() + background_greenlets = [] + def background_greenlet(): + # Throw control back to the main greenlet. + greenlet.getcurrent().parent.switch() + + def background_thread(): + glet = greenlet.greenlet(background_greenlet) + background_greenlets.append(glet) + glet.switch() # Be sure it's active. + # Control is ours again. + del glet # Delete one reference from the thread it runs in. + background_glet_running.set() + background_glet_killed.wait() + # To trigger the background collection of the dead greenlet, + # we need to run some APIs. See issue 252. + greenlet.getcurrent() + + + t = threading.Thread(target=background_thread) + t.start() + background_glet_running.wait() + + lists_before = count_lists() + + assert len(background_greenlets) == 1 + self.assertFalse(background_greenlets[0].dead) + # Delete the last reference to the background greenlet + # from a different thread. This puts it in the background thread's + # ts_delkey list. + del background_greenlets[:] + background_glet_killed.set() + + # Now wait for the background thread to die. + t.join(10) + + lists_after = count_lists() + self.assertEqual(lists_before, lists_after) From da4ddc0509008da0345a63d637792ca56d10747f Mon Sep 17 00:00:00 2001 From: Jason Madden Date: Sat, 7 Aug 2021 16:00:54 -0500 Subject: [PATCH 2/2] Fix #251 by getting the reference count of the list correct. There's still some cycle though, meaning you must both call getcurrent() from the thread that hosted the dead greenlet, as well as some other thread to be sure that all the greenlets get deleted. --- CHANGES.rst | 6 ++++ src/greenlet/greenlet.c | 24 +++++++++++++--- src/greenlet/tests/test_leaks.py | 49 +++++++++++++++++++++++++------- 3 files changed, 65 insertions(+), 14 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 74919cb5..2e7904b5 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,6 +11,12 @@ memory corruption could result. See `issue 245 `_. Patch by fygao-wish. +- Fix a leak of a list object when the last reference to a greenlet + was deleted from some other thread than the one it belonged too. For + this to work correctly, you must call a greenlet API like + ``getcurrent()`` before the thread owning the greenlet exits; we + hope to lift this limitation. Note that in some cases this may also + fix leaks of greenlet objects themselves. See `issue 251 <>`_. 1.1.1 (2021-08-06) diff --git a/src/greenlet/greenlet.c b/src/greenlet/greenlet.c index 465e3150..cd22f7dc 100644 --- a/src/greenlet/greenlet.c +++ b/src/greenlet/greenlet.c @@ -254,6 +254,11 @@ green_updatecurrent(void) it stores them in the thread dict; delete them now. */ deleteme = PyDict_GetItem(tstate->dict, ts_delkey); if (deleteme != NULL) { + /* The only reference to these greenlets should be in this list, so + clearing the list should let them be deleted again, triggering + calls to green_dealloc() in the correct thread. This may run + arbitrary Python code? + */ PyList_SetSlice(deleteme, 0, INT_MAX, NULL); } @@ -267,7 +272,6 @@ green_updatecurrent(void) /* release an extra reference */ Py_DECREF(current); - /* restore current exception */ PyErr_Restore(exc, val, tb); @@ -276,7 +280,6 @@ green_updatecurrent(void) if (ts_current->run_info != tstate->dict) { goto green_updatecurrent_restart; } - return 0; } @@ -988,10 +991,16 @@ kill_greenlet(PyGreenlet* self) lst = PyDict_GetItem(self->run_info, ts_delkey); if (lst == NULL) { lst = PyList_New(0); - if (lst == NULL || - PyDict_SetItem(self->run_info, ts_delkey, lst) < 0) { + if (lst == NULL + || PyDict_SetItem(self->run_info, ts_delkey, lst) < 0) { return -1; } + /* PyDict_SetItem now holds a strong reference. PyList_New also + returned a fresh reference. We need to DECREF it now and let + the dictionary keep sole ownership. Frow now on, we're working + with a borrowed reference that will go away when the thread + dies. */ + Py_DECREF(lst); } if (PyList_Append(lst, (PyObject*)self) < 0) { return -1; @@ -1581,6 +1590,13 @@ green_repr(PyGreenlet* self) #endif if (_green_not_dead(self)) { + /* XXX: The otid= is almost useless becasue you can't correlate it to + any thread identifier exposed to Python. We could use + PyThreadState_GET()->thread_id, but we'd need to save that in the + greenlet, or save the whole PyThreadState object itself. + + As it stands, its only useful for identifying greenlets from the same thread. + */ result = GNative_FromFormat( "<%s object at %p (otid=%p)%s%s%s%s>", Py_TYPE(self)->tp_name, diff --git a/src/greenlet/tests/test_leaks.py b/src/greenlet/tests/test_leaks.py index 7aec9636..2b02bfdb 100644 --- a/src/greenlet/tests/test_leaks.py +++ b/src/greenlet/tests/test_leaks.py @@ -86,7 +86,7 @@ def additional(): for g in gg: self.assertIsNone(g()) - def test_issue251_killing_cross_thread_leaks_list(self): + def test_issue251_killing_cross_thread_leaks_list(self, manually_collect_background=True): # See https://github.com/python-greenlet/greenlet/issues/251 # Killing a greenlet (probably not the main one) # in one thread from another thread would @@ -96,7 +96,7 @@ def test_issue251_killing_cross_thread_leaks_list(self): # GC assert gc.is_tracked([]) - def count_lists(): + def count_objects(kind=list): # pylint:disable=unidiomatic-typecheck # Collect the garbage. for _ in range(3): @@ -105,9 +105,15 @@ def count_lists(): return sum( 1 for x in gc.get_objects() - if type(x) is list + if type(x) is kind ) + # XXX: The main greenlet of a dead thread is only released + # when one of the proper greenlet APIs is used from a different + # running thread. See #252 (https://github.com/python-greenlet/greenlet/issues/252) + greenlet.getcurrent() + greenlets_before = count_objects(greenlet.greenlet) + background_glet_running = threading.Event() background_glet_killed = threading.Event() background_greenlets = [] @@ -123,16 +129,18 @@ def background_thread(): del glet # Delete one reference from the thread it runs in. background_glet_running.set() background_glet_killed.wait() - # To trigger the background collection of the dead greenlet, - # we need to run some APIs. See issue 252. - greenlet.getcurrent() + # To trigger the background collection of the dead + # greenlet, thus clearing out the contents of the list, we + # need to run some APIs. See issue 252. + if manually_collect_background: + greenlet.getcurrent() t = threading.Thread(target=background_thread) t.start() background_glet_running.wait() - lists_before = count_lists() + lists_before = count_objects() assert len(background_greenlets) == 1 self.assertFalse(background_greenlets[0].dead) @@ -144,6 +152,27 @@ def background_thread(): # Now wait for the background thread to die. t.join(10) - - lists_after = count_lists() - self.assertEqual(lists_before, lists_after) + del t + + # Free the background main greenlet by forcing greenlet to notice a difference. + greenlet.getcurrent() + greenlets_after = count_objects(greenlet.greenlet) + + lists_after = count_objects() + # On 2.7, we observe that lists_after is smaller than + # lists_before. No idea what lists got cleaned up. All the + # Python 3 versions match exactly. + self.assertLessEqual(lists_after, lists_before) + + self.assertEqual(greenlets_before, greenlets_after) + + @unittest.expectedFailure + def test_issue251_issue252_need_to_collect_in_background(self): + # This still fails because the leak of the list + # still exists when we don't call a greenlet API before exiting the + # thread. The proximate cause is that neither of the two greenlets + # from the background thread are actually being destroyed, even though + # the GC is in fact visiting both objects. + # It's not clear where that leak is? For some reason the thread-local dict + # holding it isn't being cleaned up. + self.test_issue251_killing_cross_thread_leaks_list(manually_collect_background=False)