Routeur: Ajout temporaire des modules workflow et dae en écriture
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
1 # -*- coding: utf-8 -*-
2 from django import forms
3 from django.db import connections
4 from django.db.models import get_model
5 from django.db.models.fields import Field
6 from django.core.urlresolvers import reverse, NoReverseMatch
7 from django.conf import settings
8 from django.forms.formsets import BaseFormSet, formset_factory
9 from django.utils.importlib import import_module
10 from django.utils.translation import ugettext as _
11 from django.contrib.contenttypes.models import ContentType
12
13 from django_qbe.utils import get_models
14 from django_qbe.widgets import CriteriaInput
15
16
17 DATABASES = None
18 try:
19 DATABASES = settings.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
22 DATABASES = {
23 'default': {
24 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
25 'NAME': settings.DATABASE_NAME,
26 }
27 }
28
29 SORT_CHOICES = (
30 ("", ""),
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
33 )
34
35 STATUS_CHOICES = (
36 ("", "Tous"),
37 ("inactive", "Inactifs"),
38 ("active", "Actifs"),
39 ("futur", "Futurs"),
40 ("unknown", "Inconnus"),
41 )
42
43
44 class QueryByExampleForm(forms.Form):
45 show = forms.BooleanField(label=_("Show"), required=False)
46 model = forms.CharField(label=_("Model"))
47 field = forms.CharField(label=_("Field"))
48 criteria = forms.CharField(label=_("Criteria"), required=False)
49 status = forms.ChoiceField(label=_("Statut"), choices=STATUS_CHOICES,
50 required=False)
51 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
52 required=False)
53
54 def __init__(self, *args, **kwargs):
55 super(QueryByExampleForm, self).__init__(*args, **kwargs)
56 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
57 self.fields['model'].widget = model_widget
58 sort_widget = forms.Select(attrs={'disabled': "disabled",
59 'class': 'submitIfChecked'},
60 choices=SORT_CHOICES)
61 self.fields['sort'].widget = sort_widget
62 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
63 self.fields['criteria'].widget = criteria_widget
64 criteria_widgets = getattr(criteria_widget, "widgets", [])
65 if criteria_widgets:
66 criteria_len = len(criteria_widgets)
67 criteria_names = ",".join([("criteria_%s" % s)
68 for s in range(0, criteria_len)])
69 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
70 else:
71 field_attr_class = "qbeFillFields enable:sort,criteria"
72 status_widget = forms.Select(attrs={'disabled': "disabled",
73 'class': 'hidden'},
74 choices=STATUS_CHOICES)
75 self.fields['status'].widget = status_widget
76 field_widget = forms.Select(attrs={'class': field_attr_class})
77 self.fields['field'].widget = field_widget
78
79 def clean_model(self):
80 model = self.cleaned_data['model']
81 return model.lower().replace(".", "_")
82
83 def clean_criteria(self):
84 criteria = self.cleaned_data['criteria']
85 try:
86 operator, over = eval(criteria, {}, {})
87 return (operator, over)
88 except:
89 return (None, None)
90
91
92 class BaseQueryByExampleFormSet(BaseFormSet):
93 _selects = []
94 _froms = []
95 _wheres = []
96 _statuses = []
97 _sorts = []
98 _params = []
99 _models = {}
100 _raw_query = None
101 _db_alias = "default"
102 _db_operators = {}
103 _db_table_names = []
104 _db_operations = None
105
106 def __init__(self, *args, **kwargs):
107 self._db_alias = kwargs.pop("using", "default")
108 self._db_connection = connections["default"]
109 database_properties = DATABASES.get(self._db_alias, "default")
110 module = database_properties['ENGINE']
111 try:
112 base_mod = import_module("%s.base" % module)
113 intros_mod = import_module("%s.introspection" % module)
114 except ImportError:
115 pass
116 if base_mod and intros_mod:
117 self._db_operators = base_mod.DatabaseWrapper.operators
118 DatabaseOperations = base_mod.DatabaseOperations
119 try:
120 self._db_operations = DatabaseOperations(self._db_connection)
121 except TypeError:
122 # Some engines have no params to instance DatabaseOperations
123 self._db_operations = DatabaseOperations()
124 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
125 django_table_names = intros_db.django_table_names()
126 table_names = intros_db.table_names()
127 self._db_table_names = list(django_table_names.union(table_names))
128 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
129
130 def clean(self):
131 """
132 Checks that there is almost one field to select
133 """
134 if any(self.errors):
135 # Don't bother validating the formset unless each form is valid on
136 # its own
137 return
138 selects, froms, wheres, sorts, params, statuses = self.get_query_parts()
139 if not selects:
140 validation_message = _(u"At least you must check a row to get.")
141 raise forms.ValidationError, validation_message
142 self._selects = selects
143 self._froms = froms
144 self._wheres = wheres
145 self._sorts = sorts
146 self._params = params
147 self._statuses = statuses
148
149 def translate_model_to_db_table(self, model_name):
150 """
151 Ensure the full model name match the DB table name (not app_label
152 name).
153 """
154 app_label, name = model_name.split("_")
155 try:
156 ct = ContentType.objects.get(app_label=app_label, model=name)
157 model = ct.model_class()
158 return model._meta.db_table
159 except:
160 return model_name
161
162 def get_query_parts(self):
163 """
164 Return SQL query for cleaned data
165 """
166 selects = []
167 froms = []
168 wheres = []
169 sorts = []
170 params = []
171 statuses = []
172 app_model_labels = None
173 lookup_cast = self._db_operations.lookup_cast
174 qn = self._db_operations.quote_name
175 uqn = self._unquote_name
176 for data in self.cleaned_data:
177 if not ("model" in data and "field" in data):
178 break
179 model = data["model"]
180 # HACK: Workaround to handle tables created
181 # by django for its own
182 if not app_model_labels:
183 app_models = get_models(include_auto_created=True,
184 include_deferred=True)
185 app_model_labels = [u"%s_%s" % (a._meta.app_label,
186 a._meta.module_name)
187 for a in app_models]
188 if model in app_model_labels:
189 position = app_model_labels.index(model)
190 model = app_models[position]._meta.db_table
191 self._models[model] = app_models[position]
192 field = data["field"]
193 show = data["show"]
194 criteria = data["criteria"]
195 sort = data["sort"]
196 status = data["status"]
197 db_field = u"%s.%s" % (qn(model), qn(field))
198 operator, over = criteria
199 try:
200 is_join = operator.lower() == 'join'
201 except:
202 is_join = False
203
204 if show and not is_join:
205 try:
206 self._models[model]._meta.get_field(field)
207 selects.append(db_field)
208 except:
209 # dynamic field can't be viewable
210 pass
211 if sort:
212 sorts.append((db_field, sort))
213 if status:
214 statuses.append((model, status))
215
216 if all(criteria):
217 if is_join:
218 over_split = over.lower().rsplit(".", 1)
219 join_model = qn(self.translate_model_to_db_table(over_split[0].replace(".", "_")))
220 join_field = qn(over_split[1])
221
222 if model in self._models:
223 _field = self._models[model]._meta.get_field(field)
224 try:
225 _db_column = qn(_field.db_column)
226 except:
227 _db_column = qn(_field.attname)
228
229 join = u"%s.%s = %s.%s" \
230 % (join_model, join_field, qn(model),
231 qn(_db_column))
232 else:
233 join = u"%s.%s = %s" \
234 % (join_model, join_field,
235 u"%s_id" % db_field)
236 if (join not in wheres
237 and uqn(join_model) in self._db_table_names):
238 wheres.append(join)
239 if join_model not in froms:
240 froms.append(join_model)
241 # join_select = u"%s.%s" % (join_model, join_field)
242 # if join_select not in selects:
243 # selects.append(join_select)
244 elif operator in self._db_operators:
245 # db_operator = self._db_operators[operator] % over
246 db_operator = self._db_operators[operator]
247 lookup = self._get_lookup(operator, over)
248 params.append(lookup)
249 wheres.append(u"%s %s" \
250 % (lookup_cast(operator) % db_field,
251 db_operator))
252 if qn(model) not in froms and model in self._db_table_names:
253 froms.append(qn(model))
254 return selects, froms, wheres, sorts, params, statuses
255
256 def get_raw_query(self, limit=None, offset=None, count=False,
257 add_extra_ids=False, add_params=False):
258 qn = self._db_operations.quote_name
259 if self._raw_query:
260 return self._raw_query
261 if self._sorts:
262 order_by = u"ORDER BY %s" % (", ".join([" ".join(x) for x in self._sorts]))
263 else:
264 order_by = u""
265 _my_wheres = self._wheres
266 if self._statuses:
267 for m, s in self._statuses:
268 if m not in ('rh_employe', 'rh_dossier', 'rh_poste'):
269 continue
270 # Test cas spécial: Pour état employé, vérifier via les dossiers.
271 if m == 'rh_employe':
272 m = qn('rh_dossier')
273 if m not in self._froms:
274 self._froms.append(m)
275 _my_wheres.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
276 if s == "inactive":
277 _my_wheres.append("%s.date_fin < DATE(NOW())" % m)
278 if s == "active":
279 _my_wheres.append(
280 "(((%s.`date_debut` <= DATE(NOW()) OR %s.`date_debut` IS NULL) AND %s.`date_fin` >= DATE(NOW())) OR "
281 "((%s.`date_fin` >= DATE(NOW()) OR %s.`date_fin` IS NULL) AND %s.`date_debut` <= DATE(NOW())) OR "
282 "(%s.`date_debut` <= DATE(NOW()) AND %s.`date_fin` >= DATE(NOW())))"
283 % (m, m, m, m, m, m, m, m))
284 if s == "futur":
285 _my_wheres.append("%s.date_debut > DATE(NOW())" % m)
286 if s == "unknown":
287 _my_wheres.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m, m))
288 if _my_wheres:
289 wheres = u"WHERE %s" % (" AND ".join(_my_wheres))
290 else:
291 wheres = u""
292 if count:
293 selects = (u"COUNT(*) as count", )
294 order_by = u""
295 elif add_extra_ids:
296 selects = self._get_selects_with_extra_ids()
297 else:
298 selects = self._selects
299 limits = u""
300 if limit:
301 try:
302 limits = u"LIMIT %s" % int(limit)
303 except ValueError:
304 pass
305 offsets = u""
306 if offset:
307 try:
308 offsets = u"OFFSET %s" % int(offset)
309 except ValueError:
310 pass
311 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
312 % (", ".join(selects),
313 ", ".join(self._froms),
314 wheres,
315 order_by,
316 limits,
317 offsets)
318 if add_params:
319 return u"%s /* %s */" % (sql, ", ".join(self._params))
320 else:
321 return sql
322
323 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
324 row_number=False):
325 """
326 Fetch all results after perform SQL query and
327 """
328 add_extra_ids = (admin_name != None)
329 if not query:
330 sql = self.get_raw_query(limit=limit, offset=offset,
331 add_extra_ids=add_extra_ids)
332 else:
333 sql = query
334 if settings.DEBUG:
335 print sql
336 cursor = self._db_connection.cursor()
337 cursor.execute(sql, tuple(self._params))
338 query_results = cursor.fetchall()
339 if admin_name:
340 selects = self._get_selects_with_extra_ids()
341 results = []
342 try:
343 offset = int(offset)
344 except ValueError:
345 offset = 0
346 for r, row in enumerate(query_results):
347 i = 0
348 l = len(row)
349 if row_number:
350 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
351 else:
352 result = []
353 while i < l:
354 appmodel, field = selects[i].split(".")
355 appmodel = self._unquote_name(appmodel)
356 field = self._unquote_name(field)
357 try:
358 if appmodel in self._models:
359 _model = self._models[appmodel]
360 _appmodel = u"%s_%s" % (_model._meta.app_label,
361 _model._meta.module_name)
362 else:
363 _appmodel = appmodel
364 admin_url = reverse("%s:%s_change" % (admin_name,
365 _appmodel),
366 args=[row[i + 1]])
367 except NoReverseMatch:
368 admin_url = None
369 result.append((row[i], admin_url))
370 i += 2
371 results.append(result)
372 return results
373 else:
374 if row_number:
375 results = []
376 for r, row in enumerate(query_results):
377 result = [r + 1]
378 for cell in row:
379 result.append(cell)
380 results.append(result)
381 return results
382 else:
383 return query_results
384
385 def get_count(self):
386 query = self.get_raw_query(count=True)
387 results = self.get_results(query=query)
388 if results:
389 return float(results[0][0])
390 else:
391 return len(self.get_results())
392
393
394 def get_model(self, db_prefix, model):
395 klass = get_model(db_prefix, model)
396 if klass is None:
397 db_model = "%s_%s" % (db_prefix, model)
398 for table in self._models.keys():
399 if table == db_model:
400 return self._models[table]
401 return klass
402
403 def get_labels(self, add_extra_ids=False, row_number=False):
404 if row_number:
405 labels = [_(u"#")]
406 else:
407 labels = []
408 if add_extra_ids:
409 selects = self._get_selects_with_extra_ids()
410 else:
411 selects = self._selects
412 if selects and isinstance(selects, (tuple, list)):
413 for select in selects:
414 label_splits = select.replace("`", "").replace("_", ".").split(".")
415 # restore underscore for fields which use it
416 label_field = "_".join(label_splits[2:])
417 model = self.get_model(label_splits[0], label_splits[1])
418 label = model._meta.get_field_by_name(label_field)[0].verbose_name
419 labels.append(label.capitalize())
420 return labels
421
422 def _unquote_name(self, name):
423 quoted_space = self._db_operations.quote_name("")
424 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
425 return name[1:-1]
426 return name
427
428 def _get_lookup(self, operator, over):
429 lookup = Field().get_db_prep_lookup(operator, over,
430 connection=self._db_connection,
431 prepared=True)
432 if isinstance(lookup, (tuple, list)):
433 return lookup[0]
434 return lookup
435
436 def _get_selects_with_extra_ids(self):
437 qn = self._db_operations.quote_name
438 selects = []
439 for select in self._selects:
440 appmodel, field = select.split(".")
441 appmodel = self._unquote_name(appmodel)
442 field = self._unquote_name(field)
443 selects.append(select)
444 if appmodel in self._models:
445 pk_name = self._models[appmodel]._meta.pk.name
446 else:
447 pk_name = u"id"
448 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
449 return selects
450
451 QueryByExampleFormSet = formset_factory(QueryByExampleForm,
452 formset=BaseQueryByExampleFormSet,
453 extra=1,
454 can_delete=True)