Changeset 88:9c6fe8cd30df in livinglogic.python.tipimaid

Show
Ignore:
Timestamp:
02/10/09 11:29:16 (10 years ago)
Author:
Nik Tautenhahn <nik@…>
Branch:
default
Message:

workerthreads + queues for liaalh, cosmetics for the liaalh_server

Files:
2 modified

Legend:

Unmodified
Added
Removed
  • liaalh.py

    r86 r88  
    22# -*- coding: utf-8 -*- 
    33 
    4 import sys, os, datetime, errno, bisect, re, gzip, signal, time, threading 
     4import sys, os, datetime, errno, bisect, re, gzip, signal, time, threading, Queue 
    55 
    66subprocessworks = False 
     
    1111    pass 
    1212 
     13 
     14class WorkerThread(threading.Thread): 
     15    workqueue = Queue.Queue(0) 
     16 
     17    def __init__(self, do_something): 
     18        threading.Thread.__init__(self) 
     19        self.do_something = do_something 
     20 
     21    def run(self): 
     22        while True: 
     23            args = WorkerThread.workqueue.get() 
     24            self.do_something(args) 
     25            WorkerThread.workqueue.task_done() 
     26 
     27 
    1328class LogLine(tuple): 
    1429    """ 
     
    2641    The main class of liaalh 
    2742    """ 
    28     def __init__(self, pattern='', gzip_logs=None, buffertime=0, stream=sys.stdin, utcrotate=False, symlinkpattern=None, execute=None, progname="liaalh"): # TODO: Name 
     43    def __init__(self, pattern='', gzip_logs=None, buffertime=0, stream=sys.stdin, utcrotate=False, symlinkpattern=None, execute=None, progname="liaalh", num_threads=3): 
    2944        self.pattern = pattern 
    3045        self.gzip_logs = gzip_logs 
     
    4257        self.symlinkpattern = symlinkpattern 
    4358        self.progname = progname 
     59        self.num_threads = num_threads 
    4460        if buffertime > 0: 
    4561            self.run = self.run_buffered 
    4662        else: 
    4763            self.run = self.run_unbuffered 
     64 
     65    def set_up_threads(self): 
     66        self.threads = [WorkerThread(do_something=self.do_something) for i in xrange(self.num_threads)] 
     67        for thr in self.threads: 
     68            thr.daemon = True 
     69            thr.start() 
    4870 
    4971    def openfile(self, filename, server): 
     
    115137                f.close() 
    116138                if self.execute is not None: 
    117                     thr = threading.Thread(target=self.do_something, args=[os.path.abspath(f.name)]) 
    118                     thr.start() 
     139                    WorkerThread.workqueue.put(os.path.abspath(f.name)) 
     140#                   thr = threading.Thread(target=self.do_something, args=[os.path.abspath(f.name)]) 
     141#                   thr.start() 
    119142                self.updateutcoffset() 
    120143                self.servers[server] = f = self.openfile(filename, server) 
     
    128151        Tries to find the date/time of a logline. This is the unbuffered case. 
    129152        """ 
     153        if self.execute: 
     154            self.set_up_threads() 
    130155        for (server, data) in self.readlines(): 
    131156            try: 
     
    142167        Tries to find the date/time of a logline. This is the buffered case. 
    143168        """ 
     169        if self.execute: 
     170            self.set_up_threads() 
    144171        signal.signal(signal.SIGHUP, self.sighandler) 
    145172        signal.signal(signal.SIGTERM, self.sighandler) 
     
    194221        Write all loglines content to their files. 
    195222        """ 
     223        WorkerThread.workqueue.join() # execute everything which is scheduled to be executed by nows 
    196224        for (utclogdate, server, logdata) in self.data: 
    197225            self.writeline(utclogdate, server, logdata) 
     
    245273    p.add_option("-s", "--symlink", dest="symlinkpattern", metavar="FILEPATTERN", action="store", help="""Create a symlink pointing to the most recent log file (of each virtual host if you use %v). Needs a filename pattern for the symlink (e.g. %v/access.log or symlinks/access-%v.log). Only "%v" is allowed in the pattern as symlinks which include time/date data are useless.""", default=None) 
    246274    p.add_option("-x", "--execute", dest="execute", metavar="COMMAND", type="string", action="store", help="After writing to a logfile is finished and a new one is created (e.g. after rotating the logs), the given executable is started with the finished logfile as its first (and only) parameter. You could use gzip, bzip2, a self-written bash-script or even rm here.", default=None) 
     275    p.add_option("-t", "--threads", dest="num_threads", metavar="NUMBER_OF_THREADS", type="int", action="store", help="Specifies the number of allowed worker threads for -x/--execute, default is 3", default=3) 
    247276    (options, args) = p.parse_args() 
    248277    if options.gzip is not None: 
     
    262291        return 1 
    263292 
    264     buf = Buffer(pattern=args[0], gzip_logs=options.gzip, buffertime=options.buffertime, utcrotate=options.utcrotate, symlinkpattern=options.symlinkpattern, execute=options.execute, progname=p.get_prog_name()) 
     293    buf = Buffer(pattern=args[0], gzip_logs=options.gzip, buffertime=options.buffertime, utcrotate=options.utcrotate, symlinkpattern=options.symlinkpattern, execute=options.execute, progname=p.get_prog_name(), num_threads=options.num_threads) 
    265294    buf.run() 
    266295 
  • liaalh_server.py

    r86 r88  
    7777    import optparse 
    7878    p = optparse.OptionParser(usage="usage: %prog port") 
    79     p.add_option("-n", "--netcat-compatible", dest="netcat", action="store_true", help="If set, %prog switches to netcat-compatible mode. Otherwise %prog assumes that it talks to its own sender which has better recovery in case of a broken connection.\nDo NOT use this switch if you use liaalh_sender!", default=None) # TODO: Fix name liaalh_sender 
     79    p.add_option("-n", "--netcat-compatible", dest="netcat", action="store_true", help="If set, %s switches to netcat-compatible mode. Otherwise %s assumes that it talks to its own sender which has better recovery in case of a broken connection.\nDo NOT use this switch if you use liaalh_sender!" % (sys.argv[0], sys.argv[0]), default=None) # TODO: Fix name liaalh_sender 
    8080    (options, args) = p.parse_args() 
    8181    if len(args) != 1 or intify(args[0], None) is None: