Commit | Line | Data |
---|---|---|
c638d827 CR |
1 | #$Id: back_postgresql.py,v 1.44 2008-08-07 05:50:03 richard Exp $ |
2 | # | |
3 | # Copyright (c) 2003 Martynas Sklyzmantas, Andrey Lebedev <andrey@micro.lt> | |
4 | # | |
5 | # This module is free software, and you may redistribute it and/or modify | |
6 | # under the same terms as Python, so long as this copyright message and | |
7 | # disclaimer are retained in their original form. | |
8 | # | |
9 | '''Postgresql backend via psycopg for Roundup.''' | |
10 | __docformat__ = 'restructuredtext' | |
11 | ||
12 | import os, shutil, time | |
13 | try: | |
14 | import psycopg | |
15 | from psycopg import QuotedString | |
16 | from psycopg import ProgrammingError | |
17 | except: | |
18 | from psycopg2 import psycopg1 as psycopg | |
19 | from psycopg2.extensions import QuotedString | |
20 | from psycopg2.psycopg1 import ProgrammingError | |
21 | import logging | |
22 | ||
23 | from roundup import hyperdb, date | |
24 | from roundup.backends import rdbms_common | |
25 | from roundup.backends import sessions_rdbms | |
26 | ||
27 | def connection_dict(config, dbnamestr=None): | |
28 | ''' read_default_group is MySQL-specific, ignore it ''' | |
29 | d = rdbms_common.connection_dict(config, dbnamestr) | |
30 | if 'read_default_group' in d: | |
31 | del d['read_default_group'] | |
32 | if 'read_default_file' in d: | |
33 | del d['read_default_file'] | |
34 | return d | |
35 | ||
36 | def db_create(config): | |
37 | """Clear all database contents and drop database itself""" | |
38 | command = "CREATE DATABASE %s WITH ENCODING='UNICODE'"%config.RDBMS_NAME | |
39 | logging.getLogger('hyperdb').info(command) | |
40 | db_command(config, command) | |
41 | ||
42 | def db_nuke(config, fail_ok=0): | |
43 | """Clear all database contents and drop database itself""" | |
44 | command = 'DROP DATABASE %s'% config.RDBMS_NAME | |
45 | logging.getLogger('hyperdb').info(command) | |
46 | db_command(config, command) | |
47 | ||
48 | if os.path.exists(config.DATABASE): | |
49 | shutil.rmtree(config.DATABASE) | |
50 | ||
51 | def db_command(config, command): | |
52 | '''Perform some sort of database-level command. Retry 10 times if we | |
53 | fail by conflicting with another user. | |
54 | ''' | |
55 | template1 = connection_dict(config) | |
56 | template1['database'] = 'template1' | |
57 | ||
58 | try: | |
59 | conn = psycopg.connect(**template1) | |
60 | except psycopg.OperationalError, message: | |
61 | raise hyperdb.DatabaseError(message) | |
62 | ||
63 | conn.set_isolation_level(0) | |
64 | cursor = conn.cursor() | |
65 | try: | |
66 | for n in range(10): | |
67 | if pg_command(cursor, command): | |
68 | return | |
69 | finally: | |
70 | conn.close() | |
71 | raise RuntimeError('10 attempts to create database failed') | |
72 | ||
73 | def pg_command(cursor, command): | |
74 | '''Execute the postgresql command, which may be blocked by some other | |
75 | user connecting to the database, and return a true value if it succeeds. | |
76 | ||
77 | If there is a concurrent update, retry the command. | |
78 | ''' | |
79 | try: | |
80 | cursor.execute(command) | |
81 | except psycopg.ProgrammingError, err: | |
82 | response = str(err).split('\n')[0] | |
83 | if response.find('FATAL') != -1: | |
84 | raise RuntimeError(response) | |
85 | else: | |
86 | msgs = [ | |
87 | 'is being accessed by other users', | |
88 | 'could not serialize access due to concurrent update', | |
89 | ] | |
90 | can_retry = 0 | |
91 | for msg in msgs: | |
92 | if response.find(msg) == -1: | |
93 | can_retry = 1 | |
94 | if can_retry: | |
95 | time.sleep(1) | |
96 | return 0 | |
97 | raise RuntimeError(response) | |
98 | return 1 | |
99 | ||
100 | def db_exists(config): | |
101 | """Check if database already exists""" | |
102 | db = connection_dict(config, 'database') | |
103 | try: | |
104 | conn = psycopg.connect(**db) | |
105 | conn.close() | |
106 | return 1 | |
107 | except: | |
108 | return 0 | |
109 | ||
110 | class Sessions(sessions_rdbms.Sessions): | |
111 | def set(self, *args, **kwargs): | |
112 | try: | |
113 | sessions_rdbms.Sessions.set(self, *args, **kwargs) | |
114 | except ProgrammingError, err: | |
115 | response = str(err).split('\n')[0] | |
116 | if -1 != response.find('ERROR') and \ | |
117 | -1 != response.find('could not serialize access due to concurrent update'): | |
118 | # another client just updated, and we're running on | |
119 | # serializable isolation. | |
120 | # see http://www.postgresql.org/docs/7.4/interactive/transaction-iso.html | |
121 | self.db.rollback() | |
122 | ||
123 | class Database(rdbms_common.Database): | |
124 | arg = '%s' | |
125 | ||
126 | # used by some code to switch styles of query | |
127 | implements_intersect = 1 | |
128 | ||
129 | def getSessionManager(self): | |
130 | return Sessions(self) | |
131 | ||
132 | def sql_open_connection(self): | |
133 | db = connection_dict(self.config, 'database') | |
134 | logging.getLogger('hyperdb').info('open database %r'%db['database']) | |
135 | try: | |
136 | conn = psycopg.connect(**db) | |
137 | except psycopg.OperationalError, message: | |
138 | raise hyperdb.DatabaseError(message) | |
139 | ||
140 | cursor = conn.cursor() | |
141 | ||
142 | return (conn, cursor) | |
143 | ||
144 | def open_connection(self): | |
145 | if not db_exists(self.config): | |
146 | db_create(self.config) | |
147 | ||
148 | self.conn, self.cursor = self.sql_open_connection() | |
149 | ||
150 | try: | |
151 | self.load_dbschema() | |
152 | except psycopg.ProgrammingError, message: | |
153 | if str(message).find('schema') == -1: | |
154 | raise | |
155 | self.rollback() | |
156 | self.init_dbschema() | |
157 | self.sql("CREATE TABLE schema (schema TEXT)") | |
158 | self.sql("CREATE TABLE dual (dummy integer)") | |
159 | self.sql("insert into dual values (1)") | |
160 | self.create_version_2_tables() | |
161 | ||
162 | def create_version_2_tables(self): | |
163 | # OTK store | |
164 | self.sql('''CREATE TABLE otks (otk_key VARCHAR(255), | |
165 | otk_value TEXT, otk_time REAL)''') | |
166 | self.sql('CREATE INDEX otks_key_idx ON otks(otk_key)') | |
167 | ||
168 | # Sessions store | |
169 | self.sql('''CREATE TABLE sessions ( | |
170 | session_key VARCHAR(255), session_time REAL, | |
171 | session_value TEXT)''') | |
172 | self.sql('''CREATE INDEX sessions_key_idx ON | |
173 | sessions(session_key)''') | |
174 | ||
175 | # full-text indexing store | |
176 | self.sql('CREATE SEQUENCE ___textids_ids') | |
177 | self.sql('''CREATE TABLE __textids ( | |
178 | _textid integer primary key, _class VARCHAR(255), | |
179 | _itemid VARCHAR(255), _prop VARCHAR(255))''') | |
180 | self.sql('''CREATE TABLE __words (_word VARCHAR(30), | |
181 | _textid integer)''') | |
182 | self.sql('CREATE INDEX words_word_idx ON __words(_word)') | |
183 | self.sql('CREATE INDEX words_by_id ON __words (_textid)') | |
184 | self.sql('CREATE UNIQUE INDEX __textids_by_props ON ' | |
185 | '__textids (_class, _itemid, _prop)') | |
186 | ||
187 | def fix_version_2_tables(self): | |
188 | # Convert journal date column to TIMESTAMP, params column to TEXT | |
189 | self._convert_journal_tables() | |
190 | ||
191 | # Convert all String properties to TEXT | |
192 | self._convert_string_properties() | |
193 | ||
194 | # convert session / OTK *_time columns to REAL | |
195 | for name in ('otk', 'session'): | |
196 | self.sql('drop index %ss_key_idx'%name) | |
197 | self.sql('drop table %ss'%name) | |
198 | self.sql('''CREATE TABLE %ss (%s_key VARCHAR(255), | |
199 | %s_value VARCHAR(255), %s_time REAL)'''%(name, name, name, | |
200 | name)) | |
201 | self.sql('CREATE INDEX %ss_key_idx ON %ss(%s_key)'%(name, name, | |
202 | name)) | |
203 | ||
204 | def fix_version_3_tables(self): | |
205 | rdbms_common.Database.fix_version_3_tables(self) | |
206 | self.sql('''CREATE INDEX words_both_idx ON public.__words | |
207 | USING btree (_word, _textid)''') | |
208 | ||
209 | def add_actor_column(self): | |
210 | # update existing tables to have the new actor column | |
211 | tables = self.database_schema['tables'] | |
212 | for name in tables: | |
213 | self.sql('ALTER TABLE _%s add __actor VARCHAR(255)'%name) | |
214 | ||
215 | def __repr__(self): | |
216 | return '<roundpsycopgsql 0x%x>' % id(self) | |
217 | ||
218 | def sql_commit(self, fail_ok=False): | |
219 | ''' Actually commit to the database. | |
220 | ''' | |
221 | logging.getLogger('hyperdb').info('commit') | |
222 | ||
223 | try: | |
224 | self.conn.commit() | |
225 | except psycopg.ProgrammingError, message: | |
226 | # we've been instructed that this commit is allowed to fail | |
227 | if fail_ok and str(message).endswith('could not serialize ' | |
228 | 'access due to concurrent update'): | |
229 | logging.getLogger('hyperdb').info('commit FAILED, but fail_ok') | |
230 | else: | |
231 | raise | |
232 | ||
233 | # open a new cursor for subsequent work | |
234 | self.cursor = self.conn.cursor() | |
235 | ||
236 | def sql_stringquote(self, value): | |
237 | ''' psycopg.QuotedString returns a "buffer" object with the | |
238 | single-quotes around it... ''' | |
239 | return str(QuotedString(str(value)))[1:-1] | |
240 | ||
241 | def sql_index_exists(self, table_name, index_name): | |
242 | sql = 'select count(*) from pg_indexes where ' \ | |
243 | 'tablename=%s and indexname=%s'%(self.arg, self.arg) | |
244 | self.sql(sql, (table_name, index_name)) | |
245 | return self.cursor.fetchone()[0] | |
246 | ||
247 | def create_class_table(self, spec, create_sequence=1): | |
248 | if create_sequence: | |
249 | sql = 'CREATE SEQUENCE _%s_ids'%spec.classname | |
250 | self.sql(sql) | |
251 | ||
252 | return rdbms_common.Database.create_class_table(self, spec) | |
253 | ||
254 | def drop_class_table(self, cn): | |
255 | sql = 'drop table _%s'%cn | |
256 | self.sql(sql) | |
257 | ||
258 | sql = 'drop sequence _%s_ids'%cn | |
259 | self.sql(sql) | |
260 | ||
261 | def newid(self, classname): | |
262 | sql = "select nextval('_%s_ids') from dual"%classname | |
263 | self.sql(sql) | |
264 | return str(self.cursor.fetchone()[0]) | |
265 | ||
266 | def setid(self, classname, setid): | |
267 | sql = "select setval('_%s_ids', %s) from dual"%(classname, int(setid)) | |
268 | self.sql(sql) | |
269 | ||
270 | def clear(self): | |
271 | rdbms_common.Database.clear(self) | |
272 | ||
273 | # reset the sequences | |
274 | for cn in self.classes: | |
275 | self.cursor.execute('DROP SEQUENCE _%s_ids'%cn) | |
276 | self.cursor.execute('CREATE SEQUENCE _%s_ids'%cn) | |
277 | ||
278 | class PostgresqlClass: | |
279 | order_by_null_values = '(%s is not NULL)' | |
280 | ||
281 | class Class(PostgresqlClass, rdbms_common.Class): | |
282 | pass | |
283 | class IssueClass(PostgresqlClass, rdbms_common.IssueClass): | |
284 | pass | |
285 | class FileClass(PostgresqlClass, rdbms_common.FileClass): | |
286 | pass | |
287 | ||
288 | # vim: set et sts=4 sw=4 : |