diff --git a/util/queue.py b/util/queue.py index 9765658..9e4f39d 100644 --- a/util/queue.py +++ b/util/queue.py @@ -18,8 +18,10 @@ # along with this program. If not, see . +import sys import multiprocessing import multiprocessing.queues +from collections import namedtuple class SharedCounter(object): @@ -51,6 +53,9 @@ def value(self): return self.count.value +QueueState = namedtuple('QueueState', ['queue', 'size']) + + class Queue(multiprocessing.queues.Queue): """ A portable implementation of multiprocessing.Queue. @@ -66,9 +71,20 @@ class Queue(multiprocessing.queues.Queue): """ def __init__(self, *args, **kwargs): + if sys.version_info >= (3, 4) and 'ctx' not in kwargs: + kwargs['ctx'] = multiprocessing.get_context() super(Queue, self).__init__(*args, **kwargs) self._size = SharedCounter(0) + # __getstate__ and __setstate__ are needed for pickling, otherwise _size won't be copied. + def __getstate__(self): + return QueueState(queue=super(Queue, self).__getstate__(), + size=self._size) + + def __setstate__(self, state): + self._size = state.size + super(Queue, self).__setstate__(state.queue) + def put(self, *args, **kwargs): super(Queue, self).put(*args, **kwargs) self._size.increment(1)