root/livinglogic.python.xist/src/ll/sisyphus.py @ 4529:2af1ce09d6b7

Revision 4529:2af1ce09d6b7, 20.5 KB (checked in by Walter Doerwald <walter@…>, 8 years ago)

Fix sisyphus: Use ul4c.Template instead of ul4c.compile.

Line 
1#! /usr/bin/env python
2# -*- coding: utf-8 -*-
3
4## Copyright 2000-2011 by LivingLogic AG, Bayreuth/Germany.
5## Copyright 2000-2011 by Walter Dörwald
6##
7## All Rights Reserved
8##
9## See ll/__init__.py for the license
10
11
12"""
13:mod:`ll.sisyphus` simplifies running Python stuff as cron jobs.
14
15There will be no more than one sisyphus job of a certain name running at every
16given time. A job has a maximum allowed runtime. If this maximum is exceeded,
17the job will kill itself.
18
19In addition to that, job execution can be logged.
20
21To use this module, you must derive your own class from :class:`Job` and
22implement the :meth:`execute` method.
23
24Logs will (by default) be created in the :dir:`~/ll.sisyphus` directory.
25This can be changed by deriving a new subclass and overwriting the appropriate
26class attribute.
27
28To execute a job, use the module level function :func:`execute` (or
29:func:`executewithargs` when you want to support command line arguments).
30
31Example
32-------
33
34The following example illustrates the use of this module::
35
36    #!/usr/bin/env python
37
38    import os
39    import urllib
40    from ll import sisyphus
41
42    class Fetch(sisyphus.Job):
43        projectname = "ACME.FooBar"
44        jobname = "Fetch"
45        argdescription = "fetch http://www.python.org/ and save it to a local file"
46        maxtime = 180
47
48        def __init__(self):
49            self.url = "http://www.python.org/"
50            self.tmpname = "Fetch_Tmp_{}.html".format(os.getpid())
51            self.officialname = "Python.html"
52
53        def execute(self):
54            self.log("fetching data from {!r}".format(self.url))
55            data = urllib.urlopen(self.url).read()
56            datasize = len(data)
57            self.log("writing file {!r} ({} bytes)".format(self.tmpname, datasize))
58            open(self.tmpname, "wb").write(data)
59            self.log("renaming file {!r} to {!r}".format(self.tmpname, self.officialname))
60            os.rename(self.tmpname, self.officialname)
61            return "cached {!r} as {!r} ({} bytes)".format(self.url, self.officialname, datasize)
62
63    if __name__=="__main__":
64        sisyphus.executewithargs(Fetch())
65
66You will find the log files for this job in ``~/ll.sisyphus/ACME.FooBar/Fetch/``.
67"""
68
69
70import sys, os, signal, fcntl, codecs, traceback, errno, pprint, datetime, re, contextlib, argparse
71
72from ll import url, ul4c, misc
73
74
75__docformat__ = "reStructuredText"
76
77
78def literaldecode(exc):
79    return (u"".join(u"[%02x]" % ord(c) for c in exc.object[exc.start:exc.end]), exc.end)
80
81codecs.register_error("literaldecode", literaldecode)
82
83
84encodingdeclaration = re.compile(r"coding[:=]\s*([-\w.]+)")
85
86
87class MaximumRuntimeExceeded(Exception):
88    def __init__(self, maxtime):
89        self.maxtime = maxtime
90
91    def __str__(self):
92        return "maximum runtime of {} seconds exceeded".format(self.maxtime)
93
94
95class Job(object):
96    """
97    A Job object executes a task once.
98
99    To use this class, derive your own class from it and overwrite the
100    :meth:`execute` method.
101
102    Logging itself is done by calling ``self.log``::
103
104        self.log("can't parse XML file {}".format(filename))
105
106    This logs the argument without tagging the line. To add tags to the logging
107    call, simply access attributes of ``self.log``::
108
109        self.log.xml.warning("can't parse XML file {}".format(filename))
110
111    This adds the tags ``"xml"`` and ``"warning"`` to the log line.
112
113    :mod:`ll.sisyphus` itself uses the following tags:
114
115        ``sisyphus``
116            This tag will be added to all log lines produced by :mod:`ll.sisyphus`
117            itself
118
119        ``init``
120            This tag is used for the log lines output at the start of the job
121
122        ``result``
123            This tag is used for final line it the log files that shows a summary
124            of what the job did (or why it failed)
125
126        ``fail``
127            This tag is used in the result line if the job failed with a exception.
128
129        ``kill``
130            This tag is used in the result line if the job was killed because it
131            exceeded the maximum allowed runtime.
132
133    The job con be configured in three ways. By class attributes in the
134    :class:`Job` subclass, by attributes of the :class:`Job` instance (e.g. set
135    in :meth:`__init__`) and by command line arguments (if :func:`executewithargs`
136    is used). The following attributes are supported:
137
138    ``projectname`` : :option:`-p` or :option:`--projectname`
139        The name of the project this job belongs to. This might be a dot-separated
140        hierarchical project name (e.g. including customer names or similar stuff).
141
142    ``jobname`` : :option:`-j` or :option:`--jobname`
143        The name of the job itself (defaulting to the name of the class if none
144        is given).
145
146    ``argdescription`` : No command line equivalent
147        Description for help message of the command line argument parser.
148
149    ``maxtime`` : :option:`-m` or :option:`--maxtime`
150        Maximum allowed runtime for the job (as the number of seconds). If the job
151        runs longer than that it will kill itself.
152
153    ``fork`` : :option:`--fork`
154        Forks the process and does the work in the child process. The parent
155        process is responsible for monitoring the maximum runtime (this is the
156        default). In non-forking mode the single process does both the work and
157        the runtime monitoring.
158
159    ``noisykills`` : :option:`--noisykills`
160        Should a message be printed when the maximum runtime is exceeded?
161
162    ``logfilename`` : :option:`--logfilename`
163        Path/name of the logfile for this job as an UL4 template. Variables
164        available in the template include ``user_name``, ``projectname``,
165        ``jobname`` and ``starttime``.
166
167    ``loglinkname`` : :option:`--loglinkname`
168        A link that points to the currently active logfile (as an UL4 template).
169        If this is :const:`None` no link will be created.
170
171    ``log2file`` : :option:`-f` or :option:`--log2file`
172        Should a logfile be written at all?
173
174    ``formatlogline`` : :option:`--formatlogline`
175        An UL4 template for formatting each line in the logfile. Available
176        variables are ``time`` (current time), ``starttime`` (start time of the
177        job), ``tags`` (list of tags for the line) and ``line`` (the log line
178        itself).
179
180    ``keepfilelogs`` : :option:`--keepfilelogs`
181        The number of days the logfiles are kept. Old logfiles (i.e. any file in
182        the same directory as the current logfile that's more than
183        ``keepfilelogs`` days old) will be removed at the end of the job.
184
185    ``inputencoding`` : :option:`--inputencoding`
186        The encoding to be used for data that is supposed to be unicode, but isn't
187        (e.g. host/user/network info, lines passed to ``self.log`` etc.)
188
189    ``inputerrors`` : :option:`--inputerrors`
190        Decoding error handler name (goes with ``inputencoding``)
191
192    ``outputencoding`` : :option:`--outputencoding`
193        The encoding to be used for the logfile.
194
195    ``outputerrors`` : :option:`--outputerrors`
196        Encoding error handler name (goes with ``outputencoding``)
197
198    Command line arguments take precedence over instance attributes (if
199    :func:`executewithargs` is used) and those take precedence over class
200    attributes.
201    """
202
203    projectname = None
204    jobname = None
205
206    argdescription = "execute the job"
207
208    maxtime = 5 * 60
209
210    fork = True
211
212    noisykills = False
213
214    logfilename = u"~/ll.sisyphus/<?print projectname?>/<?print jobname?>/<?print starttime.format('%Y-%m-%d-%H-%M-%S-%f')?>.sisyphuslog"
215    loglinkname = u"~/ll.sisyphus/<?print projectname?>/<?print jobname?>/current.sisyphuslog"
216
217    log2file = True
218    log2stdout = False
219    log2stderr = False
220
221    formatlogline = u"[<?print time?>]=[t+<?print time-starttime?>]<?for tag in tags?>[<?print tag?>]<?end for?>: <?print line?>"
222
223    keepfilelogs = 30
224
225    inputencoding = u"utf-8"
226    inputerrors = u"literaldecode"
227
228    outputencoding = u"utf-8"
229    outputerrors = u"strict"
230
231    def execute(self):
232        """
233        Execute the job once. The return value is a one line summary of what the
234        job did. Overwrite in subclasses.
235        """
236        return u"done"
237
238    def failed(self):
239        """
240        Called when running the job generated an exception. Overwrite in
241        subclasses, to e.g. rollback your database transactions.
242        """
243        pass
244
245    def argparser(self):
246        """
247        Return an :mod:`argparse` parser for parsing the command line arguments.
248        This can be overwritten in subclasses to add more arguments.
249        """
250        p = argparse.ArgumentParser(description=self.argdescription, epilog="For more info see http://www.livinglogic.de/Python/sisyphus/")
251        p.add_argument("-p", "--projectname", dest="projectname", metavar="NAME", help="The name of the project this job belongs to (default: %(default)s)", type=self._string, default=self.projectname)
252        p.add_argument("-j", "--jobname", dest="jobname", metavar="NAME", help="The name of the job (default: %(default)s)", type=self._string, default=self.jobname if self.jobname is not None else self.__class__.__name__)
253        p.add_argument("-m", "--maxtime", dest="maxtime", metavar="SECONDS", help="Maximum number of seconds the job is allowed to run (default: %(default)s)", type=int, default=self.maxtime)
254        p.add_argument(      "--fork", dest="fork", help="Fork the process and do the work in the child process? (default: %(default)s)", action=misc.FlagAction, default=self.fork)
255        p.add_argument("-f", "--log2file", dest="log2file", help="Should the job log into a file? (default: %(default)s)", action=misc.FlagAction, default=self.log2file)
256        p.add_argument("-o", "--log2stdout", dest="log2stdout", help="Should the job log to stdout? (default: %(default)s)", action=misc.FlagAction, default=self.log2stdout)
257        p.add_argument("-e", "--log2stderr", dest="log2stderr", help="Should the job log to stderr? (default: %(default)s)", action=misc.FlagAction, default=self.log2stderr)
258        p.add_argument(      "--keepfilelogs", dest="keepfilelogs", metavar="DAYS", help="Number of days log files are kept (default: %(default)s)", type=float, default=self.keepfilelogs)
259        p.add_argument(      "--inputencoding", dest="inputencoding", metavar="ENCODING", help="Encoding for system data (i.e. crontab etc.) (default: %(default)s)", default=self.inputencoding)
260        p.add_argument(      "--inputerrors", dest="inputerrors", metavar="METHOD", help="Error handling method for encoding errors in system data (default: %(default)s)", default=self.inputerrors)
261        p.add_argument(      "--outputencoding", dest="outputencoding", metavar="ENCODING", help="Encoding for the log file (default: %(default)s)", default=self.outputencoding)
262        p.add_argument(      "--outputerrors", dest="outputerrors", metavar="METHOD", help="Error handling method for encoding errors in log texts (default: %(default)s)", default=self.outputerrors)
263        p.add_argument(      "--noisykills", dest="noisykills", help="Should a message be printed if the maximum runtime is exceeded? (default: %(default)s)", action=misc.FlagAction, default=self.noisykills)
264        return p
265
266    def parseargs(self, args=None):
267        """
268        Use the parser returned by :meth:`argparser` to parse the argument
269        sequence :var:`args`, modify :var:`self` accordingly and return
270        the result of the parsers :meth:`parse_args` call.
271        """
272        p = self.argparser()
273        args = p.parse_args(args)
274        self.projectname = args.projectname
275        self.jobname = args.jobname
276        self.maxtime = args.maxtime
277        self.fork = args.fork
278        self.noisykills = args.noisykills
279        self.log2file = args.log2file
280        self.log2stdout = args.log2stdout
281        self.log2stderr = args.log2stderr
282        self.keepfilelogs = datetime.timedelta(days=args.keepfilelogs)
283        self.inputencoding = args.inputencoding
284        self.inputerrors = args.inputerrors
285        self.outputencoding = args.outputencoding
286        self.outputerrors = args.outputerrors
287        return args
288
289    def _alarm_fork(self, signum, frame):
290        os.kill(self.killpid, signal.SIGTERM) # Kill our child
291        maxtime = datetime.timedelta(seconds=self.maxtime)
292        if self._logfile is not None:
293            self.log.sisyphus.result.kill(u"Terminated child after {}".format(maxtime))
294            self._logfile.close()
295        if self.noisykills:
296            print "Terminated forked job {} (pid {}) after {}".format(self.info.sysinfo.scriptname, self.info.sysinfo.pid, maxtime)
297        os._exit(1)
298
299    def _alarm_nofork(self, signum, frame):
300        self._prefix = ""
301        maxtime = datetime.timedelta(seconds=self.maxtime)
302        if self._logfile is not None:
303            self.log.sisyphus.result.kill(u"Terminated after {}".format(maxtime))
304            self._logfile.close()
305        if self.noisykills:
306            print "Terminated job {} (pid {}) after {}".format(self.info.sysinfo.scriptname, self.info.sysinfo.pid, maxtime)
307        os._exit(1)
308
309    def _handleexecution(self):
310        """
311        Handle executing the job including handling of duplicate or hanging jobs.
312        """
313        self.info = AttrDict()
314        self.info.sysinfo = misc.SysInfo(self.inputencoding, self.inputerrors)
315        self.info.projectname = self._string(self.projectname)
316        self.info.jobname = self._string(self.jobname)
317        self.info.maxtime = self.maxtime
318        self._prefix = ""
319
320        # Obtain a lock on the script file to make sure we're the only one running
321        with open(self.info.sysinfo.scriptname, "rb") as f:
322            try:
323                fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
324            except IOError, exc:
325                if exc[0] not in (errno.EACCES, errno.EAGAIN): # some other error
326                    raise
327                # The previous invocation of the job is still running
328                return # Return without calling :meth:`execute`
329
330            # we were able to obtain the lock, so we are the only one running
331            self.info.starttime = datetime.datetime.now()
332
333            self._getscriptsource() # Get source code
334            self._getcrontab() # Get crontab
335            self.lineno = 1 # Current line number
336            self.log = Tag(self._log) # Create tagged logger
337            self._formatlogline = ul4c.Template(self.formatlogline.replace("\n", "").replace("\r", "") + u"\n", "formatlogline") # Log line formatting template
338            self._createlog() # Create log file and link
339
340            self.log.sisyphus.init(u"{} (max time {}; pid {})".format(self.info.sysinfo.scriptname, datetime.timedelta(seconds=self.maxtime), self.info.sysinfo.pid))
341
342            if self.fork: # Forking mode?
343                # Fork the process; the child will do the work; the parent will monitor the maximum runtime
344                self.killpid = pid = os.fork()
345                if pid: # We are the parent process
346                    # set a signal to kill the child process after the maximum runtime
347                    signal.signal(signal.SIGALRM, self._alarm_fork)
348                    signal.alarm(self.maxtime)
349                    os.wait() # Wait for the child process to terminate
350                    return # Exit normally
351                self.log.sisyphus.init(u"forked worker child (child pid {})".format(os.getpid()))
352            else: # We didn't fork
353                # set a signal to kill ourselves after the maximum runtime
354                signal.signal(signal.SIGALRM, self._alarm_nofork)
355                signal.alarm(self.maxtime)
356
357            try:
358                with url.Context():
359                    result = self.execute()
360                self._cleanupoldlogs() # Clean up old logfiles
361            except BaseException, exc:
362                # log the error to the logfile, because the job probably didn't have a chance to do it
363                self.log.sisyphus.exc(exc)
364                result = u"failed with {}".format(self._exc(exc))
365                self.log.sisyphus.result.fail(result)
366                self.failed()
367                raise
368            else:
369                # log the result
370                self.log.sisyphus.result.ok(self._string(result))
371            finally:
372                if self._logfile is not None:
373                    self._logfile.close()
374                fcntl.flock(f, fcntl.LOCK_UN | fcntl.LOCK_NB)
375            if self.fork:
376                os._exit(0)
377
378    @contextlib.contextmanager
379    def prefix(self, prefix):
380        """
381        :meth:`prefix` is a context manager. For the duration of the ``with`` block
382        :var:`prefix` will be prepended to all log lines. :meth:`prefix` calls can
383        be nested.
384        """
385        oldprefix = self._prefix
386        self._prefix += prefix
387        try:
388            yield
389        finally:
390            self._prefix = oldprefix
391
392    def _log(self, tags, *texts):
393        """
394        Log items in :var:`texts` to the log file using :var:`tags` as the list
395        of tags.
396        """
397        if self.log2file or self.log2stdout or self.log2stderr:
398            timestamp = datetime.datetime.now()
399            for text in texts:
400                text = self._string(text)
401                if isinstance(text, BaseException):
402                    if "exc" not in tags:
403                        tags += ("exc",)
404                    tb = u"".join(map(self._string, traceback.format_tb(sys.exc_info()[-1])))
405                    text = u"{tb}\n{exc}".format(tb=tb, exc=self._exc(text))
406                elif not isinstance(text, unicode):
407                    text = self._string(pprint.pformat(text))
408                lines = text.splitlines()
409                if lines and not lines[-1].strip():
410                    del lines[-1]
411                for line in lines:
412                    text = self._formatlogline.renders(line=self._prefix+line, time=timestamp, tags=tags, **self.info)
413                    text = text.encode(self.outputencoding, self.outputerrors)
414                    if self.log2file:
415                        self._logfile.write(text)
416                        self._logfile.flush()
417                    if self.log2stdout:
418                        sys.stdout.write(text)
419                        sys.stdout.flush()
420                    if self.log2stderr:
421                        sys.stderr.write(text)
422                        sys.stderr.flush()
423                    self.lineno += 1
424
425    def _getscriptsource(self):
426        """
427        Reads the source code of the script into ``self.source``.
428        """
429        try:
430            with open(self.info.sysinfo.scriptname.rstrip("c"), "rb") as f:
431                source = f.read()
432                lines = source.splitlines()
433                # find encoding in the first two lines
434                encoding = self.inputencoding
435                if lines and lines[0].startswith(codecs.BOM_UTF8):
436                    encoding = "utf-8"
437                else:
438                    for line in lines[:2]:
439                        match = encodingdeclaration.search(line)
440                        if match is not None:
441                            encoding = match.group(1)
442                self.source = source.decode(encoding, self.inputerrors)
443        except IOError: # Script might have called ``os.chdir()`` before
444            self.source = None
445
446    def _getcrontab(self):
447        """
448        Reads the current crontab into ``self.crontab``.
449        """
450        self.crontab = self._string(os.popen("crontab -l 2>/dev/null").read())
451
452    def _createlog(self):
453        """
454        Create the logfile and the link to the logfile (if requested).
455        """
456        self._logfile = None
457        self._logfilename = None
458        self._loglinkname = None
459        if self.log2file:
460            # Create the log file
461            logfilename = ul4c.Template(self.logfilename, "logfilename").renders(**self.info)
462            lf = self._logfilename = url.File(logfilename).abs()
463            self._logfile = lf.openwrite()
464            if self.loglinkname is not None:
465                # Create the log link
466                loglinkname = ul4c.Template(self.loglinkname, "loglinkname").renders(**self.info)
467                ll = self._loglinkname = url.File(loglinkname).abs()
468                lf = self._logfilename
469                try:
470                    lf.symlink(ll)
471                except OSError, exc:
472                    if exc[0] == errno.EEXIST:
473                        ll.remove()
474                        lf.symlink(ll)
475                    else:
476                        raise
477
478    def _cleanupoldlogs(self):
479        """
480        Remove old logfiles.
481        """
482        if self._logfile is not None and self.keepfilelogs is not None:
483            removedany = False
484            keepfilelogs = self.keepfilelogs
485            if not isinstance(keepfilelogs, datetime.timedelta):
486                keepfilelogs = datetime.timedelta(days=keepfilelogs)
487            threshold = datetime.datetime.utcnow() - keepfilelogs # Files older that this will be deleted
488            logdir = self._logfile.url.withoutfile()
489            for fileurl in logdir/logdir.files():
490                fileurl = logdir/fileurl
491                # Never delete the current log file or link, even if keepfilelogs is 0
492                if fileurl == self._logfilename or fileurl == self._loglinkname:
493                    continue
494                # If the file is to old, delete it (note that this might delete files that were not produced by sisyphus)
495                if fileurl.mdate() < threshold:
496                    if not removedany: # Only log this line for the first logfile we remove
497                        self.log.sisyphus.info("Removing logfiles older than {}".format(keepfilelogs))
498                        removedany = True
499                    self.log.sisyphus.info("Removing logfile {}".format(fileurl.local()))
500                    fileurl.remove()
501
502    def _string(self, s):
503        """
504        Convert :var:`s` to unicode if it's a :class:`str`.
505        """
506        if isinstance(s, str):
507            s = s.decode(self.inputencoding, self.inputerrors)
508        return s
509
510    def _exc(self, exc):
511        """
512        Format an exception object for logging.
513        """
514        if exc.__class__.__module__ not in ("__builtin__", "exceptions"):
515            fmt = u"{0.__class__.__module__}.{0.__class__.__name__}: {1}"
516        else:
517            fmt = u"{0.__class__.__name__}: {1}"
518        try:
519            strexc = unicode(exc)
520        except UnicodeError:
521            try:
522                strexc = str(exc).decode("iso-8859-1") # latin-1 might be wrong, but it will not produce an exception
523            except Exception:
524                strexc = u"?"
525        return fmt.format(exc, strexc)
526
527
528class Tag(object):
529    """
530    A :class:`Tag` object can be used to call a function with an additional list
531    of tags. Tags ca be added via :meth:`__getattr__` or :meth:`__getitem__` calls.
532    """
533    def __init__(self, log, *tags):
534        self.log = log
535        self.tags = tags
536        self._map = {}
537
538    def __getattr__(self, tag):
539        if tag in self.tags: # Avoid duplicate tags
540            return self
541        if tag not in self._map:
542            newtag = Tag(self.log, *(self.tags + (tag,)))
543            self._map[tag] = newtag
544            return newtag
545        else:
546            return self._map[tag]
547
548    __getitem__ = __getattr__
549
550    def __call__(self, *texts, **kwargs):
551        self.log(self.tags, *texts, **kwargs)
552
553
554class AttrDict(dict):
555    """
556    :class:`dict` subclass that makes keys available as attributes.
557    """
558    def __getattr__(self, name):
559        try:
560            return self[name]
561        except KeyError:
562            raise AttributeError
563
564    def __setattr__(self, name, value):
565        self[name] = value
566
567
568def execute(job):
569    """
570    Execute the job :var:`job` once.
571    """
572    job._handleexecution()
573
574
575def executewithargs(job, args=None):
576    """
577    Execute the job :var:`job` once with command line arguments.
578
579    :var:`args` are the command line arguments (:const:`None` results in
580    ``sys.argv`` being used)
581    """
582    job.parseargs(args)
583    job._handleexecution()
Note: See TracBrowser for help on using the browser.