1 # -*- coding: utf-8 -*-
2 from django
import forms
3 from django
.db
import connections
4 from django
.db
.models
import get_model
5 from django
.db
.models
.fields
import Field
6 from django
.core
.urlresolvers
import reverse
, NoReverseMatch
7 from django
.conf
import settings
8 from django
.forms
.formsets
import BaseFormSet
, formset_factory
9 from django
.utils
.importlib
import import_module
10 from django
.utils
.translation
import ugettext
as _
11 from django
.contrib
.contenttypes
.models
import ContentType
13 from django_qbe
.utils
import get_models
14 from django_qbe
.widgets
import CriteriaInput
19 DATABASES
= settings
.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
24 'ENGINE': "django.db.backends.%s" % settings
.DATABASE_ENGINE
,
25 'NAME': settings
.DATABASE_NAME
,
31 ("asc", _("Ascending")),
32 ("desc", _("Descending")),
37 ("inactive", "Inactifs"),
40 ("unknown", "Inconnus"),
44 class QueryByExampleForm(forms
.Form
):
45 show
= forms
.BooleanField(label
=_("Show"), required
=False)
46 model
= forms
.CharField(label
=_("Model"))
47 status
= forms
.ChoiceField(label
=_("Statut"), choices
=STATUS_CHOICES
,
49 field
= forms
.CharField(label
=_("Field"))
50 criteria
= forms
.CharField(label
=_("Criteria"), required
=False)
51 sort
= forms
.ChoiceField(label
=_("Sort"), choices
=SORT_CHOICES
,
54 def __init__(self
, *args
, **kwargs
):
55 super(QueryByExampleForm
, self
).__init__(*args
, **kwargs
)
56 model_widget
= forms
.Select(attrs
={'class': "qbeFillModels to:field"})
57 self
.fields
['model'].widget
= model_widget
58 sort_widget
= forms
.Select(attrs
={'disabled': "disabled",
59 'class': 'submitIfChecked'},
61 self
.fields
['sort'].widget
= sort_widget
62 criteria_widget
= CriteriaInput(attrs
={'disabled': "disabled"})
63 self
.fields
['criteria'].widget
= criteria_widget
64 criteria_widgets
= getattr(criteria_widget
, "widgets", [])
66 criteria_len
= len(criteria_widgets
)
67 criteria_names
= ",".join([("criteria_%s" % s
)
68 for s
in range(0, criteria_len
)])
69 field_attr_class
= "qbeFillFields enable:sort,%s" % criteria_names
71 field_attr_class
= "qbeFillFields enable:sort,criteria"
72 field_widget
= forms
.Select(attrs
={'class': field_attr_class
})
73 self
.fields
['field'].widget
= field_widget
75 def clean_model(self
):
76 model
= self
.cleaned_data
['model']
77 return model
.lower().replace(".", "_")
79 def clean_criteria(self
):
80 criteria
= self
.cleaned_data
['criteria']
82 operator
, over
= eval(criteria
, {}, {})
83 return (operator
, over
)
88 class BaseQueryByExampleFormSet(BaseFormSet
):
100 _db_operations
= None
102 def __init__(self
, *args
, **kwargs
):
103 self
._db_alias
= kwargs
.pop("using", "default")
104 self
._db_connection
= connections
["default"]
105 database_properties
= DATABASES
.get(self
._db_alias
, "default")
106 module
= database_properties
['ENGINE']
108 base_mod
= import_module("%s.base" % module
)
109 intros_mod
= import_module("%s.introspection" % module
)
112 if base_mod
and intros_mod
:
113 self
._db_operators
= base_mod
.DatabaseWrapper
.operators
114 DatabaseOperations
= base_mod
.DatabaseOperations
116 self
._db_operations
= DatabaseOperations(self
._db_connection
)
118 # Some engines have no params to instance DatabaseOperations
119 self
._db_operations
= DatabaseOperations()
120 intros_db
= intros_mod
.DatabaseIntrospection(self
._db_connection
)
121 django_table_names
= intros_db
.django_table_names()
122 table_names
= intros_db
.table_names()
123 self
._db_table_names
= list(django_table_names
.union(table_names
))
124 super(BaseQueryByExampleFormSet
, self
).__init__(*args
, **kwargs
)
128 Checks that there is almost one field to select
131 # Don't bother validating the formset unless each form is valid on
134 selects
, froms
, wheres
, sorts
, params
, statuses
= self
.get_query_parts()
136 validation_message
= _(u
"At least you must check a row to get.")
137 raise forms
.ValidationError
, validation_message
138 self
._selects
= selects
140 self
._wheres
= wheres
142 self
._params
= params
143 self
._statuses
= statuses
145 def translate_model_to_db_table(self
, model_name
):
147 Ensure the full model name match the DB table name (not app_label
150 app_label
, name
= model_name
.split("_")
152 ct
= ContentType
.objects
.get(app_label
=app_label
, model
=name
)
153 model
= ct
.model_class()
154 return model
._meta
.db_table
158 def get_query_parts(self
):
160 Return SQL query for cleaned data
168 app_model_labels
= None
169 lookup_cast
= self
._db_operations
.lookup_cast
170 qn
= self
._db_operations
.quote_name
171 uqn
= self
._unquote_name
172 for data
in self
.cleaned_data
:
173 if not ("model" in data
and "field" in data
):
175 model
= data
["model"]
176 # HACK: Workaround to handle tables created
177 # by django for its own
178 if not app_model_labels
:
179 app_models
= get_models(include_auto_created
=True,
180 include_deferred
=True)
181 app_model_labels
= [u
"%s_%s" % (a
._meta
.app_label
,
184 if model
in app_model_labels
:
185 position
= app_model_labels
.index(model
)
186 model
= app_models
[position
]._meta
.db_table
187 self
._models
[model
] = app_models
[position
]
188 field
= data
["field"]
190 criteria
= data
["criteria"]
192 status
= data
["status"]
193 db_field
= u
"%s.%s" % (qn(model
), qn(field
))
194 operator
, over
= criteria
195 is_join
= operator
.lower() == 'join'
196 if show
and not is_join
:
197 selects
.append(db_field
)
199 sorts
.append((db_field
, sort
))
201 statuses
.append((model
, status
))
204 over_split
= over
.lower().rsplit(".", 1)
205 join_model
= qn(self
.translate_model_to_db_table(over_split
[0].replace(".", "_")))
206 join_field
= qn(over_split
[1])
208 if model
in self
._models
:
209 _field
= self
._models
[model
]._meta
.get_field(field
)
211 _db_column
= qn(_field
.db_column
)
213 _db_column
= qn(_field
.attname
)
215 join
= u
"%s.%s = %s.%s" \
216 % (join_model
, join_field
, qn(model
),
219 join
= u
"%s.%s = %s" \
220 % (join_model
, join_field
,
222 if (join
not in wheres
223 and uqn(join_model
) in self
._db_table_names
):
225 if join_model
not in froms
:
226 froms
.append(join_model
)
227 # join_select = u"%s.%s" % (join_model, join_field)
228 # if join_select not in selects:
229 # selects.append(join_select)
230 elif operator
in self
._db_operators
:
231 # db_operator = self._db_operators[operator] % over
232 db_operator
= self
._db_operators
[operator
]
233 lookup
= self
._get_lookup(operator
, over
)
234 params
.append(lookup
)
235 wheres
.append(u
"%s %s" \
236 % (lookup_cast(operator
) % db_field
,
238 if qn(model
) not in froms
and model
in self
._db_table_names
:
239 froms
.append(qn(model
))
240 return selects
, froms
, wheres
, sorts
, params
, statuses
242 def get_raw_query(self
, limit
=None, offset
=None, count
=False,
243 add_extra_ids
=False, add_params
=False):
244 qn
= self
._db_operations
.quote_name
246 return self
._raw_query
248 order_by
= u
"ORDER BY %s" % (", ".join([" ".join(x
) for x
in self
._sorts
]))
251 _my_wheres
= self
._wheres
253 for m
, s
in self
._statuses
:
254 # Test cas spécial: Pour état employé, vérifier via les dossiers.
255 if m
== 'rh_employe':
257 if m
not in self
._froms
:
258 self
._froms
.append(m
)
259 _my_wheres
.append("`rh_employe`.`id` = `rh_dossier`.`employe`")
261 _my_wheres
.append("%s.date_fin < DATE(NOW())" % m
)
263 _my_wheres
.append("(%s.date_debut IS NULL OR %s.date_debut <= DATE(NOW())) AND (%s.date_fin IS NULL OR %s.date_fin >= DATE(NOW()))" %
266 _my_wheres
.append("%s.date_debut > DATE(NOW())" % m
)
268 _my_wheres
.append("(%s.date_debut IS NULL AND %s.date_fin IS NULL)" % (m
, m
))
270 wheres
= u
"WHERE %s" % (" AND ".join(_my_wheres
))
274 selects
= (u
"COUNT(*) as count", )
277 selects
= self
._get_selects_with_extra_ids()
279 selects
= self
._selects
283 limits
= u
"LIMIT %s" % int(limit
)
289 offsets
= u
"OFFSET %s" % int(offset
)
292 sql
= u
"""SELECT %s FROM %s %s %s %s %s;""" \
293 % (", ".join(selects
),
294 ", ".join(self
._froms
),
300 return u
"%s /* %s */" % (sql
, ", ".join(self
._params
))
304 def get_results(self
, limit
=None, offset
=None, query
=None, admin_name
=None,
307 Fetch all results after perform SQL query and
309 add_extra_ids
= (admin_name
!= None)
311 sql
= self
.get_raw_query(limit
=limit
, offset
=offset
,
312 add_extra_ids
=add_extra_ids
)
317 cursor
= self
._db_connection
.cursor()
318 cursor
.execute(sql
, tuple(self
._params
))
319 query_results
= cursor
.fetchall()
321 selects
= self
._get_selects_with_extra_ids()
327 for r
, row
in enumerate(query_results
):
331 result
= [(r
+ offset
+ 1, u
"#row%s" % (r
+ offset
+ 1))]
335 appmodel
, field
= selects
[i
].split(".")
336 appmodel
= self
._unquote_name(appmodel
)
337 field
= self
._unquote_name(field
)
339 if appmodel
in self
._models
:
340 _model
= self
._models
[appmodel
]
341 _appmodel
= u
"%s_%s" % (_model
._meta
.app_label
,
342 _model
._meta
.module_name
)
345 admin_url
= reverse("%s:%s_change" % (admin_name
,
348 except NoReverseMatch
:
350 result
.append((row
[i
], admin_url
))
352 results
.append(result
)
357 for r
, row
in enumerate(query_results
):
361 results
.append(result
)
367 query
= self
.get_raw_query(count
=True)
368 results
= self
.get_results(query
=query
)
370 return float(results
[0][0])
372 return len(self
.get_results())
375 def get_model(self
, db_prefix
, model
):
376 klass
= get_model(db_prefix
, model
)
378 db_model
= "%s_%s" % (db_prefix
, model
)
379 for table
in self
._models
.keys():
380 if table
== db_model
:
381 return self
._models
[table
]
384 def get_labels(self
, add_extra_ids
=False, row_number
=False):
390 selects
= self
._get_selects_with_extra_ids()
392 selects
= self
._selects
393 if selects
and isinstance(selects
, (tuple, list)):
394 for select
in selects
:
395 label_splits
= select
.replace("`", "").replace("_", ".").split(".")
396 # restore underscore for fields which use it
397 label_field
= "_".join(label_splits
[2:])
398 model
= self
.get_model(label_splits
[0], label_splits
[1])
399 label
= model
._meta
.get_field_by_name(label_field
)[0].verbose_name
400 labels
.append(label
.capitalize())
403 def _unquote_name(self
, name
):
404 quoted_space
= self
._db_operations
.quote_name("")
405 if name
.startswith(quoted_space
[0]) and name
.endswith(quoted_space
[1]):
409 def _get_lookup(self
, operator
, over
):
410 lookup
= Field().get_db_prep_lookup(operator
, over
,
411 connection
=self
._db_connection
,
413 if isinstance(lookup
, (tuple, list)):
417 def _get_selects_with_extra_ids(self
):
418 qn
= self
._db_operations
.quote_name
420 for select
in self
._selects
:
421 appmodel
, field
= select
.split(".")
422 appmodel
= self
._unquote_name(appmodel
)
423 field
= self
._unquote_name(field
)
424 selects
.append(select
)
425 if appmodel
in self
._models
:
426 pk_name
= self
._models
[appmodel
]._meta
.pk
.name
429 selects
.append("%s.%s" % (qn(appmodel
), qn(pk_name
)))
432 QueryByExampleFormSet
= formset_factory(QueryByExampleForm
,
433 formset
=BaseQueryByExampleFormSet
,