Skip to content

Commit

Permalink
Prevent cached callables being invoked multiple times when multithrea…
Browse files Browse the repository at this point in the history
…ding

Stop wrapped function being called multiple times when multithreading

Add the argument-level-locks to func.py as well as decorators.py

Use reentrant lock instead of standard threading lock in decorators.py

cachedmethod to not call wrapped function multiple times when multithreading

Short-circuit when early caller completes and blocked caller successfully retrieves the value

Cleanup variable naming and add more useful comments
  • Loading branch information
Jack Brown committed Sep 30, 2021
1 parent 40d2710 commit 04379f7
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 38 deletions.
58 changes: 38 additions & 20 deletions src/cachetools/decorators.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
from threading import RLock

from .keys import hashkey

Expand Down Expand Up @@ -32,20 +33,29 @@ def wrapper(*args, **kwargs):

else:

_key_level_locks = {}

def wrapper(*args, **kwargs):
k = key(*args, **kwargs)
try:
with lock:
with lock:
try:
return cache[k]
except KeyError:
pass # key not found
v = func(*args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock:
except KeyError:
pass # key not found
_key_level_lock = _key_level_locks.setdefault(k, RLock())
# Only one caller may generate or retrieve a value for this key
with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return cache[k]
except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
_key_level_locks.pop(k)
try:
return cache.setdefault(k, v)
except ValueError:
return v # value too large
except ValueError:
return v # value too large

return functools.update_wrapper(wrapper, func)

Expand Down Expand Up @@ -78,24 +88,32 @@ def wrapper(self, *args, **kwargs):
return v

else:
_key_level_locks = {}

def wrapper(self, *args, **kwargs):
c = cache(self)
if c is None:
return method(self, *args, **kwargs)
k = key(*args, **kwargs)
try:
with lock(self):
with lock(self):
try:
return c[k]
except KeyError:
pass # key not found
v = method(self, *args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock(self):
except KeyError:
pass
_key_level_lock = _key_level_locks.setdefault(k, RLock())
# Only one caller may generate or retrieve a value for this key
with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return c[k]
except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = method(self, *args, **kwargs)
_key_level_locks.pop(k)
try:
return c.setdefault(k, v)
except ValueError:
return v # value too large
except ValueError:
return v # value too large

return functools.update_wrapper(wrapper, method)

Expand Down
22 changes: 15 additions & 7 deletions src/cachetools/func.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _cache(cache, typed):
def decorator(func):
key = keys.typedkey if typed else keys.hashkey
lock = RLock()
key_level_locks = {}
stats = [0, 0]

def wrapper(*args, **kwargs):
Expand All @@ -63,13 +64,20 @@ def wrapper(*args, **kwargs):
return v
except KeyError:
stats[1] += 1
v = func(*args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock:
return cache.setdefault(k, v)
except ValueError:
return v # value too large
_key_level_lock = key_level_locks.setdefault(k, RLock())
# Only one caller may generate or retrieve a value for this key
with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return cache[k]
except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
key_level_locks.pop(k)
try:
return cache.setdefault(k,v)
except ValueError:
return v # value too large

def cache_info():
with lock:
Expand Down
14 changes: 7 additions & 7 deletions tests/test_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ def test_locked_dict(self):
cached = Locked({})

self.assertEqual(cached.get(0), 1)
self.assertEqual(cached.get(1), 3)
self.assertEqual(cached.get(1), 3)
self.assertEqual(cached.get(1.0), 3)
self.assertEqual(cached.get(2.0), 7)
self.assertEqual(cached.get(1), 2)
self.assertEqual(cached.get(1), 2)
self.assertEqual(cached.get(1.0), 2)
self.assertEqual(cached.get(2.0), 5)

def test_locked_nocache(self):
cached = Locked(None)
Expand All @@ -159,7 +159,7 @@ def test_locked_nospace(self):
cached = Locked(LRUCache(maxsize=0))

self.assertEqual(cached.get(0), 1)
self.assertEqual(cached.get(1), 2)
self.assertEqual(cached.get(1), 3)
self.assertEqual(cached.get(1), 5)
self.assertEqual(cached.get(1.0), 7)
self.assertEqual(cached.get(1.0), 9)
self.assertEqual(cached.get(1.0), 4)
self.assertEqual(cached.get(1.0), 5)
38 changes: 34 additions & 4 deletions tests/test_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import time
import unittest
from concurrent.futures import ThreadPoolExecutor

import cachetools
import cachetools.keys
Expand Down Expand Up @@ -92,11 +94,11 @@ def __exit__(self, *exc):
self.assertEqual(len(cache), 0)
self.assertEqual(wrapper.__wrapped__, self.func)
self.assertEqual(wrapper(0), 0)
self.assertEqual(Lock.count, 2)
self.assertEqual(Lock.count, 1)
self.assertEqual(wrapper(1), 1)
self.assertEqual(Lock.count, 4)
self.assertEqual(Lock.count, 2)
self.assertEqual(wrapper(1), 1)
self.assertEqual(Lock.count, 5)
self.assertEqual(Lock.count, 3)


class CacheWrapperTest(unittest.TestCase, DecoratorTestMixin):
Expand Down Expand Up @@ -132,7 +134,35 @@ def __exit__(self, *exc):

self.assertEqual(wrapper(0), 0)
self.assertEqual(len(cache), 0)
self.assertEqual(Lock.count, 2)
self.assertEqual(Lock.count, 1)

def test_doesnt_execute_multiple_times_when_multithreading(self):
class Lock:

count = 0

def __enter__(self):
Lock.count += 1

def __exit__(self, *exc):
pass
def _long_func(*args, **kwargs):
time.sleep(1)
return self.func(*args, **kwargs)

cache = self.cache(5)
wrapper = cachetools.cached(cache, lock=Lock())(_long_func)

self.assertEqual(len(cache), 0)
self.assertEqual(wrapper.__wrapped__, _long_func)
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(wrapper, [1] * 10)
# only called the wrapped function once
self.assertEqual(self.func(), 1)
# Accessed cache under lock 10 times
self.assertEqual(Lock.count, 10)
# all of our arguments were the same (1)
self.assertEqual(len(cache), 1)


class DictWrapperTest(unittest.TestCase, DecoratorTestMixin):
Expand Down

0 comments on commit 04379f7

Please sign in to comment.