root/livinglogic.python.tipimaid/liaalh_sender.py @ 83:ea8737f6934e

Revision 83:ea8737f6934e, 8.4 KB (checked in by Nik Tautenhahn <nik@…>, 10 years ago)

cosmetics (remove unneeded function, comments)

  • Property exe set to *
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys, socket, select, signal, datetime, os
5
6def intify(s, defaultint=0):
7    try:
8        return int(s)
9    except ValueError, exc:
10        return defaultint
11
12class Sender(object):
13    def __init__(self, ip, port, buffertime=0, backuppath=None, stream=sys.stdin, progname="liaalh_sender"): # TODO: Put the right name in here
14        self.ip = ip
15        self.port = port
16        self.stream = stream
17        self.progname = progname
18        self.check_every = datetime.timedelta(seconds=10)
19        if buffertime > 3:
20            self.buffertime = datetime.timedelta(seconds=buffertime - 3) # just to be sure...
21        else:
22            self.buffertime = datetime.timedelta(seconds=0)
23        self.buffer = []
24        self.startedbuffering = datetime.datetime.now()
25        self.s = None
26        self.f = None
27        if backuppath: # test if we may write to the backuppath-directory
28            backuppath = backuppath.rstrip(os.path.sep)
29            try:
30                os.chdir(backuppath)
31            except OSError, exc:
32                sys.stderr.write("Unable to enter directory %s\n" % backuppath)
33                raise
34            try:
35                filename = str(hash(datetime.datetime.now()))
36                f = open(os.path.join(backuppath, filename), "a")
37                f.close()
38                os.remove(os.path.join(backuppath, filename))
39            except IOError, exc:
40                sys.stderr.write("No write permissions for directory %s\n" % backuppath)
41                raise
42            self.backuppath = backuppath
43        socket.setdefaulttimeout(5) # applies only to *new* sockets so we have to set it here
44        self.create_socket()
45        self.last_check_connection = datetime.datetime.now()
46        signal.signal(signal.SIGHUP, self.sighandler)
47        signal.signal(signal.SIGTERM, self.sighandler)
48        signal.signal(signal.SIGINT, self.sighandler)
49        signal.signal(signal.SIGQUIT, self.sighandler)
50
51    def readlines(self):
52        while True:
53            try:
54                line = self.stream.readline()
55            except IOError, exc:
56                if exc[0] == errno.EINTR:
57                    continue
58                else:
59                    raise
60            if not line:
61                break
62            yield line
63
64    def create_socket(self):
65        if self.s is None:
66            for (af, socktype, proto, canonname, sa) in socket.getaddrinfo(self.ip, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
67                try:
68                    s = socket.socket(af, socktype, proto)
69                except socket.error, msg:
70                    s = None
71                    continue
72                try:
73                    s.connect(sa)
74                except socket.error, exc:
75                    s.close()
76                    s = None
77                    continue
78                break
79            self.s = s
80
81    def send_socket(self, line):
82        (l, t) = (len(line), 0)
83        while t < l: # it might be that only a part of line gets transmitted
84            t += self.s.send(line[t:])
85            try:
86                ret = self.s.recv(1024)
87                if ret == "":
88                    raise socket.timeout
89            except (socket.timeout, socket.error), exc:
90                self.s.close()
91                self.s = None
92                self.last_check_connection = self.startedbuffering = datetime.datetime.now()
93                sys.stderr.write("%s (%s): Socket connection lost, starting to log to buffer and/or recovery file\n" % (self.progname, self.startedbuffering.isoformat()))
94                self.send_tempfile(line)
95
96    def send_tempfile(self, line):
97        if datetime.datetime.now() - self.startedbuffering < self.buffertime:
98            self.buffer.append(line)
99        else: # this entry is too late - write it to a tempfile
100            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
101            if self.f:
102                if self.f.name != filename:
103                    self.f.close()
104                    self.f = open(filename, "a", 1)
105            else:
106                self.f = open(filename, "a", 1)
107            if self.buffer: # if we have buffered entries these must be handled before "line"
108                sys.stderr.write("%s (%s): Dumping buffered data to local recovery file\n" % (self.progname, datetime.datetime.now().isoformat()))
109                for oldline in self.buffer:
110                    self.f.write(oldline)
111                self.buffer = []
112            self.f.write(line)
113        td = datetime.datetime.now() - self.last_check_connection
114        if td > self.check_every:
115            self.create_socket()
116            self.last_check_connection = datetime.datetime.now()
117            if self.s:
118                if self.f:
119                    self.f.close()
120                    self.f = None
121                if self.buffer: #  the good case: we have buffered entries and got our connection back before the buffertime was over
122                    for (i, oldline) in enumerate(self.buffer):
123                        if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
124                            (l, t) = (len(oldline), 0)
125                            while t < l: # it might be that only a part of line gets transmitted
126                                try:
127                                    t += self.s.send(oldline[t:])
128                                    ret = self.s.recv(1024)
129                                    if ret == "":
130                                        raise socket.timeout
131                                except (socket.timeout, socket.error), exc:
132                                    self.s.close()
133                                    self.s = None
134                                    self.buffer = self.buffer[i:] # the first i items were transmitted successfully
135                                    sys.stderr.write("%s (%s): Socket connection died again, while sending buffered data\n" % (self.progname, datetime.datetime.now().isoformat()))
136                                    return # our socket died while we were transmitting buffered items - go back to "normal" behaviour
137                        else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
138                            self.s.close() # make sure that we go to this method again to write away all buffered content which is too old
139                            self.s = None # that's why we throw away our socket
140                            self.buffer = self.buffer[i:] # and the content which was transmitted to the server in time
141                            return
142                    sys.stderr.write("%s (%s): Socket connection established again, all data could be sent to the server\n" % (self.progname, datetime.datetime.now().isoformat()))
143                    self.buffer = [] # if we arrive here everything was sent successfully
144
145    def flush_buffer(self):
146        if not self.s:
147            self.create_socket()
148            self.last_check_connection = datetime.datetime.now()
149        if self.s:
150            for (i, oldline) in enumerate(self.buffer):
151                if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
152                    (l, t) = (len(oldline), 0)
153                    while t < l: # it might be that only a part of line gets transmitted
154                        try:
155                            t += self.s.send(oldline[t:])
156                            ret = self.s.recv(1024)
157                            if ret == "":
158                                raise socket.timeout
159                        except (socket.timeout, socket.error), exc:
160                            self.s.close()
161                            self.s = None
162                            self.buffer = self.buffer[i:] # the first i items were transmitted successfully
163                            break
164                    else:
165                        continue # if we didn't break in the while loop, continue for loop
166                    break # otherwise break the for loop, too
167                else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
168                    self.buffer = self.buffer[i:]
169                    break
170            else: # we didn't break the for loop so all data was transmitted successfully
171                self.buffer = []
172                if self.f:
173                    self.f.close()
174                    self.f = None
175                return
176        if self.buffer:
177            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
178            if self.f:
179                if self.f.name != filename:
180                    self.f.close()
181                    self.f = open(filename, "a", 1)
182            else:
183                self.f = open(filename, "a", 1)
184            for oldline in self.buffer:
185                self.f.write(oldline)
186            self.buffer = []
187            self.f.close()
188            self.f = None
189
190    def send(self):
191        try:
192            for line in self.readlines():
193                if self.s:
194                    self.send_socket(line)
195                else:
196                    self.send_tempfile(line)
197            if self.s:
198                self.s.close()
199        except Exception, exc:
200            if self.buffer:
201                self.flush_buffer()
202            raise
203
204    def sighandler(self, signum, frame):
205        self.flush_buffer()
206        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT):
207            sys.exit(signum)
208
209
210def main(args=None):
211    import optparse
212    p = optparse.OptionParser(usage="usage: %prog ip port [options]")
213    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering; for buffering it should be set to at least 10.", default=0)
214    p.add_option("-p", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None)
215    (options, args) = p.parse_args()
216    if len(args) != 2 or intify(args[1], None) is None or options.buffertime == 0 or options.backuppath is None:
217        p.print_usage(sys.stderr)
218        sys.stderr.write("%s: Please specify ip, port, a buffertime and a path for local backups!\n" % p.get_prog_name()) # required options... not so nice but necessary
219        sys.stderr.flush()
220        return 1
221    S = Sender(args[0], intify(args[1], None), intify(options.buffertime, 0), options.backuppath, sys.stdin, p.get_prog_name())
222    S.send()
223
224
225if __name__ == "__main__":
226    sys.exit(main())
Note: See TracBrowser for help on using the browser.