PriorityQueue
from __future__ import with_statement from Queue import Queue import bisect from time import time as _time _DefaultCheckDup = lambda x,y: bool(x==y) class PQTask(object): """ Data structure of task, for PriorityQueue internal use. """ def __init__(self, pri=0, item=None): object.__init__(self) self.pri = pri self.item = item def __cmp__(self, b): return cmp(self.pri, getattr(b, 'pri', 0)) class PriorityQueue(Queue): """ PriorityQueue, support reorder the task in queue with priority. insert() : the task will be processed first in the same priority append() : the task will be processed later in the same priority - checkdup specified 1. True : check by _DefaultCheckDup() 2. callable function : check by this function If checkdup is specified, _put() will try to kill original task in queue then add new task(with last priority) """ def __init__(self, maxsize=0): Queue.__init__(self, maxsize) # remove the default put function, replace it by _append(), _insert() def put(self, item, block=True, timeout=None): raise NotImplementedError def put_nowait(self, item): raise NotImplementedError def clear(self): """ clear all task in queue """ with self.mutex: self.queue = list() def insert(self, priority, item, checkdup=None, block=True, timeout=None): """insert into the queue in the same priority.""" self.__put(False, priority, item, checkdup, block, timeout) def insert_nowait(self, priority, item, checkdup=None): """insert item into the queue without blocking.""" self.__put(False, priority, item, checkdup, False) def append(self, priority, item, checkdup=None, block=True, timeout=None): """append item into the queue in the same priority.""" self.__put(True, priority, item, checkdup, block, timeout) def append_nowait(self, priority, item, checkdup=None): """append item into the queue without blocking.""" self.__put(True, priority, item, checkdup, False) def __put(self, append, priority, item, checkdup=None, block=True, timeout=None): """ reference & modify from Queue::put(), for add custom parameter into _put() """ self.not_full.acquire() try: if not block: if self._full(): raise Full elif timeout is None: while self._full(): self.not_full.wait() else: if timeout < 0: raise ValueError("'timeout' must be a positive number") endtime = _time() + timeout while self._full(): remaining = endtime - _time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) #add into queue self._put(append, priority, item, checkdup) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() # Initialize the queue representation def _init(self, maxsize): self.maxsize = maxsize #the last item is highest priority # queue : [ low priority -> high priority ] # get() will try to get last item to process self.queue = list() # put task into queue def _put(self, append, p, item, checkdup): """ if checkdup is set, check the task is duplicate in queue before add """ if checkdup: if not callable(checkdup): checkdup = _DefaultCheckDup for idx, qt in enumerate(self.queue): if checkdup(qt.item, item): # remove original and add new task in new priority del self.queue[idx] break t = PQTask(p, item) if append: bisect.insort_left(self.queue, t) else: bisect.insort_right(self.queue, t) # Get an item from the queue def _get(self): return self.queue.pop() #get the last(priority largest) task
既然有了PriorityQueue了,就可以用這個class去實作一個PriorityThread了。
並實作了pause, resume與stop的功能,在測試了很多次後,應該是不會有deadlock啦,有的話請告訴我...XD
PriorityThread
from __future__ import with_statement import threading import time SUPER_PRIORITY = 9999 class PriorityThread(threading.Thread): """ PriorityThread, support pause, resume and blocking priority queue Please overwrite processTask() to do sometime when get task follow the feature of PriorityQueue, insertTask() & appendTask() should pass the correct priority into queue """ def __init__(self, daemon=None): threading.Thread.__init__(self) self.queue = PriorityQueue() self.wait = threading.Condition(threading.Lock()) #wait when pauseEvt is set self.pauseEvt = threading.Event() self.stopEvt = threading.Event() if daemon is not None: self.setDaemon(daemon) def pause(self): with self.wait: self.pauseEvt.set() if self.queue.empty(): #thread is bolcing in queue.get(), put a None task self.insertTask(SUPER_PRIORITY, None) def resume(self): with self.wait: self.pauseEvt.clear() self.wait.notify() def stop(self, timeout=None): self.stopEvt.set() with self.wait: self.pauseEvt.clear() self.wait.notify() if self.queue.empty(): #thread is bolcing in queue.get(), put a None task self.insertTask(SUPER_PRIORITY, None) self.join(timeout) def insertTask(self, priority, item, *argv, **argd): self.queue.insert(priority, item, *argv, **argd) def appendTask(self, priority, item, *argv, **argd): self.queue.append(priority, item, *argv, **argd) def clearTask(self): self.queue.clear() def lenTask(self): return self.queue.qsize() def emptyTask(self): return self.queue.empty() def run(self): print '[PriorityThread] %s run()' % (type(self).__name__) self.onThreadBegin() while not self.stopEvt.isSet(): with self.wait: if self.pauseEvt.isSet(): self.wait.wait() continue t = self.queue.get() if t.item: self.processTask(t.pri, t.item) time.sleep(0.01) self.onThreadEnd() print '[PriorityThread] %s end of run()' % (type(self).__name__) ############################################## # Overwriteable function ############################################## def onThreadBegin(self): """ overwrite this function for process something before thread.run() """ pass def onThreadEnd(self): """ overwrite this function for process something after thread.run() """ pass def processTask(self, priority, item): """ overwrite this function for process task """ pass
以下是我的TestCase
if __name__ == '__main__': #################################### # Test PriorityQueue # #################################### def TestQueue(): print '@TestQueue' q = PriorityQueue() q.insert(0, 'a') q.insert(1, 'b') q.insert(2, 'c') q.insert(3, 'd') q.append(3, 'e') q.insert(3, 'f') q.insert(1, 'b') q.insert(0, 'a') while not q.empty(): print q.get().item def TestQueue_CheckDup(): print '@TestQueue_CheckDup' q = PriorityQueue() q.insert(0, 'a', checkdup=True) q.insert(1, 'a', checkdup=True) q.insert(2, 'a', checkdup=True) q.insert(0, 'b', checkdup=True) q.insert(1, 'b', checkdup=True) #>>> the queue result : [ (1, b), (2, a) ] while not q.empty(): t = q.get() print t.pri, t.item def TestQueue_CheckDupCustomTask(): ''' Custom _Item, and custom checkdup function ''' print '@TestQueue_CheckDupCustomTask' class _Item: def __init__(self, v): self.val = v def _checkdup(x,y): return bool(x.val == y.val) q = PriorityQueue() q.insert(0, _Item('a'), checkdup=_checkdup) q.insert(1, _Item('a'), checkdup=_checkdup) q.insert(2, _Item('a'), checkdup=_checkdup) q.insert(0, _Item('b'), checkdup=_checkdup) q.insert(1, _Item('b'), checkdup=_checkdup) #>>> the queue result : [ (1, b), (2, a) ] while not q.empty(): t = q.get() print t.pri, t.item.val TestQueue() TestQueue_CheckDup() TestQueue_CheckDupCustomTask() ################################### # Test PriorityThread # ################################### class MyPriorityThread(PriorityThread): def processTask(self, pri, item): print item def TestThread_PauseFirst(t): t.pause() for v in xrange(100): t.appendTask(0, v) time.sleep(3) t.resume() time.sleep(10) def TestThread_Normal(t): for v in xrange(100): t.appendTask(0, v) time.sleep(0.1) t.pause() for v in xrange(100, 200): t.appendTask(0, v) time.sleep(0.1) t.resume() for v in xrange(200, 30000): t.appendTask(0, v) time.sleep(0.1) def TestThread_PauseResume(t): for v in xrange(10000): t.appendTask(0,v) time.sleep(0) while not t.emptyTask(): import thread thread.start_new_thread(lambda t:t.pause(), (t,)) thread.start_new_thread(lambda t:t.resume(), (t,)) time.sleep(1) def TestThread_Control(t): print '\n\n=== Command control thread ===\ni:initial and add large task \np:pause, \nr:resume \ns:stop \na:add tasks ' t.pause() count = 0 while not t.emptyTask(): cmd = raw_input() cmd = cmd.strip() if cmd == 'i': t.resume() [ t.appendTask(0,c) for c in range(count, count+5000) ] count += 5000 if cmd == 'p': t.pause() elif cmd == 'r': t.resume() elif cmd == 'a': [ t.appendTask(0,c) for c in range(count, count+100) ] count += 100 print 'current task count = %d, largets task = %d' % (t.lenTask(), count) elif cmd == 's': t.stop() break t = MyPriorityThread() t.start() #TestThread_Normal(t) #TestThread_PauseFirst(t) #TestThread_PauseResume(t) TestThread_Control(t) t.stop() print 'Please press enter to continue...' raw_input()
沒有留言:
張貼留言