You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

853 lines
25 KiB

4 months ago
from collections import OrderedDict
from collections.abc import Iterable
from datetime import timedelta
from itertools import chain
from django import forms
from django.core.validators import MaxValueValidator
from django.db.models import Q
from django.db.models.constants import LOOKUP_SEP
from django.forms.utils import pretty_name
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from .conf import settings
from .constants import EMPTY_VALUES
from .fields import (
BaseCSVField,
BaseRangeField,
ChoiceField,
DateRangeField,
DateTimeRangeField,
IsoDateTimeField,
IsoDateTimeRangeField,
LookupChoiceField,
ModelChoiceField,
ModelMultipleChoiceField,
MultipleChoiceField,
RangeField,
TimeRangeField,
)
from .utils import get_model_field, label_for_filter
__all__ = [
"AllValuesFilter",
"AllValuesMultipleFilter",
"BaseCSVFilter",
"BaseInFilter",
"BaseRangeFilter",
"BooleanFilter",
"CharFilter",
"ChoiceFilter",
"DateFilter",
"DateFromToRangeFilter",
"DateRangeFilter",
"DateTimeFilter",
"DateTimeFromToRangeFilter",
"DurationFilter",
"Filter",
"IsoDateTimeFilter",
"IsoDateTimeFromToRangeFilter",
"LookupChoiceFilter",
"ModelChoiceFilter",
"ModelMultipleChoiceFilter",
"MultipleChoiceFilter",
"NumberFilter",
"NumericRangeFilter",
"OrderingFilter",
"RangeFilter",
"TimeFilter",
"TimeRangeFilter",
"TypedChoiceFilter",
"TypedMultipleChoiceFilter",
"UUIDFilter",
]
class Filter:
creation_counter = 0
field_class = forms.Field
def __init__(
self,
field_name=None,
lookup_expr=None,
*,
label=None,
method=None,
distinct=False,
exclude=False,
**kwargs
):
if lookup_expr is None:
lookup_expr = settings.DEFAULT_LOOKUP_EXPR
self.field_name = field_name
self.lookup_expr = lookup_expr
self.label = label
self.method = method
self.distinct = distinct
self.exclude = exclude
self.extra = kwargs
self.extra.setdefault("required", False)
self.creation_counter = Filter.creation_counter
Filter.creation_counter += 1
def get_method(self, qs):
"""Return filter method based on whether we're excluding
or simply filtering.
"""
return qs.exclude if self.exclude else qs.filter
def method():
"""
Filter method needs to be lazily resolved, as it may be dependent on
the 'parent' FilterSet.
"""
def fget(self):
return self._method
def fset(self, value):
self._method = value
# clear existing FilterMethod
if isinstance(self.filter, FilterMethod):
del self.filter
# override filter w/ FilterMethod.
if value is not None:
self.filter = FilterMethod(self)
return locals()
method = property(**method())
def label():
def fget(self):
if self._label is None and hasattr(self, "model"):
self._label = label_for_filter(
self.model, self.field_name, self.lookup_expr, self.exclude
)
return self._label
def fset(self, value):
self._label = value
return locals()
label = property(**label())
@property
def field(self):
if not hasattr(self, "_field"):
field_kwargs = self.extra.copy()
if settings.DISABLE_HELP_TEXT:
field_kwargs.pop("help_text", None)
self._field = self.field_class(label=self.label, **field_kwargs)
return self._field
def filter(self, qs, value):
if value in EMPTY_VALUES:
return qs
if self.distinct:
qs = qs.distinct()
lookup = "%s__%s" % (self.field_name, self.lookup_expr)
qs = self.get_method(qs)(**{lookup: value})
return qs
class CharFilter(Filter):
field_class = forms.CharField
class BooleanFilter(Filter):
field_class = forms.NullBooleanField
class ChoiceFilter(Filter):
field_class = ChoiceField
def __init__(self, *args, **kwargs):
self.null_value = kwargs.get("null_value", settings.NULL_CHOICE_VALUE)
super().__init__(*args, **kwargs)
def filter(self, qs, value):
if value != self.null_value:
return super().filter(qs, value)
qs = self.get_method(qs)(
**{"%s__%s" % (self.field_name, self.lookup_expr): None}
)
return qs.distinct() if self.distinct else qs
class TypedChoiceFilter(Filter):
field_class = forms.TypedChoiceField
class UUIDFilter(Filter):
field_class = forms.UUIDField
class MultipleChoiceFilter(Filter):
"""
This filter performs OR(by default) or AND(using conjoined=True) query
on the selected options.
Advanced usage
--------------
Depending on your application logic, when all or no choices are selected,
filtering may be a no-operation. In this case you may wish to avoid the
filtering overhead, particularly if using a `distinct` call.
You can override `get_filter_predicate` to use a custom filter.
By default it will use the filter's name for the key, and the value will
be the model object - or in case of passing in `to_field_name` the
value of that attribute on the model.
Set `always_filter` to `False` after instantiation to enable the default
`is_noop` test. You can override `is_noop` if you need a different test
for your application.
`distinct` defaults to `True` as to-many relationships will generally
require this.
"""
field_class = MultipleChoiceField
always_filter = True
def __init__(self, *args, **kwargs):
kwargs.setdefault("distinct", True)
self.conjoined = kwargs.pop("conjoined", False)
self.null_value = kwargs.get("null_value", settings.NULL_CHOICE_VALUE)
super().__init__(*args, **kwargs)
def is_noop(self, qs, value):
"""
Return `True` to short-circuit unnecessary and potentially slow
filtering.
"""
if self.always_filter:
return False
# A reasonable default for being a noop...
if self.extra.get("required") and len(value) == len(self.field.choices):
return True
return False
def filter(self, qs, value):
if not value:
# Even though not a noop, no point filtering if empty.
return qs
if self.is_noop(qs, value):
return qs
if not self.conjoined:
q = Q()
for v in set(value):
if v == self.null_value:
v = None
predicate = self.get_filter_predicate(v)
if self.conjoined:
qs = self.get_method(qs)(**predicate)
else:
q |= Q(**predicate)
if not self.conjoined:
qs = self.get_method(qs)(q)
return qs.distinct() if self.distinct else qs
def get_filter_predicate(self, v):
name = self.field_name
if name and self.lookup_expr != settings.DEFAULT_LOOKUP_EXPR:
name = LOOKUP_SEP.join([name, self.lookup_expr])
try:
return {name: getattr(v, self.field.to_field_name)}
except (AttributeError, TypeError):
return {name: v}
class TypedMultipleChoiceFilter(MultipleChoiceFilter):
field_class = forms.TypedMultipleChoiceField
class DateFilter(Filter):
field_class = forms.DateField
class DateTimeFilter(Filter):
field_class = forms.DateTimeField
class IsoDateTimeFilter(DateTimeFilter):
"""
Uses IsoDateTimeField to support filtering on ISO 8601 formatted datetimes.
For context see:
* https://code.djangoproject.com/ticket/23448
* https://github.com/encode/django-rest-framework/issues/1338
* https://github.com/carltongibson/django-filter/pull/264
"""
field_class = IsoDateTimeField
class TimeFilter(Filter):
field_class = forms.TimeField
class DurationFilter(Filter):
field_class = forms.DurationField
class QuerySetRequestMixin:
"""
Add callable functionality to filters that support the ``queryset``
argument. If the ``queryset`` is callable, then it **must** accept the
``request`` object as a single argument.
This is useful for filtering querysets by properties on the ``request``
object, such as the user.
Example::
def departments(request):
company = request.user.company
return company.department_set.all()
class EmployeeFilter(filters.FilterSet):
department = filters.ModelChoiceFilter(queryset=departments)
...
The above example restricts the set of departments to those in the logged-in
user's associated company.
"""
def __init__(self, *args, **kwargs):
self.queryset = kwargs.get("queryset")
super().__init__(*args, **kwargs)
def get_request(self):
try:
return self.parent.request
except AttributeError:
return None
def get_queryset(self, request):
queryset = self.queryset
if callable(queryset):
return queryset(request)
return queryset
@property
def field(self):
request = self.get_request()
queryset = self.get_queryset(request)
if queryset is not None:
self.extra["queryset"] = queryset
return super().field
class ModelChoiceFilter(QuerySetRequestMixin, ChoiceFilter):
field_class = ModelChoiceField
def __init__(self, *args, **kwargs):
kwargs.setdefault("empty_label", settings.EMPTY_CHOICE_LABEL)
super().__init__(*args, **kwargs)
class ModelMultipleChoiceFilter(QuerySetRequestMixin, MultipleChoiceFilter):
field_class = ModelMultipleChoiceField
class NumberFilter(Filter):
field_class = forms.DecimalField
def get_max_validator(self):
"""
Return a MaxValueValidator for the field, or None to disable.
"""
return MaxValueValidator(1e50)
@property
def field(self):
if not hasattr(self, "_field"):
field = super().field
max_validator = self.get_max_validator()
if max_validator:
field.validators.append(max_validator)
self._field = field
return self._field
class NumericRangeFilter(Filter):
field_class = RangeField
def filter(self, qs, value):
if value:
if value.start is not None and value.stop is not None:
value = (value.start, value.stop)
elif value.start is not None:
self.lookup_expr = "startswith"
value = value.start
elif value.stop is not None:
self.lookup_expr = "endswith"
value = value.stop
return super().filter(qs, value)
class RangeFilter(Filter):
field_class = RangeField
def filter(self, qs, value):
if value:
if value.start is not None and value.stop is not None:
self.lookup_expr = "range"
value = (value.start, value.stop)
elif value.start is not None:
self.lookup_expr = "gte"
value = value.start
elif value.stop is not None:
self.lookup_expr = "lte"
value = value.stop
return super().filter(qs, value)
def _truncate(dt):
return dt.date()
class DateRangeFilter(ChoiceFilter):
choices = [
("today", _("Today")),
("yesterday", _("Yesterday")),
("week", _("Past 7 days")),
("month", _("This month")),
("year", _("This year")),
]
filters = {
"today": lambda qs, name: qs.filter(
**{
"%s__year" % name: now().year,
"%s__month" % name: now().month,
"%s__day" % name: now().day,
}
),
"yesterday": lambda qs, name: qs.filter(
**{
"%s__year" % name: (now() - timedelta(days=1)).year,
"%s__month" % name: (now() - timedelta(days=1)).month,
"%s__day" % name: (now() - timedelta(days=1)).day,
}
),
"week": lambda qs, name: qs.filter(
**{
"%s__gte" % name: _truncate(now() - timedelta(days=7)),
"%s__lt" % name: _truncate(now() + timedelta(days=1)),
}
),
"month": lambda qs, name: qs.filter(
**{"%s__year" % name: now().year, "%s__month" % name: now().month}
),
"year": lambda qs, name: qs.filter(
**{
"%s__year" % name: now().year,
}
),
}
def __init__(self, choices=None, filters=None, *args, **kwargs):
if choices is not None:
self.choices = choices
if filters is not None:
self.filters = filters
all_choices = list(
chain.from_iterable(
[subchoice[0] for subchoice in choice[1]]
if isinstance(choice[1], (list, tuple)) # This is an optgroup
else [choice[0]]
for choice in self.choices
)
)
unique = set(all_choices) ^ set(self.filters)
assert not unique, (
"Keys must be present in both 'choices' and 'filters'. Missing keys: "
"'%s'" % ", ".join(sorted(unique))
)
# null choice not relevant
kwargs.setdefault("null_label", None)
super().__init__(choices=self.choices, *args, **kwargs)
def filter(self, qs, value):
if not value:
return qs
assert value in self.filters
qs = self.filters[value](qs, self.field_name)
return qs.distinct() if self.distinct else qs
class DateFromToRangeFilter(RangeFilter):
field_class = DateRangeField
class DateTimeFromToRangeFilter(RangeFilter):
field_class = DateTimeRangeField
class IsoDateTimeFromToRangeFilter(RangeFilter):
field_class = IsoDateTimeRangeField
class TimeRangeFilter(RangeFilter):
field_class = TimeRangeField
class AllValuesFilter(ChoiceFilter):
@property
def field(self):
qs = self.model._default_manager.distinct()
qs = qs.order_by(self.field_name).values_list(self.field_name, flat=True)
self.extra["choices"] = [(o, o) for o in qs]
return super().field
class AllValuesMultipleFilter(MultipleChoiceFilter):
@property
def field(self):
qs = self.model._default_manager.distinct()
qs = qs.order_by(self.field_name).values_list(self.field_name, flat=True)
self.extra["choices"] = [(o, o) for o in qs]
return super().field
class BaseCSVFilter(Filter):
"""
Base class for CSV type filters, such as IN and RANGE.
"""
base_field_class = BaseCSVField
def __init__(self, *args, **kwargs):
kwargs.setdefault("help_text", _("Multiple values may be separated by commas."))
super().__init__(*args, **kwargs)
class ConcreteCSVField(self.base_field_class, self.field_class):
pass
ConcreteCSVField.__name__ = self._field_class_name(
self.field_class, self.lookup_expr
)
self.field_class = ConcreteCSVField
@classmethod
def _field_class_name(cls, field_class, lookup_expr):
"""
Generate a suitable class name for the concrete field class. This is not
completely reliable, as not all field class names are of the format
<Type>Field.
ex::
BaseCSVFilter._field_class_name(DateTimeField, 'year__in')
returns 'DateTimeYearInField'
"""
# DateTimeField => DateTime
type_name = field_class.__name__
if type_name.endswith("Field"):
type_name = type_name[:-5]
# year__in => YearIn
parts = lookup_expr.split(LOOKUP_SEP)
expression_name = "".join(p.capitalize() for p in parts)
# DateTimeYearInField
return str("%s%sField" % (type_name, expression_name))
class BaseInFilter(BaseCSVFilter):
def __init__(self, *args, **kwargs):
kwargs.setdefault("lookup_expr", "in")
super().__init__(*args, **kwargs)
class BaseRangeFilter(BaseCSVFilter):
base_field_class = BaseRangeField
def __init__(self, *args, **kwargs):
kwargs.setdefault("lookup_expr", "range")
super().__init__(*args, **kwargs)
class LookupChoiceFilter(Filter):
"""
A combined filter that allows users to select the lookup expression from a dropdown.
* ``lookup_choices`` is an optional argument that accepts multiple input
formats, and is ultimately normalized as the choices used in the lookup
dropdown. See ``.get_lookup_choices()`` for more information.
* ``field_class`` is an optional argument that allows you to set the inner
form field class used to validate the value. Default: ``forms.CharField``
ex::
price = django_filters.LookupChoiceFilter(
field_class=forms.DecimalField,
lookup_choices=[
('exact', 'Equals'),
('gt', 'Greater than'),
('lt', 'Less than'),
]
)
"""
field_class = forms.CharField
outer_class = LookupChoiceField
def __init__(
self, field_name=None, lookup_choices=None, field_class=None, **kwargs
):
self.empty_label = kwargs.pop("empty_label", settings.EMPTY_CHOICE_LABEL)
super(LookupChoiceFilter, self).__init__(field_name=field_name, **kwargs)
self.lookup_choices = lookup_choices
if field_class is not None:
self.field_class = field_class
@classmethod
def normalize_lookup(cls, lookup):
"""
Normalize the lookup into a tuple of ``(lookup expression, display value)``
If the ``lookup`` is already a tuple, the tuple is not altered.
If the ``lookup`` is a string, a tuple is returned with the lookup
expression used as the basis for the display value.
ex::
>>> LookupChoiceFilter.normalize_lookup(('exact', 'Equals'))
('exact', 'Equals')
>>> LookupChoiceFilter.normalize_lookup('has_key')
('has_key', 'Has key')
"""
if isinstance(lookup, str):
return (lookup, pretty_name(lookup))
return (lookup[0], lookup[1])
def get_lookup_choices(self):
"""
Get the lookup choices in a format suitable for ``django.forms.ChoiceField``.
If the filter is initialized with ``lookup_choices``, this value is normalized
and passed to the underlying ``LookupChoiceField``. If no choices are provided,
they are generated from the corresponding model field's registered lookups.
"""
lookups = self.lookup_choices
if lookups is None:
field = get_model_field(self.model, self.field_name)
lookups = field.get_lookups()
return [self.normalize_lookup(lookup) for lookup in lookups]
@property
def field(self):
if not hasattr(self, "_field"):
inner_field = super().field
lookups = self.get_lookup_choices()
self._field = self.outer_class(
inner_field,
lookups,
label=self.label,
empty_label=self.empty_label,
required=self.extra["required"],
)
return self._field
def filter(self, qs, lookup):
if not lookup:
return super().filter(qs, None)
self.lookup_expr = lookup.lookup_expr
return super().filter(qs, lookup.value)
class OrderingFilter(BaseCSVFilter, ChoiceFilter):
"""
Enable queryset ordering. As an extension of ``ChoiceFilter`` it accepts
two additional arguments that are used to build the ordering choices.
* ``fields`` is a mapping of {model field name: parameter name}. The
parameter names are exposed in the choices and mask/alias the field
names used in the ``order_by()`` call. Similar to field ``choices``,
``fields`` accepts the 'list of two-tuples' syntax that retains order.
``fields`` may also just be an iterable of strings. In this case, the
field names simply double as the exposed parameter names.
* ``field_labels`` is an optional argument that allows you to customize
the display label for the corresponding parameter. It accepts a mapping
of {field name: human readable label}. Keep in mind that the key is the
field name, and not the exposed parameter name.
Additionally, you can just provide your own ``choices`` if you require
explicit control over the exposed options. For example, when you might
want to disable descending sort options.
This filter is also CSV-based, and accepts multiple ordering params. The
default select widget does not enable the use of this, but it is useful
for APIs.
"""
descending_fmt = _("%s (descending)")
def __init__(self, *args, **kwargs):
"""
``fields`` may be either a mapping or an iterable.
``field_labels`` must be a map of field names to display labels
"""
fields = kwargs.pop("fields", {})
fields = self.normalize_fields(fields)
field_labels = kwargs.pop("field_labels", {})
self.param_map = {v: k for k, v in fields.items()}
if "choices" not in kwargs:
kwargs["choices"] = self.build_choices(fields, field_labels)
kwargs.setdefault("label", _("Ordering"))
kwargs.setdefault("help_text", "")
kwargs.setdefault("null_label", None)
super().__init__(*args, **kwargs)
def get_ordering_value(self, param):
descending = param.startswith("-")
param = param[1:] if descending else param
field_name = self.param_map.get(param, param)
return "-%s" % field_name if descending else field_name
def filter(self, qs, value):
if value in EMPTY_VALUES:
return qs
ordering = [
self.get_ordering_value(param)
for param in value
if param not in EMPTY_VALUES
]
return qs.order_by(*ordering)
@classmethod
def normalize_fields(cls, fields):
"""
Normalize the fields into an ordered map of {field name: param name}
"""
# fields is a mapping, copy into new OrderedDict
if isinstance(fields, dict):
return OrderedDict(fields)
# convert iterable of values => iterable of pairs (field name, param name)
assert isinstance(
fields, Iterable
), "'fields' must be an iterable (e.g., a list, tuple, or mapping)."
# fields is an iterable of field names
assert all(
isinstance(field, str)
or isinstance(field, Iterable)
and len(field) == 2 # may need to be wrapped in parens
for field in fields
), "'fields' must contain strings or (field name, param name) pairs."
return OrderedDict([(f, f) if isinstance(f, str) else f for f in fields])
def build_choices(self, fields, labels):
ascending = [
(param, labels.get(field, _(pretty_name(param))))
for field, param in fields.items()
]
descending = [
("-%s" % param, labels.get("-%s" % param, self.descending_fmt % label))
for param, label in ascending
]
# interleave the ascending and descending choices
return [val for pair in zip(ascending, descending) for val in pair]
class FilterMethod:
"""
This helper is used to override Filter.filter() when a 'method' argument
is passed. It proxies the call to the actual method on the filter's parent.
"""
def __init__(self, filter_instance):
self.f = filter_instance
def __call__(self, qs, value):
if value in EMPTY_VALUES:
return qs
return self.method(qs, self.f.field_name, value)
@property
def method(self):
"""
Resolve the method on the parent filterset.
"""
instance = self.f
# noop if 'method' is a function
if callable(instance.method):
return instance.method
# otherwise, method is the name of a method on the parent FilterSet.
assert hasattr(
instance, "parent"
), "Filter '%s' must have a parent FilterSet to find '.%s()'" % (
instance.field_name,
instance.method,
)
parent = instance.parent
method = getattr(parent, instance.method, None)
assert callable(
method
), "Expected parent FilterSet '%s.%s' to have a '.%s()' method." % (
parent.__class__.__module__,
parent.__class__.__name__,
instance.method,
)
return method

Powered by TurnKey Linux.