Source code for akvo.rest.views.project_overview

# -*- coding: utf-8 -*-

# Akvo RSR is covered by the GNU Affero General Public License.

# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.


from akvo.codelists.models import ResultType
from akvo.rest.authentication import TastyTokenAuthentication
from akvo.rsr.models import Project, Result, Indicator, IndicatorPeriod, IndicatorPeriodData
from akvo.rsr.models.result.utils import QUANTITATIVE, QUALITATIVE, PERCENTAGE_MEASURE, calculate_percentage
from akvo.utils import ensure_decimal
from dataclasses import dataclass, field
from datetime import date
from decimal import Decimal
from django.conf import settings
from django.db.models import Count, Sum
from django.shortcuts import get_object_or_404
from functools import cached_property, lru_cache
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import api_view, authentication_classes
from rest_framework.response import Response
from rest_framework.status import HTTP_403_FORBIDDEN
from typing import Optional, Dict, List, Set


[docs]@dataclass(frozen=True) class ContributingProjectData(object): id: int title: Optional[str] = None country_code: Optional[str] = None aggregate_children: bool = True aggregate_to_parent: bool = True partners: Dict[int, str] = field(default_factory=dict)
[docs] @classmethod def make(cls, data, prefix=''): obj = cls( id=data[f"{prefix}id"], title=data.get(f"{prefix}title", None), aggregate_children=data.get(f"{prefix}aggregate_children", True), aggregate_to_parent=data.get(f"{prefix}aggregate_to_parent", True), country_code=data.get(f"{prefix}primary_location__country__iso_code", None), ) partner_id = data.get(f"{prefix}partners__id", None) partner_name = data.get(f"{prefix}partners__name", None) if not partner_id and partner_name: obj.partners[partner_id] = partner_name return obj
[docs]@dataclass(frozen=True) class ContributingResultData(object): id: int project: ContributingProjectData parent: Optional[int] = None contributors: List['ContributingResultData'] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], parent=data.get(f"{prefix}parent_result", None), project=ContributingProjectData.make(data, f"{prefix}project__") )
@cached_property def contributing_countries(self): if not self.project.aggregate_children: return set() local_project = set([self.project.country_code]) if self.project and self.project.country_code else set() contributors = set() for contrib in self.contributors: if not contrib.project.aggregate_to_parent: continue contributors = contributors | contrib.contributing_countries return local_project | contributors @cached_property def contributing_projects(self): if not self.project.aggregate_children: return {} projects = {self.project.id: self.project.title} if self.project else {} for contrib in self.contributors: if not contrib.project.aggregate_to_parent: continue for project_id, project_title in contrib.contributing_projects.items(): if project_id not in projects: projects[project_id] = project_title return projects @cached_property def contributing_partners(self): if not self.project.aggregate_children: return {} partners = self.project.partners if self.project else {} for contrib in self.contributors: if not contrib.project.aggregate_to_parent: continue for partner_id, partner_name in contrib.contributing_partners.items(): if partner_id not in partners: partners[partner_id] = partner_name return partners
[docs]@dataclass(frozen=True) class PeriodData: id: int period_start: Optional[date] = None period_end: Optional[date] = None
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], period_start=data.get(f"{prefix}period_start", None), period_end=data.get(f"{prefix}period_end", None), )
[docs]@dataclass(frozen=True) class IndicatorData(object): id: int title: Optional[str] = '' periods: List[PeriodData] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], title=data.get(f"{prefix}title", ''), )
@cached_property def reporting_periods(self): periods = set() for period in self.periods: periods = periods | set([(period.period_start, period.period_end)]) return periods
[docs]@dataclass(frozen=True) class ResultData(object): id: int title: Optional[str] = None type: Optional[str] = None indicators: List[IndicatorData] = field(default_factory=list) contributors: List[ContributingResultData] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], title=data.get(f"{prefix}title", None), type=data.get(f"{prefix}type", None), )
@property def indicator_count(self): return len(self.indicators) @cached_property def indicator_titles(self): return [i.title for i in self.indicators] @cached_property def reporting_periods(self): periods = set() for indicator in self.indicators: periods = periods | indicator.reporting_periods return periods @cached_property def iati_type_name(self): return self.get_codelist_name(self.type)
[docs] @staticmethod @lru_cache def get_codelist_name(code, version=settings.IATI_VERSION): try: type = ResultType.objects.get(code=code, version__code=version) return type.name except ResultType.DoesNotExist: return ''
@cached_property def contributing_countries(self): codes = set() for contrib in self.contributors: codes = codes | contrib.contributing_countries return codes @cached_property def contributing_projects(self): projects = {} for contrib in self.contributors: for project_id, project_title in contrib.contributing_projects.items(): if project_id not in projects: projects[project_id] = project_title return projects @cached_property def contributing_partners(self): partners = {} for contrib in self.contributors: for partner_id, partner_name in contrib.contributing_partners.items(): if partner_id not in partners: partners[partner_id] = partner_name return partners
[docs]def fetch_periods(project): return IndicatorPeriod.objects\ .select_related('indicator', 'indicator__result')\ .filter(indicator__result__project=project)\ .order_by('indicator__result__order', 'indicator__order')\ .values( 'id', 'period_start', 'period_end', 'indicator__id', 'indicator__title', 'indicator__result__id', 'indicator__result__title', 'indicator__result__type' )
[docs]def get_contributor_ids(root_result_ids: Set[int]): children = Result.objects.filter(parent_result__in=root_result_ids).values_list('id', flat=True) all_children = [] while children: all_children.append(children) children = set(Result.objects.filter(parent_result__in=children).values_list('id', flat=True)) return set([]).union(*all_children)
[docs]def fetch_contributing_results(root_result_ids: List[int]): contributor_ids = get_contributor_ids(set(root_result_ids)) return Result.objects\ .filter(id__in=contributor_ids)\ .values( 'id', 'parent_result', 'project__id', 'project__title', 'project__aggregate_children', 'project__aggregate_to_parent', 'project__primary_location__country__iso_code', 'project__partners__id', 'project__partners__name', )
[docs]def get_flat_contributors(root_result_ids: List[int]): lookup = {} raw_contributors = fetch_contributing_results(root_result_ids) for c in raw_contributors: contributor_id = c['id'] partner_id = c['project__partners__id'] if contributor_id not in lookup: contributor = ContributingResultData.make(c) lookup[contributor_id] = contributor if partner_id not in contributor.project.partners: contributor.project.partners[partner_id] = c['project__partners__name'] return lookup.values()
[docs]def hierarchize_contributors(contributors): tops = [] lookup = {it.id: it for it in contributors} ids = lookup.keys() for contributor in contributors: parent = contributor.parent if not parent or parent not in ids: tops.append(contributor) else: lookup[parent].contributors.append(contributor) return tops
[docs]def get_contributors(root_result_ids: List[int]): flat_contributors = get_flat_contributors(root_result_ids) return hierarchize_contributors(flat_contributors)
[docs]def get_results(project): raw_periods = fetch_periods(project) lookup = { 'results': {}, 'indicators': {}, 'periods': {}, } for r in raw_periods: result_id = r['indicator__result__id'] indicator_id = r['indicator__id'] period_id = r['id'] if result_id not in lookup['results']: lookup['results'][result_id] = ResultData.make(r, 'indicator__result__') result = lookup['results'][result_id] if indicator_id not in lookup['indicators']: indicator = IndicatorData.make(r, 'indicator__') result.indicators.append(indicator) lookup['indicators'][indicator_id] = indicator else: indicator = lookup['indicators'][indicator_id] if period_id not in lookup['periods']: period = PeriodData.make(r) indicator.periods.append(period) lookup['periods'][period_id] = period contributors = get_contributors(lookup['results'].keys()) for c in contributors: result_id = c.parent if result_id in lookup['results']: lookup['results'][result_id].contributors.append(c) return lookup['results'].values()
[docs]@api_view(['GET']) @authentication_classes([SessionAuthentication, TastyTokenAuthentication]) def project_results(request, pk): queryset = Project.objects.prefetch_related('results') project = get_object_or_404(queryset, pk=pk) if not request.user.has_perm('rsr.view_project', project): return Response('Request not allowed', status=HTTP_403_FORBIDDEN) results = get_results(project) data = { 'id': project.id, 'title': project.title, 'subtitle': project.subtitle, 'targets_at': project.targets_at, 'results': [ { 'id': r.id, 'title': r.title, 'indicator_count': r.indicator_count, 'indicator_titles': r.indicator_titles, 'type': r.iati_type_name, 'countries': r.contributing_countries, 'contributors': r.contributing_projects, 'partners': r.contributing_partners, 'periods': r.reporting_periods, } for r in results ] } return Response(data)
[docs]def is_aggregating_targets(project): return project.id in settings.AGGREGATE_TARGETS
[docs]@api_view(['GET']) @authentication_classes([SessionAuthentication, TastyTokenAuthentication]) def project_result_overview(request, project_pk, result_pk): queryset = Result.objects.prefetch_related( 'indicators', 'indicators__periods').select_related('project') result = get_object_or_404(queryset, pk=result_pk) project = result.project if project.id != int(project_pk) or not request.user.has_perm('rsr.view_project', project): return Response('Request not allowed', status=HTTP_403_FORBIDDEN) program = project.get_program() targets_at = program.targets_at if program else project.targets_at aggregate_targets = is_aggregating_targets(project) data = { 'id': result.id, 'title': result.title, 'indicators': [ { 'id': i.id, 'title': i.title, 'description': i.description, 'period_count': len(i.periods.all()), 'type': 'quantitative' if i.type == QUANTITATIVE else 'qualitative', 'baseline_year': i.baseline_year, 'baseline_value': i.baseline_value, 'target_value': _get_indicator_target(i, targets_at, aggregate_targets), 'score_options': i.scores, 'measure': ( 'unit' if i.measure == '1' else 'percentage' if i.measure == '2' else None), 'periods': _drilldown_indicator_periods_contributions(i, aggregate_targets), 'disaggregation_targets': _transform_disaggregation_targets(i), 'cumulative': i.cumulative, } for i in result.indicators.all() ] } return Response(data)
[docs]@api_view(['GET']) @authentication_classes([SessionAuthentication, TastyTokenAuthentication]) def project_indicator_overview(request, project_pk, indicator_pk): queryset = Indicator.objects.prefetch_related('periods').select_related('result__project') indicator = get_object_or_404(queryset, pk=indicator_pk) project = indicator.result.project if project.id != int(project_pk) or not request.user.has_perm('rsr.view_project', project): return Response('Request not allowed', status=HTTP_403_FORBIDDEN) program = project.get_program() targets_at = program.targets_at if program else project.targets_at aggregate_targets = is_aggregating_targets(project) data = { 'id': indicator.id, 'title': indicator.title, 'description': indicator.description, 'period_count': len(indicator.periods.all()), 'type': 'quantitative' if indicator.type == QUANTITATIVE else 'qualitative', 'baseline_year': indicator.baseline_year, 'baseline_value': indicator.baseline_value, 'target_value': _get_indicator_target(indicator, targets_at, aggregate_targets), 'measure': ( 'unit' if indicator.measure == '1' else 'percentage' if indicator.measure == '2' else None), 'periods': _drilldown_indicator_periods_contributions(indicator), 'disaggregation_targets': _transform_disaggregation_targets(indicator), } return Response(data)
def _drilldown_indicator_periods_contributions(indicator, aggregate_targets=False): periods = _get_indicator_periods_hierarchy_flatlist(indicator) periods_tree = _make_objects_hierarchy_tree(periods, 'parent_period') return [_transform_period_contributions_node(n, aggregate_targets) for n in periods_tree] def _get_indicator_hierarchy_ids(indicator): family = {indicator.id} while True: children = set(Indicator.objects.filter(parent_indicator__in=family).values_list('pk', flat=True)) if family.union(children) == family: break family = family.union(children) return family def _get_indicator_target(indicator, targets_at=None, aggregate_targets=False): if targets_at != 'indicator': return None if indicator.type == QUALITATIVE: return indicator.target_value if indicator.measure == PERCENTAGE_MEASURE or not aggregate_targets: return ensure_decimal(indicator.target_value) hierarchy_ids = _get_indicator_hierarchy_ids(indicator) result = Indicator.objects.filter(id__in=hierarchy_ids).aggregate(Sum('target_value')) return ensure_decimal(result['target_value__sum']) def _get_indicator_periods_hierarchy_flatlist(indicator): family = {period.id for period in indicator.periods.all()} while True: children = set( IndicatorPeriod.objects.filter(parent_period__in=family).values_list('pk', flat=True)) if family.union(children) == family: break family = family.union(children) periods = IndicatorPeriod.objects.select_related( 'indicator__result__project', 'indicator__result__project__primary_location__country', 'parent_period', ).prefetch_related( 'data', 'data__user', 'data__approved_by', 'data__comments', 'data__comments__user', 'data__disaggregations', 'data__disaggregations__dimension_value', 'data__disaggregations__dimension_value__name', 'disaggregation_targets', 'disaggregation_targets__dimension_value', 'disaggregation_targets__dimension_value__name' ).annotate( num_aggregation_jobs=Count("aggregation_jobs"), num_child_aggregation_jobs=Count("child_aggregation_jobs"), ).filter(pk__in=family) return periods def _make_objects_hierarchy_tree(objects, parent_attr): tree = [] lookup = {} ids = [o.id for o in objects] for obj in objects: item_id = obj.id if item_id not in lookup: lookup[item_id] = {'children': []} lookup[item_id]['item'] = obj node = lookup[item_id] parent_obj = getattr(obj, parent_attr) parent_id = parent_obj.id if parent_obj else None if not parent_id or parent_id not in ids: tree.append(node) else: if parent_id not in lookup: lookup[parent_id] = {'children': []} lookup[parent_id]['children'].append(node) return tree def _transform_period_contributions_node(node, aggregate_targets=False): period = node['item'] is_percentage = period.indicator.measure == PERCENTAGE_MEASURE is_qualitative = period.indicator.type == QUALITATIVE actual_numerator, actual_denominator = None, None updates_value, updates_numerator, updates_denominator = None, None, None contributors, countries, aggregates, disaggregations = _transform_contributions_hierarchy( node['children'], is_percentage, node['item'].num_child_aggregation_jobs, ) aggregated_value, aggregated_numerator, aggregated_denominator = aggregates updates = _transform_updates(period) if is_percentage: updates_numerator, updates_denominator = _extract_percentage_updates(updates) updates_value = calculate_percentage(updates_numerator, updates_denominator) actual_numerator, actual_denominator = updates_numerator, updates_denominator if aggregated_numerator: actual_numerator += aggregated_numerator if aggregated_denominator: actual_denominator += aggregated_denominator actual_value = calculate_percentage(actual_numerator, actual_denominator) else: actual_value = ensure_decimal(period.actual_value) updates_value = _calculate_update_values(updates) if is_qualitative: target = period.target_value elif aggregate_targets and not is_percentage: target = _aggregate_targets(node) else: target = ensure_decimal(period.target_value) result = { 'period_id': period.id, 'period_start': period.period_start, 'period_end': period.period_end, 'locked': period.locked, 'actual_comment': period.actual_comment.split(' | ') if period.actual_comment else None, 'actual_value': actual_value, 'actual_numerator': actual_numerator, 'actual_denominator': actual_denominator, 'can_add_update': period.can_save_update(), 'target_value': target, 'countries': countries, 'updates': updates, 'updates_value': updates_value, 'updates_numerator': updates_numerator, 'updates_denominator': updates_denominator, 'updates_score_index': period.score_index, 'updates_score_indices': period.score_indices, 'contributors': contributors, 'disaggregation_contributions': list(disaggregations.values()), 'disaggregation_targets': _transform_disaggregation_targets(period), } return result def _aggregate_targets(node): aggregate = ensure_decimal(node['item'].target_value) for child in node['children']: aggregate += _aggregate_targets(child) return aggregate def _transform_contributions_hierarchy(tree, is_percentage, root_has_aggregation_job): contributors = [] contributor_countries = [] aggregated_value = Decimal(0) if not is_percentage else None aggregated_numerator = Decimal(0) if is_percentage else None aggregated_denominator = Decimal(0) if is_percentage else None disaggregations = {} for node in tree: contributor, countries = _transform_contributor_node(node, is_percentage, root_has_aggregation_job) if contributor: contributors.append(contributor) contributor_countries = _merge_unique(contributor_countries, countries) if not is_percentage: aggregated_value += contributor['actual_value'] else: aggregated_numerator += contributor['actual_numerator'] aggregated_denominator += contributor['actual_denominator'] disaggregation_contributions = _extract_disaggregation_contributions(contributor) for key in disaggregation_contributions: if key not in disaggregations: disaggregations[key] = disaggregation_contributions[key].copy() else: disaggregations[key]['value'] = ensure_decimal(disaggregations[key]['value']) + ensure_decimal(disaggregation_contributions[key]['value']) aggregates = (aggregated_value, aggregated_numerator, aggregated_denominator) return contributors, contributor_countries, aggregates, disaggregations def _extract_disaggregation_contributions(contributor): disaggregations = {} for update in contributor['updates']: if update['status']['code'] == 'A': for d in update['disaggregations']: key = (d['category'], d['type']) if key not in disaggregations: disaggregations[key] = d.copy() else: disaggregations[key]['value'] = ensure_decimal(disaggregations[key]['value']) + ensure_decimal(d['value']) return disaggregations def _extract_percentage_updates(updates): numerator = Decimal(0) denominator = Decimal(0) for update in updates: if ( update['numerator'] is not None and update['denominator'] is not None and update['status']['code'] == IndicatorPeriodData.STATUS_APPROVED_CODE ): numerator += update['numerator'] denominator += update['denominator'] return numerator, denominator def _transform_contributor_node(node, is_percentage, root_has_aggregation_job): contributor, aggregate_children = _transform_contributor(node['item'], is_percentage, root_has_aggregation_job) if not contributor: return contributor, [] contributor_countries = [] if contributor['country']: contributor_countries.append(contributor['country']) if is_percentage: actual_numerator, actual_denominator = _extract_percentage_updates(contributor['updates']) contributor['actual_numerator'] = actual_numerator contributor['actual_denominator'] = actual_denominator if not aggregate_children: return contributor, contributor_countries contributors, countries, aggregates, disaggregations = _transform_contributions_hierarchy( node['children'], is_percentage, node['item'].num_child_aggregation_jobs or root_has_aggregation_job ) aggregated_value, aggregated_numerator, aggregated_denominator = aggregates contributors_count = len(contributors) if contributors_count: if aggregated_numerator: contributor['actual_numerator'] += aggregated_numerator if aggregated_denominator: contributor['actual_denominator'] += aggregated_denominator contributor['contributors'] = contributors contributor['disaggregation_contributions'] = list(disaggregations.values()) contributor_countries = _merge_unique(contributor_countries, countries) return contributor, contributor_countries def _calculate_update_values(updates): total = 0 for update in updates: if update['value'] and update['status']['code'] == IndicatorPeriodData.STATUS_APPROVED_CODE: total += update['value'] return total def _transform_contributor(period, is_percentage, root_has_aggregation_job): value = ensure_decimal(period.actual_value) is_qualitative = period.indicator.type == QUALITATIVE # FIXME: Not sure why the value < 1 check is being used, if it is a float # comparison issue, we need to resolve it in a better fashion. # Return early if there are not updates and value is "0" for quantitative updates if not root_has_aggregation_job and not is_qualitative and value < 1 and period.data.count() < 1: return None, None project = period.indicator.result.project if not project.aggregate_to_parent: return None, None country = project.primary_location.country if project.primary_location else None updates = _transform_updates(period) updates_value, updates_numerator, updates_denominator = None, None, None if is_percentage: updates_numerator, updates_denominator = _extract_percentage_updates(updates) updates_value = calculate_percentage(updates_numerator, updates_denominator) else: updates_value = _calculate_update_values(updates) if is_qualitative: target = period.target_value else: target = ensure_decimal(period.target_value) contributor = { 'project_id': project.id, 'project_title': project.title, 'project_subtitle': project.subtitle, 'period_id': period.id, 'country': {'iso_code': country.iso_code} if country else None, 'actual_comment': period.actual_comment.split(' | ') if period.actual_comment else None, 'actual_value': value, 'actual_numerator': None, 'actual_denominator': None, 'target_value': target, 'score_index': period.score_index, 'score_indices': period.score_indices, 'updates': updates, 'updates_value': updates_value, 'updates_numerator': updates_numerator, 'updates_denominator': updates_denominator, 'contributors': [], 'disaggregation_contributions': [], 'disaggregation_targets': _transform_disaggregation_targets(period), } return contributor, project.aggregate_children def _transform_updates(period): return [ { 'update_id': u.id, 'status': {'code': u.status, 'name': dict(IndicatorPeriodData.STATUSES)[u.status]}, 'user': { 'user_id': u.user.id, 'email': u.user.email, 'name': u.user.get_full_name(), } if u.user else None, 'approved_by': { 'user_id': u.approved_by.id, 'email': u.approved_by.email, 'name': u.user.get_full_name(), } if u.approved_by else None, 'value': u.value, 'numerator': u.numerator, 'denominator': u.denominator, 'text': u.text, 'narrative': u.narrative, 'score_index': u.score_index, 'score_indices': u.score_indices, 'comments': [ { 'comment_id': c.id, 'user': { 'user_id': c.user.id, 'email': c.user.email, 'name': u.user.get_full_name(), }, 'comment': c.comment, 'created_at': c.created_at, } for c in u.comments.all() ], 'disaggregations': [ { 'category': d.dimension_value.name.name, 'category_id': d.dimension_value.name.id, 'type': d.dimension_value.value, 'type_id': d.dimension_value.id, 'value': d.value, 'numerator': d.numerator, 'denominator': d.denominator, } for d in u.disaggregations.all() ], 'created_at': u.created_at, 'last_modified_at': u.last_modified_at, } for u in period.data.all() ] def _transform_disaggregation_targets(obj): return [ { 'category': t.dimension_value.name.name, 'category_id': t.dimension_value.name.id, 'type': t.dimension_value.value, 'type_id': t.dimension_value.id, 'value': t.value, } for t in obj.disaggregation_targets.all() ] def _merge_unique(l1, l2): out = list(l1) for i in l2: if i not in out: out.append(i) return out