Source code for akvo.rest.viewsets

# -*- coding: utf-8 -*-

# Akvo RSR is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.
import hashlib
import logging

from django.db import transaction
from django.db.models import QuerySet
from django.db.models.fields.related import ForeignObject
from django.core.exceptions import FieldError, FieldDoesNotExist
from django.db.models.deletion import ProtectedError
from django.utils.translation import gettext_lazy as _
from rest_framework.response import Response
from rest_framework import status

from akvo.cache import cache_with_key
from akvo.cache.prepo import QuerysetPrePo
from akvo.rsr.models import PublishingStatus, Project, User
from akvo.rsr.usecases.iati_validation import schedule_iati_activity_validation
from akvo.rest.authentication import JWTAuthentication, TastyTokenAuthentication
from akvo.rest.cache import delete_project_from_project_directory_cache
from akvo.utils import log_project_changes, get_project_for_object

from rest_framework import authentication, exceptions, filters, permissions, viewsets

from .filters import RSRGenericFilterBackend
from .pagination import TastypieOffsetPagination

logger = logging.getLogger(__name__)


[docs]class SafeMethodsPermissions(permissions.DjangoObjectPermissions): """ Base class to allow any safe methods ('GET', 'OPTIONS' and 'HEAD') without needing to authenticate. """
[docs] def has_permission(self, request, view): if request.method in permissions.SAFE_METHODS: return True return super(SafeMethodsPermissions, self).has_permission(request, view)
[docs]class BaseRSRViewSet(viewsets.ModelViewSet): """ Base class used for the view sets for RSR models. Provides unified auth and perms settings. """ authentication_classes = (authentication.SessionAuthentication, TastyTokenAuthentication, JWTAuthentication) permission_classes = (SafeMethodsPermissions, ) filter_backends = (filters.OrderingFilter, RSRGenericFilterBackend,) ordering_fields = '__all__'
[docs] def paginate_queryset(self, queryset): """ Custom offset-based pagination for the Tastypie API emulation """ if self.request and '/api/v1/' in self.request.path: self.pagination_class = TastypieOffsetPagination return super(BaseRSRViewSet, self).paginate_queryset(queryset)
[docs] def filter_queryset(self, queryset): def django_filter_filters(request): """ Support emulating the DjangoFilterBackend-based filtering that some views used to have """ # query string keys reserved by the RSRGenericFilterBackend qs_params = ['filter', 'exclude', 'select_related', 'prefetch_related', ] # query string keys used by core DRF, OrderingFilter and Akvo custom views exclude_params = ['limit', 'format', 'page', 'offset', 'ordering', 'partner_type', 'sync_owner', 'reporting_org', ] filters = {} for key in request.query_params: if key not in qs_params + exclude_params and not key.startswith('image_thumb_'): filters.update({key: request.query_params.get(key)}) return filters def get_lookups_from_filters(legacy_filters): """ Cast the values in DjangoFilterBackend-styled query string filters to correct types to be able to use them in regular queryset-filter() calls """ # types of lookups supported by the views using DjangoFilterBackend LEGACY_FIELD_LOOKUPS = ['exact', 'contains', 'icontains', 'gt', 'gte', 'lt', 'lte', ] query_set_lookups = [] for key, value in legacy_filters.items(): parts = key.split('__') if parts[-1] in LEGACY_FIELD_LOOKUPS: parts = parts[:-1] model = queryset.model for part in parts: try: field_object = model._meta.get_field(part) related_model = field_object.model direct = not field_object.auto_created or field_object.concrete if direct: if issubclass(field_object.__class__, ForeignObject): model = field_object.related_model else: value = field_object.to_python(value) break else: model = related_model except FieldDoesNotExist: pass query_set_lookups += [{key: value}] return query_set_lookups queryset = super().filter_queryset(queryset) # support for old DjangoFilterBackend-based filtering if not pk is given if not self.kwargs.get('pk'): # find all "old styled" filters legacy_filters = django_filter_filters(self.request) # create lookup dicts from the filters found lookups = get_lookups_from_filters(legacy_filters) for lookup in lookups: try: queryset = queryset.filter(**lookup) except (FieldError, ValueError): # In order to mimick 'old' behaviour of the API, we should ignore non-valid # parameters or values. Returning a warning would be more preferable. pass return queryset
[docs]class ReadOnlyPublicProjectViewSet(viewsets.ReadOnlyModelViewSet): """ Read only viewset for public projects or objects related to public projects will be shown. """ authentication_classes = (authentication.SessionAuthentication, TastyTokenAuthentication, ) permission_classes = (SafeMethodsPermissions, ) ordering_fields = '__all__' project_relation = 'project__'
[docs] def get_queryset(self): request = self.request user = request.user queryset = super(ReadOnlyPublicProjectViewSet, self).get_queryset() # filter projects if user is "non-privileged" if user.is_anonymous or not (user.is_superuser or user.is_admin) and self.action == 'list': queryset = _projects_filter_for_non_privileged_users( user, queryset, self.project_relation, action=self.action ) return queryset.distinct()
[docs]class PublicProjectViewSet(BaseRSRViewSet): """ Only public projects or objects related to public projects will be shown. """ # project_relation is the default string for constructing a field lookup to the is_public field # on the related Project. Override this in when the viewset is for a model that doesn't have a # direct FK to Project or the FK field isn't named project. E.g. IndicatorViewSet: # project_relation = 'result__project__' # The lookup is used to filter out objects associated with private projects, see below. project_relation = 'project__'
[docs] def filter_queryset(self, queryset): if hasattr(self, '_cached_filtered_queryset'): return self._cached_filtered_queryset self._cached_filtered_queryset = self._filter_queryset(queryset) return self._cached_filtered_queryset
def _filter_queryset(self, queryset): request = self.request user = request.user queryset = super().filter_queryset(queryset) # filter projects if user is "non-privileged" if user.is_anonymous or not (user.is_superuser or user.is_admin) and self.action == 'list': queryset = self.projects_filter_for_non_privileged_users( user, queryset, self.project_relation, action=self.action ) return queryset.distinct()
[docs] def create(self, request, *args, **kwargs): model_name = self.queryset.model._meta.model_name app_name = self.queryset.model._meta.app_label perm = '{}.add_{}'.format(app_name, model_name) with transaction.atomic(): response = super(PublicProjectViewSet, self).create(request, *args, **kwargs) user = request.user obj = self.queryset.model.objects.get(pk=response.data['id']) project = get_project_for_object(Project, obj) # Delete the object if the user doesn't have the right permissions # to create this. The object may get created without checking for # permissions, since the viewset returns True if the user just has # the required role in any organisation. If the newly created # object is not a project, and the user doesn't have permissions to # create it, we delete the object. if obj != project and not (user.has_perm('rsr.view_project', project) and user.has_perm(perm, obj)): obj.delete() if obj.pk is None: raise exceptions.PermissionDenied elif project is not None: log_project_changes(request.user, project, obj, {}, 'added') delete_project_from_project_directory_cache(project.pk) schedule_iati_activity_validation(project) return response
[docs] def destroy(self, request, *args, **kwargs): obj = self.get_object() project = get_project_for_object(Project, obj) try: response = super(PublicProjectViewSet, self).destroy(request, *args, **kwargs) except ProtectedError: msg = _("{}s with updates cannot be deleted".format(self.queryset.model.__name__)) return Response(msg, status=status.HTTP_405_METHOD_NOT_ALLOWED) if project is not None: log_project_changes(request.user, project, obj, {}, 'deleted') delete_project_from_project_directory_cache(project.pk) if not isinstance(obj, Project): schedule_iati_activity_validation(project) return response
[docs] def update(self, request, *args, **kwargs): response = super(PublicProjectViewSet, self).update(request, *args, **kwargs) obj = self.get_object() project = get_project_for_object(Project, obj) if project is not None: log_project_changes(request.user, project, obj, request.data, 'changed') delete_project_from_project_directory_cache(project.pk) schedule_iati_activity_validation(project) return response
[docs] @staticmethod def projects_filter_for_non_privileged_users(user, queryset, project_relation, action='create'): return _projects_filter_for_non_privileged_users(user, queryset, project_relation, action)
[docs]def make_projects_filter_cache_prefix(user: User): """Makes a prefix for cache keys for calls to _projects_filter_for_non_privileged_users""" return f"projects_filter:user_{user.id}"
[docs]def make_projects_filter_cache_key(user: User, queryset: QuerySet, project_relation: str, action: str) -> str: """Makes a cache key that can be used for _projects_filter_for_non_privileged_users""" # we hash it because the queryset.query (sql query) can be quite long args_hash = hashlib.md5(f"{queryset.query if queryset else ''}{project_relation}{action}".encode()).hexdigest() return f"{make_projects_filter_cache_prefix(user)}:{args_hash}"
# Stop-gap solution until a reorg and optimization of the models is done @cache_with_key( make_projects_filter_cache_key, timeout=3600, cache_name='default', prepo_pickle=QuerysetPrePo, ) def _projects_filter_for_non_privileged_users(user: User, queryset: QuerySet, project_relation: str, action: str = 'create'): if not user.is_anonymous and (user.is_admin or user.is_superuser): return queryset.distinct() # Construct the public projects filter field lookup. project_filter = project_relation + 'is_public' # Filter the object list into two querysets; # One where the related Projects are public and one where they are private public_objects = queryset.filter(**{project_filter: True}).distinct() private_objects = queryset.filter(**{project_filter: False}).distinct() # In case of an anonymous user, only return the public objects if user.is_anonymous: unpublished_exclude = project_relation + 'publishingstatus__status' queryset = public_objects.exclude( **{unpublished_exclude: PublishingStatus.STATUS_UNPUBLISHED} ).distinct() # Otherwise, check to which objects the user has (change) permission elif private_objects.exists(): include_user_owned = hasattr(queryset.model, 'user') if action == 'list': # The view permission is new, and previously only the change # permission existed. To avoid adding new view permissions for # all the objects, we also check if a user has change # permissions, which implicitly implies view permissions. change_permission = type(private_objects[0])._meta.db_table.replace('_', '.change_') change_filter = user.get_permission_filter( change_permission, project_relation, include_user_owned ) change_objects = private_objects.filter(change_filter).distinct() # Check if user has view permission on the queryset view_permission = change_permission.replace('.change_', '.view_') view_filter = user.get_permission_filter( view_permission, project_relation, include_user_owned ) view_objects = private_objects.filter(view_filter).distinct() private_objects = (change_objects | view_objects).distinct() else: permission = type(private_objects[0])._meta.db_table.replace('_', '.change_') filter_ = user.get_permission_filter(permission, project_relation, include_user_owned) private_objects = private_objects.filter(filter_).distinct() queryset = public_objects | private_objects return queryset.distinct()
[docs]def is_project_editor_change(request): try: project_editor_change = request.data.get('project_editor_change', False) except exceptions.UnsupportedMediaType: project_editor_change = False return project_editor_change