Installation du nouveau QBE ici
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
CommitLineData
5cf90361
PP
1# -*- coding: utf-8 -*-
2from django import forms
3from django.db import connections
4from django.db.models import get_model
5from django.db.models.fields import Field
6from django.core.urlresolvers import reverse, NoReverseMatch
7from django.conf import settings
8from django.forms.formsets import BaseFormSet, formset_factory
9from django.utils.importlib import import_module
10from django.utils.translation import ugettext as _
11
12from django_qbe.utils import get_models
13from django_qbe.widgets import CriteriaInput
14
15
16DATABASES = None
17try:
18 DATABASES = settings.DATABASES
19except AttributeError:
20 # Backwards compatibility for Django versions prior to 1.1.
21 DATABASES = {
22 'default': {
23 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
24 'NAME': settings.DATABASE_NAME,
25 }
26 }
27
28SORT_CHOICES = (
29 ("", ""),
30 ("asc", _("Ascending")),
31 ("des", _("Descending")),
32)
33
34
35class QueryByExampleForm(forms.Form):
36 show = forms.BooleanField(label=_("Show"), required=False)
37 model = forms.CharField(label=_("Model"))
38 field = forms.CharField(label=_("Field"))
39 criteria = forms.CharField(label=_("Criteria"), required=False)
40 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
41 required=False)
42
43 def __init__(self, *args, **kwargs):
44 super(QueryByExampleForm, self).__init__(*args, **kwargs)
45 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
46 self.fields['model'].widget = model_widget
47 sort_widget = forms.Select(attrs={'disabled': "disabled",
48 'class': 'submitIfChecked'},
49 choices=SORT_CHOICES)
50 self.fields['sort'].widget = sort_widget
51 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
52 self.fields['criteria'].widget = criteria_widget
53 criteria_widgets = getattr(criteria_widget, "widgets", [])
54 if criteria_widgets:
55 criteria_len = len(criteria_widgets)
56 criteria_names = ",".join([("criteria_%s" % s)
57 for s in range(0, criteria_len)])
58 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
59 else:
60 field_attr_class = "qbeFillFields enable:sort,criteria"
61 field_widget = forms.Select(attrs={'class': field_attr_class})
62 self.fields['field'].widget = field_widget
63
64 def clean_model(self):
65 model = self.cleaned_data['model']
66 return model.lower().replace(".", "_")
67
68 def clean_criteria(self):
69 criteria = self.cleaned_data['criteria']
70 try:
71 operator, over = eval(criteria, {}, {})
72 return (operator, over)
73 except:
74 return (None, None)
75
76
77class BaseQueryByExampleFormSet(BaseFormSet):
78 _selects = []
79 _froms = []
80 _wheres = []
81 _sorts = []
82 _params = []
83 _models = {}
84 _raw_query = None
85 _db_alias = "default"
86 _db_operators = {}
87 _db_table_names = []
88 _db_operations = None
89
90 def __init__(self, *args, **kwargs):
91 self._db_alias = kwargs.pop("using", "default")
92 self._db_connection = connections["default"]
93 database_properties = DATABASES.get(self._db_alias, "default")
94 module = database_properties['ENGINE']
95 try:
96 base_mod = import_module("%s.base" % module)
97 intros_mod = import_module("%s.introspection" % module)
98 except ImportError:
99 pass
100 if base_mod and intros_mod:
101 self._db_operators = base_mod.DatabaseWrapper.operators
102 DatabaseOperations = base_mod.DatabaseOperations
103 try:
104 self._db_operations = DatabaseOperations(self._db_connection)
105 except TypeError:
106 # Some engines have no params to instance DatabaseOperations
107 self._db_operations = DatabaseOperations()
108 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
109 django_table_names = intros_db.django_table_names()
110 table_names = intros_db.table_names()
111 self._db_table_names = list(django_table_names.union(table_names))
112 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
113
114 def clean(self):
115 """
116 Checks that there is almost one field to select
117 """
118 if any(self.errors):
119 # Don't bother validating the formset unless each form is valid on
120 # its own
121 return
122 selects, froms, wheres, sorts, params = self.get_query_parts()
123 if not selects:
124 validation_message = _(u"At least you must check a row to get.")
125 raise forms.ValidationError, validation_message
126 self._selects = selects
127 self._froms = froms
128 self._wheres = wheres
129 self._sorts = sorts
130 self._params = params
131
132 def get_query_parts(self):
133 """
134 Return SQL query for cleaned data
135 """
136 selects = []
137 froms = []
138 wheres = []
139 sorts = []
140 params = []
141 app_model_labels = None
142 lookup_cast = self._db_operations.lookup_cast
143 qn = self._db_operations.quote_name
144 uqn = self._unquote_name
145 for data in self.cleaned_data:
146 if not ("model" in data and "field" in data):
147 break
148 model = data["model"]
149 # HACK: Workaround to handle tables created
150 # by django for its own
151 if not app_model_labels:
152 app_models = get_models(include_auto_created=True,
153 include_deferred=True)
154 app_model_labels = [u"%s_%s" % (a._meta.app_label,
155 a._meta.module_name)
156 for a in app_models]
157 if model in app_model_labels:
158 position = app_model_labels.index(model)
159 model = app_models[position]._meta.db_table
160 self._models[model] = app_models[position]
161 field = data["field"]
162 show = data["show"]
163 criteria = data["criteria"]
164 sort = data["sort"]
165 db_field = u"%s.%s" % (qn(model), qn(field))
166 operator, over = criteria
167 is_join = operator.lower() == 'join'
168 if show and not is_join:
169 selects.append(db_field)
170 if sort:
171 sorts.append(db_field)
172 if all(criteria):
173 if is_join:
174 over_split = over.lower().rsplit(".", 1)
175 join_model = qn(over_split[0].replace(".", "_"))
176 join_field = qn(over_split[1])
177 if model in self._models:
178 _field = self._models[model]._meta.get_field(field)
179 join = u"%s.%s = %s.%s" \
180 % (join_model, join_field, qn(model),
181 qn(_field.db_column))
182 else:
183 join = u"%s.%s = %s" \
184 % (join_model, join_field,
185 u"%s_id" % db_field)
186 if (join not in wheres
187 and uqn(join_model) in self._db_table_names):
188 wheres.append(join)
189 if join_model not in froms:
190 froms.append(join_model)
191 # join_select = u"%s.%s" % (join_model, join_field)
192 # if join_select not in selects:
193 # selects.append(join_select)
194 elif operator in self._db_operators:
195 # db_operator = self._db_operators[operator] % over
196 db_operator = self._db_operators[operator]
197 lookup = self._get_lookup(operator, over)
198 params.append(lookup)
199 wheres.append(u"%s %s" \
200 % (lookup_cast(operator) % db_field,
201 db_operator))
202 if qn(model) not in froms and model in self._db_table_names:
203 froms.append(qn(model))
204 return selects, froms, wheres, sorts, params
205
206 def get_raw_query(self, limit=None, offset=None, count=False,
207 add_extra_ids=False, add_params=False):
208 if self._raw_query:
209 return self._raw_query
210 if self._sorts:
211 order_by = u"ORDER BY %s" % (", ".join(self._sorts))
212 else:
213 order_by = u""
214 if self._wheres:
215 wheres = u"WHERE %s" % (" AND ".join(self._wheres))
216 else:
217 wheres = u""
218 if count:
219 selects = (u"COUNT(*) as count", )
220 order_by = u""
221 elif add_extra_ids:
222 selects = self._get_selects_with_extra_ids()
223 else:
224 selects = self._selects
225 limits = u""
226 if limit:
227 try:
228 limits = u"LIMIT %s" % int(limit)
229 except ValueError:
230 pass
231 offsets = u""
232 if offset:
233 try:
234 offsets = u"OFFSET %s" % int(offset)
235 except ValueError:
236 pass
237 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
238 % (", ".join(selects),
239 ", ".join(self._froms),
240 wheres,
241 order_by,
242 limits,
243 offsets)
244 if add_params:
245 return u"%s /* %s */" % (sql, ", ".join(self._params))
246 else:
247 return sql
248
249 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
250 row_number=False):
251 """
252 Fetch all results after perform SQL query and
253 """
254 add_extra_ids = (admin_name != None)
255 if not query:
256 sql = self.get_raw_query(limit=limit, offset=offset,
257 add_extra_ids=add_extra_ids)
258 else:
259 sql = query
260 if settings.DEBUG:
261 print sql
262 cursor = self._db_connection.cursor()
263 cursor.execute(sql, tuple(self._params))
264 query_results = cursor.fetchall()
265 if admin_name:
266 selects = self._get_selects_with_extra_ids()
267 results = []
268 try:
269 offset = int(offset)
270 except ValueError:
271 offset = 0
272 for r, row in enumerate(query_results):
273 i = 0
274 l = len(row)
275 if row_number:
276 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
277 else:
278 result = []
279 while i < l:
280 appmodel, field = selects[i].split(".")
281 appmodel = self._unquote_name(appmodel)
282 field = self._unquote_name(field)
283 try:
284 if appmodel in self._models:
285 _model = self._models[appmodel]
286 _appmodel = u"%s_%s" % (_model._meta.app_label,
287 _model._meta.module_name)
288 else:
289 _appmodel = appmodel
290 admin_url = reverse("%s:%s_change" % (admin_name,
291 _appmodel),
292 args=[row[i + 1]])
293 except NoReverseMatch:
294 admin_url = None
295 result.append((row[i], admin_url))
296 i += 2
297 results.append(result)
298 return results
299 else:
300 if row_number:
301 results = []
302 for r, row in enumerate(query_results):
303 result = [r + 1]
304 for cell in row:
305 result.append(cell)
306 results.append(result)
307 return results
308 else:
309 return query_results
310
311 def get_count(self):
312 query = self.get_raw_query(count=True)
313 results = self.get_results(query=query)
314 if results:
315 return float(results[0][0])
316 else:
317 return len(self.get_results())
318
319 def get_labels(self, add_extra_ids=False, row_number=False):
320 if row_number:
321 labels = [_(u"#")]
322 else:
323 labels = []
324 if add_extra_ids:
325 selects = self._get_selects_with_extra_ids()
326 else:
327 selects = self._selects
328 if selects and isinstance(selects, (tuple, list)):
329 for select in selects:
330 label_splits = select.replace("`", "").replace("_", ".").split(".")
331 label_field = " ".join(label_splits[2:])
332 model = get_model(label_splits[0], label_splits[1])
333 label = model._meta.get_field_by_name(label_field)[0].verbose_name
334 labels.append(label.capitalize())
335 return labels
336
337 def _unquote_name(self, name):
338 quoted_space = self._db_operations.quote_name("")
339 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
340 return name[1:-1]
341 return name
342
343 def _get_lookup(self, operator, over):
344 lookup = Field().get_db_prep_lookup(operator, over,
345 connection=self._db_connection,
346 prepared=True)
347 if isinstance(lookup, (tuple, list)):
348 return lookup[0]
349 return lookup
350
351 def _get_selects_with_extra_ids(self):
352 qn = self._db_operations.quote_name
353 selects = []
354 for select in self._selects:
355 appmodel, field = select.split(".")
356 appmodel = self._unquote_name(appmodel)
357 field = self._unquote_name(field)
358 selects.append(select)
359 if appmodel in self._models:
360 pk_name = self._models[appmodel]._meta.pk.name
361 else:
362 pk_name = u"id"
363 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
364 return selects
365
366QueryByExampleFormSet = formset_factory(QueryByExampleForm,
367 formset=BaseQueryByExampleFormSet,
368 extra=1,
369 can_delete=True)