New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request for Asyncio Support #78
Comments
This ought to be implemented with This ought to improve performance as well, since it looks like acquire currently uses a busy-wait: https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py#L284 |
That would imply an event loop running, which is often not the case. So we cannot add asyncio support straight up, however if someone would make this library sansio type we'd be happy to review that PR 👍 |
I've gone over the code and as far as I can tell there is no way to make things nicely Fundamentally, the locking via If you are fine with having a busy loop, I would suggest (and can provide) an This could either be a separate class (say |
My 2c is that we should do this totally in parallel with the existing sync classes 😊 so yeah adding |
The busy polling (asyncio.wait_for) to acquire the lock is inevitable. But, to lessen the burden, can we watch for the lock file before entering the busy lock ? Using true aio, it would be possible to use inotify with epoll, and the Windows equivalent. I do not know any library to do this, it is just a idea. |
I have implemented a semi-efficient asynchronous filelock using anyio: https://github.com/dolamroth/starlette-web/blob/main/starlette_web/common/files/filelock.py It passes some basic tests: https://github.com/dolamroth/starlette-web/blob/main/starlette_web/tests/core/helpers/base_cache_tester.py |
We had implemented async support using @asynccontextmanager
async def async_lock(lock: BaseFileLock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lock.acquire)
yield
await loop.run_in_executor(None, lock.release) However the recent change to make locks thread local broke this and resulted in our code deadlocking. We have moved to using a static LOCK_POOL = ThreadPoolExecutor(max_workers=1)
@asynccontextmanager
async def async_lock(lock: BaseFileLock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(LOCK_POOL, lock.acquire)
yield
await loop.run_in_executor(LOCK_POOL, lock.release) |
I ended up with the following implementation import asyncio
import contextlib
from collections.abc import AsyncGenerator
from filelock import BaseFileLock
@asynccontextmanager
async def _acquire_lock(fl: BaseFileLock) -> AsyncGenerator[None, None]:
"""
Acquire filelock, async implementation
"""
for _ in range(int(10 / 0.05)):
with contextlib.suppress(TimeoutError):
try:
fl.acquire(blocking=False)
# We want to have the exclusive lock
if fl.lock_counter <= 1:
yield
break
finally:
fl.release()
await asyncio.sleep(0.05)
else:
raise TimeoutError("Could not obtain lock after 10 seconds") For me the main point is that the For me it was also important, that only one lock is active: def cite1():
with lock:
with open(file_path, "a") as f:
f.write("I hate it when he does that.")
def cite2():
with lock:
with open(file_path, "a") as f:
f.write("You don't want to sell me death sticks.")
# The lock is acquired here.
with lock:
cite1()
cite2()
# And released However transferring this example to a sync world, show that is not valid for # cite1 and cite2 are implemented async and with multiple writes
async def run():
with TaskGroup() as tg:
tg.create_task(cite1())
tg.create_task(cite2()) # potentially boom as the same file is opened/written to at the same time. Of course wrapping also the TaskGroup in an I also wrote a little test that shows my implementation works as expected for me: Show Test implementationimport asyncio
import time
from pathlib import Path
import pytest
@pytest.mark.asyncio
async def test_async_file_lock(tmp_path: Path) -> None:
"""
Acquiring the lock is non-blocking
"""
test_file = tmp_path / "test_file.txt"
start = time.perf_counter_ns()
regular_calls: list[float] = []
def get_elapsed() -> int:
return time.perf_counter_ns() - start
def debug(msg: str) -> None:
print(f"{get_elapsed()/10**9:0.5f}: {msg}")
async def reader(delay: float) -> tuple[bytes, float]:
"""
Read file a *delay* seconds
"""
await asyncio.sleep(delay)
debug(f"Obtaining lock for reading after {delay}s delay")
async with pkg_project_mapping._file_lock(test_file):
debug(f"Got lock for reading after {delay}s delay")
result = test_file.read_bytes(), get_elapsed() / 1e9
debug("Released lock for reading after {delay}s delay")
return result
async def writer() -> None:
"""
Write to file 0.1 lines per second and append to it after one second of waiting
"""
for repeat in range(2):
debug(f"Obtain write lock {repeat}")
async with pkg_project_mapping._file_lock(test_file):
debug(f"Got write lock {repeat}")
with test_file.open("a") as f:
for i in range(10):
f.write(f"{i};")
debug(f"Wrote {i} ({repeat})")
await asyncio.sleep(0.1)
f.write("\n")
debug(f"Release lock for write {repeat}")
await asyncio.sleep(1)
async def regular_caller() -> None:
"""
Simulate something that running regular
this ensures that we can check that writing/reading/locking is non blocking for
other async task
"""
for i in range(int(5 / 0.05)): # 20 sec
await asyncio.sleep(0.05)
elapsed = get_elapsed() / 1e9
if regular_calls:
debug(f"Called regular {i:03} after {elapsed-regular_calls[-1]:0.4f}s")
else:
debug(f"Called regular {i:03}")
regular_calls.append(elapsed)
async with asyncio.TaskGroup() as tg:
debug("Print Start Tasks")
tg.create_task(regular_caller())
tg.create_task(writer())
reader1 = tg.create_task(reader(0.02))
reader2 = tg.create_task(reader(2.5))
reader3 = tg.create_task(reader(2.6))
reader4 = tg.create_task(reader(2.7))
# check that regular task was running non-blocking
for n in range(len(regular_calls) - 1):
diff = regular_calls[n + 1] - regular_calls[n]
assert diff == pytest.approx(diff, abs=0.001)
expected_str = b"0;1;2;3;4;5;6;7;8;9;\n"
assert reader1.result()[0] == expected_str
# expected runtime 1.0s for the delay and 0.05 for the actual writing
assert reader1.result()[1] == pytest.approx(1.05, abs=0.1)
assert reader2.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader2.result()[1] == pytest.approx(3.1, abs=0.2)
assert reader3.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader3.result()[1] == pytest.approx(3.105, abs=0.2)
assert reader4.result()[0] == expected_str * 2
# expected runtime 3s for the delay and 0.1 for the actual writing
assert reader4.result()[1] == pytest.approx(3.11, abs=0.2) |
It would block other coroutines when acquiring locks now.
Hope it can use unblocking ways to wait for the lock. Such as
asyncio.sleep
By the way, in the meantime, I'm using this way to keep other coroutines running.
Hope this code can help with someone facing the same problem.
The text was updated successfully, but these errors were encountered: