-
Notifications
You must be signed in to change notification settings - Fork 244
/
__init__.py
135 lines (115 loc) · 4.86 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
"""
Tests for greenlet.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
from gc import collect
from gc import get_objects
from threading import active_count as active_thread_count
from time import sleep
from time import time
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet._greenlet import get_pending_cleanup_count
from greenlet._greenlet import get_total_main_greenlets
from . import leakcheck
class TestCaseMetaClass(type):
# wrap each test method with
# a) leak checks
def __new__(cls, classname, bases, classDict):
# pylint and pep8 fight over what this should be called (mcs or cls).
# pylint gets it right, but we can't scope disable pep8, so we go with
# its convention.
# pylint: disable=bad-mcs-classmethod-argument
check_totalrefcount = True
# Python 3: must copy, we mutate the classDict. Interestingly enough,
# it doesn't actually error out, but under 3.6 we wind up wrapping
# and re-wrapping the same items over and over and over.
for key, value in list(classDict.items()):
if key.startswith('test') and callable(value):
classDict.pop(key)
if check_totalrefcount:
value = leakcheck.wrap_refcount(value)
classDict[key] = value
return type.__new__(cls, classname, bases, classDict)
class TestCase(TestCaseMetaClass(
"NewBase",
(unittest.TestCase,),
{})):
cleanup_attempt_sleep_duration = 0.001
cleanup_max_sleep_seconds = 1
def wait_for_pending_cleanups(self,
initial_active_threads=None,
initial_main_greenlets=None):
initial_active_threads = initial_active_threads or self.threads_before_test
initial_main_greenlets = initial_main_greenlets or self.main_greenlets_before_test
sleep_time = self.cleanup_attempt_sleep_duration
# NOTE: This is racy! A Python-level thread object may be dead
# and gone, but the C thread may not yet have fired its
# destructors and added to the queue. There's no particular
# way to know that's about to happen. We try to watch the
# Python threads to make sure they, at least, have gone away.
# Counting the main greenlets, which we can easily do deterministically,
# also helps.
# Always sleep at least once to let other threads run
sleep(sleep_time)
quit_after = time() + self.cleanup_max_sleep_seconds
# TODO: We could add an API that calls us back when a particular main greenlet is deleted?
# It would have to drop the GIL
while (
get_pending_cleanup_count()
or active_thread_count() > initial_active_threads
or (not self.expect_greenlet_leak
and get_total_main_greenlets() > initial_main_greenlets)):
sleep(sleep_time)
if time() > quit_after:
print("Time limit exceeded.")
print("Threads: Waiting for only", initial_active_threads,
"-->", active_thread_count())
print("MGlets : Waiting for only", initial_main_greenlets,
"-->", get_total_main_greenlets())
break
collect()
def count_objects(self, kind=list, exact_kind=True):
# pylint:disable=unidiomatic-typecheck
# Collect the garbage.
for _ in range(3):
collect()
if exact_kind:
return sum(
1
for x in get_objects()
if type(x) is kind
)
# instances
return sum(
1
for x in get_objects()
if isinstance(x, kind)
)
greenlets_before_test = 0
threads_before_test = 0
main_greenlets_before_test = 0
expect_greenlet_leak = False
def count_greenlets(self):
"""
Find all the greenlets and subclasses tracked by the GC.
"""
return self.count_objects(RawGreenlet, False)
def setUp(self):
# Ensure the main greenlet exists, otherwise the first test
# gets a false positive leak
super(TestCase, self).setUp()
getcurrent()
self.threads_before_test = active_thread_count()
self.main_greenlets_before_test = get_total_main_greenlets()
self.wait_for_pending_cleanups(self.threads_before_test, self.main_greenlets_before_test)
self.greenlets_before_test = self.count_greenlets()
def tearDown(self):
if getattr(self, 'skipTearDown', False):
return
self.wait_for_pending_cleanups(self.threads_before_test, self.main_greenlets_before_test)
super(TestCase, self).tearDown()