12 4月 2011

[Python] 實作一個PriorityQueue 與 有pause, resume 功能的PriorityTaskThread

由於project是用Python2.5,要用到PriorityQueue,但是Python2.6以後才支援PriorityQueue,所以就想說寫個來試試。主要是改寫自Python原有的Queue class,繼承了原有的access blocking的feature,將原本的put改用insert與append,讓有相同priority的task能有先後之分。

PriorityQueue
from __future__ import with_statement

from Queue import Queue
import bisect
from time import time as _time

_DefaultCheckDup = lambda x,y: bool(x==y)

class PQTask(object):
    """ Data structure of task, for PriorityQueue internal use. """
    def __init__(self, pri=0, item=None):
        object.__init__(self)
        self.pri = pri
        self.item = item
    def __cmp__(self, b):
        return cmp(self.pri, getattr(b, 'pri', 0))
    
class PriorityQueue(Queue):
    """
    PriorityQueue, support reorder the task in queue with priority.
    
    insert() : the task will be processed first in the same priority
    append() : the task will be processed later in the same priority
    - checkdup specified
        1. True : check by _DefaultCheckDup()
        2. callable function : check by this function
    If checkdup is specified,
    _put() will try to kill original task in queue then add new task(with last priority)
    """
    def __init__(self, maxsize=0):
        Queue.__init__(self, maxsize)
    
    # remove the default put function, replace it by _append(), _insert()
    def put(self, item, block=True, timeout=None):
        raise NotImplementedError
    def put_nowait(self, item):
        raise NotImplementedError
    
    def clear(self):
        """ clear all task in queue """
        with self.mutex:
            self.queue = list()
    
    def insert(self, priority, item, checkdup=None, block=True, timeout=None):
        """insert into the queue in the same priority."""
        self.__put(False, priority, item, checkdup, block, timeout)
    def insert_nowait(self, priority, item, checkdup=None):
        """insert item into the queue without blocking."""
        self.__put(False, priority, item, checkdup, False)
        
    def append(self, priority, item, checkdup=None, block=True, timeout=None):
        """append item into the queue in the same priority."""
        self.__put(True, priority, item, checkdup, block, timeout)
    def append_nowait(self, priority, item, checkdup=None):
        """append item into the queue without blocking."""
        self.__put(True, priority, item, checkdup, False)
    
    def __put(self, append, priority, item, checkdup=None, block=True, timeout=None):
        """ reference & modify from Queue::put(), 
            for add custom parameter into _put() """
        self.not_full.acquire()
        try:
            if not block:
                if self._full():
                    raise Full
            elif timeout is None:
                while self._full():
                    self.not_full.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self._full():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
            
            #add into queue
            self._put(append, priority, item, checkdup)
            
            self.unfinished_tasks += 1
            self.not_empty.notify()
        finally:
            self.not_full.release()
            
    # Initialize the queue representation
    def _init(self, maxsize):
        self.maxsize = maxsize
        
        #the last item is highest priority
        # queue : [ low priority -> high priority ]
        # get() will try to get last item to process
        self.queue = list()
    
    # put task into queue
    def _put(self, append, p, item, checkdup):
        """ if checkdup is set, check the task is duplicate in queue before add """
        if checkdup:
            if not callable(checkdup):
                checkdup = _DefaultCheckDup
            for idx, qt in enumerate(self.queue):
                if checkdup(qt.item, item):
                    # remove original and add new task in new priority
                    del self.queue[idx]
                    break
        t = PQTask(p, item)
        if append:
            bisect.insort_left(self.queue, t)
        else:
            bisect.insort_right(self.queue, t)
    
    # Get an item from the queue
    def _get(self):
        return self.queue.pop() #get the last(priority largest) task


既然有了PriorityQueue了,就可以用這個class去實作一個PriorityThread了。
並實作了pause, resume與stop的功能,在測試了很多次後,應該是不會有deadlock啦,有的話請告訴我...XD
PriorityThread
from __future__ import with_statement

import threading
import time

SUPER_PRIORITY = 9999

class PriorityThread(threading.Thread):
    """
    PriorityThread, support pause, resume and blocking priority queue
    
    Please overwrite processTask() to do sometime when get task
    follow the feature of PriorityQueue, insertTask() & appendTask()
    should pass the correct priority into queue
    """
    def __init__(self, daemon=None):
        threading.Thread.__init__(self)
        
        self.queue = PriorityQueue()
        
        self.wait     = threading.Condition(threading.Lock()) #wait when pauseEvt is set
        self.pauseEvt = threading.Event()
        self.stopEvt  = threading.Event()
        
        if daemon is not None:
            self.setDaemon(daemon)
        
    def pause(self):
        with self.wait:
            self.pauseEvt.set()
            if self.queue.empty():
                #thread is bolcing in queue.get(), put a None task
                self.insertTask(SUPER_PRIORITY, None)
                
    def resume(self):
        with self.wait:
            self.pauseEvt.clear()
            self.wait.notify()
            
    def stop(self, timeout=None):
        self.stopEvt.set()
        with self.wait:
            self.pauseEvt.clear()
            self.wait.notify()
            if self.queue.empty():
                #thread is bolcing in queue.get(), put a None task
                self.insertTask(SUPER_PRIORITY, None)
        self.join(timeout)
        
    def insertTask(self, priority, item, *argv, **argd):
        self.queue.insert(priority, item, *argv, **argd)
        
    def appendTask(self, priority, item, *argv, **argd):
        self.queue.append(priority, item, *argv, **argd)
    
    def clearTask(self):
        self.queue.clear()
    
    def lenTask(self):
        return self.queue.qsize()
    
    def emptyTask(self):
        return self.queue.empty()
    
    def run(self):
        print '[PriorityThread] %s run()' % (type(self).__name__)
        self.onThreadBegin()
        
        while not self.stopEvt.isSet():
            with self.wait:
                if self.pauseEvt.isSet():
                    self.wait.wait()
                    continue
                    
            t = self.queue.get()
            if t.item:
                self.processTask(t.pri, t.item)
            
            time.sleep(0.01)
            
        self.onThreadEnd()
        print '[PriorityThread] %s end of run()' % (type(self).__name__)
    
    ##############################################
    # Overwriteable function
    ##############################################
    def onThreadBegin(self):
        """ overwrite this function for process something before thread.run() """
        pass
        
    def onThreadEnd(self):
        """ overwrite this function for process something after thread.run() """
        pass
    
    def processTask(self, priority, item):
        """ overwrite this function for process task """
        pass

