root/livinglogic.python.tipimaid/liaalh_sender.py @ 82:22eec11ad852

Revision 82:22eec11ad852, 8.5 KB (checked in by Nik Tautenhahn <nik@…>, 10 years ago)

use socket.settimeout() instead of raising socket.timeout()s manually

  • Property exe set to *
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import sys, socket, select, signal, datetime, os
5
6def intify(s, defaultint=0):
7    try:
8        return int(s)
9    except ValueError, exc:
10        return defaultint
11
12class Sender(object):
13    def __init__(self, ip, port, buffertime=0, backuppath=None, stream=sys.stdin, progname="liaalh_sender"): # TODO: Put the right name in here
14        self.ip = ip
15        self.port = port
16        self.stream = stream
17        self.progname = progname
18        self.check_every = datetime.timedelta(seconds=10)
19        if buffertime > 3:
20            self.buffertime = datetime.timedelta(seconds=buffertime - 3) # just to be sure...
21        else:
22            self.buffertime = datetime.timedelta(seconds=0)
23        self.buffer = []
24        self.startedbuffering = datetime.datetime.now()
25        self.s = None
26        self.f = None
27        if backuppath: # test if we may write to the backuppath-directory
28            backuppath = backuppath.rstrip(os.path.sep)
29            try:
30                os.chdir(backuppath)
31            except OSError, exc:
32                sys.stderr.write("Unable to enter directory %s\n" % backuppath)
33                raise
34            try:
35                filename = str(hash(datetime.datetime.now()))
36                f = open(os.path.join(backuppath, filename), "a")
37                f.close()
38                os.remove(os.path.join(backuppath, filename))
39            except IOError, exc:
40                sys.stderr.write("No write permissions for directory %s\n" % backuppath)
41                raise
42            self.backuppath = backuppath
43        socket.setdefaulttimeout(5) # applies only to *new* sockets so we have to set it here
44        self.create_socket()
45        self.last_check_connection = datetime.datetime.now()
46#       signal.signal(signal.SIGALRM, self.timeout)
47        signal.signal(signal.SIGHUP, self.sighandler)
48        signal.signal(signal.SIGTERM, self.sighandler)
49        signal.signal(signal.SIGINT, self.sighandler)
50        signal.signal(signal.SIGQUIT, self.sighandler)
51
52    def readlines(self):
53        while True:
54            try:
55                line = self.stream.readline()
56            except IOError, exc:
57                if exc[0] == errno.EINTR:
58                    continue
59                else:
60                    raise
61            if not line:
62                break
63            yield line
64
65    def create_socket(self):
66        if self.s is None:
67            for (af, socktype, proto, canonname, sa) in socket.getaddrinfo(self.ip, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
68                try:
69                    s = socket.socket(af, socktype, proto)
70                except socket.error, msg:
71                    s = None
72                    continue
73                try:
74                    s.connect(sa)
75                except socket.error, exc:
76                    s.close()
77                    s = None
78                    continue
79                break
80            self.s = s
81
82    def timeout(self, signum, frame):
83        raise socket.timeout
84
85    def send_socket(self, line):
86        (l, t) = (len(line), 0)
87        while t < l: # it might be that only a part of line gets transmitted
88            t += self.s.send(line[t:])
89            try:
90                ret = self.s.recv(1024)
91                if ret == "":
92                    raise socket.timeout
93            except (socket.timeout, socket.error), exc:
94                self.s.close()
95                self.s = None
96                self.last_check_connection = self.startedbuffering = datetime.datetime.now()
97                sys.stderr.write("%s (%s): Socket connection lost, starting to log to buffer and/or recovery file\n" % (self.progname, self.startedbuffering.isoformat()))
98                self.send_tempfile(line)
99
100    def send_tempfile(self, line):
101        if datetime.datetime.now() - self.startedbuffering < self.buffertime:
102            self.buffer.append(line)
103        else: # this entry is too late - write it to a tempfile
104            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
105            if self.f:
106                if self.f.name != filename:
107                    self.f.close()
108                    self.f = open(filename, "a", 1)
109            else:
110                self.f = open(filename, "a", 1)
111            if self.buffer: # if we have buffered entries these must be handled before "line"
112                sys.stderr.write("%s (%s): Dumping buffered data to local recovery file\n" % (self.progname, datetime.datetime.now().isoformat()))
113                for oldline in self.buffer:
114                    self.f.write(oldline)
115                self.buffer = []
116            self.f.write(line)
117        td = datetime.datetime.now() - self.last_check_connection
118        if td > self.check_every:
119            self.create_socket()
120            self.last_check_connection = datetime.datetime.now()
121            if self.s:
122                if self.f:
123                    self.f.close()
124                    self.f = None
125                if self.buffer: #  the good case: we have buffered entries and got our connection back before the buffertime was over
126                    for (i, oldline) in enumerate(self.buffer):
127                        if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
128                            (l, t) = (len(oldline), 0)
129                            while t < l: # it might be that only a part of line gets transmitted
130                                try:
131                                    t += self.s.send(oldline[t:])
132                                    ret = self.s.recv(1024)
133                                    if ret == "":
134                                        raise socket.timeout
135                                except (socket.timeout, socket.error), exc:
136                                    self.s.close()
137                                    self.s = None
138                                    self.buffer = self.buffer[i:] # the first i items were transmitted successfully
139                                    sys.stderr.write("%s (%s): Socket connection died again, while sending buffered data\n" % (self.progname, datetime.datetime.now().isoformat()))
140                                    return # our socket died while we were transmitting buffered items - go back to "normal" behaviour
141                        else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
142                            self.s.close() # make sure that we go to this method again to write away all buffered content which is too old
143                            self.s = None # that's why we throw away our socket
144                            self.buffer = self.buffer[i:] # and the content which was transmitted to the server in time
145                            return
146                    sys.stderr.write("%s (%s): Socket connection established again, all data could be sent to the server\n" % (self.progname, datetime.datetime.now().isoformat()))
147                    self.buffer = [] # if we arrive here everything was sent successfully
148
149    def flush_buffer(self):
150        if not self.s:
151            self.create_socket()
152            self.last_check_connection = datetime.datetime.now()
153        if self.s:
154            for (i, oldline) in enumerate(self.buffer):
155                if datetime.datetime.now() - self.startedbuffering < self.buffertime: # make sure we are still in buffertime
156                    (l, t) = (len(oldline), 0)
157                    while t < l: # it might be that only a part of line gets transmitted
158                        try:
159                            t += self.s.send(oldline[t:])
160                            ret = self.s.recv(1024)
161                            if ret == "":
162                                raise socket.timeout
163                        except (socket.timeout, socket.error), exc:
164                            self.s.close()
165                            self.s = None
166                            self.buffer = self.buffer[i:] # the first i items were transmitted successfully
167                            break
168                    else:
169                        continue # if we didn't break in the while loop, continue for loop
170                    break # otherwise break the for loop, too
171                else: # some buffered entries come too late, maybe because it took so long to wait for the server's response.
172                    self.buffer = self.buffer[i:]
173                    break
174            else: # we didn't break the for loop so all data was transmitted successfully
175                self.buffer = []
176                if self.f:
177                    self.f.close()
178                    self.f = None
179                return
180        if self.buffer:
181            filename = os.path.join(self.backuppath, datetime.datetime.now().strftime("%Y%m%d_recovery.log"))
182            if self.f:
183                if self.f.name != filename:
184                    self.f.close()
185                    self.f = open(filename, "a", 1)
186            else:
187                self.f = open(filename, "a", 1)
188            for oldline in self.buffer:
189                self.f.write(oldline)
190            self.buffer = []
191            self.f.close()
192            self.f = None
193
194    def send(self):
195        try:
196            for line in self.readlines():
197                if self.s:
198                    self.send_socket(line)
199                else:
200                    self.send_tempfile(line)
201            if self.s:
202                self.s.close()
203        except Exception, exc:
204            if self.buffer:
205                self.flush_buffer()
206            raise
207
208    def sighandler(self, signum, frame):
209        self.flush_buffer()
210        if signum in (signal.SIGQUIT, signal.SIGTERM, signal.SIGINT):
211            sys.exit(signum)
212
213
214def main(args=None):
215    import optparse
216    p = optparse.OptionParser(usage="usage: %prog ip port [options]")
217    p.add_option("-b", "--buffertime", dest="buffertime", type="int", action="store", help="Time in seconds for which log entries are buffered, default=0. Set to 0 to disable buffering; for buffering it should be set to at least 10.", default=0)
218    p.add_option("-p", "--backuppath", dest="backuppath", type="string", action="store", help="Directory where recovery logs should be stored if the network connection dies", default=None)
219    (options, args) = p.parse_args()
220    if len(args) != 2 or intify(args[1], None) is None or options.buffertime == 0 or options.backuppath is None:
221        p.print_usage(sys.stderr)
222        sys.stderr.write("%s: Please specify ip, port, a buffertime and a path for local backups!\n" % p.get_prog_name()) # required options... not so nice but necessary
223        sys.stderr.flush()
224        return 1
225    S = Sender(args[0], intify(args[1], None), intify(options.buffertime, 0), options.backuppath, sys.stdin, p.get_prog_name())
226    S.send()
227
228
229if __name__ == "__main__":
230    sys.exit(main())
Note: See TracBrowser for help on using the browser.