root/livinglogic.python.xist/test/test_orasql.py @ 3732:6d390fd934e0

Revision 3732:6d390fd934e0, 7.5 KB (checked in by Walter Doerwald <walter@…>, 11 years ago)

Merge orasql into xist. Drop unneccessary make actions. Add proper oracle URLs to ll.url.

Line 
1#! /usr/bin/env/python
2# -*- coding: utf-8 -*-
3
4## Copyright 2004-2009 by LivingLogic AG, Bayreuth/Germany.
5## Copyright 2004-2009 by Walter Dörwald
6##
7## All Rights Reserved
8##
9## See orasql/__init__.py for the license
10
11
12import sys, os, datetime
13
14import py.test
15
16from ll import orasql
17from ll.orasql.scripts import oracreate, oradrop, oradiff, oramerge, oragrant, orafind
18
19
20dbname = os.environ.get("LL_ORASQL_TEST_CONNECT") # Need a connectstring as environment var
21
22
23if dbname:
24    # here all objects are collected, so we don't need to call :meth:`iterobjects` multiple times
25    objlist = []
26    objdict = {}
27
28    def setup_module(module):
29        db = orasql.connect(dbname)
30        # get all definitions
31        # (this tests that :meth:`iterobjects`, :meth:`iterreferences` and :meth:`iterreferencedby` run to completion)
32        module.objdict = {}
33        for obj in db.iterobjects():
34            if obj.owner is None:
35                module.objlist.append(obj)
36                references = [o for o in obj.iterreferences() if o.owner is None]
37                referencedby = [o for o in obj.iterreferencedby() if o.owner is None]
38                module.objdict[obj] = (references, referencedby)
39
40
41    def teardown_module(module):
42        module.objlist = []
43        module.objdict = {}
44
45
46    def test_connect():
47        db = orasql.connect(dbname)
48        assert isinstance(db, orasql.Connection)
49
50
51    def test_connection_connectstring():
52        db = orasql.connect(dbname)
53        user = dbname.split("/")[0]
54        name = dbname.split("@")[1]
55        assert "%s@%s" % (user, name) == db.connectstring()
56
57
58    def test_connection_iterschema():
59        db = orasql.connect(dbname)
60        list(db.iterschema())
61
62
63    def test_connection_itertables():
64        db = orasql.connect(dbname)
65        list(db.itertables())
66
67
68    def test_connection_iterfks():
69        db = orasql.connect(dbname)
70        list(db.iterfks())
71
72
73    def test_connection_iterprivileges():
74        db = orasql.connect(dbname)
75        list(db.iterprivileges())
76
77
78    def test_referenceconsistency():
79        for (obj, (references, referencedby)) in objdict.iteritems():
80            for refobj in references:
81                # check that :meth:`iterobjects` returned everything from this schema
82                assert refobj.owner is not None or refobj in objdict
83                # check that the referenced object points back to this one (via referencedby)
84                if refobj.owner is None:
85                    assert obj in objdict[refobj][1]
86
87            # do the inverted check
88            for refobj in referencedby:
89                assert refobj.owner is not None or refobj in objdict
90                if refobj.owner is None:
91                    assert obj in objdict[refobj][0]
92
93
94    def test_ddl():
95        # check various ddl methods
96        for obj in objdict:
97            obj.createddl()
98            if isinstance(obj, orasql.Sequence):
99                obj.createddlcopy()
100            obj.dropddl()
101            if isinstance(obj, orasql.ForeignKey):
102                obj.enableddl()
103                obj.disableddl()
104
105
106    def test_repr():
107        # check that each repr method works
108        for obj in objdict:
109            repr(obj)
110
111
112    def test_cudate():
113        # check that cdate/udate method works
114        for obj in objdict:
115            cdate = obj.cdate()
116            assert cdate is None or isinstance(cdate, datetime.datetime)
117            udate = obj.udate()
118            assert udate is None or isinstance(udate, datetime.datetime)
119
120
121    def test_table_columns():
122        for obj in objdict:
123            if isinstance(obj, orasql.Table):
124                for col in obj.itercolumns():
125                    # comments are not output by :meth:`iterobjects`, so we have to call :meth:`iterreferences`
126                    assert obj in col.iterreferences()
127                    # check various methods
128                    # calling :meth:`modifyddl` doesn't make sense
129                    col.addddl()
130                    col.dropddl()
131                    col.cdate()
132                    col.udate()
133                    col.datatype()
134                    col.default()
135                    col.nullable()
136                    col.comment()
137
138
139    def test_table_comments():
140        for obj in objdict:
141            if isinstance(obj, orasql.Table):
142                # comments are output by :meth:`iterobjects`, but not for materialized views
143                if obj.ismview():
144                    for com in obj.itercomments():
145                        assert obj in com.iterreferences()
146                else:
147                    for com in obj.itercomments():
148                        assert obj in objdict[com][0]
149
150
151    def test_table_constraints():
152        for obj in objdict:
153            if isinstance(obj, orasql.Table):
154                for con in obj.iterconstraints():
155                    assert obj in objdict[con][0]
156
157
158    def test_table_records():
159        for obj in objdict:
160            if isinstance(obj, orasql.Table):
161                # fetch only a few records
162                for (i, rec) in enumerate(obj.iterrecords()):
163                    if i >= 5:
164                        break
165
166
167    def test_table_mview():
168        for obj in objdict:
169            if isinstance(obj, orasql.Table):
170                assert (obj.mview() is not None) == obj.ismview()
171
172
173    def test_constraints():
174        for obj in objdict:
175            if isinstance(obj, orasql.Constraint):
176                obj.table()
177                if isinstance(obj, orasql.ForeignKey):
178                    obj.pk()
179
180
181    def test_procedure_arguments():
182        for obj in objdict:
183            if isinstance(obj, orasql.Procedure):
184                list(obj.iterarguments())
185
186
187    def test_procedure_nonexistant():
188        db = orasql.connect(dbname)
189        py.test.raises(orasql.SQLObjectNotFoundError, orasql.Procedure("DOESNOTEXIST"), db.cursor())
190
191
192    def test_createorder():
193        # check that the default output order of :meth:`iterobjects` (i.e. create order) works
194        done = set()
195        for obj in objlist:
196            for refobj in objdict[obj][0]:
197                print obj, refobj
198                assert refobj in done
199            done.add(obj)
200
201
202    class BitBucket(object):
203        def write(self, text):
204            pass
205
206
207    def withbitbucket(func):
208        def decorator(*args, **kwargs):
209            oldstdout = sys.stdout
210            oldstderr = sys.stderr
211            bitbucket = BitBucket()
212            try:
213                sys.stdout = bitbucket
214                sys.stderr = bitbucket
215                return func(*args, **kwargs)
216            finally:
217                sys.stdout = oldstdout
218                sys.stderr = oldstderr
219        return decorator
220
221
222    @withbitbucket
223    def test_scripts_oracreate():
224        # Test oracreate without executing anything
225        args = "--color yes --verbose --seqcopy %s" % dbname
226        oracreate.main(args.split())
227
228
229    @withbitbucket
230    def test_scripts_oradrop():
231        # Test oradrop without executing anything
232        args = "--color yes --verbose %s" % dbname
233        oradrop.main(args.split())
234
235
236    @withbitbucket
237    def test_scripts_oradiff():
238        # Test oradiff (not really: we will not get any differences)
239        allargs = [
240            "--color yes --verbose %s %s" % (dbname, dbname),
241            "--color yes --verbose %s %s -mfull" % (dbname, dbname),
242        ]
243        for args in allargs:
244            oradiff.main(args.split())
245
246
247    @withbitbucket
248    def test_scripts_oramerge():
249        # Test oramerge (not really: we will not get any differences)
250        args = "--color yes --verbose %s %s %s" % (dbname, dbname, dbname)
251        oramerge.main(args.split())
252
253
254    @withbitbucket
255    def test_scripts_oragrant():
256        # Test oragrant
257        args = "--color yes %s" % dbname
258        oragrant.main(args.split())
259
260
261    @withbitbucket
262    def test_scripts_orafind():
263        # Test orafind
264        args = "--ignore-case --color yes %s foo" % dbname
265        orafind.main(args.split())
266
267
268    def test_callprocedure():
269        db = orasql.connect(dbname, readlobs=True)
270        proc = db.getobject("orasql_testprocedure")
271        result = proc(db.cursor(), c_user=u"py.test", p_in=u"abcÀöÌ", p_inout=u"abc"*10000)
272        assert result.p_in == u"abcÀöÌ"
273        assert result.p_out == u"ABCÄÖÜ"
274        assert result.p_inout == u"ABC"*10000 + u"abcÀöÌ"
275
276
277    def test_callfunction():
278        db = orasql.connect(dbname, readlobs=True)
279        func = db.getobject("orasql_testfunction")
280        (result, args) = func(db.cursor(), c_user=u"py.test", p_in=u"abcÀöÌ", p_inout=u"abc"*10000)
281        assert result == u"ABCÄÖÜ"
282        assert args.p_in == u"abcÀöÌ"
283        assert args.p_out == u"ABCÄÖÜ"
284        assert args.p_inout == u"ABC"*10000 + u"abcÀöÌ"
285
286
287    def test_fetch():
288        for obj in objdict:
289            if isinstance(obj, orasql.Table):
290                # fetch only a few records
291                db = orasql.connect(dbname)
292                c = db.cursor()
293                c.execute("select * from %s" % obj.name)
294                c.readlobs = False
295                c.fetchone()
296                c.execute("select * from %s" % obj.name)
297                c.readlobs = True
298                c.fetchone()
299                break
300   
Note: See TracBrowser for help on using the browser.