以下是我的TestCase
if __name__ == '__main__':
    ####################################
    #        Test PriorityQueue        #
    ####################################
    def TestQueue():
        print '@TestQueue'
        q = PriorityQueue()
        q.insert(0, 'a')
        q.insert(1, 'b')
        q.insert(2, 'c')
        q.insert(3, 'd')
        q.append(3, 'e')
        q.insert(3, 'f')
        q.insert(1, 'b')
        q.insert(0, 'a')
        while not q.empty():
            print q.get().item
    
    def TestQueue_CheckDup():
        print '@TestQueue_CheckDup'
        q = PriorityQueue()
        q.insert(0, 'a', checkdup=True)
        q.insert(1, 'a', checkdup=True)
        q.insert(2, 'a', checkdup=True)
        q.insert(0, 'b', checkdup=True)
        q.insert(1, 'b', checkdup=True)
        #>>> the queue result : [ (1, b), (2, a) ]
        while not q.empty():
            t = q.get()
            print t.pri, t.item
            
    def TestQueue_CheckDupCustomTask():
        ''' Custom _Item, and custom checkdup function '''
        print '@TestQueue_CheckDupCustomTask'
        class _Item:
            def __init__(self, v):
                self.val = v
        def _checkdup(x,y):
            return bool(x.val == y.val)
        q = PriorityQueue()
        q.insert(0, _Item('a'), checkdup=_checkdup)
        q.insert(1, _Item('a'), checkdup=_checkdup)
        q.insert(2, _Item('a'), checkdup=_checkdup)
        q.insert(0, _Item('b'), checkdup=_checkdup)
        q.insert(1, _Item('b'), checkdup=_checkdup)
        #>>> the queue result : [ (1, b), (2, a) ]
        while not q.empty():
            t = q.get()
            print t.pri, t.item.val
    
    TestQueue()
    TestQueue_CheckDup()
    TestQueue_CheckDupCustomTask()
    
    
    ###################################
    #       Test PriorityThread       #
    ###################################
    class MyPriorityThread(PriorityThread):
        def processTask(self, pri, item):
            print item
    
    def TestThread_PauseFirst(t):
        t.pause()
        for v in xrange(100):
            t.appendTask(0, v)
        time.sleep(3)
        t.resume()
        time.sleep(10)
        
    def TestThread_Normal(t):
        for v in xrange(100):
            t.appendTask(0, v)
        time.sleep(0.1)
        t.pause()
        for v in xrange(100, 200):
            t.appendTask(0, v)
        time.sleep(0.1)
        t.resume()
        
        for v in xrange(200, 30000):
            t.appendTask(0, v)
        time.sleep(0.1)
        
    def TestThread_PauseResume(t):
        for v in xrange(10000):
            t.appendTask(0,v)
        time.sleep(0)
        
        while not t.emptyTask():
            import thread
            thread.start_new_thread(lambda t:t.pause(), (t,))
            thread.start_new_thread(lambda t:t.resume(), (t,))
            time.sleep(1)
            
    def TestThread_Control(t):
        print '\n\n=== Command control thread ===\ni:initial and add large task \np:pause, \nr:resume \ns:stop \na:add tasks '
        t.pause()
        
        count = 0
        while not t.emptyTask():
            cmd = raw_input()
            cmd = cmd.strip()
            if cmd == 'i':
                t.resume()
                [ t.appendTask(0,c) for c in range(count, count+5000) ]
                count += 5000
            if cmd == 'p':
                t.pause()
            elif cmd == 'r':
                t.resume()
            elif cmd == 'a':
                [ t.appendTask(0,c) for c in range(count, count+100) ]
                count += 100
                print 'current task count = %d, largets task = %d' % (t.lenTask(), count)
            elif cmd == 's':
                t.stop()
                break
        
    t = MyPriorityThread()
    t.start()
    
    #TestThread_Normal(t)
    #TestThread_PauseFirst(t)
    #TestThread_PauseResume(t)
    TestThread_Control(t)
    
    t.stop()
    
    print 'Please press enter to continue...'
    raw_input()

沒有留言: