root/livinglogic.python.tipimaid/liaalh_sender.py @ 57:a207bc82c12a

Revision 57:a207bc82c12a, 8.7 KB (checked in by Nikolas Tautenhahn <nik@…>, 10 years ago)

made buffertime and backuppath \"required options\"

  • Property exe set to *
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys, socket, select, signal, datetime, os
5# TODO: What happens if we have to send in multiple chunks and bad things happen after one but before the last chunk? ATM this would result in liaalh-hiccup :-(
6def intify(s, defaultint=0):
7    try:
8        return int(s)
9    except ValueError, exc:
10        return defaultint
11
12class Sender(object):
13    def __init__(self, ip, port, buffertime=0, backuppath=None, stream=sys.stdin, progname="liaalh_sender"): # TODO: Put the right name in here
14        self.ip = ip
15        self.port = port
16        self.stream = stream
17        self.progname = progname
18        self.check_every = datetime.timedelta(seconds=10)
19        if buffertime > 3:
20            self.buffertime = datetime.timedelta(seconds=buffertime - 3) # just to be sure...
21        else:
22            self.buffertime = datetime.timedelta(seconds=0)
23        self.buffer = []
24        self.startedbuffering = datetime.datetime.now()
25        self.s = None
26        self.f = None
27        if backuppath: # test if we may write to the backuppath-directory
28            backuppath = backuppath.rstrip(os.path.sep)
29            try:
30                os.chdir(backuppath)
31            except OSError, exc:
32                sys.stderr.write("Unable to enter directory %s\n" % backuppath)
33                raise
34            try:
35                filename = str(hash(datetime.datetime.now()))
36                f = open(os.path.join(backuppath, filename), "a")
37                f.close()
38                os.remove(os.path.join(backuppath, filename))
39            except IOError, exc:
40                sys.stderr.write("No write permissions for directory %s\n" % backuppath)
41                raise
42            self.backuppath = backuppath
43        self.create_socket()
44        self.last_check_connection = datetime.datetime.now()
45        signal.signal(signal.SIGALRM, self.timeout)
46        signal.signal(signal.SIGHUP, self.sighandler)
47        signal.signal(signal.SIGTERM, self.sighandler)
48        signal.signal(signal.SIGINT, self.sighandler)
49        signal.signal(signal.SIGQUIT, self.sighandler)
50
51    def readlines(self):
52        while True:
53            try:
54                line = self.stream.readline()
55            except IOError, exc:
56                if exc[0] == errno.EINTR:
57                    continue
58                else:
59                    raise
60            if not line:
61                break
62            yield line
63
64    def create_socket(self):
65        if self.s is None:
66            for (af, socktype, proto, canonname, sa) in socket.getaddrinfo(self.ip, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
67                try:
68                    s = socket.socket(af, socktype, proto)
69                except socket.error, msg:
70                    s = None
71                    continue
72                try:
73                    s.connect(sa)
74                except socket.error, exc:
75                    s.close()
76                    s = None
77                    continue
78                break
79            self.s = s
80
81    def timeout(self, signum, frame):
82        raise socket.timeout
83
84    def send_socket(self, line):
85        (l, t) = (len(line), 0)
86        while t < l: # it might be that only a part of line gets transmitted
87            t += self.s.send(line[t:])
88            try:
89                signal.alarm(5) # wait 5 seconds for server to notify us that it got the message
90                ret = self.s.recv(1024)
91                if ret == "":
92                    raise socket.timeout
93            except (socket.timeout, socket.error), exc:
94                signal.alarm(0)
95                self.s.close()
96                self.s = None
97                self.last_check_connection = self.startedbuffering = datetime.datetime.now()
98                sys.stderr.write("%s (%s): Socket connection lost, starting to log to buffer and/or recovery file\n" % (self.progname, self.startedbuffering.isoformat()))
99                self.send_tempfile(line)
100            signal.alarm(0)
101
102    def send_tempfile(self, line):
103        if datetime.datetime.now() - self.startedbuffering < self.buffertime:
104            self.buffer.append(line)
105        else: # this entry is too late - write it to a tempfile
106            if self.f:
107                if self.f.name != os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")):
108                    self.f.close()
109                    self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1)
110            else:
111                self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1)
112            if self.buffer: # if we have buffered entries these must be handled before "line"
113                for oldline in self.buffer:
114                    self.f.write(oldline)
115                self.buffer = []
116            self.f.write(line)
117        td = datetime.datetime.now() - self.last_check_connection
118        if td > self.check_every:
119            self.create_socket()
120            self.last_check_connection = datetime.datetime.now()
121            if self.s:
122                if self.f:
123                    self.f.close()
124                    self.f = None
125                if self.buffer: #  the good case: we have buffered entries and got our connection back before the buffertime was over
126                    for (i, oldline) in enumerate(self.buffer):
127                        if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
128                            (l, t) = (len(oldline), 0)
129                            while t < l: # it might be that only a part of line gets transmitted
130                                try:
131                                    t += self.s.send(oldline[t:])
132                                    signal.alarm(2) # wait 2 seconds for server to notify us that it got the message
133                                    ret = self.s.recv(1024)
134                                    if ret == "":
135                                        raise socket.timeout
136                                except (socket.timeout, socket.error), exc:
137                                    signal.alarm(0)
138                                    self.s.close()
139                                    self.s = None
140                                    self.buffer = self.buffer[i:] # the first i items were transmitted successfully
141                                    return # our socket died while we were transmitting buffered items - go back to "normal" behaviour
142                                signal.alarm(0)
143                        else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
144                            self.s.close() # make sure that we go to this method again to write away all buffered content which is too old
145                            self.s = None # that's why we throw away our socket
146                            self.buffer = self.buffer[i:] # and the content which was transmitted to the server in time
147                            return
148                    sys.stderr.write("%s (%s): Socket connection established again, all data could be sent to the server\n" % (self.progname, self.last_check_connection.isoformat()))
149                    self.buffer = [] # if we arrive here everything was sent successfully
150
151    def flush_buffer(self):
152        if not self.s:
153            self.create_socket()
154            self.last_check_connection = datetime.datetime.now()
155        if self.s:
156            for (i, oldline) in enumerate(self.buffer):
157                if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
158                    (l, t) = (len(oldline), 0)
159                    while t < l: # it might be that only a part of line gets transmitted
160                        try:
161                            t += self.s.send(oldline[t:])
162                            signal.alarm(2) # wait 2 seconds for server to notify us that it got the message
163                            ret = self.s.recv(1024)
164                            if ret == "":
165                                raise socket.timeout
166                            signal.alarm(0)
167                        except (socket.timeout, socket.error), exc:
168                            signal.alarm(0)
169                            self.s.close()
170                            self.s = None
171                            self.buffer = self.buffer[i:] # the first i items were transmitted successfully
172                            break
173                    else:
174                        continue # if we didn't break in the while loop, continue for loop
175                    break # otherwise break the for loop, too
176                else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
177                    self.buffer = self.buffer[i:]
178                    break
179        if self.f:
180            if self.f.name != os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")):
181                self.f.close()
182                self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1)
183        else:
184            self.f = open(os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log")), "a", 1)
185        for oldline in self.buffer:
186            self.f.write(oldline)
187        self.buffer = []
188        self.f.close()
189        self.f = None
190
191    def send(self):
192        try:
193            for line in self.readlines():
194                if self.s:
195                    self.send_socket(line)
196                else:
197                    self.send_tempfile(line)
198            if self.s:
199                self.s.close()
200        except Exception, exc:
201            if self.buffer:
202                self.flush_buffer()
203            raise
204
205    def sighandler(self, signum, frame):
206        self.flush_buffer()
207        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT):
208            sys.exit(signum)
209
210
211def main(args=None):
212    import optparse
213    p = optparse.OptionParser(usage="usage: %prog ip port [options]")
214    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering; for buffering it should be set to at least 10.", default=0)
215    p.add_option("-p", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None)
216    (options, args) = p.parse_args()
217    if len(args) != 2 or intify(args[1], None) is None or options.buffertime== 0 or options.backuppath is None:
218        p.print_usage(sys.stderr)
219        sys.stderr.write("%s: Please specify ip, port, a buffertime and a path for local backups!\n" % p.get_prog_name()) # required options... not so nice but necessary
220        sys.stderr.flush()
221        return 1
222    S = Sender(args[0], intify(args[1], None), intify(options.buffertime, 0), options.backuppath, sys.stdin, p.get_prog_name())
223    S.send()
224
225
226if __name__ == "__main__":
227    sys.exit(main())
Note: See TracBrowser for help on using the browser.