# -*- coding: utf-8 -*-
# Akvo RSR is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.
import json
import os
from akvo.rsr.models import IndicatorPeriodData, IndicatorPeriodDataComment, Project, Indicator
from akvo.rest.authentication import TastyTokenAuthentication, JWTAuthentication
from akvo.rsr.models.result.utils import QUANTITATIVE, PERCENTAGE_MEASURE
from akvo.rsr.usecases.previous_cumulative_update_by_user import get_previous_cumulative_update_value
from ..serializers import (IndicatorPeriodDataSerializer, IndicatorPeriodDataFrameworkSerializer,
IndicatorPeriodDataCommentSerializer)
from ..viewsets import PublicProjectViewSet
from django.shortcuts import get_object_or_404
from django.http import HttpResponseBadRequest, HttpResponseForbidden
from django.contrib.admin.models import LogEntry, ADDITION, CHANGE, DELETION
from django.contrib.contenttypes.models import ContentType
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import api_view, authentication_classes
from rest_framework.response import Response
from rest_framework.utils.encoders import JSONEncoder
[docs]class IndicatorPeriodDataViewSet(PublicProjectViewSet):
"""
"""
queryset = IndicatorPeriodData.objects.select_related('user', 'approved_by').all()
serializer_class = IndicatorPeriodDataSerializer
project_relation = 'period__indicator__result__project__'
[docs] def filter_queryset(self, queryset):
queryset = super(IndicatorPeriodDataViewSet, self).filter_queryset(queryset)
return IndicatorPeriodData.get_user_viewable_updates(
queryset, self.request.user
)
[docs]class IndicatorPeriodDataFrameworkViewSet(PublicProjectViewSet):
"""
"""
authentication_classes = (SessionAuthentication, TastyTokenAuthentication, JWTAuthentication)
queryset = IndicatorPeriodData.objects.select_related(
'period',
'user',
'approved_by',
).prefetch_related(
'comments',
'disaggregations',
).all()
serializer_class = IndicatorPeriodDataFrameworkSerializer
project_relation = 'period__indicator__result__project__'
[docs] def get_object(self):
obj = get_object_or_404(self.get_queryset(), pk=self.kwargs['pk'])
# check whether the user has permission
viewables = IndicatorPeriodData.get_user_viewable_updates(
self.get_queryset().filter(pk=self.kwargs['pk']),
self.request.user
)
if viewables.count() == 0:
self.permission_denied(self.request)
return obj
[docs] def filter_queryset(self, queryset):
queryset = super().filter_queryset(queryset)
queryset = IndicatorPeriodData.get_user_viewable_updates(
queryset, self.request.user
)
return queryset
[docs]@api_view(['POST', 'DELETE'])
@authentication_classes([SessionAuthentication, TastyTokenAuthentication, JWTAuthentication])
def period_update_files(request, update_pk, file_pk=None):
update = get_object_or_404(IndicatorPeriodData, pk=update_pk)
user = request.user
if not user.has_perm('rsr.change_indicatorperioddata', update):
return Response({'error': 'User has no permission to add/remove files'}, status=status.HTTP_403_FORBIDDEN)
if request.method == 'POST' and not file_pk:
serializer = IndicatorPeriodDataFrameworkSerializer(instance=update, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
files = [f"Uploaded file \"{file.name}\"" for file in serializer.validated_data.get('files', [])]
serializer.save(user=user)
log_data = {'audit_trail': True, 'data': {'files': files}}
LogEntry.objects.log_action(
user_id=user.id,
content_type_id=ContentType.objects.get_for_model(IndicatorPeriodData).id,
object_id=update.id,
object_repr=str(update),
action_flag=CHANGE,
change_message=json.dumps(log_data)
)
return Response(serializer.data['file_set'])
if request.method == 'DELETE' and file_pk:
file = update.indicatorperioddatafile_set.get(pk=file_pk)
filename = os.path.basename(file.file.name)
file.delete()
update.user = user
update.save(update_fields=['user'])
log_data = {'audit_trail': True, 'data': {'files': [f"Removed file \"{filename}\""]}}
LogEntry.objects.log_action(
user_id=user.id,
content_type_id=ContentType.objects.get_for_model(IndicatorPeriodData).id,
object_id=update.id,
object_repr=str(update),
action_flag=CHANGE,
change_message=json.dumps(log_data)
)
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
[docs]@api_view(['POST', 'DELETE'])
@authentication_classes([SessionAuthentication, TastyTokenAuthentication, JWTAuthentication])
def period_update_photos(request, update_pk, photo_pk=None):
update = get_object_or_404(IndicatorPeriodData, pk=update_pk)
user = request.user
if user != update.user:
return Response({'error': 'User has no permission to add/remove photos'}, status=status.HTTP_403_FORBIDDEN)
if request.method == 'POST' and not photo_pk:
serializer = IndicatorPeriodDataFrameworkSerializer(instance=update, data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
serializer.save(user=user)
return Response(serializer.data['photo_set'])
if request.method == 'DELETE' and photo_pk:
photo = update.indicatorperioddataphoto_set.get(pk=photo_pk)
photo.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED)
[docs]@api_view(['POST', 'DELETE'])
def indicator_upload_file(request, pk=None):
"""
Special API call for directly uploading a file.
:param request; A Django request object.
:param pk; The primary key of an IndicatorPeriodData instance.
"""
# Permissions
user = getattr(request, 'user', None)
if not user:
return Response({'error': 'User is not logged in'}, status=status.HTTP_403_FORBIDDEN)
# TODO: Check if user is allowed to upload a file
# if not user.has_perm('rsr.change_project', update.period.indicator.result.project):
# return Response({'error': 'User has no permission to place an update'},
# status=status.HTTP_403_FORBIDDEN)
update = IndicatorPeriodData.objects.get(pk=pk)
if request.method == 'DELETE':
try:
if request.data['type'] == 'photo':
update.photo = ''
update.save(update_fields=['photo'])
return Response({}, status=status.HTTP_204_NO_CONTENT)
elif request.data['type'] == 'file':
update.file = ''
update.save(update_fields=['file'])
return Response({}, status=status.HTTP_204_NO_CONTENT)
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
else: # POST
upload_file = request.data['file']
try:
file_type = request.POST.copy()['type']
if file_type == 'photo':
update.photo = upload_file
update.save(update_fields=['photo'])
# Add photo member to be able to distinguish from file URL in new results version
# while keeping the old API
return Response({'file': update.photo.url, 'photo': update.photo.url})
elif file_type == 'file':
update.file = upload_file
update.save(update_fields=['file'])
return Response({'file': update.file.url})
except Exception as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)
[docs]@api_view(['POST'])
@authentication_classes([SessionAuthentication, TastyTokenAuthentication])
def set_updates_status(request, project_pk):
"""Bulk update IndicatorPeriodData.status attributes of a project.
"""
update_ids = request.data.get('updates', [])
status = request.data.get('status', None)
if len(update_ids) < 1 or status is None:
return HttpResponseBadRequest()
user = request.user
project = get_object_or_404(Project, pk=project_pk)
if not user.has_perm('rsr.change_project', project):
return HttpResponseForbidden()
IndicatorPeriodData.objects\
.filter(id__in=update_ids, period__indicator__result__project=project)\
.update(status=status)
log_data = {'audit_trail': True, 'data': {'status': status}}
for update_id in update_ids:
LogEntry.objects.log_action(
user_id=user.id,
content_type_id=ContentType.objects.get_for_model(IndicatorPeriodData).id,
object_id=update_id,
object_repr='IndicatorPeriodData',
action_flag=CHANGE,
change_message=json.dumps(log_data)
)
return Response({'success': True})
[docs]@api_view(['GET'])
@authentication_classes([SessionAuthentication, TastyTokenAuthentication])
def indicator_previous_cumulative_update(request, project_pk, indicator_pk):
user = request.user
queryset = Indicator.objects.select_related('result__project')
indicator = get_object_or_404(queryset, pk=indicator_pk)
project = indicator.result.project
if project.id != int(project_pk) or not user.has_perm('rsr.view_project', project):
return HttpResponseForbidden()
data = get_previous_cumulative_update_value(user, indicator)
return Response(data)