recrutement merge
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
1 # -*- coding: utf-8 -*-
2 from django import forms
3 from django.db import connections
4 from django.db.models import get_model
5 from django.db.models.fields import Field
6 from django.core.urlresolvers import reverse, NoReverseMatch
7 from django.conf import settings
8 from django.forms.formsets import BaseFormSet, formset_factory
9 from django.utils.importlib import import_module
10 from django.utils.translation import ugettext as _
11 from django.contrib.contenttypes.models import ContentType
12
13 from django_qbe.utils import get_models
14 from django_qbe.widgets import CriteriaInput
15
16
17 DATABASES = None
18 try:
19 DATABASES = settings.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
22 DATABASES = {
23 'default': {
24 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
25 'NAME': settings.DATABASE_NAME,
26 }
27 }
28
29 SORT_CHOICES = (
30 ("", ""),
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
33 )
34
35 STATUS_CHOICES = (
36 ("", "Tous"),
37 ("inactive", "Inactifs"),
38 ("active", "Actifs"),
39 ("futur", "Futurs"),
40 ("unknown", "Inconnus"),
41 )
42
43
44 class QueryByExampleForm(forms.Form):
45 show = forms.BooleanField(label=_("Show"), required=False)
46 model = forms.CharField(label=_("Model"))
47 field = forms.CharField(label=_("Field"))
48 criteria = forms.CharField(label=_("Criteria"), required=False)
49 status = forms.ChoiceField(label=_("Statut"), choices=STATUS_CHOICES,
50 required=False)
51 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
52 required=False)
53
54 def __init__(self, *args, **kwargs):
55 super(QueryByExampleForm, self).__init__(*args, **kwargs)
56 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
57 self.fields['model'].widget = model_widget
58 sort_widget = forms.Select(attrs={'disabled': "disabled",
59 'class': 'submitIfChecked'},
60 choices=SORT_CHOICES)
61 self.fields['sort'].widget = sort_widget
62 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
63 self.fields['criteria'].widget = criteria_widget
64 criteria_widgets = getattr(criteria_widget, "widgets", [])
65 if criteria_widgets:
66 criteria_len = len(criteria_widgets)
67 criteria_names = ",".join([("criteria_%s" % s)
68 for s in range(0, criteria_len)])
69 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
70 else:
71 field_attr_class = "qbeFillFields enable:sort,criteria"
72 status_widget = forms.Select(attrs={'disabled': "disabled",
73 'class': 'hidden'},
74 choices=STATUS_CHOICES)
75 self.fields['status'].widget = status_widget
76 field_widget = forms.Select(attrs={'class': field_attr_class})
77 self.fields['field'].widget = field_widget
78
79 def clean_model(self):
80 model = self.cleaned_data['model']
81 return model.lower().replace(".", "_")
82
83 def clean_criteria(self):
84 criteria = self.cleaned_data['criteria']
85 try:
86 operator, over = eval(criteria, {}, {})
87 return (operator, over)
88 except:
89 return (None, None)
90
91
92 class BaseQueryByExampleFormSet(BaseFormSet):
93 _selects = []
94 _froms = []
95 _wheres = []
96 _statuses = []
97 _sorts = []
98 _params = []
99 _models = {}
100 _raw_query = None
101 _db_alias = "default"
102 _db_operators = {}
103 _db_table_names = []
104 _db_operations = None
105
106 def __init__(self, *args, **kwargs):
107 self._db_alias = kwargs.pop("using", "default")
108 self._db_connection = connections["default"]
109 database_properties = DATABASES.get(self._db_alias, "default")
110 module = database_properties['ENGINE']
111 try:
112 base_mod = import_module("%s.base" % module)
113 intros_mod = import_module("%s.introspection" % module)
114 except ImportError:
115 pass
116 if base_mod and intros_mod:
117 self._db_operators = base_mod.DatabaseWrapper.operators
118 DatabaseOperations = base_mod.DatabaseOperations
119 try:
120 self._db_operations = DatabaseOperations(self._db_connection)
121 except TypeError:
122 # Some engines have no params to instance DatabaseOperations
123 self._db_operations = DatabaseOperations()
124 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
125 django_table_names = intros_db.django_table_names()
126 table_names = intros_db.table_names()
127 self._db_table_names = list(django_table_names.union(table_names))
128 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
129
130 def clean(self):
131 """
132 Checks that there is almost one field to select
133 """
134 if any(self.errors):
135 # Don't bother validating the formset unless each form is valid on
136 # its own
137 return
138 selects, froms, wheres, sorts, params, statuses = self.get_query_parts()
139 if not selects:
140 validation_message = _(u"At least you must check a row to get.")
141 raise forms.ValidationError, validation_message
142 self._selects = selects
143 self._froms = froms
144 self._wheres = wheres
145 self._sorts = sorts
146 self._params = params
147 self._statuses = statuses
148
149 def translate_model_to_db_table(self, model_name):
150 """
151 Ensure the full model name match the DB table name (not app_label
152 name).
153 """
154 app_label, name = model_name.split("_")
155 try:
156 ct = ContentType.objects.get(app_label=app_label, model=name)
157 model = ct.model_class()
158 return model._meta.db_table
159 except:
160 return model_name
161
162 def get_query_parts(self):
163 """
164 Return SQL query for cleaned data
165 """
166 selects = []
167 froms = []
168 wheres = []
169 sorts = []
170 params = []
171 statuses = []
172 app_model_labels = None
173 lookup_cast = self._db_operations.lookup_cast
174 qn = self._db_operations.quote_name
175 uqn = self._unquote_name
176 for data in self.cleaned_data:
177 if not ("model" in data and "field" in data):
178 break
179 model = data["model"]
180 # HACK: Workaround to handle tables created
181 # by django for its own
182 if not app_model_labels:
183 app_models = get_models(include_auto_created=True,
184 include_deferred=True)
185 app_model_labels = [u"%s_%s" % (a._meta.app_label,
186 a._meta.module_name)
187 for a in app_models]
188 if model in app_model_labels:
189 position = app_model_labels.index(model)
190 model = app_models[position]._meta.db_table
191 self._models[model] = app_models[position]
192 field = data["field"]
193 show = data["show"]
194 criteria = data["criteria"]
195 sort = data["sort"]
196 status = data["status"]
197 db_field = u"%s.%s" % (qn(model), qn(field))
198 operator, over = criteria
199 try:
200 is_join = operator.lower() == 'join'
201 except:
202 is_join = False
203 if show and not is_join:
204 selects.append(db_field)
205 if sort:
206 sorts.append((db_field, sort))
207 if status:
208 statuses.append((model, status))
209
210 if all(criteria):
211 if is_join:
212 over_split = over.lower().rsplit(".", 1)
213 join_model = qn(self.translate_model_to_db_table(over_split[0].replace(".", "_")))
214 join_field = qn(over_split[1])
215
216 if model in self._models:
217 _field = self._models[model]._meta.get_field(field)
218 try:
219 _db_column = qn(_field.db_column)
220 except:
221 _db_column = qn(_field.attname)
222
223 join = u"%s.%s = %s.%s" \
224 % (join_model, join_field, qn(model),
225 qn(_db_column))
226 else:
227 join = u"%s.%s = %s" \
228 % (join_model, join_field,
229 u"%s_id" % db_field)
230 if (join not in wheres
231 and uqn(join_model) in self._db_table_names):
232 wheres.append(join)
233 if join_model not in froms:
234 froms.append(join_model)
235 # join_select = u"%s.%s" % (join_model, join_field)
236 # if join_select not in selects:
237 # selects.append(join_select)
238 elif operator in self._db_operators:
239 # db_operator = self._db_operators[operator] % over
240 db_operator = self._db_operators[operator]
241 lookup = self._get_lookup(operator, over)
242 params.append(lookup)
243 wheres.append(u"%s %s" \
244 % (lookup_cast(operator) % db_field,
245 db_operator))
246 if qn(model) not in froms and model in self._db_table_names:
247 froms.append(qn(model))
248 return selects, froms, wheres, sorts, params, statuses
249
250 def get_raw_query(self, limit=None, offset=None, count=False,
251 add_extra_ids=False, add_params=False):
252 qn = self._db_operations.quote_name
253 if self._raw_query:
254 return self._raw_query
255 if self._sorts:
256 order_by = u"ORDER BY %s" % (", ".join([" ".join(x) for x in self._sorts]))
257 else:
258 order_by = u""
259 _my_wheres = self._wheres
260 if self._statuses:
261 for m, s in self._statuses:
262 # Test cas spécial: Pour état employé, vérifier via les dossiers.
263 if m == 'rh_employe':
264 m = qn('rh_dossier')
265 if m not in self._froms:
266 self._froms.append(m)
267 _my_wheres.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
268 if s == "inactive":
269 _my_wheres.append("%s.date_fin < DATE(NOW())" % m)
270 if s == "active":
271 _my_wheres.append(
272 "(((%s.`date_debut` <= DATE(NOW()) OR %s.`date_debut` IS NULL) AND %s.`date_fin` >= DATE(NOW())) OR "
273 "((%s.`date_fin` >= DATE(NOW()) OR %s.`date_fin` IS NULL) AND %s.`date_debut` <= DATE(NOW())) OR "
274 "(%s.`date_debut` <= DATE(NOW()) AND %s.`date_fin` >= DATE(NOW())))"
275 % (m, m, m, m, m, m, m, m))
276 if s == "futur":
277 _my_wheres.append("%s.date_debut > DATE(NOW())" % m)
278 if s == "unknown":
279 _my_wheres.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m, m))
280 if _my_wheres:
281 wheres = u"WHERE %s" % (" AND ".join(_my_wheres))
282 else:
283 wheres = u""
284 if count:
285 selects = (u"COUNT(*) as count", )
286 order_by = u""
287 elif add_extra_ids:
288 selects = self._get_selects_with_extra_ids()
289 else:
290 selects = self._selects
291 limits = u""
292 if limit:
293 try:
294 limits = u"LIMIT %s" % int(limit)
295 except ValueError:
296 pass
297 offsets = u""
298 if offset:
299 try:
300 offsets = u"OFFSET %s" % int(offset)
301 except ValueError:
302 pass
303 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
304 % (", ".join(selects),
305 ", ".join(self._froms),
306 wheres,
307 order_by,
308 limits,
309 offsets)
310 if add_params:
311 return u"%s /* %s */" % (sql, ", ".join(self._params))
312 else:
313 return sql
314
315 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
316 row_number=False):
317 """
318 Fetch all results after perform SQL query and
319 """
320 add_extra_ids = (admin_name != None)
321 if not query:
322 sql = self.get_raw_query(limit=limit, offset=offset,
323 add_extra_ids=add_extra_ids)
324 else:
325 sql = query
326 if settings.DEBUG:
327 print sql
328 cursor = self._db_connection.cursor()
329 cursor.execute(sql, tuple(self._params))
330 query_results = cursor.fetchall()
331 if admin_name:
332 selects = self._get_selects_with_extra_ids()
333 results = []
334 try:
335 offset = int(offset)
336 except ValueError:
337 offset = 0
338 for r, row in enumerate(query_results):
339 i = 0
340 l = len(row)
341 if row_number:
342 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
343 else:
344 result = []
345 while i < l:
346 appmodel, field = selects[i].split(".")
347 appmodel = self._unquote_name(appmodel)
348 field = self._unquote_name(field)
349 try:
350 if appmodel in self._models:
351 _model = self._models[appmodel]
352 _appmodel = u"%s_%s" % (_model._meta.app_label,
353 _model._meta.module_name)
354 else:
355 _appmodel = appmodel
356 admin_url = reverse("%s:%s_change" % (admin_name,
357 _appmodel),
358 args=[row[i + 1]])
359 except NoReverseMatch:
360 admin_url = None
361 result.append((row[i], admin_url))
362 i += 2
363 results.append(result)
364 return results
365 else:
366 if row_number:
367 results = []
368 for r, row in enumerate(query_results):
369 result = [r + 1]
370 for cell in row:
371 result.append(cell)
372 results.append(result)
373 return results
374 else:
375 return query_results
376
377 def get_count(self):
378 query = self.get_raw_query(count=True)
379 results = self.get_results(query=query)
380 if results:
381 return float(results[0][0])
382 else:
383 return len(self.get_results())
384
385
386 def get_model(self, db_prefix, model):
387 klass = get_model(db_prefix, model)
388 if klass is None:
389 db_model = "%s_%s" % (db_prefix, model)
390 for table in self._models.keys():
391 if table == db_model:
392 return self._models[table]
393 return klass
394
395 def get_labels(self, add_extra_ids=False, row_number=False):
396 if row_number:
397 labels = [_(u"#")]
398 else:
399 labels = []
400 if add_extra_ids:
401 selects = self._get_selects_with_extra_ids()
402 else:
403 selects = self._selects
404 if selects and isinstance(selects, (tuple, list)):
405 for select in selects:
406 label_splits = select.replace("`", "").replace("_", ".").split(".")
407 # restore underscore for fields which use it
408 label_field = "_".join(label_splits[2:])
409 model = self.get_model(label_splits[0], label_splits[1])
410 label = model._meta.get_field_by_name(label_field)[0].verbose_name
411 labels.append(label.capitalize())
412 return labels
413
414 def _unquote_name(self, name):
415 quoted_space = self._db_operations.quote_name("")
416 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
417 return name[1:-1]
418 return name
419
420 def _get_lookup(self, operator, over):
421 lookup = Field().get_db_prep_lookup(operator, over,
422 connection=self._db_connection,
423 prepared=True)
424 if isinstance(lookup, (tuple, list)):
425 return lookup[0]
426 return lookup
427
428 def _get_selects_with_extra_ids(self):
429 qn = self._db_operations.quote_name
430 selects = []
431 for select in self._selects:
432 appmodel, field = select.split(".")
433 appmodel = self._unquote_name(appmodel)
434 field = self._unquote_name(field)
435 selects.append(select)
436 if appmodel in self._models:
437 pk_name = self._models[appmodel]._meta.pk.name
438 else:
439 pk_name = u"id"
440 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
441 return selects
442
443 QueryByExampleFormSet = formset_factory(QueryByExampleForm,
444 formset=BaseQueryByExampleFormSet,
445 extra=1,
446 can_delete=True)