Premiere version : mise en route du suivi.
[auf_roundup.git] / build / lib / roundup / backends / back_postgresql.py
1 #$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $
2 #
3 # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt>
4 #
5 # This module is free software, and you may redistribute it and/or modify
6 # under the same terms as Python, so long as this copyright message and
7 # disclaimer are retained in their original form.
8 #
9 '''Postgresql backend via psycopg for Roundup.'''
10 __docformat__ = 'restructuredtext'
11
12 import os, shutil, time
13 try:
14 import psycopg
15 from psycopg import QuotedString
16 from psycopg import ProgrammingError
17 except:
18 from psycopg2 import psycopg1 as psycopg
19 from psycopg2.extensions import QuotedString
20 from psycopg2.psycopg1 import ProgrammingError
21 import logging
22
23 from roundup import hyperdb, date
24 from roundup.backends import rdbms_common
25 from roundup.backends import sessions_rdbms
26
27 def connection_dict(config, dbnamestr=None):
28 ''' read_default_group is MySQL-specific, ignore it '''
29 d = rdbms_common.connection_dict(config, dbnamestr)
30 if 'read_default_group' in d:
31 del d['read_default_group']
32 if 'read_default_file' in d:
33 del d['read_default_file']
34 return d
35
36 def db_create(config):
37 """Clear all database contents and drop database itself"""
38 command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME
39 logging.getLogger('hyperdb').info(command)
40 db_command(config, command)
41
42 def db_nuke(config, fail_ok=0):
43 """Clear all database contents and drop database itself"""
44 command = 'DROP DATABASE %s'% config.RDBMS_NAME
45 logging.getLogger('hyperdb').info(command)
46 db_command(config, command)
47
48 if os.path.exists(config.DATABASE):
49 shutil.rmtree(config.DATABASE)
50
51 def db_command(config, command):
52 '''Perform some sort of database-level command. Retry 10 times if we
53 fail by conflicting with another user.
54 '''
55 template1 = connection_dict(config)
56 template1['database'] = 'template1'
57
58 try:
59 conn = psycopg.connect(**template1)
60 except psycopg.OperationalError, message:
61 raise hyperdb.DatabaseError(message)
62
63 conn.set_isolation_level(0)
64 cursor = conn.cursor()
65 try:
66 for n in range(10):
67 if pg_command(cursor, command):
68 return
69 finally:
70 conn.close()
71 raise RuntimeError('10 attempts to create database failed')
72
73 def pg_command(cursor, command):
74 '''Execute the postgresql command, which may be blocked by some other
75 user connecting to the database, and return a true value if it succeeds.
76
77 If there is a concurrent update, retry the command.
78 '''
79 try:
80 cursor.execute(command)
81 except psycopg.ProgrammingError, err:
82 response = str(err).split('\n')[0]
83 if response.find('FATAL') != -1:
84 raise RuntimeError(response)
85 else:
86 msgs = [
87 'is being accessed by other users',
88 'could not serialize access due to concurrent update',
89 ]
90 can_retry = 0
91 for msg in msgs:
92 if response.find(msg) == -1:
93 can_retry = 1
94 if can_retry:
95 time.sleep(1)
96 return 0
97 raise RuntimeError(response)
98 return 1
99
100 def db_exists(config):
101 """Check if database already exists"""
102 db = connection_dict(config, 'database')
103 try:
104 conn = psycopg.connect(**db)
105 conn.close()
106 return 1
107 except:
108 return 0
109
110 class Sessions(sessions_rdbms.Sessions):
111 def set(self, *args, **kwargs):
112 try:
113 sessions_rdbms.Sessions.set(self, *args, **kwargs)
114 except ProgrammingError, err:
115 response = str(err).split('\n')[0]
116 if -1 != response.find('ERROR') and \
117 -1 != response.find('could not serialize access due to concurrent update'):
118 # another client just updated, and we're running on
119 # serializable isolation.
120 # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html
121 self.db.rollback()
122
123 class Database(rdbms_common.Database):
124 arg = '%s'
125
126 # used by some code to switch styles of query
127 implements_intersect = 1
128
129 def getSessionManager(self):
130 return Sessions(self)
131
132 def sql_open_connection(self):
133 db = connection_dict(self.config, 'database')
134 logging.getLogger('hyperdb').info('open database %r'%db['database'])
135 try:
136 conn = psycopg.connect(**db)
137 except psycopg.OperationalError, message:
138 raise hyperdb.DatabaseError(message)
139
140 cursor = conn.cursor()
141
142 return (conn, cursor)
143
144 def open_connection(self):
145 if not db_exists(self.config):
146 db_create(self.config)
147
148 self.conn, self.cursor = self.sql_open_connection()
149
150 try:
151 self.load_dbschema()
152 except psycopg.ProgrammingError, message:
153 if str(message).find('schema') == -1:
154 raise
155 self.rollback()
156 self.init_dbschema()
157 self.sql("CREATE TABLE schema (schema TEXT)")
158 self.sql("CREATE TABLE dual (dummy integer)")
159 self.sql("insert into dual values (1)")
160 self.create_version_2_tables()
161
162 def create_version_2_tables(self):
163 # OTK store
164 self.sql('''CREATE TABLE otks (otk_key VARCHAR(255),
165 otk_value TEXT, otk_time REAL)''')
166 self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)')
167
168 # Sessions store
169 self.sql('''CREATE TABLE sessions (
170 session_key VARCHAR(255), session_time REAL,
171 session_value TEXT)''')
172 self.sql('''CREATE INDEX sessions_key_idx ON
173 sessions(session_key)''')
174
175 # full-text indexing store
176 self.sql('CREATE SEQUENCE ___textids_ids')
177 self.sql('''CREATE TABLE __textids (
178 _textid integer primary key, _class VARCHAR(255),
179 _itemid VARCHAR(255), _prop VARCHAR(255))''')
180 self.sql('''CREATE TABLE __words (_word VARCHAR(30),
181 _textid integer)''')
182 self.sql('CREATE INDEX words_word_idx ON __words(_word)')
183 self.sql('CREATE INDEX words_by_id ON __words (_textid)')
184 self.sql('CREATE UNIQUE INDEX __textids_by_props ON '
185 '__textids (_class, _itemid, _prop)')
186
187 def fix_version_2_tables(self):
188 # Convert journal date column to TIMESTAMP, params column to TEXT
189 self._convert_journal_tables()
190
191 # Convert all String properties to TEXT
192 self._convert_string_properties()
193
194 # convert session / OTK *_time columns to REAL
195 for name in ('otk', 'session'):
196 self.sql('drop index %ss_key_idx'%name)
197 self.sql('drop table %ss'%name)
198 self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255),
199 %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name,
200 name))
201 self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name,
202 name))
203
204 def fix_version_3_tables(self):
205 rdbms_common.Database.fix_version_3_tables(self)
206 self.sql('''CREATE INDEX words_both_idx ON public.__words
207 USING btree (_word, _textid)''')
208
209 def add_actor_column(self):
210 # update existing tables to have the new actor column
211 tables = self.database_schema['tables']
212 for name in tables:
213 self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name)
214
215 def __repr__(self):
216 return '<roundpsycopgsql 0x%x>' % id(self)
217
218 def sql_commit(self, fail_ok=False):
219 ''' Actually commit to the database.
220 '''
221 logging.getLogger('hyperdb').info('commit')
222
223 try:
224 self.conn.commit()
225 except psycopg.ProgrammingError, message:
226 # we've been instructed that this commit is allowed to fail
227 if fail_ok and str(message).endswith('could not serialize '
228 'access due to concurrent update'):
229 logging.getLogger('hyperdb').info('commit FAILED, but fail_ok')
230 else:
231 raise
232
233 # open a new cursor for subsequent work
234 self.cursor = self.conn.cursor()
235
236 def sql_stringquote(self, value):
237 ''' psycopg.QuotedString returns a "buffer" object with the
238 single-quotes around it... '''
239 return str(QuotedString(str(value)))[1:-1]
240
241 def sql_index_exists(self, table_name, index_name):
242 sql = 'select count(*) from pg_indexes where ' \
243 'tablename=%s and indexname=%s'%(self.arg, self.arg)
244 self.sql(sql, (table_name, index_name))
245 return self.cursor.fetchone()[0]
246
247 def create_class_table(self, spec, create_sequence=1):
248 if create_sequence:
249 sql = 'CREATE SEQUENCE _%s_ids'%spec.classname
250 self.sql(sql)
251
252 return rdbms_common.Database.create_class_table(self, spec)
253
254 def drop_class_table(self, cn):
255 sql = 'drop table _%s'%cn
256 self.sql(sql)
257
258 sql = 'drop sequence _%s_ids'%cn
259 self.sql(sql)
260
261 def newid(self, classname):
262 sql = "select nextval('_%s_ids') from dual"%classname
263 self.sql(sql)
264 return str(self.cursor.fetchone()[0])
265
266 def setid(self, classname, setid):
267 sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid))
268 self.sql(sql)
269
270 def clear(self):
271 rdbms_common.Database.clear(self)
272
273 # reset the sequences
274 for cn in self.classes:
275 self.cursor.execute('DROP SEQUENCE _%s_ids'%cn)
276 self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn)
277
278 class PostgresqlClass:
279 order_by_null_values = '(%s is not NULL)'
280
281 class Class(PostgresqlClass, rdbms_common.Class):
282 pass
283 class IssueClass(PostgresqlClass, rdbms_common.IssueClass):
284 pass
285 class FileClass(PostgresqlClass, rdbms_common.FileClass):
286 pass
287
288 # vim: set et sts=4 sw=4 :