Changeset 28:476c2ca22126 in livinglogic.python.tipimaid

Show
Ignore:
Timestamp:
11/27/08 19:24:28 (10 years ago)
Author:
Nikolas Tautenhahn <nik@…>
Branch:
default
Message:

decoupled buffered and unbuffered mode, default to unbuffered mode

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • liaalh.py

    r27 r28  
    1616 
    1717class Buffer(object): 
    18     def __init__(self, pattern='', gzip_logs=None, buffertime=5, stream=sys.stdin): 
     18    def __init__(self, pattern='', gzip_logs=None, buffertime=0, stream=sys.stdin): 
    1919        self.pattern = pattern 
    2020        self.gzip_logs = gzip_logs 
     
    2222        self.servers = {} 
    2323        self.buffertime = datetime.timedelta(seconds=buffertime) 
    24         self.buffer_enabled = buffertime > 0 
    2524        self.stream = stream 
    2625        self.re_find_date = re.compile(" \[(.*?)\] ") 
     
    2827            self.pattern = "%s.gz" % self.pattern 
    2928        self.handlevirtualhost = "%v" in pattern 
    30  
    31     def add(self, logline): 
    32         if not self.buffer_enabled or self.data or self.data[-1] <= logline: 
    33             self.data.append(logline) 
    34         else: 
    35             bisect.insort_right(self.data, logline) 
    36         self.write() 
    37  
    38     def flush(self): 
    39         self.write(all=True) 
    40  
    41     def write(self, all=False): 
    42         while self.data: 
    43             line = self.data[0] 
    44             if not all and self.buffer_enabled: 
    45                 utcnow = datetime.datetime.utcnow() 
    46                 if utcnow - line[0] < self.buffertime: 
    47                     return 
    48             (utclogdate, server, logdata) = line 
    49             filename = datetime.datetime.now().strftime(self.pattern) 
    50             if self.handlevirtualhost: 
    51                 filename = filename.replace("%v", server) 
    52             if server in self.servers: 
    53                 f = self.servers[server] 
    54                 if f.name != filename: 
    55                     f.flush() 
    56                     f.close() 
    57                     self.servers[server] = f = self.openfile(filename) 
    58             else: 
    59                 self.servers[server] = f = self.openfile(filename) 
    60             f.write(logdata) 
    61             self.data.pop(0) 
    62             f.flush() 
     29        self.run = self.run_buffered if buffertime > 0 else self.run_unbuffered 
    6330 
    6431    def openfile(self, filename): 
     
    7643        return f 
    7744 
     45    def readlines(self): 
     46        while True: 
     47            try: 
     48                line = self.stream.readline() 
     49            except IOError, exc: 
     50                if exc[0] == errno.EINTR: 
     51                    continue 
     52                else: 
     53                    raise 
     54            if not line: 
     55                break 
     56            if self.handlevirtualhost: 
     57                yield line.split(None, 1) 
     58            else: 
     59                yield (None, line) 
     60 
     61    def writeline(self, server, line): 
     62        filename = datetime.datetime.now().strftime(self.pattern) 
     63        if self.handlevirtualhost: 
     64            filename = filename.replace("%v", server) 
     65        if server in self.servers: 
     66            f = self.servers[server] 
     67            if f.name != filename: 
     68                f.flush() 
     69                f.close() 
     70                self.servers[server] = f = self.openfile(filename) 
     71        else: 
     72            self.servers[server] = f = self.openfile(filename) 
     73        f.write(line) 
     74        f.flush() 
     75 
     76    def run_unbuffered(self): 
     77        for (server, data) in self.readlines(): 
     78            self.writeline(server, data) 
     79 
     80    def run_buffered(self): 
     81        signal.signal(signal.SIGHUP, self.sighandler) 
     82        signal.signal(signal.SIGTERM, self.sighandler) 
     83        signal.signal(signal.SIGINT, self.sighandler) 
     84        signal.signal(signal.SIGQUIT, self.sighandler) 
     85        try: 
     86            for (server, data) in self.readlines(): 
     87                datestring = self.re_find_date.findall(data)[0] 
     88                utctime = self.apachedate2utc(datestring) 
     89                self.add(LogLine((utctime, server, data))) 
     90        except Exception, exc: 
     91            self.flushall() 
     92            raise 
     93 
     94    def add(self, logline): 
     95        if self.data or self.data[-1] <= logline: 
     96            self.data.append(logline) 
     97        else: 
     98            bisect.insort_right(self.data, logline) 
     99        self.flush() 
     100 
     101    def flushall(self): 
     102        for (utclogdate, server, logdata) in self.data: 
     103            self.writeline(server, logdata) 
     104        self.data = [] 
     105 
     106    def flush(self): 
     107        while self.data: 
     108            line = self.data[0] 
     109            (utclogdate, server, logdata) = line 
     110            if datetime.datetime.utcnow() - utclogdate < self.buffertime: 
     111                return 
     112            self.writeline(server, logdata) 
     113            self.data.pop(0) 
     114 
    78115    def apachedate2utc(self, d): 
    79116        temp = d.split() 
     
    84121        return utcdate 
    85122 
    86     def run(self): 
    87         try: 
    88             while True: 
    89                 try: 
    90                     line = self.stream.readline() 
    91                 except IOError, exc: 
    92                     if exc[0] == errno.EINTR: 
    93                         continue 
    94                     else: 
    95                         raise 
    96                 if not line: 
    97                     break 
    98                 if self.buffer_enabled: 
    99                     datestring = self.re_find_date.findall(line)[0] 
    100                     utctime = self.apachedate2utc(datestring) 
    101                 else: 
    102                     utctime = None 
    103                 if self.handlevirtualhost: 
    104                     (server, data) = line.split(None, 1) 
    105                 else: 
    106                     (server, data) = (None, line) 
    107                 self.add(LogLine((utctime, server, data))) 
    108         except Exception, exc: 
    109             self.flush() 
    110             raise 
    111  
    112123    def sighandler(self, signum, frame): 
    113         self.flush() 
     124        self.flushall() 
    114125        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT): 
    115126            sys.exit(signum) 
     
    120131    p = optparse.OptionParser(usage="usage: %prog filename-pattern [options]") 
    121132    p.add_option("-z", "--gzip", dest="gzip", type="int", action="store", help="If set, logs are gzipped with this compression level (lowest: 1, highest: 9)", default=None) 
    122     p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=10. Set to 0 to disable buffering", default=10) 
     133    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering", default=0) 
    123134    (options, args) = p.parse_args() 
    124135    if options.gzip is not None: 
     
    133144        return 1 
    134145    buf = Buffer(pattern=args[0], gzip_logs=options.gzip, buffertime=options.buffertime) 
    135     signal.signal(signal.SIGHUP, buf.sighandler) 
    136     signal.signal(signal.SIGTERM, buf.sighandler) 
    137     signal.signal(signal.SIGINT, buf.sighandler) 
    138     signal.signal(signal.SIGQUIT, buf.sighandler) 
    139146    buf.run() 
    140147