1 # -*- coding: utf-8 -*-
2 from django
import forms
3 from django
.db
import connections
4 from django
.db
.models
import get_model
5 from django
.db
.models
.fields
import Field
6 from django
.core
.urlresolvers
import reverse
, NoReverseMatch
7 from django
.conf
import settings
8 from django
.forms
.formsets
import BaseFormSet
, formset_factory
9 from django
.utils
.importlib
import import_module
10 from django
.utils
.translation
import ugettext
as _
11 from django
.contrib
.contenttypes
.models
import ContentType
13 from django_qbe
.utils
import get_models
14 from django_qbe
.widgets
import CriteriaInput
19 DATABASES
= settings
.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
24 'ENGINE': "django.db.backends.%s" % settings
.DATABASE_ENGINE
,
25 'NAME': settings
.DATABASE_NAME
,
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
37 ("inactive", "Inactifs"),
40 ("unknown", "Inconnus"),
44 class QueryByExampleForm(forms
.Form
):
45 show
= forms
.BooleanField(label
=_("Show"), required
=False)
46 model
= forms
.CharField(label
=_("Model"))
47 field
= forms
.CharField(label
=_("Field"))
48 criteria
= forms
.CharField(label
=_("Criteria"), required
=False)
49 status
= forms
.ChoiceField(label
=_("Statut"), choices
=STATUS_CHOICES
,
51 sort
= forms
.ChoiceField(label
=_("Sort"), choices
=SORT_CHOICES
,
54 def __init__(self
, *args
, **kwargs
):
55 super(QueryByExampleForm
, self
).__init__(*args
, **kwargs
)
56 model_widget
= forms
.Select(attrs
={'class': "qbeFillModels to:field"})
57 self
.fields
['model'].widget
= model_widget
58 sort_widget
= forms
.Select(attrs
={'disabled': "disabled",
59 'class': 'submitIfChecked'},
61 self
.fields
['sort'].widget
= sort_widget
62 criteria_widget
= CriteriaInput(attrs
={'disabled': "disabled"})
63 self
.fields
['criteria'].widget
= criteria_widget
64 criteria_widgets
= getattr(criteria_widget
, "widgets", [])
66 criteria_len
= len(criteria_widgets
)
67 criteria_names
= ",".join([("criteria_%s" % s
)
68 for s
in range(0, criteria_len
)])
69 field_attr_class
= "qbeFillFields enable:sort,%s" % criteria_names
71 field_attr_class
= "qbeFillFields enable:sort,criteria"
72 status_widget
= forms
.Select(attrs
={'disabled': "disabled",
74 choices
=STATUS_CHOICES
)
75 self
.fields
['status'].widget
= status_widget
76 field_widget
= forms
.Select(attrs
={'class': field_attr_class
})
77 self
.fields
['field'].widget
= field_widget
79 def clean_model(self
):
80 model
= self
.cleaned_data
['model']
81 return model
.lower().replace(".", "_")
83 def clean_criteria(self
):
84 criteria
= self
.cleaned_data
['criteria']
86 operator
, over
= eval(criteria
, {}, {})
87 return (operator
, over
)
92 class BaseQueryByExampleFormSet(BaseFormSet
):
101 _db_alias
= "default"
104 _db_operations
= None
106 def __init__(self
, *args
, **kwargs
):
107 self
._db_alias
= kwargs
.pop("using", "default")
108 self
._db_connection
= connections
["default"]
109 database_properties
= DATABASES
.get(self
._db_alias
, "default")
110 module
= database_properties
['ENGINE']
112 base_mod
= import_module("%s.base" % module
)
113 intros_mod
= import_module("%s.introspection" % module
)
116 if base_mod
and intros_mod
:
117 self
._db_operators
= base_mod
.DatabaseWrapper
.operators
118 DatabaseOperations
= base_mod
.DatabaseOperations
120 self
._db_operations
= DatabaseOperations(self
._db_connection
)
122 # Some engines have no params to instance DatabaseOperations
123 self
._db_operations
= DatabaseOperations()
124 intros_db
= intros_mod
.DatabaseIntrospection(self
._db_connection
)
125 django_table_names
= intros_db
.django_table_names()
126 table_names
= intros_db
.table_names()
127 self
._db_table_names
= list(django_table_names
.union(table_names
))
128 super(BaseQueryByExampleFormSet
, self
).__init__(*args
, **kwargs
)
132 Checks that there is almost one field to select
135 # Don't bother validating the formset unless each form is valid on
138 selects
, froms
, wheres
, sorts
, params
, statuses
= self
.get_query_parts()
140 validation_message
= _(u
"At least you must check a row to get.")
141 raise forms
.ValidationError
, validation_message
142 self
._selects
= selects
144 self
._wheres
= wheres
146 self
._params
= params
147 self
._statuses
= statuses
149 def translate_model_to_db_table(self
, model_name
):
151 Ensure the full model name match the DB table name (not app_label
154 app_label
, name
= model_name
.split("_")
156 ct
= ContentType
.objects
.get(app_label
=app_label
, model
=name
)
157 model
= ct
.model_class()
158 return model
._meta
.db_table
162 def get_query_parts(self
):
164 Return SQL query for cleaned data
172 app_model_labels
= None
173 lookup_cast
= self
._db_operations
.lookup_cast
174 qn
= self
._db_operations
.quote_name
175 uqn
= self
._unquote_name
176 for data
in self
.cleaned_data
:
177 if not ("model" in data
and "field" in data
):
179 model
= data
["model"]
180 # HACK: Workaround to handle tables created
181 # by django for its own
182 if not app_model_labels
:
183 app_models
= get_models(include_auto_created
=True,
184 include_deferred
=True)
185 app_model_labels
= [u
"%s_%s" % (a
._meta
.app_label
,
188 if model
in app_model_labels
:
189 position
= app_model_labels
.index(model
)
190 model
= app_models
[position
]._meta
.db_table
191 self
._models
[model
] = app_models
[position
]
192 field
= data
["field"]
194 criteria
= data
["criteria"]
196 status
= data
["status"]
197 db_field
= u
"%s.%s" % (qn(model
), qn(field
))
198 operator
, over
= criteria
200 is_join
= operator
.lower() == 'join'
203 if show
and not is_join
:
204 selects
.append(db_field
)
206 sorts
.append((db_field
, sort
))
208 statuses
.append((model
, status
))
212 over_split
= over
.lower().rsplit(".", 1)
213 join_model
= qn(self
.translate_model_to_db_table(over_split
[0].replace(".", "_")))
214 join_field
= qn(over_split
[1])
216 if model
in self
._models
:
217 _field
= self
._models
[model
]._meta
.get_field(field
)
219 _db_column
= qn(_field
.db_column
)
221 _db_column
= qn(_field
.attname
)
223 join
= u
"%s.%s = %s.%s" \
224 % (join_model
, join_field
, qn(model
),
227 join
= u
"%s.%s = %s" \
228 % (join_model
, join_field
,
230 if (join
not in wheres
231 and uqn(join_model
) in self
._db_table_names
):
233 if join_model
not in froms
:
234 froms
.append(join_model
)
235 # join_select = u"%s.%s" % (join_model, join_field)
236 # if join_select not in selects:
237 # selects.append(join_select)
238 elif operator
in self
._db_operators
:
239 # db_operator = self._db_operators[operator] % over
240 db_operator
= self
._db_operators
[operator
]
241 lookup
= self
._get_lookup(operator
, over
)
242 params
.append(lookup
)
243 wheres
.append(u
"%s %s" \
244 % (lookup_cast(operator
) % db_field
,
246 if qn(model
) not in froms
and model
in self
._db_table_names
:
247 froms
.append(qn(model
))
248 return selects
, froms
, wheres
, sorts
, params
, statuses
250 def get_raw_query(self
, limit
=None, offset
=None, count
=False,
251 add_extra_ids
=False, add_params
=False):
252 qn
= self
._db_operations
.quote_name
254 return self
._raw_query
256 order_by
= u
"ORDER BY %s" % (", ".join([" ".join(x
) for x
in self
._sorts
]))
259 _my_wheres
= self
._wheres
261 for m
, s
in self
._statuses
:
262 # Test cas spécial: Pour état employé, vérifier via les dossiers.
263 if m
== 'rh_employe':
265 if m
not in self
._froms
:
266 self
._froms
.append(m
)
267 _my_wheres
.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
269 _my_wheres
.append("%s.date_fin < DATE(NOW())" % m
)
272 "(((%s.`date_debut` <= DATE(NOW()) OR %s.`date_debut` IS NULL) AND %s.`date_fin` >= DATE(NOW())) OR "
273 "((%s.`date_fin` >= DATE(NOW()) OR %s.`date_fin` IS NULL) AND %s.`date_debut` <= DATE(NOW())) OR "
274 "(%s.`date_debut` <= DATE(NOW()) AND %s.`date_fin` >= DATE(NOW())))"
275 % (m
, m
, m
, m
, m
, m
, m
, m
))
277 _my_wheres
.append("%s.date_debut > DATE(NOW())" % m
)
279 _my_wheres
.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m
, m
))
281 wheres
= u
"WHERE %s" % (" AND ".join(_my_wheres
))
285 selects
= (u
"COUNT(*) as count", )
288 selects
= self
._get_selects_with_extra_ids()
290 selects
= self
._selects
294 limits
= u
"LIMIT %s" % int(limit
)
300 offsets
= u
"OFFSET %s" % int(offset
)
303 sql
= u
"""SELECT %s FROM %s %s %s %s %s;""" \
304 % (", ".join(selects
),
305 ", ".join(self
._froms
),
311 return u
"%s /* %s */" % (sql
, ", ".join(self
._params
))
315 def get_results(self
, limit
=None, offset
=None, query
=None, admin_name
=None,
318 Fetch all results after perform SQL query and
320 add_extra_ids
= (admin_name
!= None)
322 sql
= self
.get_raw_query(limit
=limit
, offset
=offset
,
323 add_extra_ids
=add_extra_ids
)
328 cursor
= self
._db_connection
.cursor()
329 cursor
.execute(sql
, tuple(self
._params
))
330 query_results
= cursor
.fetchall()
332 selects
= self
._get_selects_with_extra_ids()
338 for r
, row
in enumerate(query_results
):
342 result
= [(r
+ offset
+ 1, u
"#row%s" % (r
+ offset
+ 1))]
346 appmodel
, field
= selects
[i
].split(".")
347 appmodel
= self
._unquote_name(appmodel
)
348 field
= self
._unquote_name(field
)
350 if appmodel
in self
._models
:
351 _model
= self
._models
[appmodel
]
352 _appmodel
= u
"%s_%s" % (_model
._meta
.app_label
,
353 _model
._meta
.module_name
)
356 admin_url
= reverse("%s:%s_change" % (admin_name
,
359 except NoReverseMatch
:
361 result
.append((row
[i
], admin_url
))
363 results
.append(result
)
368 for r
, row
in enumerate(query_results
):
372 results
.append(result
)
378 query
= self
.get_raw_query(count
=True)
379 results
= self
.get_results(query
=query
)
381 return float(results
[0][0])
383 return len(self
.get_results())
386 def get_model(self
, db_prefix
, model
):
387 klass
= get_model(db_prefix
, model
)
389 db_model
= "%s_%s" % (db_prefix
, model
)
390 for table
in self
._models
.keys():
391 if table
== db_model
:
392 return self
._models
[table
]
395 def get_labels(self
, add_extra_ids
=False, row_number
=False):
401 selects
= self
._get_selects_with_extra_ids()
403 selects
= self
._selects
404 if selects
and isinstance(selects
, (tuple, list)):
405 for select
in selects
:
406 label_splits
= select
.replace("`", "").replace("_", ".").split(".")
407 # restore underscore for fields which use it
408 label_field
= "_".join(label_splits
[2:])
409 model
= self
.get_model(label_splits
[0], label_splits
[1])
410 label
= model
._meta
.get_field_by_name(label_field
)[0].verbose_name
411 labels
.append(label
.capitalize())
414 def _unquote_name(self
, name
):
415 quoted_space
= self
._db_operations
.quote_name("")
416 if name
.startswith(quoted_space
[0]) and name
.endswith(quoted_space
[1]):
420 def _get_lookup(self
, operator
, over
):
421 lookup
= Field().get_db_prep_lookup(operator
, over
,
422 connection
=self
._db_connection
,
424 if isinstance(lookup
, (tuple, list)):
428 def _get_selects_with_extra_ids(self
):
429 qn
= self
._db_operations
.quote_name
431 for select
in self
._selects
:
432 appmodel
, field
= select
.split(".")
433 appmodel
= self
._unquote_name(appmodel
)
434 field
= self
._unquote_name(field
)
435 selects
.append(select
)
436 if appmodel
in self
._models
:
437 pk_name
= self
._models
[appmodel
]._meta
.pk
.name
440 selects
.append("%s.%s" % (qn(appmodel
), qn(pk_name
)))
443 QueryByExampleFormSet
= formset_factory(QueryByExampleForm
,
444 formset
=BaseQueryByExampleFormSet
,