Ajout champ Etat pour actif/inactif/etc...
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
1 # -*- coding: utf-8 -*-
2 from django import forms
3 from django.db import connections
4 from django.db.models import get_model
5 from django.db.models.fields import Field
6 from django.core.urlresolvers import reverse, NoReverseMatch
7 from django.conf import settings
8 from django.forms.formsets import BaseFormSet, formset_factory
9 from django.utils.importlib import import_module
10 from django.utils.translation import ugettext as _
11 from django.contrib.contenttypes.models import ContentType
12
13 from django_qbe.utils import get_models
14 from django_qbe.widgets import CriteriaInput
15
16
17 DATABASES = None
18 try:
19 DATABASES = settings.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
22 DATABASES = {
23 'default': {
24 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
25 'NAME': settings.DATABASE_NAME,
26 }
27 }
28
29 SORT_CHOICES = (
30 ("", ""),
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
33 )
34
35 STATUS_CHOICES = (
36 ("", "Tous"),
37 ("inactive", "Inactifs"),
38 ("active", "Actifs"),
39 ("futur", "Futurs"),
40 ("unknown", "Inconnus"),
41 )
42
43
44 class QueryByExampleForm(forms.Form):
45 show = forms.BooleanField(label=_("Show"), required=False)
46 model = forms.CharField(label=_("Model"))
47 status = forms.ChoiceField(label=_("Statut"), choices=STATUS_CHOICES,
48 required=False)
49 field = forms.CharField(label=_("Field"))
50 criteria = forms.CharField(label=_("Criteria"), required=False)
51 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
52 required=False)
53
54 def __init__(self, *args, **kwargs):
55 super(QueryByExampleForm, self).__init__(*args, **kwargs)
56 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
57 self.fields['model'].widget = model_widget
58 sort_widget = forms.Select(attrs={'disabled': "disabled",
59 'class': 'submitIfChecked'},
60 choices=SORT_CHOICES)
61 self.fields['sort'].widget = sort_widget
62 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
63 self.fields['criteria'].widget = criteria_widget
64 criteria_widgets = getattr(criteria_widget, "widgets", [])
65 if criteria_widgets:
66 criteria_len = len(criteria_widgets)
67 criteria_names = ",".join([("criteria_%s" % s)
68 for s in range(0, criteria_len)])
69 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
70 else:
71 field_attr_class = "qbeFillFields enable:sort,criteria"
72 field_widget = forms.Select(attrs={'class': field_attr_class})
73 self.fields['field'].widget = field_widget
74
75 def clean_model(self):
76 model = self.cleaned_data['model']
77 return model.lower().replace(".", "_")
78
79 def clean_criteria(self):
80 criteria = self.cleaned_data['criteria']
81 try:
82 operator, over = eval(criteria, {}, {})
83 return (operator, over)
84 except:
85 return (None, None)
86
87
88 class BaseQueryByExampleFormSet(BaseFormSet):
89 _selects = []
90 _froms = []
91 _wheres = []
92 _statuses = []
93 _sorts = []
94 _params = []
95 _models = {}
96 _raw_query = None
97 _db_alias = "default"
98 _db_operators = {}
99 _db_table_names = []
100 _db_operations = None
101
102 def __init__(self, *args, **kwargs):
103 self._db_alias = kwargs.pop("using", "default")
104 self._db_connection = connections["default"]
105 database_properties = DATABASES.get(self._db_alias, "default")
106 module = database_properties['ENGINE']
107 try:
108 base_mod = import_module("%s.base" % module)
109 intros_mod = import_module("%s.introspection" % module)
110 except ImportError:
111 pass
112 if base_mod and intros_mod:
113 self._db_operators = base_mod.DatabaseWrapper.operators
114 DatabaseOperations = base_mod.DatabaseOperations
115 try:
116 self._db_operations = DatabaseOperations(self._db_connection)
117 except TypeError:
118 # Some engines have no params to instance DatabaseOperations
119 self._db_operations = DatabaseOperations()
120 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
121 django_table_names = intros_db.django_table_names()
122 table_names = intros_db.table_names()
123 self._db_table_names = list(django_table_names.union(table_names))
124 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
125
126 def clean(self):
127 """
128 Checks that there is almost one field to select
129 """
130 if any(self.errors):
131 # Don't bother validating the formset unless each form is valid on
132 # its own
133 return
134 selects, froms, wheres, sorts, params, statuses = self.get_query_parts()
135 if not selects:
136 validation_message = _(u"At least you must check a row to get.")
137 raise forms.ValidationError, validation_message
138 self._selects = selects
139 self._froms = froms
140 self._wheres = wheres
141 self._sorts = sorts
142 self._params = params
143 self._statuses = statuses
144
145 def translate_model_to_db_table(self, model_name):
146 """
147 Ensure the full model name match the DB table name (not app_label
148 name).
149 """
150 app_label, name = model_name.split("_")
151 try:
152 ct = ContentType.objects.get(app_label=app_label, model=name)
153 model = ct.model_class()
154 return model._meta.db_table
155 except:
156 return model_name
157
158 def get_query_parts(self):
159 """
160 Return SQL query for cleaned data
161 """
162 selects = []
163 froms = []
164 wheres = []
165 sorts = []
166 params = []
167 statuses = []
168 app_model_labels = None
169 lookup_cast = self._db_operations.lookup_cast
170 qn = self._db_operations.quote_name
171 uqn = self._unquote_name
172 for data in self.cleaned_data:
173 if not ("model" in data and "field" in data):
174 break
175 model = data["model"]
176 # HACK: Workaround to handle tables created
177 # by django for its own
178 if not app_model_labels:
179 app_models = get_models(include_auto_created=True,
180 include_deferred=True)
181 app_model_labels = [u"%s_%s" % (a._meta.app_label,
182 a._meta.module_name)
183 for a in app_models]
184 if model in app_model_labels:
185 position = app_model_labels.index(model)
186 model = app_models[position]._meta.db_table
187 self._models[model] = app_models[position]
188 field = data["field"]
189 show = data["show"]
190 criteria = data["criteria"]
191 sort = data["sort"]
192 status = data["status"]
193 db_field = u"%s.%s" % (qn(model), qn(field))
194 operator, over = criteria
195 is_join = operator.lower() == 'join'
196 if show and not is_join:
197 selects.append(db_field)
198 if sort:
199 sorts.append((db_field, sort))
200 if status:
201 statuses.append((model, status))
202 if all(criteria):
203 if is_join:
204 over_split = over.lower().rsplit(".", 1)
205 join_model = qn(self.translate_model_to_db_table(over_split[0].replace(".", "_")))
206 join_field = qn(over_split[1])
207
208 if model in self._models:
209 _field = self._models[model]._meta.get_field(field)
210 try:
211 _db_column = qn(_field.db_column)
212 except:
213 _db_column = qn(_field.attname)
214
215 join = u"%s.%s = %s.%s" \
216 % (join_model, join_field, qn(model),
217 qn(_db_column))
218 else:
219 join = u"%s.%s = %s" \
220 % (join_model, join_field,
221 u"%s_id" % db_field)
222 if (join not in wheres
223 and uqn(join_model) in self._db_table_names):
224 wheres.append(join)
225 if join_model not in froms:
226 froms.append(join_model)
227 # join_select = u"%s.%s" % (join_model, join_field)
228 # if join_select not in selects:
229 # selects.append(join_select)
230 elif operator in self._db_operators:
231 # db_operator = self._db_operators[operator] % over
232 db_operator = self._db_operators[operator]
233 lookup = self._get_lookup(operator, over)
234 params.append(lookup)
235 wheres.append(u"%s %s" \
236 % (lookup_cast(operator) % db_field,
237 db_operator))
238 if qn(model) not in froms and model in self._db_table_names:
239 froms.append(qn(model))
240 return selects, froms, wheres, sorts, params, statuses
241
242 def get_raw_query(self, limit=None, offset=None, count=False,
243 add_extra_ids=False, add_params=False):
244 qn = self._db_operations.quote_name
245 if self._raw_query:
246 return self._raw_query
247 if self._sorts:
248 order_by = u"ORDER BY %s" % (", ".join([" ".join(x) for x in self._sorts]))
249 else:
250 order_by = u""
251 _my_wheres = self._wheres
252 if self._statuses:
253 for m, s in self._statuses:
254 # Test cas spécial: Pour état employé, vérifier via les dossiers.
255 if m == 'rh_employe':
256 m = qn('rh_dossier')
257 if m not in self._froms:
258 self._froms.append(m)
259 _my_wheres.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
260 if s == "inactive":
261 _my_wheres.append("%s.date_fin < DATE(NOW())" % m)
262 if s == "active":
263 _my_wheres.append("(%s.date_debut IS NULL OR %s.date_debut <= DATE(NOW())) AND (%s.date_fin IS NULL OR %s.date_fin >= DATE(NOW()))" %
264 (m, m, m, m))
265 if s == "futur":
266 _my_wheres.append("%s.date_debut > DATE(NOW())" % m)
267 if s == "unknown":
268 _my_wheres.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m, m))
269 if _my_wheres:
270 wheres = u"WHERE %s" % (" AND ".join(_my_wheres))
271 else:
272 wheres = u""
273 if count:
274 selects = (u"COUNT(*) as count", )
275 order_by = u""
276 elif add_extra_ids:
277 selects = self._get_selects_with_extra_ids()
278 else:
279 selects = self._selects
280 limits = u""
281 if limit:
282 try:
283 limits = u"LIMIT %s" % int(limit)
284 except ValueError:
285 pass
286 offsets = u""
287 if offset:
288 try:
289 offsets = u"OFFSET %s" % int(offset)
290 except ValueError:
291 pass
292 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
293 % (", ".join(selects),
294 ", ".join(self._froms),
295 wheres,
296 order_by,
297 limits,
298 offsets)
299 if add_params:
300 return u"%s /* %s */" % (sql, ", ".join(self._params))
301 else:
302 return sql
303
304 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
305 row_number=False):
306 """
307 Fetch all results after perform SQL query and
308 """
309 add_extra_ids = (admin_name != None)
310 if not query:
311 sql = self.get_raw_query(limit=limit, offset=offset,
312 add_extra_ids=add_extra_ids)
313 else:
314 sql = query
315 if settings.DEBUG:
316 print sql
317 cursor = self._db_connection.cursor()
318 cursor.execute(sql, tuple(self._params))
319 query_results = cursor.fetchall()
320 if admin_name:
321 selects = self._get_selects_with_extra_ids()
322 results = []
323 try:
324 offset = int(offset)
325 except ValueError:
326 offset = 0
327 for r, row in enumerate(query_results):
328 i = 0
329 l = len(row)
330 if row_number:
331 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
332 else:
333 result = []
334 while i < l:
335 appmodel, field = selects[i].split(".")
336 appmodel = self._unquote_name(appmodel)
337 field = self._unquote_name(field)
338 try:
339 if appmodel in self._models:
340 _model = self._models[appmodel]
341 _appmodel = u"%s_%s" % (_model._meta.app_label,
342 _model._meta.module_name)
343 else:
344 _appmodel = appmodel
345 admin_url = reverse("%s:%s_change" % (admin_name,
346 _appmodel),
347 args=[row[i + 1]])
348 except NoReverseMatch:
349 admin_url = None
350 result.append((row[i], admin_url))
351 i += 2
352 results.append(result)
353 return results
354 else:
355 if row_number:
356 results = []
357 for r, row in enumerate(query_results):
358 result = [r + 1]
359 for cell in row:
360 result.append(cell)
361 results.append(result)
362 return results
363 else:
364 return query_results
365
366 def get_count(self):
367 query = self.get_raw_query(count=True)
368 results = self.get_results(query=query)
369 if results:
370 return float(results[0][0])
371 else:
372 return len(self.get_results())
373
374
375 def get_model(self, db_prefix, model):
376 klass = get_model(db_prefix, model)
377 if klass is None:
378 db_model = "%s_%s" % (db_prefix, model)
379 for table in self._models.keys():
380 if table == db_model:
381 return self._models[table]
382 return klass
383
384 def get_labels(self, add_extra_ids=False, row_number=False):
385 if row_number:
386 labels = [_(u"#")]
387 else:
388 labels = []
389 if add_extra_ids:
390 selects = self._get_selects_with_extra_ids()
391 else:
392 selects = self._selects
393 if selects and isinstance(selects, (tuple, list)):
394 for select in selects:
395 label_splits = select.replace("`", "").replace("_", ".").split(".")
396 # restore underscore for fields which use it
397 label_field = "_".join(label_splits[2:])
398 model = self.get_model(label_splits[0], label_splits[1])
399 label = model._meta.get_field_by_name(label_field)[0].verbose_name
400 labels.append(label.capitalize())
401 return labels
402
403 def _unquote_name(self, name):
404 quoted_space = self._db_operations.quote_name("")
405 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
406 return name[1:-1]
407 return name
408
409 def _get_lookup(self, operator, over):
410 lookup = Field().get_db_prep_lookup(operator, over,
411 connection=self._db_connection,
412 prepared=True)
413 if isinstance(lookup, (tuple, list)):
414 return lookup[0]
415 return lookup
416
417 def _get_selects_with_extra_ids(self):
418 qn = self._db_operations.quote_name
419 selects = []
420 for select in self._selects:
421 appmodel, field = select.split(".")
422 appmodel = self._unquote_name(appmodel)
423 field = self._unquote_name(field)
424 selects.append(select)
425 if appmodel in self._models:
426 pk_name = self._models[appmodel]._meta.pk.name
427 else:
428 pk_name = u"id"
429 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
430 return selects
431
432 QueryByExampleFormSet = formset_factory(QueryByExampleForm,
433 formset=BaseQueryByExampleFormSet,
434 extra=1,
435 can_delete=True)