root/livinglogic.python.tipimaid/liaalh.py @ 92:b201ba1353fb

Revision 92:b201ba1353fb, 18.4 KB (checked in by Nik Tautenhahn <nik@…>, 10 years ago)

start executables in a shell

  • Property exe set to *
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys, os, datetime, errno, bisect, re, gzip, signal, time
5try:
6    import threading
7except ImportError:
8    import dummy_threading as threading
9from collections import deque
10from Queue import *
11
12subprocessworks = False
13
14try:
15    import subprocess # only python >= 2.6
16    subprocessworks = True
17except ImportError:
18    pass
19
20if not hasattr(Queue, "task_done"): # We need a newer implementation of Queue, this one is taken from 2.6 and slightly modified to use old Python-API syntax (notifyAll (old) instead of notify_all (new))
21    class Empty(Exception):
22        "Exception raised by Queue.get(block=0)/get_nowait()."
23        pass
24
25    class Full(Exception):
26        "Exception raised by Queue.put(block=0)/put_nowait()."
27        pass
28
29    class Queue:
30        """Create a queue object with a given maximum size.
31
32        If maxsize is <= 0, the queue size is infinite.
33        """
34        def __init__(self, maxsize=0):
35            self.maxsize = maxsize
36            self._init(maxsize)
37            # mutex must be held whenever the queue is mutating.  All methods
38            # that acquire mutex must release it before returning.  mutex
39            # is shared between the three conditions, so acquiring and
40            # releasing the conditions also acquires and releases mutex.
41            self.mutex = threading.Lock()
42            # Notify not_empty whenever an item is added to the queue; a
43            # thread waiting to get is notified then.
44            self.not_empty = threading.Condition(self.mutex)
45            # Notify not_full whenever an item is removed from the queue;
46            # a thread waiting to put is notified then.
47            self.not_full = threading.Condition(self.mutex)
48            # Notify all_tasks_done whenever the number of unfinished tasks
49            # drops to zero; thread waiting to join() is notified to resume
50            self.all_tasks_done = threading.Condition(self.mutex)
51            self.unfinished_tasks = 0
52
53        def task_done(self):
54            """Indicate that a formerly enqueued task is complete.
55
56            Used by Queue consumer threads.  For each get() used to fetch a task,
57            a subsequent call to task_done() tells the queue that the processing
58            on the task is complete.
59
60            If a join() is currently blocking, it will resume when all items
61            have been processed (meaning that a task_done() call was received
62            for every item that had been put() into the queue).
63
64            Raises a ValueError if called more times than there were items
65            placed in the queue.
66            """
67            self.all_tasks_done.acquire()
68            try:
69                unfinished = self.unfinished_tasks - 1
70                if unfinished <= 0:
71                    if unfinished < 0:
72                        raise ValueError('task_done() called too many times')
73                    self.all_tasks_done.notifyAll() # old API, in 2.6: notify_all
74                self.unfinished_tasks = unfinished
75            finally:
76                self.all_tasks_done.release()
77
78        def join(self):
79            """Blocks until all items in the Queue have been gotten and processed.
80
81            The count of unfinished tasks goes up whenever an item is added to the
82            queue. The count goes down whenever a consumer thread calls task_done()
83            to indicate the item was retrieved and all work on it is complete.
84
85            When the count of unfinished tasks drops to zero, join() unblocks.
86            """
87            self.all_tasks_done.acquire()
88            try:
89                while self.unfinished_tasks:
90                    self.all_tasks_done.wait()
91            finally:
92                self.all_tasks_done.release()
93
94        def qsize(self):
95            """Return the approximate size of the queue (not reliable!)."""
96            self.mutex.acquire()
97            n = self._qsize()
98            self.mutex.release()
99            return n
100
101        def empty(self):
102            """Return True if the queue is empty, False otherwise (not reliable!)."""
103            self.mutex.acquire()
104            n = not self._qsize()
105            self.mutex.release()
106            return n
107
108        def full(self):
109            """Return True if the queue is full, False otherwise (not reliable!)."""
110            self.mutex.acquire()
111            n = 0 < self.maxsize == self._qsize()
112            self.mutex.release()
113            return n
114
115        def put(self, item, block=True, timeout=None):
116            """Put an item into the queue.
117
118            If optional args 'block' is true and 'timeout' is None (the default),
119            block if necessary until a free slot is available. If 'timeout' is
120            a positive number, it blocks at most 'timeout' seconds and raises
121            the Full exception if no free slot was available within that time.
122            Otherwise ('block' is false), put an item on the queue if a free slot
123            is immediately available, else raise the Full exception ('timeout'
124            is ignored in that case).
125            """
126            self.not_full.acquire()
127            try:
128                if self.maxsize > 0:
129                    if not block:
130                        if self._qsize() == self.maxsize:
131                            raise Full
132                    elif timeout is None:
133                        while self._qsize() == self.maxsize:
134                            self.not_full.wait()
135                    elif timeout < 0:
136                        raise ValueError("'timeout' must be a positive number")
137                    else:
138                        endtime = _time() + timeout
139                        while self._qsize() == self.maxsize:
140                            remaining = endtime - _time()
141                            if remaining <= 0.0:
142                                raise Full
143                            self.not_full.wait(remaining)
144                self._put(item)
145                self.unfinished_tasks += 1
146                self.not_empty.notify()
147            finally:
148                self.not_full.release()
149
150        def put_nowait(self, item):
151            """Put an item into the queue without blocking.
152
153            Only enqueue the item if a free slot is immediately available.
154            Otherwise raise the Full exception.
155            """
156            return self.put(item, False)
157
158        def get(self, block=True, timeout=None):
159            """Remove and return an item from the queue.
160
161            If optional args 'block' is true and 'timeout' is None (the default),
162            block if necessary until an item is available. If 'timeout' is
163            a positive number, it blocks at most 'timeout' seconds and raises
164            the Empty exception if no item was available within that time.
165            Otherwise ('block' is false), return an item if one is immediately
166            available, else raise the Empty exception ('timeout' is ignored
167            in that case).
168            """
169            self.not_empty.acquire()
170            try:
171                if not block:
172                    if not self._qsize():
173                        raise Empty
174                elif timeout is None:
175                    while not self._qsize():
176                        self.not_empty.wait()
177                elif timeout < 0:
178                    raise ValueError("'timeout' must be a positive number")
179                else:
180                    endtime = _time() + timeout
181                    while not self._qsize():
182                        remaining = endtime - _time()
183                        if remaining <= 0.0:
184                            raise Empty
185                        self.not_empty.wait(remaining)
186                item = self._get()
187                self.not_full.notify()
188                return item
189            finally:
190                self.not_empty.release()
191
192        def get_nowait(self):
193            """Remove and return an item from the queue without blocking.
194
195            Only get an item if one is immediately available. Otherwise
196            raise the Empty exception.
197            """
198            return self.get(False)
199
200        # Override these methods to implement other queue organizations
201        # (e.g. stack or priority queue).
202        # These will only be called with appropriate locks held
203
204        # Initialize the queue representation
205        def _init(self, maxsize):
206            self.queue = deque()
207
208        def _qsize(self, len=len):
209            return len(self.queue)
210
211        # Put a new item in the queue
212        def _put(self, item):
213            self.queue.append(item)
214
215        # Get an item from the queue
216        def _get(self):
217            return self.queue.popleft()
218
219
220class WorkerThread(threading.Thread):
221    workqueue = Queue(0)
222
223    def __init__(self, do_something):
224        threading.Thread.__init__(self)
225        self.do_something = do_something
226
227    def run(self):
228        while True:
229            args = WorkerThread.workqueue.get()
230            self.do_something(args)
231            WorkerThread.workqueue.task_done()
232
233
234class LogLine(tuple):
235    """
236    Helper Class which overwrites "<" and "<=" to do the right things for
237    liaalh-loglines - it should only be sorted according to the datetime
238    of the logline.
239    """
240    def __lt__(self, other):
241        return self[0] < other[0]
242
243    def __le__(self, other):
244        return self[0] <= other[0]
245
246
247class Buffer(object):
248    """
249    The main class of liaalh
250    """
251    def __init__(self, pattern='', gzip_logs=None, buffertime=0, stream=sys.stdin, utcrotate=False, symlinkpattern=None, execute=None, progname="liaalh", num_threads=3):
252        self.pattern = pattern
253        self.gzip_logs = gzip_logs
254        self.data = []
255        self.servers = {}
256        self.buffertime = datetime.timedelta(seconds=buffertime)
257        self.stream = stream
258        self.re_find_date = re.compile(" \[(.*?)\] ")
259        if gzip_logs is not None and not pattern.endswith(".gz"):
260            self.pattern = "%s.gz" % self.pattern
261        self.utcrotate = utcrotate
262        self.execute = execute
263        self.updateutcoffset()
264        self.handlevirtualhost = "%v" in pattern
265        self.symlinkpattern = symlinkpattern
266        self.progname = progname
267        self.num_threads = num_threads
268        if buffertime > 0:
269            self.run = self.run_buffered
270        else:
271            self.run = self.run_unbuffered
272
273    def set_up_threads(self):
274        self.threads = [WorkerThread(do_something=self.do_something) for i in xrange(self.num_threads)]
275        for thr in self.threads:
276            thr.setDaemon(True)
277            thr.start()
278
279    def openfile(self, filename, server):
280        """
281        opens a file filename for server server which may be continuously gzipped. Closes old files, creates directories, if necessary
282        """
283        try:
284            f = open(filename, "a", 1)
285        except IOError, exc:
286            if exc.errno == errno.ENOENT:
287                os.makedirs(os.path.dirname(filename))
288                f = open(filename, "a", 1)
289            else:
290                raise
291        if self.symlinkpattern:
292            symlinkname = self.symlinkpattern.replace("%v", server)
293            try:
294                os.symlink(os.path.abspath(filename), symlinkname)
295            except OSError, exc:
296                if exc.errno == errno.ENOENT:
297                    os.makedirs(os.path.dirname(symlinkname))
298                    os.symlink(os.path.abspath(filename), symlinkname)
299                elif exc.errno == errno.EEXIST:
300                    os.remove(symlinkname)
301                    os.symlink(os.path.abspath(filename), symlinkname)
302                else:
303                    raise
304        if self.gzip_logs is not None:
305            f = gzip.GzipFile(fileobj=f, compresslevel=self.gzip_logs)
306            f.name = f.fileobj.name # gzip-fileobjects don't have a name attribute
307        return f
308
309    def readlines(self):
310        """
311        Gets lines from stdin and splits the virtual host from the rest of the
312        line if virtual hosts are used.
313        """
314        while True:
315            try:
316                line = self.stream.readline()
317            except IOError, exc:
318                if exc[0] == errno.EINTR:
319                    continue
320                else:
321                    raise
322            if not line:
323                break
324            if self.handlevirtualhost:
325                ret = line.split(None, 1)
326                if ret == []:
327                    ret = [None, None]
328                yield ret
329            else:
330                yield (None, line)
331
332    def writeline(self, utclogdate, server, line):
333        """
334        Writes the logline ``line`` for server ``server`` which has the date
335        ``utcdate`` to the right logfile (i.e. it checks if an already opened
336        logfile is still correct or if it has to be rotated).
337
338        This method also triggers the execution of external scripts after a
339        logfile has been rotated.
340        """
341        if not self.utcrotate:
342            utclogdate += self.localutcoffset
343        filename = utclogdate.strftime(self.pattern)
344        if self.handlevirtualhost:
345            filename = filename.replace("%v", server)
346        if server in self.servers:
347            f = self.servers[server]
348            if f.name != filename:
349                f.flush()
350                f.close()
351                if self.execute is not None:
352                    WorkerThread.workqueue.put(os.path.abspath(f.name))
353#                   thr = threading.Thread(target=self.do_something, args=[os.path.abspath(f.name)])
354#                   thr.start()
355                self.updateutcoffset()
356                self.servers[server] = f = self.openfile(filename, server)
357        else:
358            self.servers[server] = f = self.openfile(filename, server)
359        f.write(line)
360        f.flush()
361
362    def run_unbuffered(self):
363        """
364        Tries to find the date/time of a logline. This is the unbuffered case.
365        """
366        if self.execute:
367            self.set_up_threads()
368        signal.signal(signal.SIGHUP, self.sighandler)
369        signal.signal(signal.SIGTERM, self.sighandler)
370        signal.signal(signal.SIGINT, self.sighandler)
371        signal.signal(signal.SIGQUIT, self.sighandler)
372        try:
373            for (server, data) in self.readlines():
374                try:
375                    if server is None and data is None: # got an empty line
376                        continue
377                    datestring = self.re_find_date.findall(data)[0]
378                    utclogdate = self.apachedate2utc(datestring)
379                    self.writeline(utclogdate, server, data)
380                except IndexError, exc: # index error because we didn't find an apache date -> malformed logline
381                    continue # ignore it
382        except Exception, exc:
383            self.flushall()
384            raise
385
386    def run_buffered(self):
387        """
388        Tries to find the date/time of a logline. This is the buffered case.
389        """
390        if self.execute:
391            self.set_up_threads()
392        signal.signal(signal.SIGHUP, self.sighandler)
393        signal.signal(signal.SIGTERM, self.sighandler)
394        signal.signal(signal.SIGINT, self.sighandler)
395        signal.signal(signal.SIGQUIT, self.sighandler)
396        try:
397            for (server, data) in self.readlines():
398                try:
399                    if server is None and data is None: # got an empty line
400                        continue
401                    datestring = self.re_find_date.findall(data)[0]
402                    utclogdate = self.apachedate2utc(datestring)
403                    self.add(LogLine((utclogdate, server, data)))
404                except IndexError, exc: # index error because we didn't find an apache date -> malformed logline
405                    continue # ignore it
406                except Exception, exc:
407                    sys.stderr.write("[%s] [%s] Exception encountered: %r\nVhost was: %s\nData was: %r\n" % (datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"), self.progname, exc, server, data))
408                    sys.stderr.flush()
409                    raise
410        except Exception, exc:
411            self.flushall()
412            raise
413
414    def do_something(self, filename):
415        """
416        Execute the command which was given with the --execute option
417        """
418        try:
419            if subprocessworks:
420                retcode = subprocess.call("%s %s" % (self.execute, filename), shell=True)
421            else:
422                retcode = os.system("%s %s" % (self.execute, filename))
423            if retcode != 0:
424                sys.stderr.write("[%s] [%s] Subprocess \"%s %s\" returned error code %s\n" % (datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"), self.progname, self.execute, filename, retcode))
425                sys.stderr.flush()
426        except OSError, exc:
427            sys.stderr.write("[%s] [%s] Subprocess \"%s %s\" caused exception %r\n" % (datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y"), self.progname, self.execute, filename, retcode))
428            sys.stderr.flush()
429
430    def add(self, logline):
431        """
432        Keeps entries in the buffer sorted.
433        """
434        if not self.data or self.data[-1] <= logline:
435            self.data.append(logline)
436        else:
437            bisect.insort_right(self.data, logline)
438        self.flush()
439
440    def flushall(self):
441        """
442        Write all loglines content to their files.
443        """
444        for (utclogdate, server, logdata) in self.data:
445            self.writeline(utclogdate, server, logdata)
446        self.data = []
447
448    def flush(self):
449        """
450        Write buffered loglines to their files if they have been in the buffer
451        for ``buffertime`` seconds.
452        """
453        while self.data:
454            line = self.data[0]
455            (utclogdate, server, logdata) = line
456            if datetime.datetime.utcnow() - utclogdate < self.buffertime:
457                return
458            self.writeline(utclogdate, server, logdata)
459            self.data.pop(0)
460
461    def apachedate2utc(self, d):
462        """
463        Converts an "apachedate", i.e. a string like
464        "01/Jan/2009:01:02:03 +0100"
465        to a (UTC) datetime object.
466        """
467        temp = d.split()
468        utcdate = datetime.datetime(*(time.strptime(temp[0], "%d/%b/%Y:%H:%M:%S")[0:6])) # support ancient distributions with python < 2.5
469        minsoff = int("%s%s" % (temp[1][0], temp[1][-2:]))
470        hrsoff = int("%s%s" % (temp[1][0], temp[1][1:3]))
471        utcdate -= datetime.timedelta(hours=hrsoff, minutes=minsoff)
472        return utcdate
473
474    def updateutcoffset(self):
475        """
476        Updates the offset of the local system clock to UTC time. (Daylight
477        savings time might have changed... or you managed to move your
478        server across a timezone without switching it off or...)
479        """
480        temp = datetime.datetime.now() - datetime.datetime.utcnow()
481        self.localutcoffset = datetime.timedelta(days=temp.days, seconds=temp.seconds+1, microseconds=0)
482
483    def sighandler(self, signum, frame):
484        """
485        Signal handler which specifies what to do if someone wants to quit,
486        term or interrupt us. (If someone wants to kill us, we can't react...)
487        """
488        self.flushall()
489        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT):
490            WorkerThread.workqueue.join() # execute everything which is scheduled to be executed by now
491            sys.exit(signum)
492        self.flushall() # finishing workers might take some time...
493
494def main(args=None):
495    import optparse
496    p = optparse.OptionParser(usage="usage: %prog filename-pattern [options]\nIf you use virtual hosts please note that the virtual host column (%v) has to be the first column in every logfile!")
497    p.add_option("-b", "--buffertime", dest="buffertime", metavar="SECONDS", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering.", default=0)
498    p.add_option("-z", "--continuous-gzip", dest="gzip", metavar="COMPRESSIONLEVEL", type="int", action="store", help="If set, logs are (continuously!) gzipped with this compression level (lowest: 1, highest: 9).", default=None)
499    p.add_option("-u", "--utcrotate", dest="utcrotate", action="store_true", help="If set, UTC time determines the time for filenames and rotation. Otherwise local time is used.", default=False)
500    p.add_option("-s", "--symlink", dest="symlinkpattern", metavar="FILEPATTERN", action="store", help="""Create a symlink pointing to the most recent log file (of each virtual host if you use %v). Needs a filename pattern for the symlink (e.g. %v/access.log or symlinks/access-%v.log). Only "%v" is allowed in the pattern as symlinks which include time/date data are useless.""", default=None)
501    p.add_option("-x", "--execute", dest="execute", metavar="COMMAND", type="string", action="store", help="""After writing to a logfile is finished and a new one is created (e.g. after rotating the logs), the given executable is started with the finished logfile as its parameter. You could use gzip, bzip2, a self-written bash-script or even rm here. Use quotation marks if your command contains parameters of its own, e.g. -x "gzip -9".""", default=None)
502    p.add_option("-t", "--threads", dest="num_threads", metavar="NUMBER_OF_THREADS", type="int", action="store", help="Specifies the number of allowed worker threads for -x/--execute, default is 3", default=3)
503    (options, args) = p.parse_args()
504    if options.gzip is not None:
505        if options.gzip < 1:
506            options.gzip = 1
507        elif options.gzip > 9:
508            options.gzip = 9
509    if len(args) != 1:
510        p.print_usage(sys.stderr)
511        sys.stderr.write("%s: We need a filename-pattern\n" % p.get_prog_name())
512        sys.stderr.flush()
513        return 1
514    if options.symlinkpattern is not None and ("%v" in args[0] and ("%v" in options.symlinkpattern and options.symlinkpattern.count("%") > 1) or ("%v" not in options.symlinkpattern)):
515        p.print_usage(sys.stderr)
516        sys.stderr.write("%s: If you split logfiles by virtual hosts you should use virtual hosts (%v) in the symlink-pattern as well. But you shouldn't use any patterns for time/date data.\n" % p.get_prog_name())
517        sys.stderr.flush()
518        return 1
519
520    buf = Buffer(pattern=args[0], gzip_logs=options.gzip, buffertime=options.buffertime, utcrotate=options.utcrotate, symlinkpattern=options.symlinkpattern, execute=options.execute, progname=p.get_prog_name(), num_threads=options.num_threads)
521    buf.run()
522
523
524if __name__ == "__main__":
525    sys.exit(main())
Note: See TracBrowser for help on using the browser.