Installation de la version modifié de QBE dans src/
[auf_rh_dae.git] / src / qbe / django_qbe / forms.py
1 # -*- coding: utf-8 -*-
2 from django import forms
3 from django.db import connections
4 from django.db.models.fields import Field
5 from django.core.urlresolvers import reverse, NoReverseMatch
6 from django.conf import settings
7 from django.forms.formsets import BaseFormSet, formset_factory
8 from django.utils.importlib import import_module
9 from django.utils.translation import ugettext as _
10
11 from django_qbe.utils import get_models
12 from django_qbe.widgets import CriteriaInput
13
14
15 DATABASES = None
16 try:
17 DATABASES = settings.DATABASES
18 except AttributeError:
19 # Backwards compatibility for Django versions prior to 1.1.
20 DATABASES = {
21 'default': {
22 'ENGINE': "django.db.backends.%s" % settings.DATABASE_ENGINE,
23 'NAME': settings.DATABASE_NAME,
24 }
25 }
26
27 SORT_CHOICES = (
28 ("", ""),
29 ("asc", _("Ascending")),
30 ("des", _("Descending")),
31 )
32
33
34 class QueryByExampleForm(forms.Form):
35 show = forms.BooleanField(label=_("Show"), required=False)
36 model = forms.CharField(label=_("Model"))
37 field = forms.CharField(label=_("Field"))
38 criteria = forms.CharField(label=_("Criteria"), required=False)
39 sort = forms.ChoiceField(label=_("Sort"), choices=SORT_CHOICES,
40 required=False)
41
42 def __init__(self, *args, **kwargs):
43 super(QueryByExampleForm, self).__init__(*args, **kwargs)
44 model_widget = forms.Select(attrs={'class': "qbeFillModels to:field"})
45 self.fields['model'].widget = model_widget
46 sort_widget = forms.Select(attrs={'disabled': "disabled",
47 'class': 'submitIfChecked'},
48 choices=SORT_CHOICES)
49 self.fields['sort'].widget = sort_widget
50 criteria_widget = CriteriaInput(attrs={'disabled': "disabled"})
51 self.fields['criteria'].widget = criteria_widget
52 criteria_widgets = getattr(criteria_widget, "widgets", [])
53 if criteria_widgets:
54 criteria_len = len(criteria_widgets)
55 criteria_names = ",".join([("criteria_%s" % s)
56 for s in range(0, criteria_len)])
57 field_attr_class = "qbeFillFields enable:sort,%s" % criteria_names
58 else:
59 field_attr_class = "qbeFillFields enable:sort,criteria"
60 field_widget = forms.Select(attrs={'class': field_attr_class})
61 self.fields['field'].widget = field_widget
62
63 def clean_model(self):
64 model = self.cleaned_data['model']
65 return model.lower().replace(".", "_")
66
67 def clean_criteria(self):
68 criteria = self.cleaned_data['criteria']
69 try:
70 operator, over = eval(criteria, {}, {})
71 return (operator, over)
72 except:
73 return (None, None)
74
75
76 class BaseQueryByExampleFormSet(BaseFormSet):
77 _selects = []
78 _froms = []
79 _wheres = []
80 _sorts = []
81 _params = []
82 _models = {}
83 _raw_query = None
84 _db_alias = "default"
85 _db_operators = {}
86 _db_table_names = []
87 _db_operations = None
88
89 def __init__(self, *args, **kwargs):
90 self._db_alias = kwargs.pop("using", "default")
91 self._db_connection = connections["default"]
92 database_properties = DATABASES.get(self._db_alias, "default")
93 module = database_properties['ENGINE']
94 try:
95 base_mod = import_module("%s.base" % module)
96 intros_mod = import_module("%s.introspection" % module)
97 except ImportError:
98 pass
99 if base_mod and intros_mod:
100 self._db_operators = base_mod.DatabaseWrapper.operators
101 DatabaseOperations = base_mod.DatabaseOperations
102 try:
103 self._db_operations = DatabaseOperations(self._db_connection)
104 except TypeError:
105 # Some engines have no params to instance DatabaseOperations
106 self._db_operations = DatabaseOperations()
107 intros_db = intros_mod.DatabaseIntrospection(self._db_connection)
108 django_table_names = intros_db.django_table_names()
109 table_names = intros_db.table_names()
110 self._db_table_names = list(django_table_names.union(table_names))
111 super(BaseQueryByExampleFormSet, self).__init__(*args, **kwargs)
112
113 def clean(self):
114 """
115 Checks that there is almost one field to select
116 """
117 if any(self.errors):
118 # Don't bother validating the formset unless each form is valid on
119 # its own
120 return
121 selects, froms, wheres, sorts, params = self.get_query_parts()
122 if not selects:
123 validation_message = _(u"At least you must check a row to get.")
124 raise forms.ValidationError, validation_message
125 self._selects = selects
126 self._froms = froms
127 self._wheres = wheres
128 self._sorts = sorts
129 self._params = params
130
131 def get_query_parts(self):
132 """
133 Return SQL query for cleaned data
134 """
135 selects = []
136 froms = []
137 wheres = []
138 sorts = []
139 params = []
140 app_model_labels = None
141 lookup_cast = self._db_operations.lookup_cast
142 qn = self._db_operations.quote_name
143 uqn = self._unquote_name
144 for data in self.cleaned_data:
145 if not ("model" in data and "field" in data):
146 break
147 model = data["model"]
148 # HACK: Workaround to handle tables created
149 # by django for its own
150 if not app_model_labels:
151 app_models = get_models(include_auto_created=True,
152 include_deferred=True)
153 app_model_labels = [u"%s_%s" % (a._meta.app_label,
154 a._meta.module_name)
155 for a in app_models]
156 if model in app_model_labels:
157 position = app_model_labels.index(model)
158 model = app_models[position]._meta.db_table
159 self._models[model] = app_models[position]
160 field = data["field"]
161 show = data["show"]
162 criteria = data["criteria"]
163 sort = data["sort"]
164 db_field = u"%s.%s" % (qn(model), qn(field))
165 operator, over = criteria
166 is_join = operator.lower() == 'join'
167 if show and not is_join:
168 selects.append(db_field)
169 if sort:
170 sorts.append(db_field)
171 if all(criteria):
172 if is_join:
173 over_split = over.lower().rsplit(".", 1)
174 join_model = qn(over_split[0].replace(".", "_"))
175 join_field = qn(over_split[1])
176 if model in self._models:
177 _field = self._models[model]._meta.get_field(field)
178 join = u"%s.%s = %s.%s" \
179 % (join_model, join_field, qn(model),
180 qn(_field.db_column))
181 else:
182 join = u"%s.%s = %s" \
183 % (join_model, join_field,
184 u"%s_id" % db_field)
185 if (join not in wheres
186 and uqn(join_model) in self._db_table_names):
187 wheres.append(join)
188 if join_model not in froms:
189 froms.append(join_model)
190 # join_select = u"%s.%s" % (join_model, join_field)
191 # if join_select not in selects:
192 # selects.append(join_select)
193 elif operator in self._db_operators:
194 # db_operator = self._db_operators[operator] % over
195 db_operator = self._db_operators[operator]
196 lookup = self._get_lookup(operator, over)
197 params.append(lookup)
198 wheres.append(u"%s %s" \
199 % (lookup_cast(operator) % db_field,
200 db_operator))
201 if qn(model) not in froms and model in self._db_table_names:
202 froms.append(qn(model))
203 return selects, froms, wheres, sorts, params
204
205 def get_raw_query(self, limit=None, offset=None, count=False,
206 add_extra_ids=False, add_params=False):
207 if self._raw_query:
208 return self._raw_query
209 if self._sorts:
210 order_by = u"ORDER BY %s" % (", ".join(self._sorts))
211 else:
212 order_by = u""
213 if self._wheres:
214 wheres = u"WHERE %s" % (" AND ".join(self._wheres))
215 else:
216 wheres = u""
217 if count:
218 selects = (u"COUNT(*) as count", )
219 order_by = u""
220 elif add_extra_ids:
221 selects = self._get_selects_with_extra_ids()
222 else:
223 selects = self._selects
224 limits = u""
225 if limit:
226 try:
227 limits = u"LIMIT %s" % int(limit)
228 except ValueError:
229 pass
230 offsets = u""
231 if offset:
232 try:
233 offsets = u"OFFSET %s" % int(offset)
234 except ValueError:
235 pass
236 sql = u"""SELECT %s FROM %s %s %s %s %s;""" \
237 % (", ".join(selects),
238 ", ".join(self._froms),
239 wheres,
240 order_by,
241 limits,
242 offsets)
243 if add_params:
244 return u"%s /* %s */" % (sql, ", ".join(self._params))
245 else:
246 return sql
247
248 def get_results(self, limit=None, offset=None, query=None, admin_name=None,
249 row_number=False):
250 """
251 Fetch all results after perform SQL query and
252 """
253 add_extra_ids = (admin_name != None)
254 if not query:
255 sql = self.get_raw_query(limit=limit, offset=offset,
256 add_extra_ids=add_extra_ids)
257 else:
258 sql = query
259 if settings.DEBUG:
260 print sql
261 cursor = self._db_connection.cursor()
262 cursor.execute(sql, tuple(self._params))
263 query_results = cursor.fetchall()
264 if admin_name:
265 selects = self._get_selects_with_extra_ids()
266 results = []
267 try:
268 offset = int(offset)
269 except ValueError:
270 offset = 0
271 for r, row in enumerate(query_results):
272 i = 0
273 l = len(row)
274 if row_number:
275 result = [(r + offset + 1, u"#row%s" % (r + offset + 1))]
276 else:
277 result = []
278 while i < l:
279 appmodel, field = selects[i].split(".")
280 appmodel = self._unquote_name(appmodel)
281 field = self._unquote_name(field)
282 try:
283 if appmodel in self._models:
284 _model = self._models[appmodel]
285 _appmodel = u"%s_%s" % (_model._meta.app_label,
286 _model._meta.module_name)
287 else:
288 _appmodel = appmodel
289 admin_url = reverse("%s:%s_change" % (admin_name,
290 _appmodel),
291 args=[row[i + 1]])
292 except NoReverseMatch:
293 admin_url = None
294 result.append((row[i], admin_url))
295 i += 2
296 results.append(result)
297 return results
298 else:
299 if row_number:
300 results = []
301 for r, row in enumerate(query_results):
302 result = [r + 1]
303 for cell in row:
304 result.append(cell)
305 results.append(result)
306 return results
307 else:
308 return query_results
309
310 def get_count(self):
311 query = self.get_raw_query(count=True)
312 results = self.get_results(query=query)
313 if results:
314 return float(results[0][0])
315 else:
316 return len(self.get_results())
317
318 def get_labels(self, add_extra_ids=False, row_number=False):
319 if row_number:
320 labels = [_(u"#")]
321 else:
322 labels = []
323 if add_extra_ids:
324 selects = self._get_selects_with_extra_ids()
325 else:
326 selects = self._selects
327 if selects and isinstance(selects, (tuple, list)):
328 for select in selects:
329 label_splits = select.replace("_", ".").split(".")
330 label_splits_field = " ".join(label_splits[2:]).capitalize()
331 label = u"%s.%s: %s" % (label_splits[0].capitalize(),
332 label_splits[1].capitalize(),
333 label_splits_field)
334 labels.append(label)
335 return labels
336
337 def _unquote_name(self, name):
338 quoted_space = self._db_operations.quote_name("")
339 if name.startswith(quoted_space[0]) and name.endswith(quoted_space[1]):
340 return name[1:-1]
341 return name
342
343 def _get_lookup(self, operator, over):
344 lookup = Field().get_db_prep_lookup(operator, over,
345 connection=self._db_connection,
346 prepared=True)
347 if isinstance(lookup, (tuple, list)):
348 return lookup[0]
349 return lookup
350
351 def _get_selects_with_extra_ids(self):
352 qn = self._db_operations.quote_name
353 selects = []
354 for select in self._selects:
355 appmodel, field = select.split(".")
356 appmodel = self._unquote_name(appmodel)
357 field = self._unquote_name(field)
358 selects.append(select)
359 if appmodel in self._models:
360 pk_name = self._models[appmodel]._meta.pk.name
361 else:
362 pk_name = u"id"
363 selects.append("%s.%s" % (qn(appmodel), qn(pk_name)))
364 return selects
365
366 QueryByExampleFormSet = formset_factory(QueryByExampleForm,
367 formset=BaseQueryByExampleFormSet,
368 extra=1,
369 can_delete=True)