Changeset 90:8736c6a3a3e9 in livinglogic.python.tipimaid

Show
Ignore:
Timestamp:
02/10/09 15:25:50 (10 years ago)
Author:
Nik Tautenhahn <nik@…>
Branch:
default
Message:

include an implementation of Queue!

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • liaalh.py

    r89 r90  
    22# -*- coding: utf-8 -*- 
    33 
    4 import sys, os, datetime, errno, bisect, re, gzip, signal, time, threading, Queue 
     4import sys, os, datetime, errno, bisect, re, gzip, signal, time 
     5try: 
     6    import threading 
     7except ImportError: 
     8    import dummy_threading as threading 
     9from collections import deque 
     10from Queue import * 
    511 
    612subprocessworks = False 
     13 
    714try: 
    815    import subprocess # only python >= 2.6 
     
    1118    pass 
    1219 
     20if not hasattr(Queue, "task_done"): # We need a newer implementation of Queue, this one is taken from 2.6 and slightly modified to use old Python-API syntax (notifyAll (old) instead of notify_all (new)) 
     21    class Empty(Exception): 
     22        "Exception raised by Queue.get(block=0)/get_nowait()." 
     23        pass 
     24 
     25    class Full(Exception): 
     26        "Exception raised by Queue.put(block=0)/put_nowait()." 
     27        pass 
     28 
     29    class Queue: 
     30        """Create a queue object with a given maximum size. 
     31 
     32        If maxsize is <= 0, the queue size is infinite. 
     33        """ 
     34        def __init__(self, maxsize=0): 
     35            self.maxsize = maxsize 
     36            self._init(maxsize) 
     37            # mutex must be held whenever the queue is mutating.  All methods 
     38            # that acquire mutex must release it before returning.  mutex 
     39            # is shared between the three conditions, so acquiring and 
     40            # releasing the conditions also acquires and releases mutex. 
     41            self.mutex = threading.Lock() 
     42            # Notify not_empty whenever an item is added to the queue; a 
     43            # thread waiting to get is notified then. 
     44            self.not_empty = threading.Condition(self.mutex) 
     45            # Notify not_full whenever an item is removed from the queue; 
     46            # a thread waiting to put is notified then. 
     47            self.not_full = threading.Condition(self.mutex) 
     48            # Notify all_tasks_done whenever the number of unfinished tasks 
     49            # drops to zero; thread waiting to join() is notified to resume 
     50            self.all_tasks_done = threading.Condition(self.mutex) 
     51            self.unfinished_tasks = 0 
     52 
     53        def task_done(self): 
     54            """Indicate that a formerly enqueued task is complete. 
     55 
     56            Used by Queue consumer threads.  For each get() used to fetch a task, 
     57            a subsequent call to task_done() tells the queue that the processing 
     58            on the task is complete. 
     59 
     60            If a join() is currently blocking, it will resume when all items 
     61            have been processed (meaning that a task_done() call was received 
     62            for every item that had been put() into the queue). 
     63 
     64            Raises a ValueError if called more times than there were items 
     65            placed in the queue. 
     66            """ 
     67            self.all_tasks_done.acquire() 
     68            try: 
     69                unfinished = self.unfinished_tasks - 1 
     70                if unfinished <= 0: 
     71                    if unfinished < 0: 
     72                        raise ValueError('task_done() called too many times') 
     73                    self.all_tasks_done.notifyAll() # old API, in 2.6: notify_all 
     74                self.unfinished_tasks = unfinished 
     75            finally: 
     76                self.all_tasks_done.release() 
     77 
     78        def join(self): 
     79            """Blocks until all items in the Queue have been gotten and processed. 
     80 
     81            The count of unfinished tasks goes up whenever an item is added to the 
     82            queue. The count goes down whenever a consumer thread calls task_done() 
     83            to indicate the item was retrieved and all work on it is complete. 
     84 
     85            When the count of unfinished tasks drops to zero, join() unblocks. 
     86            """ 
     87            self.all_tasks_done.acquire() 
     88            try: 
     89                while self.unfinished_tasks: 
     90                    self.all_tasks_done.wait() 
     91            finally: 
     92                self.all_tasks_done.release() 
     93 
     94        def qsize(self): 
     95            """Return the approximate size of the queue (not reliable!).""" 
     96            self.mutex.acquire() 
     97            n = self._qsize() 
     98            self.mutex.release() 
     99            return n 
     100 
     101        def empty(self): 
     102            """Return True if the queue is empty, False otherwise (not reliable!).""" 
     103            self.mutex.acquire() 
     104            n = not self._qsize() 
     105            self.mutex.release() 
     106            return n 
     107 
     108        def full(self): 
     109            """Return True if the queue is full, False otherwise (not reliable!).""" 
     110            self.mutex.acquire() 
     111            n = 0 < self.maxsize == self._qsize() 
     112            self.mutex.release() 
     113            return n 
     114 
     115        def put(self, item, block=True, timeout=None): 
     116            """Put an item into the queue. 
     117 
     118            If optional args 'block' is true and 'timeout' is None (the default), 
     119            block if necessary until a free slot is available. If 'timeout' is 
     120            a positive number, it blocks at most 'timeout' seconds and raises 
     121            the Full exception if no free slot was available within that time. 
     122            Otherwise ('block' is false), put an item on the queue if a free slot 
     123            is immediately available, else raise the Full exception ('timeout' 
     124            is ignored in that case). 
     125            """ 
     126            self.not_full.acquire() 
     127            try: 
     128                if self.maxsize > 0: 
     129                    if not block: 
     130                        if self._qsize() == self.maxsize: 
     131                            raise Full 
     132                    elif timeout is None: 
     133                        while self._qsize() == self.maxsize: 
     134                            self.not_full.wait() 
     135                    elif timeout < 0: 
     136                        raise ValueError("'timeout' must be a positive number") 
     137                    else: 
     138                        endtime = _time() + timeout 
     139                        while self._qsize() == self.maxsize: 
     140                            remaining = endtime - _time() 
     141                            if remaining <= 0.0: 
     142                                raise Full 
     143                            self.not_full.wait(remaining) 
     144                self._put(item) 
     145                self.unfinished_tasks += 1 
     146                self.not_empty.notify() 
     147            finally: 
     148                self.not_full.release() 
     149 
     150        def put_nowait(self, item): 
     151            """Put an item into the queue without blocking. 
     152 
     153            Only enqueue the item if a free slot is immediately available. 
     154            Otherwise raise the Full exception. 
     155            """ 
     156            return self.put(item, False) 
     157 
     158        def get(self, block=True, timeout=None): 
     159            """Remove and return an item from the queue. 
     160 
     161            If optional args 'block' is true and 'timeout' is None (the default), 
     162            block if necessary until an item is available. If 'timeout' is 
     163            a positive number, it blocks at most 'timeout' seconds and raises 
     164            the Empty exception if no item was available within that time. 
     165            Otherwise ('block' is false), return an item if one is immediately 
     166            available, else raise the Empty exception ('timeout' is ignored 
     167            in that case). 
     168            """ 
     169            self.not_empty.acquire() 
     170            try: 
     171                if not block: 
     172                    if not self._qsize(): 
     173                        raise Empty 
     174                elif timeout is None: 
     175                    while not self._qsize(): 
     176                        self.not_empty.wait() 
     177                elif timeout < 0: 
     178                    raise ValueError("'timeout' must be a positive number") 
     179                else: 
     180                    endtime = _time() + timeout 
     181                    while not self._qsize(): 
     182                        remaining = endtime - _time() 
     183                        if remaining <= 0.0: 
     184                            raise Empty 
     185                        self.not_empty.wait(remaining) 
     186                item = self._get() 
     187                self.not_full.notify() 
     188                return item 
     189            finally: 
     190                self.not_empty.release() 
     191 
     192        def get_nowait(self): 
     193            """Remove and return an item from the queue without blocking. 
     194 
     195            Only get an item if one is immediately available. Otherwise 
     196            raise the Empty exception. 
     197            """ 
     198            return self.get(False) 
     199 
     200        # Override these methods to implement other queue organizations 
     201        # (e.g. stack or priority queue). 
     202        # These will only be called with appropriate locks held 
     203 
     204        # Initialize the queue representation 
     205        def _init(self, maxsize): 
     206            self.queue = deque() 
     207 
     208        def _qsize(self, len=len): 
     209            return len(self.queue) 
     210 
     211        # Put a new item in the queue 
     212        def _put(self, item): 
     213            self.queue.append(item) 
     214 
     215        # Get an item from the queue 
     216        def _get(self): 
     217            return self.queue.popleft() 
     218 
    13219 
    14220class WorkerThread(threading.Thread): 
    15     workqueue = Queue.Queue(0) 
    16  
    17     def __init__(self, do_something): 
    18         threading.Thread.__init__(self) 
    19         self.do_something = do_something 
    20  
    21     def run(self): 
    22         while True: 
    23             args = WorkerThread.workqueue.get() 
    24             self.do_something(args) 
    25             WorkerThread.workqueue.task_done() 
     221    workqueue = Queue(0) 
     222 
     223    def __init__(self, do_something): 
     224        threading.Thread.__init__(self) 
     225        self.do_something = do_something 
     226 
     227    def run(self): 
     228        while True: 
     229            args = WorkerThread.workqueue.get() 
     230            self.do_something(args) 
     231            WorkerThread.workqueue.task_done() 
    26232 
    27233 
    28234class LogLine(tuple): 
    29235    """ 
    30     Helper Class which overwrites "<" and "<=" to do the right things for liaalh-loglines - it should only be sorted according to the datetime of the logline. 
     236    Helper Class which overwrites "<" and "<=" to do the right things for 
     237    liaalh-loglines - it should only be sorted according to the datetime 
     238    of the logline. 
    31239    """ 
    32240    def __lt__(self, other): 
     
    66274        self.threads = [WorkerThread(do_something=self.do_something) for i in xrange(self.num_threads)] 
    67275        for thr in self.threads: 
    68             thr.daemon = True 
     276            thr.setDaemon(True) 
    69277            thr.start() 
    70278 
     
    101309    def readlines(self): 
    102310        """ 
    103         Gets lines from stdin and splits the virtual host from the rest of the line if virtual hosts are used. 
     311        Gets lines from stdin and splits the virtual host from the rest of the 
     312        line if virtual hosts are used. 
    104313        """ 
    105314        while True: 
     
    123332    def writeline(self, utclogdate, server, line): 
    124333        """ 
    125         Writes the logline line for server server which has the date utcdate to the right logfile (i.e. it checks if an already opened logfile is still correct or if it has to be rotated). 
    126         This method also triggers the execution of external scripts after a logfile has been rotated. 
     334        Writes the logline ``line`` for server ``server`` which has the date 
     335        ``utcdate`` to the right logfile (i.e. it checks if an already opened 
     336        logfile is still correct or if it has to be rotated). 
     337 
     338        This method also triggers the execution of external scripts after a 
     339        logfile has been rotated. 
    127340        """ 
    128341        if not self.utcrotate: 
     
    153366        if self.execute: 
    154367            self.set_up_threads() 
    155         for (server, data) in self.readlines(): 
    156             try: 
    157                 if server is None and data is None: # got an empty line 
    158                     continue 
    159                 datestring = self.re_find_date.findall(data)[0] 
    160                 utclogdate = self.apachedate2utc(datestring) 
    161                 self.writeline(utclogdate, server, data) 
    162             except IndexError, exc: # index error because we didn't find an apache date -> malformed logline 
    163                 continue # ignore it 
     368        signal.signal(signal.SIGHUP, self.sighandler) 
     369        signal.signal(signal.SIGTERM, self.sighandler) 
     370        signal.signal(signal.SIGINT, self.sighandler) 
     371        signal.signal(signal.SIGQUIT, self.sighandler) 
     372        try: 
     373            for (server, data) in self.readlines(): 
     374                try: 
     375                    if server is None and data is None: # got an empty line 
     376                        continue 
     377                    datestring = self.re_find_date.findall(data)[0] 
     378                    utclogdate = self.apachedate2utc(datestring) 
     379                    self.writeline(utclogdate, server, data) 
     380                except IndexError, exc: # index error because we didn't find an apache date -> malformed logline 
     381                    continue # ignore it 
     382        except Exception, exc: 
     383            self.flushall() 
     384            raise 
    164385 
    165386    def run_buffered(self): 
     
    227448    def flush(self): 
    228449        """ 
    229         Write buffered loglines to their files if they have been in the buffer for buffertime seconds. 
     450        Write buffered loglines to their files if they have been in the buffer 
     451        for ``buffertime`` seconds. 
    230452        """ 
    231453        while self.data: 
     
    239461    def apachedate2utc(self, d): 
    240462        """ 
    241         Converts an "apachedate", i.e. a string like "01/Jan/2009:01:02:03 +0100", to a (UTC) datetime object. 
     463        Converts an "apachedate", i.e. a string like 
     464        "01/Jan/2009:01:02:03 +0100" 
     465        to a (UTC) datetime object. 
    242466        """ 
    243467        temp = d.split() 
     
    250474    def updateutcoffset(self): 
    251475        """ 
    252         Updates the offset of the local system clock to UTC time. (Daylight savings time might have changed... or you managed to move your server across a timezone without switching it off or...) 
     476        Updates the offset of the local system clock to UTC time. (Daylight 
     477        savings time might have changed... or you managed to move your 
     478        server across a timezone without switching it off or...) 
    253479        """ 
    254480        temp = datetime.datetime.now() - datetime.datetime.utcnow() 
     
    257483    def sighandler(self, signum, frame): 
    258484        """ 
    259         Signal handler which specifies what to do if someone wants to quit, term or interrupt us. (If someone wants to kill us, we can't react...) 
     485        Signal handler which specifies what to do if someone wants to quit, 
     486        term or interrupt us. (If someone wants to kill us, we can't react...) 
    260487        """ 
    261488        self.flushall()