- ref supprime
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
1 # -*- coding: utf-8 -*-
2 from django import forms
3 from django.db import connections
4 from django.db.models import get_model
5 from django.db.models.fields import Field
6 from django.core.urlresolvers import reverse, NoReverseMatch
7 from django.conf import settings
8 from django.forms.formsets import BaseFormSet, formset_factory
9 from django.utils.importlib import import_module
10 from django.utils.translation import ugettext as _
11 from django.contrib.contenttypes.models import ContentType
12
13 from django_qbe.utils import get_models
14 from django_qbe.widgets import CriteriaInput
15
16
17 DATABASES = None
18 try:
19 DATABASES = settings.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
22 DATABASES = {
23 'default': {
24 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
25 'NAME': settings.DATABASE_NAME,
26 }
27 }
28
29 SORT_CHOICES = (
30 ("", ""),
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
33 )
34
35 STATUS_CHOICES = (
36 ("", "Tous"),
37 ("inactive", "Inactifs"),
38 ("active", "Actifs"),
39 ("futur", "Futurs"),
40 ("unknown", "Inconnus"),
41 )
42
43
44 class QueryByExampleForm(forms.Form):
45 show = forms.BooleanField(label=_("Show"), required=False)
46 model = forms.CharField(label=_("Model"))
47 field = forms.CharField(label=_("Field"))
48 criteria = forms.CharField(label=_("Criteria"), required=False)
49 status = forms.ChoiceField(label=_("Statut"), choices=STATUS_CHOICES,
50 required=False)
51 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
52 required=False)
53
54 def __init__(self, *args, **kwargs):
55 super(QueryByExampleForm, self).__init__(*args, **kwargs)
56 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
57 self.fields['model'].widget = model_widget
58 sort_widget = forms.Select(attrs={'disabled': "disabled",
59 'class': 'submitIfChecked'},
60 choices=SORT_CHOICES)
61 self.fields['sort'].widget = sort_widget
62 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
63 self.fields['criteria'].widget = criteria_widget
64 criteria_widgets = getattr(criteria_widget, "widgets", [])
65 if criteria_widgets:
66 criteria_len = len(criteria_widgets)
67 criteria_names = ",".join([("criteria_%s" % s)
68 for s in range(0, criteria_len)])
69 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
70 else:
71 field_attr_class = "qbeFillFields enable:sort,criteria"
72 status_widget = forms.Select(attrs={'disabled': "disabled",
73 'class': 'hidden'},
74 choices=STATUS_CHOICES)
75 self.fields['status'].widget = status_widget
76 field_widget = forms.Select(attrs={'class': field_attr_class})
77 self.fields['field'].widget = field_widget
78
79 def clean_model(self):
80 model = self.cleaned_data['model']
81 return model.lower().replace(".", "_")
82
83 def clean_criteria(self):
84 criteria = self.cleaned_data['criteria']
85 try:
86 operator, over = eval(criteria, {}, {})
87 return (operator, over)
88 except:
89 return (None, None)
90
91
92 class BaseQueryByExampleFormSet(BaseFormSet):
93 _selects = []
94 _froms = []
95 _wheres = []
96 _statuses = []
97 _sorts = []
98 _params = []
99 _models = {}
100 _raw_query = None
101 _db_alias = "default"
102 _db_operators = {}
103 _db_table_names = []
104 _db_operations = None
105
106 def __init__(self, *args, **kwargs):
107 self._db_alias = kwargs.pop("using", "default")
108 self._db_connection = connections["default"]
109 database_properties = DATABASES.get(self._db_alias, "default")
110 module = database_properties['ENGINE']
111 try:
112 base_mod = import_module("%s.base" % module)
113 intros_mod = import_module("%s.introspection" % module)
114 except ImportError:
115 pass
116 if base_mod and intros_mod:
117 self._db_operators = base_mod.DatabaseWrapper.operators
118 DatabaseOperations = base_mod.DatabaseOperations
119 try:
120 self._db_operations = DatabaseOperations(self._db_connection)
121 except TypeError:
122 # Some engines have no params to instance DatabaseOperations
123 self._db_operations = DatabaseOperations()
124 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
125 django_table_names = intros_db.django_table_names()
126 table_names = intros_db.table_names()
127 self._db_table_names = list(django_table_names.union(table_names))
128 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
129
130 def clean(self):
131 """
132 Checks that there is almost one field to select
133 """
134 if any(self.errors):
135 # Don't bother validating the formset unless each form is valid on
136 # its own
137 return
138 selects, froms, wheres, sorts, params, statuses = self.get_query_parts()
139 if not selects:
140 validation_message = _(u"At least you must check a row to get.")
141 raise forms.ValidationError, validation_message
142 self._selects = selects
143 self._froms = froms
144 self._wheres = wheres
145 self._sorts = sorts
146 self._params = params
147 self._statuses = statuses
148
149 def translate_model_to_db_table(self, model_name):
150 """
151 Ensure the full model name match the DB table name (not app_label
152 name).
153 """
154 app_label, name = model_name.split("_")
155 try:
156 ct = ContentType.objects.get(app_label=app_label, model=name)
157 model = ct.model_class()
158 return model._meta.db_table
159 except:
160 return model_name
161
162 def get_query_parts(self):
163 """
164 Return SQL query for cleaned data
165 """
166 selects = []
167 froms = []
168 wheres = []
169 sorts = []
170 params = []
171 statuses = []
172 app_model_labels = None
173 lookup_cast = self._db_operations.lookup_cast
174 qn = self._db_operations.quote_name
175 uqn = self._unquote_name
176 for data in self.cleaned_data:
177 if not ("model" in data and "field" in data):
178 break
179 model = data["model"]
180 # HACK: Workaround to handle tables created
181 # by django for its own
182 if not app_model_labels:
183 app_models = get_models(include_auto_created=True,
184 include_deferred=True)
185 app_model_labels = [u"%s_%s" % (a._meta.app_label,
186 a._meta.module_name)
187 for a in app_models]
188 if model in app_model_labels:
189 position = app_model_labels.index(model)
190 model = app_models[position]._meta.db_table
191 self._models[model] = app_models[position]
192 field = data["field"]
193 show = data["show"]
194 criteria = data["criteria"]
195 sort = data["sort"]
196 status = data["status"]
197 db_field = u"%s.%s" % (qn(model), qn(field))
198 operator, over = criteria
199 try:
200 is_join = operator.lower() == 'join'
201 except:
202 is_join = False
203 if show and not is_join:
204 selects.append(db_field)
205 if sort:
206 sorts.append((db_field, sort))
207 if status:
208 statuses.append((model, status))
209 if all(criteria):
210 if is_join:
211 over_split = over.lower().rsplit(".", 1)
212 join_model = qn(self.translate_model_to_db_table(over_split[0].replace(".", "_")))
213 join_field = qn(over_split[1])
214
215 if model in self._models:
216 _field = self._models[model]._meta.get_field(field)
217 try:
218 _db_column = qn(_field.db_column)
219 except:
220 _db_column = qn(_field.attname)
221
222 join = u"%s.%s = %s.%s" \
223 % (join_model, join_field, qn(model),
224 qn(_db_column))
225 else:
226 join = u"%s.%s = %s" \
227 % (join_model, join_field,
228 u"%s_id" % db_field)
229 if (join not in wheres
230 and uqn(join_model) in self._db_table_names):
231 wheres.append(join)
232 if join_model not in froms:
233 froms.append(join_model)
234 # join_select = u"%s.%s" % (join_model, join_field)
235 # if join_select not in selects:
236 # selects.append(join_select)
237 elif operator in self._db_operators:
238 # db_operator = self._db_operators[operator] % over
239 db_operator = self._db_operators[operator]
240 lookup = self._get_lookup(operator, over)
241 params.append(lookup)
242 wheres.append(u"%s %s" \
243 % (lookup_cast(operator) % db_field,
244 db_operator))
245 if qn(model) not in froms and model in self._db_table_names:
246 froms.append(qn(model))
247 return selects, froms, wheres, sorts, params, statuses
248
249 def get_raw_query(self, limit=None, offset=None, count=False,
250 add_extra_ids=False, add_params=False):
251 qn = self._db_operations.quote_name
252 if self._raw_query:
253 return self._raw_query
254 if self._sorts:
255 order_by = u"ORDER BY %s" % (", ".join([" ".join(x) for x in self._sorts]))
256 else:
257 order_by = u""
258 _my_wheres = self._wheres
259 if self._statuses:
260 for m, s in self._statuses:
261 # Test cas spécial: Pour état employé, vérifier via les dossiers.
262 if m == 'rh_employe':
263 m = qn('rh_dossier')
264 if m not in self._froms:
265 self._froms.append(m)
266 _my_wheres.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
267 if s == "inactive":
268 _my_wheres.append("%s.date_fin < DATE(NOW())" % m)
269 if s == "active":
270 _my_wheres.append(
271 "((%s.`date_debut` <= DATE(NOW()) OR %s.`date_debut` IS NULL) AND %s.`date_fin` >= DATE(NOW())) OR "
272 "((%s.`date_fin` >= DATE(NOW()) OR %s.`date_fin` IS NULL) AND %s.`date_debut` <= DATE(NOW())) OR "
273 "(%s.`date_debut` <= DATE(NOW()) AND %s.`date_fin` >= DATE(NOW()))"
274 % (m, m, m, m, m, m, m, m))
275 if s == "futur":
276 _my_wheres.append("%s.date_debut > DATE(NOW())" % m)
277 if s == "unknown":
278 _my_wheres.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m, m))
279 if _my_wheres:
280 wheres = u"WHERE %s" % (" AND ".join(_my_wheres))
281 else:
282 wheres = u""
283 if count:
284 selects = (u"COUNT(*) as count", )
285 order_by = u""
286 elif add_extra_ids:
287 selects = self._get_selects_with_extra_ids()
288 else:
289 selects = self._selects
290 limits = u""
291 if limit:
292 try:
293 limits = u"LIMIT %s" % int(limit)
294 except ValueError:
295 pass
296 offsets = u""
297 if offset:
298 try:
299 offsets = u"OFFSET %s" % int(offset)
300 except ValueError:
301 pass
302 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
303 % (", ".join(selects),
304 ", ".join(self._froms),
305 wheres,
306 order_by,
307 limits,
308 offsets)
309 if add_params:
310 return u"%s /* %s */" % (sql, ", ".join(self._params))
311 else:
312 return sql
313
314 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
315 row_number=False):
316 """
317 Fetch all results after perform SQL query and
318 """
319 add_extra_ids = (admin_name != None)
320 if not query:
321 sql = self.get_raw_query(limit=limit, offset=offset,
322 add_extra_ids=add_extra_ids)
323 else:
324 sql = query
325 if settings.DEBUG:
326 print sql
327 cursor = self._db_connection.cursor()
328 cursor.execute(sql, tuple(self._params))
329 query_results = cursor.fetchall()
330 if admin_name:
331 selects = self._get_selects_with_extra_ids()
332 results = []
333 try:
334 offset = int(offset)
335 except ValueError:
336 offset = 0
337 for r, row in enumerate(query_results):
338 i = 0
339 l = len(row)
340 if row_number:
341 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
342 else:
343 result = []
344 while i < l:
345 appmodel, field = selects[i].split(".")
346 appmodel = self._unquote_name(appmodel)
347 field = self._unquote_name(field)
348 try:
349 if appmodel in self._models:
350 _model = self._models[appmodel]
351 _appmodel = u"%s_%s" % (_model._meta.app_label,
352 _model._meta.module_name)
353 else:
354 _appmodel = appmodel
355 admin_url = reverse("%s:%s_change" % (admin_name,
356 _appmodel),
357 args=[row[i + 1]])
358 except NoReverseMatch:
359 admin_url = None
360 result.append((row[i], admin_url))
361 i += 2
362 results.append(result)
363 return results
364 else:
365 if row_number:
366 results = []
367 for r, row in enumerate(query_results):
368 result = [r + 1]
369 for cell in row:
370 result.append(cell)
371 results.append(result)
372 return results
373 else:
374 return query_results
375
376 def get_count(self):
377 query = self.get_raw_query(count=True)
378 results = self.get_results(query=query)
379 if results:
380 return float(results[0][0])
381 else:
382 return len(self.get_results())
383
384
385 def get_model(self, db_prefix, model):
386 klass = get_model(db_prefix, model)
387 if klass is None:
388 db_model = "%s_%s" % (db_prefix, model)
389 for table in self._models.keys():
390 if table == db_model:
391 return self._models[table]
392 return klass
393
394 def get_labels(self, add_extra_ids=False, row_number=False):
395 if row_number:
396 labels = [_(u"#")]
397 else:
398 labels = []
399 if add_extra_ids:
400 selects = self._get_selects_with_extra_ids()
401 else:
402 selects = self._selects
403 if selects and isinstance(selects, (tuple, list)):
404 for select in selects:
405 label_splits = select.replace("`", "").replace("_", ".").split(".")
406 # restore underscore for fields which use it
407 label_field = "_".join(label_splits[2:])
408 model = self.get_model(label_splits[0], label_splits[1])
409 label = model._meta.get_field_by_name(label_field)[0].verbose_name
410 labels.append(label.capitalize())
411 return labels
412
413 def _unquote_name(self, name):
414 quoted_space = self._db_operations.quote_name("")
415 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
416 return name[1:-1]
417 return name
418
419 def _get_lookup(self, operator, over):
420 lookup = Field().get_db_prep_lookup(operator, over,
421 connection=self._db_connection,
422 prepared=True)
423 if isinstance(lookup, (tuple, list)):
424 return lookup[0]
425 return lookup
426
427 def _get_selects_with_extra_ids(self):
428 qn = self._db_operations.quote_name
429 selects = []
430 for select in self._selects:
431 appmodel, field = select.split(".")
432 appmodel = self._unquote_name(appmodel)
433 field = self._unquote_name(field)
434 selects.append(select)
435 if appmodel in self._models:
436 pk_name = self._models[appmodel]._meta.pk.name
437 else:
438 pk_name = u"id"
439 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
440 return selects
441
442 QueryByExampleFormSet = formset_factory(QueryByExampleForm,
443 formset=BaseQueryByExampleFormSet,
444 extra=1,
445 can_delete=True)