Skip to content

Commit

Permalink
Switch from incorrect use of RLock to regular Lock.
Browse files Browse the repository at this point in the history
Also avoid deadlocks in `SqliteQueueDatabase`. Refs #2874.
  • Loading branch information
coleifer committed Apr 19, 2024
1 parent fc899f8 commit e6483e2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
31 changes: 14 additions & 17 deletions peewee.py
Original file line number Diff line number Diff line change
Expand Up @@ -3184,7 +3184,7 @@ def __init__(self, database, thread_safe=True, autorollback=False,
self.thread_safe = thread_safe
if thread_safe:
self._state = _ConnectionLocal()
self._lock = threading.RLock()
self._lock = threading.Lock()
else:
self._state = _ConnectionState()
self._lock = _NoopLock()
Expand Down Expand Up @@ -3401,26 +3401,23 @@ def default_values_insert(self, ctx):
return ctx.literal('DEFAULT VALUES')

def session_start(self):
with self._lock:
return self.transaction().__enter__()
return self.transaction().__enter__()

def session_commit(self):
with self._lock:
try:
txn = self.pop_transaction()
except IndexError:
return False
txn.commit(begin=self.in_transaction())
return True
try:
txn = self.pop_transaction()
except IndexError:
return False
txn.commit(begin=self.in_transaction())
return True

def session_rollback(self):
with self._lock:
try:
txn = self.pop_transaction()
except IndexError:
return False
txn.rollback(begin=self.in_transaction())
return True
try:
txn = self.pop_transaction()
except IndexError:
return False
txn.rollback(begin=self.in_transaction())
return True

def in_transaction(self):
return bool(self._state.transactions)
Expand Down
14 changes: 9 additions & 5 deletions playhouse/sqliteq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import weakref
from threading import local as thread_local
from threading import Event
from threading import Lock
from threading import Thread
try:
from Queue import Queue
Expand Down Expand Up @@ -194,6 +195,9 @@ def __init__(self, database, use_gevent=False, autostart=True,
queue_max_size=None, results_timeout=None, *args, **kwargs):
kwargs['check_same_thread'] = False

# Lock around starting and stopping write thread operations.
self._qlock = Lock()

# Ensure that journal_mode is WAL. This value is passed to the parent
# class constructor below.
pragmas = self._validate_journal_mode(kwargs.pop('pragmas', None))
Expand Down Expand Up @@ -255,7 +259,7 @@ def execute_sql(self, sql, params=None, commit=None, timeout=None):
return cursor

def start(self):
with self._lock:
with self._qlock:
if not self._is_stopped:
return False
def run():
Expand All @@ -269,7 +273,7 @@ def run():

def stop(self):
logger.debug('environment stop requested.')
with self._lock:
with self._qlock:
if self._is_stopped:
return False
self._write_queue.put(SHUTDOWN)
Expand All @@ -278,15 +282,15 @@ def stop(self):
return True

def is_stopped(self):
with self._lock:
with self._qlock:
return self._is_stopped

def pause(self):
with self._lock:
with self._qlock:

This comment has been minimized.

Copy link
@travnick

travnick Apr 19, 2024

queue is already thread safe, no lock is required.

self._write_queue.put(PAUSE)

def unpause(self):
with self._lock:
with self._qlock:

This comment has been minimized.

Copy link
@travnick

travnick Apr 19, 2024

queue is already thread safe, no lock is required.

self._write_queue.put(UNPAUSE)

def __unsupported__(self, *args, **kwargs):
Expand Down

0 comments on commit e6483e2

Please sign in to comment.