1 # -*- coding: utf-8 -*-
2 from django
import forms
3 from django
.db
import connections
4 from django
.db
.models
import get_model
5 from django
.db
.models
.fields
import Field
6 from django
.core
.urlresolvers
import reverse
, NoReverseMatch
7 from django
.conf
import settings
8 from django
.forms
.formsets
import BaseFormSet
, formset_factory
9 from django
.utils
.importlib
import import_module
10 from django
.utils
.translation
import ugettext
as _
11 from django
.contrib
.contenttypes
.models
import ContentType
13 from django_qbe
.utils
import get_models
14 from django_qbe
.widgets
import CriteriaInput
19 DATABASES
= settings
.DATABASES
20 except AttributeError:
21 # Backwards compatibility for Django versions prior to 1.1.
24 'ENGINE': "django.db.backends.%s" % settings
.DATABASE_ENGINE
,
25 'NAME': settings
.DATABASE_NAME
,
31 ("asc", _("Ascending")),
32 ("des", _("Descending")),
36 class QueryByExampleForm(forms
.Form
):
37 show
= forms
.BooleanField(label
=_("Show"), required
=False)
38 model
= forms
.CharField(label
=_("Model"))
39 field
= forms
.CharField(label
=_("Field"))
40 criteria
= forms
.CharField(label
=_("Criteria"), required
=False)
41 sort
= forms
.ChoiceField(label
=_("Sort"), choices
=SORT_CHOICES
,
44 def __init__(self
, *args
, **kwargs
):
45 super(QueryByExampleForm
, self
).__init__(*args
, **kwargs
)
46 model_widget
= forms
.Select(attrs
={'class': "qbeFillModels to:field"})
47 self
.fields
['model'].widget
= model_widget
48 sort_widget
= forms
.Select(attrs
={'disabled': "disabled",
49 'class': 'submitIfChecked'},
51 self
.fields
['sort'].widget
= sort_widget
52 criteria_widget
= CriteriaInput(attrs
={'disabled': "disabled"})
53 self
.fields
['criteria'].widget
= criteria_widget
54 criteria_widgets
= getattr(criteria_widget
, "widgets", [])
56 criteria_len
= len(criteria_widgets
)
57 criteria_names
= ",".join([("criteria_%s" % s
)
58 for s
in range(0, criteria_len
)])
59 field_attr_class
= "qbeFillFields enable:sort,%s" % criteria_names
61 field_attr_class
= "qbeFillFields enable:sort,criteria"
62 field_widget
= forms
.Select(attrs
={'class': field_attr_class
})
63 self
.fields
['field'].widget
= field_widget
65 def clean_model(self
):
66 model
= self
.cleaned_data
['model']
67 return model
.lower().replace(".", "_")
69 def clean_criteria(self
):
70 criteria
= self
.cleaned_data
['criteria']
72 operator
, over
= eval(criteria
, {}, {})
73 return (operator
, over
)
78 class BaseQueryByExampleFormSet(BaseFormSet
):
91 def __init__(self
, *args
, **kwargs
):
92 self
._db_alias
= kwargs
.pop("using", "default")
93 self
._db_connection
= connections
["default"]
94 database_properties
= DATABASES
.get(self
._db_alias
, "default")
95 module
= database_properties
['ENGINE']
97 base_mod
= import_module("%s.base" % module
)
98 intros_mod
= import_module("%s.introspection" % module
)
101 if base_mod
and intros_mod
:
102 self
._db_operators
= base_mod
.DatabaseWrapper
.operators
103 DatabaseOperations
= base_mod
.DatabaseOperations
105 self
._db_operations
= DatabaseOperations(self
._db_connection
)
107 # Some engines have no params to instance DatabaseOperations
108 self
._db_operations
= DatabaseOperations()
109 intros_db
= intros_mod
.DatabaseIntrospection(self
._db_connection
)
110 django_table_names
= intros_db
.django_table_names()
111 table_names
= intros_db
.table_names()
112 self
._db_table_names
= list(django_table_names
.union(table_names
))
113 super(BaseQueryByExampleFormSet
, self
).__init__(*args
, **kwargs
)
117 Checks that there is almost one field to select
120 # Don't bother validating the formset unless each form is valid on
123 selects
, froms
, wheres
, sorts
, params
= self
.get_query_parts()
125 validation_message
= _(u
"At least you must check a row to get.")
126 raise forms
.ValidationError
, validation_message
127 self
._selects
= selects
129 self
._wheres
= wheres
131 self
._params
= params
133 def translate_model_to_db_table(self
, model_name
):
135 Ensure the full model name match the DB table name (not app_label
138 app_label
, name
= model_name
.split("_")
140 ct
= ContentType
.objects
.get(app_label
=app_label
, model
=name
)
141 model
= ct
.model_class()
142 return model
._meta
.db_table
146 def get_query_parts(self
):
148 Return SQL query for cleaned data
155 app_model_labels
= None
156 lookup_cast
= self
._db_operations
.lookup_cast
157 qn
= self
._db_operations
.quote_name
158 uqn
= self
._unquote_name
159 for data
in self
.cleaned_data
:
160 if not ("model" in data
and "field" in data
):
162 model
= data
["model"]
163 # HACK: Workaround to handle tables created
164 # by django for its own
165 if not app_model_labels
:
166 app_models
= get_models(include_auto_created
=True,
167 include_deferred
=True)
168 app_model_labels
= [u
"%s_%s" % (a
._meta
.app_label
,
171 if model
in app_model_labels
:
172 position
= app_model_labels
.index(model
)
173 model
= app_models
[position
]._meta
.db_table
174 self
._models
[model
] = app_models
[position
]
175 field
= data
["field"]
177 criteria
= data
["criteria"]
179 db_field
= u
"%s.%s" % (qn(model
), qn(field
))
180 operator
, over
= criteria
181 is_join
= operator
.lower() == 'join'
182 if show
and not is_join
:
183 selects
.append(db_field
)
185 sorts
.append(db_field
)
188 over_split
= over
.lower().rsplit(".", 1)
189 join_model
= qn(self
.translate_model_to_db_table(over_split
[0].replace(".", "_")))
190 join_field
= qn(over_split
[1])
192 if model
in self
._models
:
193 _field
= self
._models
[model
]._meta
.get_field(field
)
194 join
= u
"%s.%s = %s.%s" \
195 % (join_model
, join_field
, qn(model
),
196 qn(_field
.db_column
))
198 join
= u
"%s.%s = %s" \
199 % (join_model
, join_field
,
201 if (join
not in wheres
202 and uqn(join_model
) in self
._db_table_names
):
204 if join_model
not in froms
:
205 froms
.append(join_model
)
206 # join_select = u"%s.%s" % (join_model, join_field)
207 # if join_select not in selects:
208 # selects.append(join_select)
209 elif operator
in self
._db_operators
:
210 # db_operator = self._db_operators[operator] % over
211 db_operator
= self
._db_operators
[operator
]
212 lookup
= self
._get_lookup(operator
, over
)
213 params
.append(lookup
)
214 wheres
.append(u
"%s %s" \
215 % (lookup_cast(operator
) % db_field
,
217 if qn(model
) not in froms
and model
in self
._db_table_names
:
218 froms
.append(qn(model
))
219 return selects
, froms
, wheres
, sorts
, params
221 def get_raw_query(self
, limit
=None, offset
=None, count
=False,
222 add_extra_ids
=False, add_params
=False):
224 return self
._raw_query
226 order_by
= u
"ORDER BY %s" % (", ".join(self
._sorts
))
230 wheres
= u
"WHERE %s" % (" AND ".join(self
._wheres
))
234 selects
= (u
"COUNT(*) as count", )
237 selects
= self
._get_selects_with_extra_ids()
239 selects
= self
._selects
243 limits
= u
"LIMIT %s" % int(limit
)
249 offsets
= u
"OFFSET %s" % int(offset
)
252 sql
= u
"""SELECT %s FROM %s %s %s %s %s;""" \
253 % (", ".join(selects
),
254 ", ".join(self
._froms
),
260 return u
"%s /* %s */" % (sql
, ", ".join(self
._params
))
264 def get_results(self
, limit
=None, offset
=None, query
=None, admin_name
=None,
267 Fetch all results after perform SQL query and
269 add_extra_ids
= (admin_name
!= None)
271 sql
= self
.get_raw_query(limit
=limit
, offset
=offset
,
272 add_extra_ids
=add_extra_ids
)
277 cursor
= self
._db_connection
.cursor()
278 cursor
.execute(sql
, tuple(self
._params
))
279 query_results
= cursor
.fetchall()
281 selects
= self
._get_selects_with_extra_ids()
287 for r
, row
in enumerate(query_results
):
291 result
= [(r
+ offset
+ 1, u
"#row%s" % (r
+ offset
+ 1))]
295 appmodel
, field
= selects
[i
].split(".")
296 appmodel
= self
._unquote_name(appmodel
)
297 field
= self
._unquote_name(field
)
299 if appmodel
in self
._models
:
300 _model
= self
._models
[appmodel
]
301 _appmodel
= u
"%s_%s" % (_model
._meta
.app_label
,
302 _model
._meta
.module_name
)
305 admin_url
= reverse("%s:%s_change" % (admin_name
,
308 except NoReverseMatch
:
310 result
.append((row
[i
], admin_url
))
312 results
.append(result
)
317 for r
, row
in enumerate(query_results
):
321 results
.append(result
)
327 query
= self
.get_raw_query(count
=True)
328 results
= self
.get_results(query
=query
)
330 return float(results
[0][0])
332 return len(self
.get_results())
335 def get_model(self
, db_prefix
, model
):
336 klass
= get_model(db_prefix
, model
)
338 db_model
= "%s_%s" % (db_prefix
, model
)
339 for table
in self
._models
.keys():
340 if table
== db_model
:
341 return self
._models
[table
]
344 def get_labels(self
, add_extra_ids
=False, row_number
=False):
350 selects
= self
._get_selects_with_extra_ids()
352 selects
= self
._selects
353 if selects
and isinstance(selects
, (tuple, list)):
354 for select
in selects
:
355 label_splits
= select
.replace("`", "").replace("_", ".").split(".")
356 # restore underscore for fields which use it
357 label_field
= "_".join(label_splits
[2:])
358 model
= self
.get_model(label_splits
[0], label_splits
[1])
359 label
= model
._meta
.get_field_by_name(label_field
)[0].verbose_name
360 labels
.append(label
.capitalize())
363 def _unquote_name(self
, name
):
364 quoted_space
= self
._db_operations
.quote_name("")
365 if name
.startswith(quoted_space
[0]) and name
.endswith(quoted_space
[1]):
369 def _get_lookup(self
, operator
, over
):
370 lookup
= Field().get_db_prep_lookup(operator
, over
,
371 connection
=self
._db_connection
,
373 if isinstance(lookup
, (tuple, list)):
377 def _get_selects_with_extra_ids(self
):
378 qn
= self
._db_operations
.quote_name
380 for select
in self
._selects
:
381 appmodel
, field
= select
.split(".")
382 appmodel
= self
._unquote_name(appmodel
)
383 field
= self
._unquote_name(field
)
384 selects
.append(select
)
385 if appmodel
in self
._models
:
386 pk_name
= self
._models
[appmodel
]._meta
.pk
.name
389 selects
.append("%s.%s" % (qn(appmodel
), qn(pk_name
)))
392 QueryByExampleFormSet
= formset_factory(QueryByExampleForm
,
393 formset
=BaseQueryByExampleFormSet
,