Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key-level locking which corrects multithreading performance #224

Closed
wants to merge 1 commit into from

Conversation

northyorkshire
Copy link

@northyorkshire northyorkshire commented Sep 29, 2021

A cache is generally applied to functions and methods which are either slow or expensive to execute, in order to minimize both caller latency and stress on underlying services.

As it stands today, calling a cachetools cached function multiple times from separate threads with the same key may cause the function body to be evaluated multiple times. This means that a cached, 10 seconds reference data load may be invoked thread count number of times during the first 10 seconds that it's executing, potentially swamping underlying services.

Cachetools today:

For example, setting up a REST (I used FastAPI) server to call the following function per request yields multiple calls even though the function is cached. (Note that each timestamped line represents a call to the FastAPI endpoint)

This is because @ cached only locks on the access to the cache, not on the generation of the value when the key is not present. During the time it takes from the first call for that key to that call (or a subsequent) call completing, the wrapped function will always be evaluated.

cache = TTLCache(maxsize=1024, ttl=600)
@cached(cache)
def test(self):
	print("Function body called")
	time.sleep(10)

> 2021-09-29 13:29:42,240 [.....
> Function body called
> 2021-09-29 13:29:44,137 [.....
> Function body called
> 2021-09-29 13:29:45,474 [.....
> Function body called
> 2021-09-29 13:29:46,974 [.....
> Function body called
> 2021-09-29 13:29:48,527 [.....
> Function body called
> 2021-09-29 13:29:50,242 [.....
> Function body called
> 2021-09-29 13:29:51,895 [.....
> Function body called
> 2021-09-29 13:29:51,895 [.....
> 2021-09-29 13:29:53.543 [.....
> 2021-09-29 13:29:57.213 [.....
> 2021-09-29 13:29:59.753 [.....

Another, more self contained example is as follows:

from cachetools import TTLCache
from cachetools.decorators import cached
from time import sleep
from concurrent.futures import ThreadPoolExecutor


cache = TTLCache(maxsize=100,ttl=600)
calls=0
@cached(cache)
def method(*args):
    global calls
    sleep(1)
    calls+=1
    print("Doing something expensive!")
    return args

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(method, ['arg']*10)
    
print(calls)

> Doing something expensive!
> Doing something expensive!
> Doing something expensive!Doing something expensive!
> Doing something expensive!
> 5

Cachetools post-fix

After the fixes which I'm proposing, the expensive underlying function is only executed a single time for each unique (per key) call.

For the first example:

cache = TTLCache(maxsize=1024, ttl=600)
@cached(cache)
def test(self):
	print("Function body called")
	time.sleep(10)

> 2021-09-29 13:59:17,391 [...
> Function body called
> 2021-09-29 13:59:17,996 [.... subsequent calls to the API
> 2021-09-29 13:59:21,140 [.... subsequent calls to the API
> 2021-09-29 13:59:22,758 [.... subsequent calls to the API
> 2021-09-29 13:59:24,222 [.... subsequent calls to the API
> 2021-09-29 13:59:25,740 [.... subsequent calls to the API
> 2021-09-29 13:59:27,289 [.... Original call unblocks
> 2021-09-29 13:59:27,290 [.... All subsequent calls unblock once call 1 finishes 
> 2021-09-29 13:59:27,292 [.... All subsequent calls unblock once call 1 finishes 
> 2021-09-29 13:59:27,293 [.... All subsequent calls unblock once call 1 finishes 
> 2021-09-29 13:59:27,293 [.... All subsequent calls unblock once call 1 finishes 
> 2021-09-29 13:59:27,294 [.... All subsequent calls unblock once call 1 finishes 

I have manually added some commentary to the log lines. Note how the first call hits our expensive function, while subsequent calls wait for it to complete.

10 seconds after the first call has come in, all other calls instantly return, since the value is now available.
The request at 13:59:25 took only two seconds to respond, whereas it would not only have taken 10 seconds to respond before the bug fix, it would also add more stress to the underlying services called from within test()

In this second, self contained example, note how only one call is logged to the cached function, even though the code is functionally identical to before.

from cachetools import TTLCache  # Still using cachetools TTLCache
from cachetools_fixed.decorators import cached  # Fixed @ cached decorator 
from time import sleep
from concurrent.futures import ThreadPoolExecutor


cache = TTLCache(maxsize=100,ttl=600)
calls=0
@cached(cache)
def method(*args):
    global calls
    sleep(1)
    calls+=1
    print("Doing something expensive!")
    return args

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(method, ['arg']*10)
    
print(calls)

> Doing something expensive!
> 1

I'll also add that key level locking still works as expected - repeated calls with different keys yields no benefit over the previous implementation before this bug fix.

@tkem
Copy link
Owner

tkem commented Sep 29, 2021

First of all, thanks for your interest and for providing detailed analysis and explanation for what you are trying to achieve. I very much appreciate that, especially from a first-time contributor!

I thought about something like this when I initially implemented the decorators, but at that time decided the use case (multiple threads requesting the same key concurrently) wasn't common enough to warrant the extra complexity, especially since

  • it's not trivial to get right (please see my review comments)
  • I wanted to keep the user-supplied lock more generic, i.e. not tied to threading in particular
  • I really like the simplicity of the current wrapper implementation

So I decided to follow the standard library's lru_cache strategy of simply not locking calls to the underlying function.

Anyway, I'd like to hear more about your particular real-world use case, which apparently triggers this behavior, before making up my mind about this.

@tkem
Copy link
Owner

tkem commented Sep 29, 2021

On a side note, I've learned myself that many maintainers prefer PRs which consist of a single commit, to keep their git log short and clean. Rebasing and squashing is therefore always a good idea before submitting a PR.

@northyorkshire
Copy link
Author

northyorkshire commented Sep 30, 2021

Thank you for considering our improvement! I've got some responses to your comments, you do raise some good points:

it's not trivial to get right (please see my review comments)

Where about's can I find your review comments? I understand that it's a non-trivial change to get right, but I feel that the work required to make it right is worth it, at least in our case. Given the complexity however, would you be adverse to splitting out the @ cached decorators into key-level-locked/non-key-level-locked flavors? Perhaps a @ cached(key_level_locking=True) argument may be passed, which routes the caller to a separate implementation? I can see that you merged a decent refactor yesterday, so happy to make a change to this effect at the same time as re-basing onto your changes.

I wanted to keep the user-supplied lock more generic, i.e. not tied to threading in particular

Agreed that we shouldnt hide the key level locking behind the fact that the user has supplied a Lock. Once we decide on how to progress with the previous point I'll get that cleaned up, not a problem.

I really like the simplicity of the current wrapper implementation

As do we! It's very easy to read, which may be a positive for splitting the implementation into one with key level locking and one without.

Anyway, I'd like to hear more about your particular real-world use case, which apparently triggers this behavior, before making up my mind about this.

Here goes: Our usecase is one where we've got a standard (inherently multithreaded) FastAPI endpoint which fields upwards of 150 requests per second.
Each request to our endpoint requires reference data lookups in order to be correctly serviced, however the backend services (specifically the databases which are not only shared, but are also upwards of 15 years old) that we hit to do these lookups are extremely sensitive to bursty load patterns, and a cache-miss may take upwards of a second to return.
We decided to batch our reference data read in such a way that if a request is made for data on date X, we load all data for date X behind a cache, such that subsequent requests to this date are filled from memory. This means that theoretically we only need to hit the database once to service any request on date X.

We put a cachetools ttl_cache(ttl=60*60) on our lookup function, keyed by date, in the hopes that we'd have minimal impact on the underlying services which we rely on.

Past the first ~1.5 seconds of a new date being requested, we run perfectly. However, during that first ~1.5 seconds, every single reqeust for a given date (note we get ~150 per second) triggers a number of database queries, which completely crushes our backend and leads to response times of up to 30 seconds as our databases become fully congested. We implemented this key level locking in cachetools such that subsequent requests for a given date are blocked, meaning that not only is our worst-case response time now ~1.5 seconds (blocked threads re-use the newly computed value), we've also completely mitigated almost all bursty load on our database.

@@ -1,4 +1,5 @@
import functools
from threading import RLock
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RLock only to lock access to _key_level_lock; cache access should be guarded with user-provided lock object.

with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return cache[k]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache access not protected by user-supplied lock, maybe not MT-safe and not conforming to documentation

except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
_key_level_locks.pop(k)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple threads may call _key_level_locks with different k concurrently

with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return c[k]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = method(self, *args, **kwargs)
_key_level_locks.pop(k)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

with _key_level_lock:
try:
# In the case that this thread was blocked, retrieve and return the now computed value
return cache[k]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
key_level_locks.pop(k)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

self.assertEqual(cached.get(1), 3)
self.assertEqual(cached.get(1.0), 3)
self.assertEqual(cached.get(2.0), 7)
self.assertEqual(cached.get(1), 2)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS you should _not_need to change existing unit tests.

self.assertEqual(cached.get(1), 3)
self.assertEqual(cached.get(1), 5)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS you should _not_need to change existing unit tests.

@@ -92,11 +94,11 @@ def __exit__(self, *exc):
self.assertEqual(len(cache), 0)
self.assertEqual(wrapper.__wrapped__, self.func)
self.assertEqual(wrapper(0), 0)
self.assertEqual(Lock.count, 2)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

return cache[k]
except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if func() raises an exception, k is never popped from _key_level_locks => possible memory leak?

@tkem
Copy link
Owner

tkem commented Sep 30, 2021

@northyorkshire: You should be able to see the review comments now, sorry for the confusion. And yes, I already started some long-postponed (and in retrospect somewhat ill-fated) refactoring when your PR arrived, so sorry for that inconvenience, too. In fact, only the location of the @cached and @cachedmethod decorators has changed, they now reside in src/cachetools/__init__.py.

@northyorkshire
Copy link
Author

northyorkshire commented Sep 30, 2021

That's okay - it's nothing that we cant deal with :). I'll have a look at addressing the things which you have pointed out now. Nice spot on the memory leak btw!

Did you have any more thought on how you'd like us to feature-toggle this key-level-locking behavior? Or if you'd even want it toggleable to begin with?

return cache[k]
except KeyError:
# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception may cause memory leak

@tkem
Copy link
Owner

tkem commented Sep 30, 2021

@northyorkshire: Thanks for explaining your use case - it's always informative to learn how this is being used.

Regarding integration of key-level locking in the decorators, however, I'm still somewhat undecided - sure, the current situation is less than optimal and can probably be quite surprising (I guess you spent quite some time to figure out what's going on here). But I'm not sure the added complexity is worth it, especially since this issue hasn't been brought up before.

I also don't like the idea of selecting behavior passing an extra argument to @cached, since this would effectively mean having to maintain two different versions of the decorators...

@tkem
Copy link
Owner

tkem commented Sep 30, 2021

I also toyed a little with your example code, thinking about how this could be solved without any changes to cachetools itself. I came up with a different decorator, which can be "stacked" on top of @cached, but may be also used in its own right - it essentially prevents a function from being called with the same arguments concurrently:

from concurrent.futures import ThreadPoolExecutor
from functools import update_wrapper
from threading import RLock
from time import sleep

from cachetools import TTLCache, cached
from cachetools.keys import hashkey

cache = TTLCache(maxsize=100,ttl=600)
calls=0

def key_level_locking(func):

    lock = RLock()
    # locks holds [lock, refcount] items
    locks = {}

    def wrapper(*args, **kwargs):
        k = hashkey(*args, **kwargs)
        try:
            with lock:
                klock = locks.setdefault(k, [RLock(), 0])
                klock[1] += 1  # increase reference count
            with klock[0]:
                v = func(*args, **kwargs)
        finally:
            with lock:
                klock = locks[k]
                klock[1] -= 1  # decrease reference count
                if klock[1] == 0:
                    del locks[k]
        return v

    return update_wrapper(wrapper, func)

@key_level_locking
@cached(cache)
def method(*args):
    global calls
    sleep(1)
    calls+=1
    print("Doing something expensive!")
    return args

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(method, ['arg']*10)

print(calls)

Now this has some disadvantages compared with adding key-level locking to the decorators directly, performance-wise:

  • hashkey is called twice, once in each decorator
  • a key-level lock is created each time a key is accessed, even when it's already in the cache
  • reference counting of key level locks adds some overhead, too

Hoewever, I find reasoning about correctness and thread-safety easier with smaller building blocks, and the performance issues may be negligible in your case.

@tkem
Copy link
Owner

tkem commented Sep 30, 2021

Another, much simpler solution that comes to mind would be to somehow pre-seed the cache with the next date before that data is being requested. If that's predictable in some way, this would even improve your response times for the first request(s).

@northyorkshire
Copy link
Author

I'm glad that you can appreciate our usecase for this! It was indeed a tricky one to track down, we almost never suspect the fault to be with a library which we're using and always try to assume it's with our own code.

Are you intending to keep func.py? I can see that some of the files have been re-added

Our view is that while we are proposing added complexity, we're also adding / correcting functionality as well. I do like your idea of the key_level_locking decorator, although I'm not convinced that the reference counter is strictly necessary:

def key_level_locking(func):

    lock = RLock()
    locks = {}

    def wrapper(*args, **kwargs):
        k = hashkey(*args, **kwargs)
        try:
            with lock:
                klock = locks.setdefault(k, RLock())
            with klock:
                return func(*args, **kwargs)
        finally:
            with lock:
                if k in locks:
                    del locks[k]

    return update_wrapper(wrapper, func)

My reservation with this approach though is that we'd rarely use cachetools without the key_level_locking decorator since it's fundamentally more correct to have duplicate callers wait on cache-miss until the main worker thread has returned since it saves both resources and time.

If we build this logic into the cached decorator itself then all three of your disadvantages disappear, and the worst-case performance for strictly single threaded applications is largely uneffected.

Another, much simpler solution that comes to mind would be to somehow pre-seed the cache with the next date before that data is being requested. If that's predictable in some way, this would even improve your response times for the first request(s).

As for pre-warming the cache, our data becomes stale quite quickly (hence our need for a ttl_cache), and there would be a significant amount of data to pre-load should we grab for all dates, which makes this unfeasable

@tkem
Copy link
Owner

tkem commented Sep 30, 2021

Are you intending to keep func.py? I can see that some of the files have been re-added

Yes, for the forseeable future cachetools.func will stay as it is. As for the re-adding... I've been somewhat naive regarding how this module is being used out there - see #225 for details. Nothing you should worry about, though.

I'm not convinced that the reference counter is strictly necessary

I think it's necessary when using the decorator "stand-alone", i.e. to guarantee that the underlying function is not called concurrently with the same arguments. What you present looks exactly like my initial attempt, and may be sufficient when combined with a caching decorator for your use case, but I haven't fully thought it through.

it's fundamentally more correct to have duplicate callers wait on cache-miss

I agree, and that's why I sympathize with this PR in principle ;-)

I've just become somewhat conservative when it comes to major changes and/or introducing added complexity, mainly for maintenance reasons and backward compatibility (unless I'm convinced it's just a little refactoring that noone will notice... see #225). I'd also have to be convinced that this is 100% thread-safe and does behave as expected under all circumstances, and the fact that the standard library's @lru_cache does without this also makes me wonder.

So it may take a while and a couple of iterations before I feel it's safe to include this, maybe even until the next major version release.

# Otherwise compute it on this thread since the key is not in the cache
v = func(*args, **kwargs)
finally:
_key_level_locks.pop(k)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlocked access to _key_level_locks - since k provides it's own __hash__ function, dict operations are not thread-safe, not even in CPython with its global interpreter lock...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not look like the GIL get's released, however I've protected this call either way since it doesn't look like it's going to give rise to a deadlock (all operations under the global (non key level) lock are context managed and finite)

https://github.com/python/cpython/blob/main/Objects/setobject.c#L630

@northyorkshire
Copy link
Author

northyorkshire commented Oct 5, 2021

I've just become somewhat conservative when it comes to major changes

Fully understandable and we appreciate that this is quite a major, all be it backend change.

I'd also have to be convinced that this is 100% thread-safe

This is very difficult to prove, however we've been hitting our caches quite aggressively for a few weeks now and have not encountered any problems. My reasoning for thread safety is that, to my eyes, no lock can be acquired such that a separate caller may be cause for deadlock since all operations under non-key-level lock are deterministic and independent of the underlying wrapped function.

So it may take a while and a couple of iterations before I feel it's safe to include this, maybe even until the next major version release.

That's promising! I'm happy to work with you to correctly feature toggle this behavior if need be, however I appreciate your prior concerns around maintaining yet another implementation of the caching decorator

@@ -526,20 +527,33 @@ def wrapper(*args, **kwargs):

else:

_key_level_locks = {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to use defaultdict to not allocate extra RLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! That datatype looks ideal here. Out of curiosity, where is the extra RLock being allocated here?

Copy link

@horpto horpto Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not here actually, but a pair lines below on _key_level_lock = _key_level_locks.setdefault(k, RLock()). RLock is always created even it's not needed. As that line is under the lock so it must be atomic.

@gp-slick-coder
Copy link

I always thought passing a lock object would handle this problem. But I was wrong. Thanks for the key_level_locking function. I go for that solution.

import threading
import time

from cachetools import TTLCache, cached

# from memoization import cached

from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, _make_key, update_wrapper, wraps
from collections import defaultdict

from cachetools.keys import hashkey

cache_lock = threading.Lock()


def threadsafe_lru(func):
    # https://noamkremen.github.io/a-simple-threadsafe-caching-decorator.html
    func = lru_cache()(func)
    lock_dict = defaultdict(threading.Lock)

    def _thread_lru(*args, **kwargs):
        key = _make_key(args, kwargs, typed=False)
        with lock_dict[key]:
            return func(*args, **kwargs)

    return _thread_lru


def key_level_locking(key=hashkey):
    def decorator(func):
        # https://github.com/tkem/cachetools/pull/224
        lock = threading.RLock()
        # locks holds [lock, refcount] items
        locks = {}

        def wrapper(*args, **kwargs):
            k = key(*args, **kwargs)
            try:
                with lock:
                    klock = locks.setdefault(k, [threading.RLock(), 0])
                    klock[1] += 1  # increase reference count
                with klock[0]:
                    v = func(*args, **kwargs)
            finally:
                with lock:
                    klock = locks[k]
                    klock[1] -= 1  # decrease reference count
                    if klock[1] == 0:
                        del locks[k]
            return v

        return update_wrapper(wrapper, func)

    return decorator


def get_key(o: int):
    # print("key: ", o)
    # return "xxx"
    return str(o)


cache = TTLCache(maxsize=1024, ttl=25)


# @threadsafe_lru # no ttl, no custom key
@key_level_locking(key=get_key)
@cached(cache, key=get_key)
# @cached(cache, lock=cache_lock, key=get_key)  # not working
def method(arg: int):
    print(f"Doing something expensive with {arg} ...")
    time.sleep(5)
    return arg


import random


def simple():
    r = random.randrange(1, 100)
    r = 10
    method(r)
    print("...done.")


if False:
    with ThreadPoolExecutor(max_workers=5) as executor:
        print("start execution")
        executor.map(simple)

if True:
    for i in range(10):
        t = threading.Thread(target=simple, name=f"worker-{i}")
        print("start execution")
        t.start()
        time.sleep(1)


# Resources
# https://stackoverflow.com/questions/31771286/python-in-memory-cache-with-time-to-live

Output:

start execution
Doing something expensive with 10 ...
start execution
start execution
start execution
start execution
...done.
...done.
...done.
...done.
...done.
start execution
...done.
start execution
...done.
start execution
...done.
start execution
...done.
start execution
...done.

@northyorkshire
Copy link
Author

@GBreeze I'm glad that you found some use from this!

@tkem have you had any more thoughts on merging this PR?

@lmeyerov
Copy link

lmeyerov commented Mar 27, 2022

@tkem @northyorkshire This is of interest to us as well for asyncio servers

  1. Resolving handlers place a key-level write lock without blocking the eventloop, allowing progress for requests on other keys:
# some multi-process server,
# where each process uses asyncio greenthread (single physical thread / event loop) concurrency in each process
@cached(TTLCache(...), key_rw_lock_class=aiorwlock.RWLock)
async def route_handler(...):
   out = await fan_out_async()
   return out

or, worse, at least only block requests at the same route (even if diff keys):

# more global/conservative blocking of all use
@cached(TTLCache(...), lock=threading.Lock())
async def route_handler(...):
   out = await fan_out_async()
   return out
  1. As an interim, as cachedtools does not support async, it sounds like we're stuck fully blocking the server:
async def route_handler(...):
   out = route_handler_sync_helper(...)
   return out

# no value in adding lock=... as the event loop will be blocked anways
@cached(TTLCache(...))
def route_handler_sync_helper(...):
   """
   Fully blocks asyncio event loop
   """
   out = fan_out_sync()
   return out 

@tkem
Copy link
Owner

tkem commented Apr 5, 2022

@northyorkshire Sorry for the delay... Due to the added complexity and runtime overhead for simple uses (e.g. the fib() example from the README) I'm somewhat reluctant to include this, at least with the "standard" decorator. Issues/questions relating to alternative "locks" used with the decorator are also not resolved, AFAICS.
So, for the time being, and with other issues in the backlog, I don't see myself merging this too soon, sorry!
Maybe publishing this on your own as a "cachetools extension" would be an option?

@hellocoldworld
Copy link

Hello @northyorkshire ! Are you planning to publish this as a separate package?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants