1 # -*- coding: utf-8 -*-
2 from django
import forms
3 from django
.db
import connections
4 from django
.db
.models
import get_model
5 from django
.db
.models
.fields
import Field
6 from django
.core
.urlresolvers
import reverse
, NoReverseMatch
7 from django
.conf
import settings
8 from django
.forms
.formsets
import BaseFormSet
, formset_factory
9 from django
.utils
.importlib
import import_module
10 from django
.utils
.translation
import ugettext
as _
12 from django_qbe
.utils
import get_models
13 from django_qbe
.widgets
import CriteriaInput
18 DATABASES
= settings
.DATABASES
19 except AttributeError:
20 # Backwards compatibility for Django versions prior to 1.1.
23 'ENGINE': "django.db.backends.%s" % settings
.DATABASE_ENGINE
,
24 'NAME': settings
.DATABASE_NAME
,
30 ("asc", _("Ascending")),
31 ("des", _("Descending")),
35 class QueryByExampleForm(forms
.Form
):
36 show
= forms
.BooleanField(label
=_("Show"), required
=False)
37 model
= forms
.CharField(label
=_("Model"))
38 field
= forms
.CharField(label
=_("Field"))
39 criteria
= forms
.CharField(label
=_("Criteria"), required
=False)
40 sort
= forms
.ChoiceField(label
=_("Sort"), choices
=SORT_CHOICES
,
43 def __init__(self
, *args
, **kwargs
):
44 super(QueryByExampleForm
, self
).__init__(*args
, **kwargs
)
45 model_widget
= forms
.Select(attrs
={'class': "qbeFillModels to:field"})
46 self
.fields
['model'].widget
= model_widget
47 sort_widget
= forms
.Select(attrs
={'disabled': "disabled",
48 'class': 'submitIfChecked'},
50 self
.fields
['sort'].widget
= sort_widget
51 criteria_widget
= CriteriaInput(attrs
={'disabled': "disabled"})
52 self
.fields
['criteria'].widget
= criteria_widget
53 criteria_widgets
= getattr(criteria_widget
, "widgets", [])
55 criteria_len
= len(criteria_widgets
)
56 criteria_names
= ",".join([("criteria_%s" % s
)
57 for s
in range(0, criteria_len
)])
58 field_attr_class
= "qbeFillFields enable:sort,%s" % criteria_names
60 field_attr_class
= "qbeFillFields enable:sort,criteria"
61 field_widget
= forms
.Select(attrs
={'class': field_attr_class
})
62 self
.fields
['field'].widget
= field_widget
64 def clean_model(self
):
65 model
= self
.cleaned_data
['model']
66 return model
.lower().replace(".", "_")
68 def clean_criteria(self
):
69 criteria
= self
.cleaned_data
['criteria']
71 operator
, over
= eval(criteria
, {}, {})
72 return (operator
, over
)
77 class BaseQueryByExampleFormSet(BaseFormSet
):
90 def __init__(self
, *args
, **kwargs
):
91 self
._db_alias
= kwargs
.pop("using", "default")
92 self
._db_connection
= connections
["default"]
93 database_properties
= DATABASES
.get(self
._db_alias
, "default")
94 module
= database_properties
['ENGINE']
96 base_mod
= import_module("%s.base" % module
)
97 intros_mod
= import_module("%s.introspection" % module
)
100 if base_mod
and intros_mod
:
101 self
._db_operators
= base_mod
.DatabaseWrapper
.operators
102 DatabaseOperations
= base_mod
.DatabaseOperations
104 self
._db_operations
= DatabaseOperations(self
._db_connection
)
106 # Some engines have no params to instance DatabaseOperations
107 self
._db_operations
= DatabaseOperations()
108 intros_db
= intros_mod
.DatabaseIntrospection(self
._db_connection
)
109 django_table_names
= intros_db
.django_table_names()
110 table_names
= intros_db
.table_names()
111 self
._db_table_names
= list(django_table_names
.union(table_names
))
112 super(BaseQueryByExampleFormSet
, self
).__init__(*args
, **kwargs
)
116 Checks that there is almost one field to select
119 # Don't bother validating the formset unless each form is valid on
122 selects
, froms
, wheres
, sorts
, params
= self
.get_query_parts()
124 validation_message
= _(u
"At least you must check a row to get.")
125 raise forms
.ValidationError
, validation_message
126 self
._selects
= selects
128 self
._wheres
= wheres
130 self
._params
= params
132 def get_query_parts(self
):
134 Return SQL query for cleaned data
141 app_model_labels
= None
142 lookup_cast
= self
._db_operations
.lookup_cast
143 qn
= self
._db_operations
.quote_name
144 uqn
= self
._unquote_name
145 for data
in self
.cleaned_data
:
146 if not ("model" in data
and "field" in data
):
148 model
= data
["model"]
149 # HACK: Workaround to handle tables created
150 # by django for its own
151 if not app_model_labels
:
152 app_models
= get_models(include_auto_created
=True,
153 include_deferred
=True)
154 app_model_labels
= [u
"%s_%s" % (a
._meta
.app_label
,
157 if model
in app_model_labels
:
158 position
= app_model_labels
.index(model
)
159 model
= app_models
[position
]._meta
.db_table
160 self
._models
[model
] = app_models
[position
]
161 field
= data
["field"]
163 criteria
= data
["criteria"]
165 db_field
= u
"%s.%s" % (qn(model
), qn(field
))
166 operator
, over
= criteria
167 is_join
= operator
.lower() == 'join'
168 if show
and not is_join
:
169 selects
.append(db_field
)
171 sorts
.append(db_field
)
174 over_split
= over
.lower().rsplit(".", 1)
175 join_model
= qn(over_split
[0].replace(".", "_"))
176 join_field
= qn(over_split
[1])
177 if model
in self
._models
:
178 _field
= self
._models
[model
]._meta
.get_field(field
)
179 join
= u
"%s.%s = %s.%s" \
180 % (join_model
, join_field
, qn(model
),
181 qn(_field
.db_column
))
183 join
= u
"%s.%s = %s" \
184 % (join_model
, join_field
,
186 if (join
not in wheres
187 and uqn(join_model
) in self
._db_table_names
):
189 if join_model
not in froms
:
190 froms
.append(join_model
)
191 # join_select = u"%s.%s" % (join_model, join_field)
192 # if join_select not in selects:
193 # selects.append(join_select)
194 elif operator
in self
._db_operators
:
195 # db_operator = self._db_operators[operator] % over
196 db_operator
= self
._db_operators
[operator
]
197 lookup
= self
._get_lookup(operator
, over
)
198 params
.append(lookup
)
199 wheres
.append(u
"%s %s" \
200 % (lookup_cast(operator
) % db_field
,
202 if qn(model
) not in froms
and model
in self
._db_table_names
:
203 froms
.append(qn(model
))
204 return selects
, froms
, wheres
, sorts
, params
206 def get_raw_query(self
, limit
=None, offset
=None, count
=False,
207 add_extra_ids
=False, add_params
=False):
209 return self
._raw_query
211 order_by
= u
"ORDER BY %s" % (", ".join(self
._sorts
))
215 wheres
= u
"WHERE %s" % (" AND ".join(self
._wheres
))
219 selects
= (u
"COUNT(*) as count", )
222 selects
= self
._get_selects_with_extra_ids()
224 selects
= self
._selects
228 limits
= u
"LIMIT %s" % int(limit
)
234 offsets
= u
"OFFSET %s" % int(offset
)
237 sql
= u
"""SELECT %s FROM %s %s %s %s %s;""" \
238 % (", ".join(selects
),
239 ", ".join(self
._froms
),
245 return u
"%s /* %s */" % (sql
, ", ".join(self
._params
))
249 def get_results(self
, limit
=None, offset
=None, query
=None, admin_name
=None,
252 Fetch all results after perform SQL query and
254 add_extra_ids
= (admin_name
!= None)
256 sql
= self
.get_raw_query(limit
=limit
, offset
=offset
,
257 add_extra_ids
=add_extra_ids
)
262 cursor
= self
._db_connection
.cursor()
263 cursor
.execute(sql
, tuple(self
._params
))
264 query_results
= cursor
.fetchall()
266 selects
= self
._get_selects_with_extra_ids()
272 for r
, row
in enumerate(query_results
):
276 result
= [(r
+ offset
+ 1, u
"#row%s" % (r
+ offset
+ 1))]
280 appmodel
, field
= selects
[i
].split(".")
281 appmodel
= self
._unquote_name(appmodel
)
282 field
= self
._unquote_name(field
)
284 if appmodel
in self
._models
:
285 _model
= self
._models
[appmodel
]
286 _appmodel
= u
"%s_%s" % (_model
._meta
.app_label
,
287 _model
._meta
.module_name
)
290 admin_url
= reverse("%s:%s_change" % (admin_name
,
293 except NoReverseMatch
:
295 result
.append((row
[i
], admin_url
))
297 results
.append(result
)
302 for r
, row
in enumerate(query_results
):
306 results
.append(result
)
312 query
= self
.get_raw_query(count
=True)
313 results
= self
.get_results(query
=query
)
315 return float(results
[0][0])
317 return len(self
.get_results())
319 def get_labels(self
, add_extra_ids
=False, row_number
=False):
325 selects
= self
._get_selects_with_extra_ids()
327 selects
= self
._selects
328 if selects
and isinstance(selects
, (tuple, list)):
329 for select
in selects
:
330 label_splits
= select
.replace("`", "").replace("_", ".").split(".")
331 label_field
= " ".join(label_splits
[2:])
332 model
= get_model(label_splits
[0], label_splits
[1])
333 label
= model
._meta
.get_field_by_name(label_field
)[0].verbose_name
334 labels
.append(label
.capitalize())
337 def _unquote_name(self
, name
):
338 quoted_space
= self
._db_operations
.quote_name("")
339 if name
.startswith(quoted_space
[0]) and name
.endswith(quoted_space
[1]):
343 def _get_lookup(self
, operator
, over
):
344 lookup
= Field().get_db_prep_lookup(operator
, over
,
345 connection
=self
._db_connection
,
347 if isinstance(lookup
, (tuple, list)):
351 def _get_selects_with_extra_ids(self
):
352 qn
= self
._db_operations
.quote_name
354 for select
in self
._selects
:
355 appmodel
, field
= select
.split(".")
356 appmodel
= self
._unquote_name(appmodel
)
357 field
= self
._unquote_name(field
)
358 selects
.append(select
)
359 if appmodel
in self
._models
:
360 pk_name
= self
._models
[appmodel
]._meta
.pk
.name
363 selects
.append("%s.%s" % (qn(appmodel
), qn(pk_name
)))
366 QueryByExampleFormSet
= formset_factory(QueryByExampleForm
,
367 formset
=BaseQueryByExampleFormSet
,