root/livinglogic.python.tipimaid/liaalh_sender.py @ 81:2ba82a3a2b6d

Revision 81:2ba82a3a2b6d, 8.9 KB (checked in by Walter Doerwald <walter@…>, 10 years ago)

Whitespace.

  • Property exe set to *
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys, socket, select, signal, datetime, os
5# TODO: What happens if we have to send in multiple chunks and bad things happen after one but before the last chunk? ATM this would result in liaalh-hiccup :-(
6def intify(s, defaultint=0):
7    try:
8        return int(s)
9    except ValueError, exc:
10        return defaultint
11
12class Sender(object):
13    def __init__(self, ip, port, buffertime=0, backuppath=None, stream=sys.stdin, progname="liaalh_sender"): # TODO: Put the right name in here
14        self.ip = ip
15        self.port = port
16        self.stream = stream
17        self.progname = progname
18        self.check_every = datetime.timedelta(seconds=10)
19        if buffertime > 3:
20            self.buffertime = datetime.timedelta(seconds=buffertime - 3) # just to be sure...
21        else:
22            self.buffertime = datetime.timedelta(seconds=0)
23        self.buffer = []
24        self.startedbuffering = datetime.datetime.now()
25        self.s = None
26        self.f = None
27        if backuppath: # test if we may write to the backuppath-directory
28            backuppath = backuppath.rstrip(os.path.sep)
29            try:
30                os.chdir(backuppath)
31            except OSError, exc:
32                sys.stderr.write("Unable to enter directory %s\n" % backuppath)
33                raise
34            try:
35                filename = str(hash(datetime.datetime.now()))
36                f = open(os.path.join(backuppath, filename), "a")
37                f.close()
38                os.remove(os.path.join(backuppath, filename))
39            except IOError, exc:
40                sys.stderr.write("No write permissions for directory %s\n" % backuppath)
41                raise
42            self.backuppath = backuppath
43        self.create_socket()
44        self.last_check_connection = datetime.datetime.now()
45        signal.signal(signal.SIGALRM, self.timeout)
46        signal.signal(signal.SIGHUP, self.sighandler)
47        signal.signal(signal.SIGTERM, self.sighandler)
48        signal.signal(signal.SIGINT, self.sighandler)
49        signal.signal(signal.SIGQUIT, self.sighandler)
50
51    def readlines(self):
52        while True:
53            try:
54                line = self.stream.readline()
55            except IOError, exc:
56                if exc[0] == errno.EINTR:
57                    continue
58                else:
59                    raise
60            if not line:
61                break
62            yield line
63
64    def create_socket(self):
65        if self.s is None:
66            for (af, socktype, proto, canonname, sa) in socket.getaddrinfo(self.ip, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
67                try:
68                    s = socket.socket(af, socktype, proto)
69                except socket.error, msg:
70                    s = None
71                    continue
72                try:
73                    s.connect(sa)
74                except socket.error, exc:
75                    s.close()
76                    s = None
77                    continue
78                break
79            self.s = s
80
81    def timeout(self, signum, frame):
82        raise socket.timeout
83
84    def send_socket(self, line):
85        (l, t) = (len(line), 0)
86        while t < l: # it might be that only a part of line gets transmitted
87            t += self.s.send(line[t:])
88            try:
89                signal.alarm(5) # wait 5 seconds for server to notify us that it got the message
90                ret = self.s.recv(1024)
91                if ret == "":
92                    raise socket.timeout
93            except (socket.timeout, socket.error), exc:
94                signal.alarm(0)
95                self.s.close()
96                self.s = None
97                self.last_check_connection = self.startedbuffering = datetime.datetime.now()
98                sys.stderr.write("%s (%s): Socket connection lost, starting to log to buffer and/or recovery file\n" % (self.progname, self.startedbuffering.isoformat()))
99                self.send_tempfile(line)
100            signal.alarm(0)
101
102    def send_tempfile(self, line):
103        if datetime.datetime.now() - self.startedbuffering < self.buffertime:
104            self.buffer.append(line)
105        else: # this entry is too late - write it to a tempfile
106            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
107            if self.f:
108                if self.f.name != filename:
109                    self.f.close()
110                    self.f = open(filename, "a", 1)
111            else:
112                self.f = open(filename, "a", 1)
113            if self.buffer: # if we have buffered entries these must be handled before "line"
114                sys.stderr.write("%s (%s): Dumping buffered data to local recovery file\n" % (self.progname, datetime.datetime.now().isoformat()))
115                for oldline in self.buffer:
116                    self.f.write(oldline)
117                self.buffer = []
118            self.f.write(line)
119        td = datetime.datetime.now() - self.last_check_connection
120        if td > self.check_every:
121            self.create_socket()
122            self.last_check_connection = datetime.datetime.now()
123            if self.s:
124                if self.f:
125                    self.f.close()
126                    self.f = None
127                if self.buffer: #  the good case: we have buffered entries and got our connection back before the buffertime was over
128                    for (i, oldline) in enumerate(self.buffer):
129                        if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
130                            (l, t) = (len(oldline), 0)
131                            while t < l: # it might be that only a part of line gets transmitted
132                                try:
133                                    t += self.s.send(oldline[t:])
134                                    signal.alarm(2) # wait 2 seconds for server to notify us that it got the message
135                                    ret = self.s.recv(1024)
136                                    if ret == "":
137                                        raise socket.timeout
138                                except (socket.timeout, socket.error), exc:
139                                    signal.alarm(0)
140                                    self.s.close()
141                                    self.s = None
142                                    self.buffer = self.buffer[i:] # the first i items were transmitted successfully
143                                    sys.stderr.write("%s (%s): Socket connection died again, while sending buffered data\n" % (self.progname, datetime.datetime.now().isoformat()))
144                                    return # our socket died while we were transmitting buffered items - go back to "normal" behaviour
145                                signal.alarm(0)
146                        else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
147                            self.s.close() # make sure that we go to this method again to write away all buffered content which is too old
148                            self.s = None # that's why we throw away our socket
149                            self.buffer = self.buffer[i:] # and the content which was transmitted to the server in time
150                            return
151                    sys.stderr.write("%s (%s): Socket connection established again, all data could be sent to the server\n" % (self.progname, datetime.datetime.now().isoformat()))
152                    self.buffer = [] # if we arrive here everything was sent successfully
153
154    def flush_buffer(self):
155        if not self.s:
156            self.create_socket()
157            self.last_check_connection = datetime.datetime.now()
158        if self.s:
159            for (i, oldline) in enumerate(self.buffer):
160                if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
161                    (l, t) = (len(oldline), 0)
162                    while t < l: # it might be that only a part of line gets transmitted
163                        try:
164                            t += self.s.send(oldline[t:])
165                            signal.alarm(2) # wait 2 seconds for server to notify us that it got the message
166                            ret = self.s.recv(1024)
167                            if ret == "":
168                                raise socket.timeout
169                            signal.alarm(0)
170                        except (socket.timeout, socket.error), exc:
171                            signal.alarm(0)
172                            self.s.close()
173                            self.s = None
174                            self.buffer = self.buffer[i:] # the first i items were transmitted successfully
175                            break
176                    else:
177                        continue # if we didn't break in the while loop, continue for loop
178                    break # otherwise break the for loop, too
179                else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
180                    self.buffer = self.buffer[i:]
181                    break
182            else: # we didn't break the for loop so all data was transmitted successfully
183                self.buffer = []
184                if self.f:
185                    self.f.close()
186                    self.f = None
187                return
188        if self.buffer:
189            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
190            if self.f:
191                if self.f.name != filename:
192                    self.f.close()
193                    self.f = open(filename, "a", 1)
194            else:
195                self.f = open(filename, "a", 1)
196            for oldline in self.buffer:
197                self.f.write(oldline)
198            self.buffer = []
199            self.f.close()
200            self.f = None
201
202    def send(self):
203        try:
204            for line in self.readlines():
205                if self.s:
206                    self.send_socket(line)
207                else:
208                    self.send_tempfile(line)
209            if self.s:
210                self.s.close()
211        except Exception, exc:
212            if self.buffer:
213                self.flush_buffer()
214            raise
215
216    def sighandler(self, signum, frame):
217        self.flush_buffer()
218        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT):
219            sys.exit(signum)
220
221
222def main(args=None):
223    import optparse
224    p = optparse.OptionParser(usage="usage: %prog ip port [options]")
225    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering; for buffering it should be set to at least 10.", default=0)
226    p.add_option("-p", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None)
227    (options, args) = p.parse_args()
228    if len(args) != 2 or intify(args[1], None) is None or options.buffertime== 0 or options.backuppath is None:
229        p.print_usage(sys.stderr)
230        sys.stderr.write("%s: Please specify ip, port, a buffertime and a path for local backups!\n" % p.get_prog_name()) # required options... not so nice but necessary
231        sys.stderr.flush()
232        return 1
233    S = Sender(args[0], intify(args[1], None), intify(options.buffertime, 0), options.backuppath, sys.stdin, p.get_prog_name())
234    S.send()
235
236
237if __name__ == "__main__":
238    sys.exit(main())
Note: See TracBrowser for help on using the browser.