Skip to content

Commit

Permalink
Ensure that pause() and unpause() conform to documentation.
Browse files Browse the repository at this point in the history
These will now block the caller until they are handled by the queue
thread.

Fixes #2877
  • Loading branch information
coleifer committed Apr 22, 2024
1 parent 6187416 commit 1cca5dc
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions playhouse/sqliteq.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def fetchone(self):
return None

SHUTDOWN = StopIteration
QUERY = object()
PAUSE = object()
UNPAUSE = object()

Expand Down Expand Up @@ -143,30 +144,34 @@ def run(self):
self.database._state.reset()

def wait_unpause(self):
obj = self.queue.get()
if obj is UNPAUSE:
op, obj = self.queue.get()
if op is UNPAUSE:
logger.info('writer unpaused - reconnecting to database.')
obj.set()
return True
elif obj is SHUTDOWN:
elif op is SHUTDOWN:
raise ShutdownException()
elif obj is PAUSE:
elif op is PAUSE:
logger.error('writer received pause, but is already paused.')
obj.set()
else:
obj.set_result(None, WriterPaused())
logger.warning('writer paused, not handling %s', obj)

def loop(self, conn):
obj = self.queue.get()
if isinstance(obj, AsyncCursor):
op, obj = self.queue.get()
if op is QUERY:
self.execute(obj)
elif obj is PAUSE:
elif op is PAUSE:
logger.info('writer paused - closing database connection.')
self.database._close(conn)
self.database._state.reset()
obj.set()
return
elif obj is UNPAUSE:
elif op is UNPAUSE:
logger.error('writer received unpause, but is already running.')
elif obj is SHUTDOWN:
obj.set()
elif op is SHUTDOWN:
raise ShutdownException()
else:
logger.error('writer received unsupported object: %s', obj)
Expand Down Expand Up @@ -255,7 +260,7 @@ def execute_sql(self, sql, params=None, commit=None, timeout=None):
sql=sql,
params=params,
timeout=self._results_timeout if timeout is None else timeout)
self._write_queue.put(cursor)
self._write_queue.put((QUERY, cursor))
return cursor

def start(self):
Expand All @@ -276,7 +281,7 @@ def stop(self):
with self._qlock:
if self._is_stopped:
return False
self._write_queue.put(SHUTDOWN)
self._write_queue.put((SHUTDOWN, None))
self._writer.join()
self._is_stopped = True
return True
Expand All @@ -286,10 +291,14 @@ def is_stopped(self):
return self._is_stopped

def pause(self):
self._write_queue.put(PAUSE)
evt = self._thread_helper.event()
self._write_queue.put((PAUSE, evt))
evt.wait()

def unpause(self):
self._write_queue.put(UNPAUSE)
evt = self._thread_helper.event()
self._write_queue.put((UNPAUSE, evt))
evt.wait()

def __unsupported__(self, *args, **kwargs):
raise ValueError('This method is not supported by %r.' % type(self))
Expand Down

0 comments on commit 1cca5dc

Please sign in to comment.