Changeset 52:c4352bf4ac95 in livinglogic.python.tipimaid

Show
Ignore:
Timestamp:
12/22/08 18:59:06 (10 years ago)
Author:
Nikolas Tautenhahn <nik@…>
Branch:
default
Message:

buffer locally (in memory) as long as interruption is < buffertime. If we are lucky, liaalh's buffertime is enough to fix it. If not, write it to a file.

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • liaalh_sender.py

    r51 r52  
    88 
    99class Sender(object): 
    10     def __init__(self, ip, port, backuppath, stream=sys.stdin): 
     10    def __init__(self, ip, port, buffertime=0, backuppath=None, stream=sys.stdin): 
    1111        self.ip = ip 
    1212        self.port = port 
    1313        self.stream = stream 
    1414        self.check_every = datetime.timedelta(seconds=10) 
     15        if buffertime > 3: 
     16            self.buffertime = datetime.timedelta(seconds=buffertime - 3) # just to be sure... 
     17        else: 
     18            self.buffertime = 0 
     19        self.buffer = [] 
     20        self.startedbuffering = datetime.datetime.now() 
    1521        self.s = None 
    1622        self.f = None 
     
    7278        while t < l: # it might be that only a part of line gets transmitted 
    7379            t += self.s.send(line[t:]) 
    74             signal.alarm(5) # wait 5 seconds for server to notify us that it got the message 
    7580            try: 
     81                signal.alarm(5) # wait 5 seconds for server to notify us that it got the message 
    7682                ret = self.s.recv(1024) 
    7783                if ret == "": 
     
    8187                self.s.close() 
    8288                self.s = None 
    83                 self.last_check_connection = datetime.datetime.now() 
     89                self.last_check_connection = self.startedbuffering = datetime.datetime.now() 
    8490                self.send_tempfile(line) 
    8591            signal.alarm(0) 
    8692 
    8793    def send_tempfile(self, line): 
    88         if self.f: 
    89             if self.f.name != os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")): 
    90                 self.f.close() 
    91                 self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a") 
    92         else: 
    93             self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a") 
    94         self.f.write(line) 
     94        if datetime.datetime.now() - self.startedbuffering < self.buffertime: 
     95            self.buffer.append(line) 
     96        else: # this entry is too late - write it to a tempfile 
     97            if self.f: 
     98                if self.f.name != os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")): 
     99                    self.f.close() 
     100                    self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1) 
     101            else: 
     102                self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1) 
     103            if self.buffer: # write away all buffered entries 
     104                for oldline in self.buffer: 
     105                    self.f.write(oldline) 
     106                self.buffer = [] 
     107            self.f.write(line) 
    95108        td = datetime.datetime.now() - self.last_check_connection 
    96109        if td > self.check_every: 
     
    98111            self.last_check_connection = datetime.datetime.now() 
    99112            if self.s: 
    100                 self.f.close() 
    101                 self.f = None 
     113                if self.f: 
     114                    self.f.close() 
     115                    self.f = None 
     116                if self.buffer: #  the good case: we have buffered entries and got our connection back before the buffertime was over 
     117                    for (i, oldline) in enumerate(self.buffer): 
     118                        (l, t) = (len(oldline), 0) 
     119                        while t < l: # it might be that only a part of line gets transmitted 
     120                            t += self.s.send(oldline[t:]) 
     121                            try: 
     122                                signal.alarm(2) # wait 2 seconds for server to notify us that it got the message 
     123                                ret = self.s.recv(1024) 
     124                                if ret == "": 
     125                                    raise socket.timeout 
     126                            except (socket.timeout, socket.error), exc: 
     127                                signal.alarm(0) 
     128                                self.s.close() 
     129                                self.s = None 
     130                                self.buffer = self.buffer[i:] # the first i items were transmitted successfully 
     131                                return # our socket died while we were transmitting buffered items - go back to "normal" behaviour 
     132                            signal.alarm(0) 
    102133 
    103134    def send(self): 
    104         for line in self.readlines(): 
     135        try: 
     136            for line in self.readlines(): 
     137                print repr(self.s) 
     138                if self.s: 
     139                    self.send_socket(line) 
     140                else: 
     141                    self.send_tempfile(line) 
    105142            if self.s: 
    106                 self.send_socket(line) 
    107             else: 
    108                 self.send_tempfile(line) 
    109         if self.s: 
    110             self.s.close() 
     143                self.s.close() 
     144        except Exception, exc: 
     145            if self.f: 
     146                self.f.close() 
     147            if self.s: 
     148                self.s.close() 
     149            raise 
    111150 
    112151def main(args=None): 
    113152    import optparse 
    114153    p = optparse.OptionParser(usage="usage: %prog ip port [options]") 
    115     p.add_option("-b", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None) 
     154    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering; for buffering it should be set to at least 10.", default=0) 
     155    p.add_option("-p", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None) 
    116156    (options, args) = p.parse_args() 
    117157    if len(args) != 2 or intify(args[1], None) is None: 
     
    120160        sys.stderr.flush() 
    121161        return 1 
    122     S = Sender(args[0], intify(args[1], None), options.backuppath) 
     162    S = Sender(args[0], intify(args[1], None), intify(options.buffertime, 0), options.backuppath) 
    123163    S.send() 
    124164