[#3328] QBD: Tri décroissant ne fonctionne pas
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
CommitLineData
5cf90361
PP
1# -*- coding: utf-8 -*-
2from django import forms
3from django.db import connections
4from django.db.models import get_model
5from django.db.models.fields import Field
6from django.core.urlresolvers import reverse, NoReverseMatch
7from django.conf import settings
8from django.forms.formsets import BaseFormSet, formset_factory
9from django.utils.importlib import import_module
10from django.utils.translation import ugettext as _
6dc0bee7 11from django.contrib.contenttypes.models import ContentType
5cf90361
PP
12
13from django_qbe.utils import get_models
14from django_qbe.widgets import CriteriaInput
15
16
17DATABASES = None
18try:
19 DATABASES = settings.DATABASES
20except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
22 DATABASES = {
23 'default': {
24 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
25 'NAME': settings.DATABASE_NAME,
26 }
27 }
28
29SORT_CHOICES = (
30 ("", ""),
31 ("asc", _("Ascending")),
5feb86b7 32 ("desc", _("Descending")),
5cf90361
PP
33)
34
35
36class QueryByExampleForm(forms.Form):
37 show = forms.BooleanField(label=_("Show"), required=False)
38 model = forms.CharField(label=_("Model"))
39 field = forms.CharField(label=_("Field"))
40 criteria = forms.CharField(label=_("Criteria"), required=False)
41 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
42 required=False)
43
44 def __init__(self, *args, **kwargs):
45 super(QueryByExampleForm, self).__init__(*args, **kwargs)
46 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
47 self.fields['model'].widget = model_widget
48 sort_widget = forms.Select(attrs={'disabled': "disabled",
49 'class': 'submitIfChecked'},
50 choices=SORT_CHOICES)
51 self.fields['sort'].widget = sort_widget
52 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
53 self.fields['criteria'].widget = criteria_widget
54 criteria_widgets = getattr(criteria_widget, "widgets", [])
55 if criteria_widgets:
56 criteria_len = len(criteria_widgets)
57 criteria_names = ",".join([("criteria_%s" % s)
58 for s in range(0, criteria_len)])
59 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
60 else:
61 field_attr_class = "qbeFillFields enable:sort,criteria"
62 field_widget = forms.Select(attrs={'class': field_attr_class})
63 self.fields['field'].widget = field_widget
64
65 def clean_model(self):
66 model = self.cleaned_data['model']
67 return model.lower().replace(".", "_")
68
69 def clean_criteria(self):
70 criteria = self.cleaned_data['criteria']
71 try:
72 operator, over = eval(criteria, {}, {})
73 return (operator, over)
74 except:
75 return (None, None)
76
77
78class BaseQueryByExampleFormSet(BaseFormSet):
79 _selects = []
80 _froms = []
81 _wheres = []
82 _sorts = []
83 _params = []
84 _models = {}
85 _raw_query = None
86 _db_alias = "default"
87 _db_operators = {}
88 _db_table_names = []
89 _db_operations = None
90
91 def __init__(self, *args, **kwargs):
92 self._db_alias = kwargs.pop("using", "default")
93 self._db_connection = connections["default"]
94 database_properties = DATABASES.get(self._db_alias, "default")
95 module = database_properties['ENGINE']
96 try:
97 base_mod = import_module("%s.base" % module)
98 intros_mod = import_module("%s.introspection" % module)
99 except ImportError:
100 pass
101 if base_mod and intros_mod:
102 self._db_operators = base_mod.DatabaseWrapper.operators
103 DatabaseOperations = base_mod.DatabaseOperations
104 try:
105 self._db_operations = DatabaseOperations(self._db_connection)
106 except TypeError:
107 # Some engines have no params to instance DatabaseOperations
108 self._db_operations = DatabaseOperations()
109 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
110 django_table_names = intros_db.django_table_names()
111 table_names = intros_db.table_names()
112 self._db_table_names = list(django_table_names.union(table_names))
113 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
114
115 def clean(self):
116 """
117 Checks that there is almost one field to select
118 """
119 if any(self.errors):
120 # Don't bother validating the formset unless each form is valid on
121 # its own
122 return
123 selects, froms, wheres, sorts, params = self.get_query_parts()
124 if not selects:
125 validation_message = _(u"At least you must check a row to get.")
126 raise forms.ValidationError, validation_message
127 self._selects = selects
128 self._froms = froms
129 self._wheres = wheres
130 self._sorts = sorts
131 self._params = params
132
6dc0bee7
OL
133 def translate_model_to_db_table(self, model_name):
134 """
135 Ensure the full model name match the DB table name (not app_label
136 name).
137 """
138 app_label, name = model_name.split("_")
139 try:
140 ct = ContentType.objects.get(app_label=app_label, model=name)
141 model = ct.model_class()
142 return model._meta.db_table
143 except:
144 return model_name
145
5cf90361
PP
146 def get_query_parts(self):
147 """
148 Return SQL query for cleaned data
149 """
150 selects = []
151 froms = []
152 wheres = []
153 sorts = []
154 params = []
155 app_model_labels = None
156 lookup_cast = self._db_operations.lookup_cast
157 qn = self._db_operations.quote_name
158 uqn = self._unquote_name
159 for data in self.cleaned_data:
160 if not ("model" in data and "field" in data):
161 break
162 model = data["model"]
163 # HACK: Workaround to handle tables created
164 # by django for its own
165 if not app_model_labels:
166 app_models = get_models(include_auto_created=True,
167 include_deferred=True)
168 app_model_labels = [u"%s_%s" % (a._meta.app_label,
169 a._meta.module_name)
170 for a in app_models]
171 if model in app_model_labels:
172 position = app_model_labels.index(model)
173 model = app_models[position]._meta.db_table
174 self._models[model] = app_models[position]
175 field = data["field"]
176 show = data["show"]
177 criteria = data["criteria"]
178 sort = data["sort"]
179 db_field = u"%s.%s" % (qn(model), qn(field))
180 operator, over = criteria
181 is_join = operator.lower() == 'join'
182 if show and not is_join:
183 selects.append(db_field)
184 if sort:
5feb86b7 185 sorts.append((db_field, sort))
5cf90361
PP
186 if all(criteria):
187 if is_join:
188 over_split = over.lower().rsplit(".", 1)
6dc0bee7 189 join_model = qn(self.translate_model_to_db_table(over_split[0].replace(".", "_")))
5cf90361 190 join_field = qn(over_split[1])
6dc0bee7 191
5cf90361
PP
192 if model in self._models:
193 _field = self._models[model]._meta.get_field(field)
dd816d22
PP
194 try:
195 _db_column = qn(_field.db_column)
196 except:
197 _db_column = qn(_field.attname)
198
5cf90361
PP
199 join = u"%s.%s = %s.%s" \
200 % (join_model, join_field, qn(model),
dd816d22 201 qn(_db_column))
5cf90361
PP
202 else:
203 join = u"%s.%s = %s" \
204 % (join_model, join_field,
205 u"%s_id" % db_field)
206 if (join not in wheres
207 and uqn(join_model) in self._db_table_names):
208 wheres.append(join)
209 if join_model not in froms:
210 froms.append(join_model)
211 # join_select = u"%s.%s" % (join_model, join_field)
212 # if join_select not in selects:
213 # selects.append(join_select)
214 elif operator in self._db_operators:
215 # db_operator = self._db_operators[operator] % over
216 db_operator = self._db_operators[operator]
217 lookup = self._get_lookup(operator, over)
218 params.append(lookup)
219 wheres.append(u"%s %s" \
220 % (lookup_cast(operator) % db_field,
221 db_operator))
222 if qn(model) not in froms and model in self._db_table_names:
223 froms.append(qn(model))
224 return selects, froms, wheres, sorts, params
225
226 def get_raw_query(self, limit=None, offset=None, count=False,
227 add_extra_ids=False, add_params=False):
228 if self._raw_query:
229 return self._raw_query
230 if self._sorts:
5feb86b7 231 order_by = u"ORDER BY %s" % (", ".join([" ".join(x) for x in self._sorts]))
5cf90361
PP
232 else:
233 order_by = u""
234 if self._wheres:
235 wheres = u"WHERE %s" % (" AND ".join(self._wheres))
236 else:
237 wheres = u""
238 if count:
239 selects = (u"COUNT(*) as count", )
240 order_by = u""
241 elif add_extra_ids:
242 selects = self._get_selects_with_extra_ids()
243 else:
244 selects = self._selects
245 limits = u""
246 if limit:
247 try:
248 limits = u"LIMIT %s" % int(limit)
249 except ValueError:
250 pass
251 offsets = u""
252 if offset:
253 try:
254 offsets = u"OFFSET %s" % int(offset)
255 except ValueError:
256 pass
257 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
258 % (", ".join(selects),
259 ", ".join(self._froms),
260 wheres,
261 order_by,
262 limits,
263 offsets)
264 if add_params:
265 return u"%s /* %s */" % (sql, ", ".join(self._params))
266 else:
267 return sql
268
269 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
270 row_number=False):
271 """
272 Fetch all results after perform SQL query and
273 """
274 add_extra_ids = (admin_name != None)
275 if not query:
276 sql = self.get_raw_query(limit=limit, offset=offset,
277 add_extra_ids=add_extra_ids)
278 else:
279 sql = query
280 if settings.DEBUG:
281 print sql
282 cursor = self._db_connection.cursor()
283 cursor.execute(sql, tuple(self._params))
284 query_results = cursor.fetchall()
285 if admin_name:
286 selects = self._get_selects_with_extra_ids()
287 results = []
288 try:
289 offset = int(offset)
290 except ValueError:
291 offset = 0
292 for r, row in enumerate(query_results):
293 i = 0
294 l = len(row)
295 if row_number:
296 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
297 else:
298 result = []
299 while i < l:
300 appmodel, field = selects[i].split(".")
301 appmodel = self._unquote_name(appmodel)
302 field = self._unquote_name(field)
303 try:
304 if appmodel in self._models:
305 _model = self._models[appmodel]
306 _appmodel = u"%s_%s" % (_model._meta.app_label,
307 _model._meta.module_name)
308 else:
309 _appmodel = appmodel
310 admin_url = reverse("%s:%s_change" % (admin_name,
311 _appmodel),
312 args=[row[i + 1]])
313 except NoReverseMatch:
314 admin_url = None
315 result.append((row[i], admin_url))
316 i += 2
317 results.append(result)
318 return results
319 else:
320 if row_number:
321 results = []
322 for r, row in enumerate(query_results):
323 result = [r + 1]
324 for cell in row:
325 result.append(cell)
326 results.append(result)
327 return results
328 else:
329 return query_results
330
331 def get_count(self):
332 query = self.get_raw_query(count=True)
333 results = self.get_results(query=query)
334 if results:
335 return float(results[0][0])
336 else:
337 return len(self.get_results())
338
6dc0bee7
OL
339
340 def get_model(self, db_prefix, model):
341 klass = get_model(db_prefix, model)
342 if klass is None:
343 db_model = "%s_%s" % (db_prefix, model)
344 for table in self._models.keys():
345 if table == db_model:
346 return self._models[table]
347 return klass
348
5cf90361
PP
349 def get_labels(self, add_extra_ids=False, row_number=False):
350 if row_number:
351 labels = [_(u"#")]
352 else:
353 labels = []
354 if add_extra_ids:
355 selects = self._get_selects_with_extra_ids()
356 else:
357 selects = self._selects
358 if selects and isinstance(selects, (tuple, list)):
359 for select in selects:
360 label_splits = select.replace("`", "").replace("_", ".").split(".")
6dc0bee7
OL
361 # restore underscore for fields which use it
362 label_field = "_".join(label_splits[2:])
363 model = self.get_model(label_splits[0], label_splits[1])
5cf90361
PP
364 label = model._meta.get_field_by_name(label_field)[0].verbose_name
365 labels.append(label.capitalize())
366 return labels
367
368 def _unquote_name(self, name):
369 quoted_space = self._db_operations.quote_name("")
370 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
371 return name[1:-1]
372 return name
373
374 def _get_lookup(self, operator, over):
375 lookup = Field().get_db_prep_lookup(operator, over,
376 connection=self._db_connection,
377 prepared=True)
378 if isinstance(lookup, (tuple, list)):
379 return lookup[0]
380 return lookup
381
382 def _get_selects_with_extra_ids(self):
383 qn = self._db_operations.quote_name
384 selects = []
385 for select in self._selects:
386 appmodel, field = select.split(".")
387 appmodel = self._unquote_name(appmodel)
388 field = self._unquote_name(field)
389 selects.append(select)
390 if appmodel in self._models:
391 pk_name = self._models[appmodel]._meta.pk.name
392 else:
393 pk_name = u"id"
394 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
395 return selects
396
397QueryByExampleFormSet = formset_factory(QueryByExampleForm,
398 formset=BaseQueryByExampleFormSet,
399 extra=1,
400 can_delete=True)