PriorityQueue
from __future__ import with_statement
from Queue import Queue
import bisect
from time import time as _time
_DefaultCheckDup = lambda x,y: bool(x==y)
class PQTask(object):
""" Data structure of task, for PriorityQueue internal use. """
def __init__(self, pri=0, item=None):
object.__init__(self)
self.pri = pri
self.item = item
def __cmp__(self, b):
return cmp(self.pri, getattr(b, 'pri', 0))
class PriorityQueue(Queue):
"""
PriorityQueue, support reorder the task in queue with priority.
insert() : the task will be processed first in the same priority
append() : the task will be processed later in the same priority
- checkdup specified
1. True : check by _DefaultCheckDup()
2. callable function : check by this function
If checkdup is specified,
_put() will try to kill original task in queue then add new task(with last priority)
"""
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
# remove the default put function, replace it by _append(), _insert()
def put(self, item, block=True, timeout=None):
raise NotImplementedError
def put_nowait(self, item):
raise NotImplementedError
def clear(self):
""" clear all task in queue """
with self.mutex:
self.queue = list()
def insert(self, priority, item, checkdup=None, block=True, timeout=None):
"""insert into the queue in the same priority."""
self.__put(False, priority, item, checkdup, block, timeout)
def insert_nowait(self, priority, item, checkdup=None):
"""insert item into the queue without blocking."""
self.__put(False, priority, item, checkdup, False)
def append(self, priority, item, checkdup=None, block=True, timeout=None):
"""append item into the queue in the same priority."""
self.__put(True, priority, item, checkdup, block, timeout)
def append_nowait(self, priority, item, checkdup=None):
"""append item into the queue without blocking."""
self.__put(True, priority, item, checkdup, False)
def __put(self, append, priority, item, checkdup=None, block=True, timeout=None):
""" reference & modify from Queue::put(),
for add custom parameter into _put() """
self.not_full.acquire()
try:
if not block:
if self._full():
raise Full
elif timeout is None:
while self._full():
self.not_full.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
while self._full():
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
#add into queue
self._put(append, priority, item, checkdup)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
# Initialize the queue representation
def _init(self, maxsize):
self.maxsize = maxsize
#the last item is highest priority
# queue : [ low priority -> high priority ]
# get() will try to get last item to process
self.queue = list()
# put task into queue
def _put(self, append, p, item, checkdup):
""" if checkdup is set, check the task is duplicate in queue before add """
if checkdup:
if not callable(checkdup):
checkdup = _DefaultCheckDup
for idx, qt in enumerate(self.queue):
if checkdup(qt.item, item):
# remove original and add new task in new priority
del self.queue[idx]
break
t = PQTask(p, item)
if append:
bisect.insort_left(self.queue, t)
else:
bisect.insort_right(self.queue, t)
# Get an item from the queue
def _get(self):
return self.queue.pop() #get the last(priority largest) task
既然有了PriorityQueue了,就可以用這個class去實作一個PriorityThread了。
並實作了pause, resume與stop的功能,在測試了很多次後,應該是不會有deadlock啦,有的話請告訴我...XD
PriorityThread
from __future__ import with_statement
import threading
import time
SUPER_PRIORITY = 9999
class PriorityThread(threading.Thread):
"""
PriorityThread, support pause, resume and blocking priority queue
Please overwrite processTask() to do sometime when get task
follow the feature of PriorityQueue, insertTask() & appendTask()
should pass the correct priority into queue
"""
def __init__(self, daemon=None):
threading.Thread.__init__(self)
self.queue = PriorityQueue()
self.wait = threading.Condition(threading.Lock()) #wait when pauseEvt is set
self.pauseEvt = threading.Event()
self.stopEvt = threading.Event()
if daemon is not None:
self.setDaemon(daemon)
def pause(self):
with self.wait:
self.pauseEvt.set()
if self.queue.empty():
#thread is bolcing in queue.get(), put a None task
self.insertTask(SUPER_PRIORITY, None)
def resume(self):
with self.wait:
self.pauseEvt.clear()
self.wait.notify()
def stop(self, timeout=None):
self.stopEvt.set()
with self.wait:
self.pauseEvt.clear()
self.wait.notify()
if self.queue.empty():
#thread is bolcing in queue.get(), put a None task
self.insertTask(SUPER_PRIORITY, None)
self.join(timeout)
def insertTask(self, priority, item, *argv, **argd):
self.queue.insert(priority, item, *argv, **argd)
def appendTask(self, priority, item, *argv, **argd):
self.queue.append(priority, item, *argv, **argd)
def clearTask(self):
self.queue.clear()
def lenTask(self):
return self.queue.qsize()
def emptyTask(self):
return self.queue.empty()
def run(self):
print '[PriorityThread] %s run()' % (type(self).__name__)
self.onThreadBegin()
while not self.stopEvt.isSet():
with self.wait:
if self.pauseEvt.isSet():
self.wait.wait()
continue
t = self.queue.get()
if t.item:
self.processTask(t.pri, t.item)
time.sleep(0.01)
self.onThreadEnd()
print '[PriorityThread] %s end of run()' % (type(self).__name__)
##############################################
# Overwriteable function
##############################################
def onThreadBegin(self):
""" overwrite this function for process something before thread.run() """
pass
def onThreadEnd(self):
""" overwrite this function for process something after thread.run() """
pass
def processTask(self, priority, item):
""" overwrite this function for process task """
pass以下是我的TestCase
if __name__ == '__main__':
####################################
# Test PriorityQueue #
####################################
def TestQueue():
print '@TestQueue'
q = PriorityQueue()
q.insert(0, 'a')
q.insert(1, 'b')
q.insert(2, 'c')
q.insert(3, 'd')
q.append(3, 'e')
q.insert(3, 'f')
q.insert(1, 'b')
q.insert(0, 'a')
while not q.empty():
print q.get().item
def TestQueue_CheckDup():
print '@TestQueue_CheckDup'
q = PriorityQueue()
q.insert(0, 'a', checkdup=True)
q.insert(1, 'a', checkdup=True)
q.insert(2, 'a', checkdup=True)
q.insert(0, 'b', checkdup=True)
q.insert(1, 'b', checkdup=True)
#>>> the queue result : [ (1, b), (2, a) ]
while not q.empty():
t = q.get()
print t.pri, t.item
def TestQueue_CheckDupCustomTask():
''' Custom _Item, and custom checkdup function '''
print '@TestQueue_CheckDupCustomTask'
class _Item:
def __init__(self, v):
self.val = v
def _checkdup(x,y):
return bool(x.val == y.val)
q = PriorityQueue()
q.insert(0, _Item('a'), checkdup=_checkdup)
q.insert(1, _Item('a'), checkdup=_checkdup)
q.insert(2, _Item('a'), checkdup=_checkdup)
q.insert(0, _Item('b'), checkdup=_checkdup)
q.insert(1, _Item('b'), checkdup=_checkdup)
#>>> the queue result : [ (1, b), (2, a) ]
while not q.empty():
t = q.get()
print t.pri, t.item.val
TestQueue()
TestQueue_CheckDup()
TestQueue_CheckDupCustomTask()
###################################
# Test PriorityThread #
###################################
class MyPriorityThread(PriorityThread):
def processTask(self, pri, item):
print item
def TestThread_PauseFirst(t):
t.pause()
for v in xrange(100):
t.appendTask(0, v)
time.sleep(3)
t.resume()
time.sleep(10)
def TestThread_Normal(t):
for v in xrange(100):
t.appendTask(0, v)
time.sleep(0.1)
t.pause()
for v in xrange(100, 200):
t.appendTask(0, v)
time.sleep(0.1)
t.resume()
for v in xrange(200, 30000):
t.appendTask(0, v)
time.sleep(0.1)
def TestThread_PauseResume(t):
for v in xrange(10000):
t.appendTask(0,v)
time.sleep(0)
while not t.emptyTask():
import thread
thread.start_new_thread(lambda t:t.pause(), (t,))
thread.start_new_thread(lambda t:t.resume(), (t,))
time.sleep(1)
def TestThread_Control(t):
print '\n\n=== Command control thread ===\ni:initial and add large task \np:pause, \nr:resume \ns:stop \na:add tasks '
t.pause()
count = 0
while not t.emptyTask():
cmd = raw_input()
cmd = cmd.strip()
if cmd == 'i':
t.resume()
[ t.appendTask(0,c) for c in range(count, count+5000) ]
count += 5000
if cmd == 'p':
t.pause()
elif cmd == 'r':
t.resume()
elif cmd == 'a':
[ t.appendTask(0,c) for c in range(count, count+100) ]
count += 100
print 'current task count = %d, largets task = %d' % (t.lenTask(), count)
elif cmd == 's':
t.stop()
break
t = MyPriorityThread()
t.start()
#TestThread_Normal(t)
#TestThread_PauseFirst(t)
#TestThread_PauseResume(t)
TestThread_Control(t)
t.stop()
print 'Please press enter to continue...'
raw_input()
沒有留言:
張貼留言