Premiere version : mise en route du suivi.
[auf_roundup.git] / roundup / backends / .svn / text-base / back_postgresql.py.svn-base
1 #$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
2 #
3 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
4 #
5 # This module is free software, and you may redistribute it and/or modify
6 # under the same terms as Python, so long as this copyright message and
7 # disclaimer are retained in their original form.
8 #
9 '''Postgresql backend via psycopg for Roundup.'''
10 __docformat__ = 'restructuredtext'
11
12 import os, shutil, time
13 try:
14     import psycopg
15     from psycopg import QuotedString
16     from psycopg import ProgrammingError
17 except:
18     from psycopg2 import psycopg1 as psycopg
19     from psycopg2.extensions import QuotedString
20     from psycopg2.psycopg1 import ProgrammingError
21 import logging
22
23 from roundup import hyperdb, date
24 from roundup.backends import rdbms_common
25 from roundup.backends import sessions_rdbms
26
27 def connection_dict(config, dbnamestr=None):
28     ''' read_default_group is MySQL-specific, ignore it '''
29     d = rdbms_common.connection_dict(config, dbnamestr)
30     if 'read_default_group' in d:
31         del d['read_default_group']
32     if 'read_default_file' in d:
33         del d['read_default_file']
34     return d
35
36 def db_create(config):
37     """Clear all database contents and drop database itself"""
38     command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
39     logging.getLogger('hyperdb').info(command)
40     db_command(config, command)
41
42 def db_nuke(config, fail_ok=0):
43     """Clear all database contents and drop database itself"""
44     command = 'DROP DATABASE %s'% config.RDBMS_NAME
45     logging.getLogger('hyperdb').info(command)
46     db_command(config, command)
47
48     if os.path.exists(config.DATABASE):
49         shutil.rmtree(config.DATABASE)
50
51 def db_command(config, command):
52     '''Perform some sort of database-level command. Retry 10 times if we
53     fail by conflicting with another user.
54     '''
55     template1 = connection_dict(config)
56     template1['database'] = 'template1'
57
58     try:
59         conn = psycopg.connect(**template1)
60     except psycopg.OperationalError, message:
61         raise hyperdb.DatabaseError(message)
62
63     conn.set_isolation_level(0)
64     cursor = conn.cursor()
65     try:
66         for n in range(10):
67             if pg_command(cursor, command):
68                 return
69     finally:
70         conn.close()
71     raise RuntimeError('10 attempts to create database failed')
72
73 def pg_command(cursor, command):
74     '''Execute the postgresql command, which may be blocked by some other
75     user connecting to the database, and return a true value if it succeeds.
76
77     If there is a concurrent update, retry the command.
78     '''
79     try:
80         cursor.execute(command)
81     except psycopg.ProgrammingError, err:
82         response = str(err).split('\n')[0]
83         if response.find('FATAL') != -1:
84             raise RuntimeError(response)
85         else:
86             msgs = [
87                 'is being accessed by other users',
88                 'could not serialize access due to concurrent update',
89             ]
90             can_retry = 0
91             for msg in msgs:
92                 if response.find(msg) == -1:
93                     can_retry = 1
94             if can_retry:
95                 time.sleep(1)
96                 return 0
97             raise RuntimeError(response)
98     return 1
99
100 def db_exists(config):
101     """Check if database already exists"""
102     db = connection_dict(config, 'database')
103     try:
104         conn = psycopg.connect(**db)
105         conn.close()
106         return 1
107     except:
108         return 0
109
110 class Sessions(sessions_rdbms.Sessions):
111     def set(self, *args, **kwargs):
112         try:
113             sessions_rdbms.Sessions.set(self, *args, **kwargs)
114         except ProgrammingError, err:
115             response = str(err).split('\n')[0]
116             if -1 != response.find('ERROR') and \
117                -1 != response.find('could not serialize access due to concurrent update'):
118                 # another client just updated, and we're running on
119                 # serializable isolation.
120                 # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
121                 self.db.rollback()
122
123 class Database(rdbms_common.Database):
124     arg = '%s'
125
126     # used by some code to switch styles of query
127     implements_intersect = 1
128
129     def getSessionManager(self):
130         return Sessions(self)
131
132     def sql_open_connection(self):
133         db = connection_dict(self.config, 'database')
134         logging.getLogger('hyperdb').info('open database %r'%db['database'])
135         try:
136             conn = psycopg.connect(**db)
137         except psycopg.OperationalError, message:
138             raise hyperdb.DatabaseError(message)
139
140         cursor = conn.cursor()
141
142         return (conn, cursor)
143
144     def open_connection(self):
145         if not db_exists(self.config):
146             db_create(self.config)
147
148         self.conn, self.cursor = self.sql_open_connection()
149
150         try:
151             self.load_dbschema()
152         except psycopg.ProgrammingError, message:
153             if str(message).find('schema') == -1:
154                 raise
155             self.rollback()
156             self.init_dbschema()
157             self.sql("CREATE TABLE schema (schema TEXT)")
158             self.sql("CREATE TABLE dual (dummy integer)")
159             self.sql("insert into dual values (1)")
160             self.create_version_2_tables()
161
162     def create_version_2_tables(self):
163         # OTK store
164         self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
165             otk_value TEXT, otk_time REAL)''')
166         self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
167
168         # Sessions store
169         self.sql('''CREATE TABLE sessions (
170             session_key VARCHAR(255), session_time REAL,
171             session_value TEXT)''')
172         self.sql('''CREATE INDEX sessions_key_idx ON
173             sessions(session_key)''')
174
175         # full-text indexing store
176         self.sql('CREATE SEQUENCE ___textids_ids')
177         self.sql('''CREATE TABLE __textids (
178             _textid integer primary key, _class VARCHAR(255),
179             _itemid VARCHAR(255), _prop VARCHAR(255))''')
180         self.sql('''CREATE TABLE __words (_word VARCHAR(30),
181             _textid integer)''')
182         self.sql('CREATE INDEX words_word_idx ON __words(_word)')
183         self.sql('CREATE INDEX words_by_id ON __words (_textid)')
184         self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
185                  '__textids (_class, _itemid, _prop)')
186
187     def fix_version_2_tables(self):
188         # Convert journal date column to TIMESTAMP, params column to TEXT
189         self._convert_journal_tables()
190
191         # Convert all String properties to TEXT
192         self._convert_string_properties()
193
194         # convert session / OTK *_time columns to REAL
195         for name in ('otk', 'session'):
196             self.sql('drop index %ss_key_idx'%name)
197             self.sql('drop table %ss'%name)
198             self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
199                 %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
200                 name))
201             self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
202                 name))
203
204     def fix_version_3_tables(self):
205         rdbms_common.Database.fix_version_3_tables(self)
206         self.sql('''CREATE INDEX words_both_idx ON public.__words
207             USING btree (_word, _textid)''')
208
209     def add_actor_column(self):
210         # update existing tables to have the new actor column
211         tables = self.database_schema['tables']
212         for name in tables:
213             self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
214
215     def __repr__(self):
216         return '<roundpsycopgsql 0x%x>' % id(self)
217
218     def sql_commit(self, fail_ok=False):
219         ''' Actually commit to the database.
220         '''
221         logging.getLogger('hyperdb').info('commit')
222
223         try:
224             self.conn.commit()
225         except psycopg.ProgrammingError, message:
226             # we've been instructed that this commit is allowed to fail
227             if fail_ok and str(message).endswith('could not serialize '
228                     'access due to concurrent update'):
229                 logging.getLogger('hyperdb').info('commit FAILED, but fail_ok')
230             else:
231                 raise
232
233         # open a new cursor for subsequent work
234         self.cursor = self.conn.cursor()
235
236     def sql_stringquote(self, value):
237         ''' psycopg.QuotedString returns a "buffer" object with the
238             single-quotes around it... '''
239         return str(QuotedString(str(value)))[1:-1]
240
241     def sql_index_exists(self, table_name, index_name):
242         sql = 'select count(*) from pg_indexes where ' \
243             'tablename=%s and indexname=%s'%(self.arg, self.arg)
244         self.sql(sql, (table_name, index_name))
245         return self.cursor.fetchone()[0]
246
247     def create_class_table(self, spec, create_sequence=1):
248         if create_sequence:
249             sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
250             self.sql(sql)
251
252         return rdbms_common.Database.create_class_table(self, spec)
253
254     def drop_class_table(self, cn):
255         sql = 'drop table _%s'%cn
256         self.sql(sql)
257
258         sql = 'drop sequence _%s_ids'%cn
259         self.sql(sql)
260
261     def newid(self, classname):
262         sql = "select nextval('_%s_ids') from dual"%classname
263         self.sql(sql)
264         return str(self.cursor.fetchone()[0])
265
266     def setid(self, classname, setid):
267         sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
268         self.sql(sql)
269
270     def clear(self):
271         rdbms_common.Database.clear(self)
272
273         # reset the sequences
274         for cn in self.classes:
275             self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
276             self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
277
278 class PostgresqlClass:
279     order_by_null_values = '(%s is not NULL)'
280
281 class Class(PostgresqlClass, rdbms_common.Class):
282     pass
283 class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
284     pass
285 class FileClass(PostgresqlClass, rdbms_common.FileClass):
286     pass
287
288 # vim: set et sts=4 sw=4 :