Source code for akvo.rest.views.program_results_geo

# -*- coding: utf-8 -*-

# Akvo RSR is covered by the GNU Affero General Public License.

# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.


import json

from abc import ABC
from dataclasses import dataclass, field
from datetime import date
from decimal import Decimal
from functools import cached_property, lru_cache
from os.path import dirname, abspath, join
from typing import List, Optional, Set, Dict

from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework.authentication import SessionAuthentication
from rest_framework.decorators import api_view, authentication_classes
from rest_framework.response import Response
from rest_framework.status import HTTP_403_FORBIDDEN

from akvo.codelists.models import ResultType
from akvo.rest.authentication import TastyTokenAuthentication
from akvo.rsr.models import Project, IndicatorPeriod, IndicatorPeriodData
from akvo.rsr.models.result.utils import QUANTITATIVE, QUALITATIVE, PERCENTAGE_MEASURE, calculate_percentage
from akvo.utils import ensure_decimal


# -----------------------------------------------------------------------
# Dataclasses

[docs]@dataclass(frozen=True) class ContributorProjectData(object): id: Optional[int] = None title: str = '' country_code: Optional[str] = None aggregate_children: bool = True aggregate_to_parent: bool = True sector_codes: Set[str] = field(default_factory=set)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data.get(f"{prefix}id", None), title=data.get(f"{prefix}title", ''), country_code=data.get(f"{prefix}primary_location__country__iso_code", None), aggregate_children=data.get(f"{prefix}aggregate_children", True), aggregate_to_parent=data.get(f"{prefix}aggregate_to_parent", True), )
[docs]@dataclass(frozen=True) class PeriodUpdateData(object): id: int status: str = IndicatorPeriodData.STATUS_DRAFT_CODE value: Optional[Decimal] = None numerator: Optional[Decimal] = None denominator: Optional[Decimal] = None narrative: str = '' created_at: Optional[date] = None last_modified_at: Optional[date] = None
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], status=data.get(f"{prefix}status", IndicatorPeriodData.STATUS_DRAFT_CODE), value=data.get(f"{prefix}value", None), numerator=data.get(f"{prefix}numerator", None), denominator=data.get(f"{prefix}denominator", None), narrative=data.get(f"{prefix}narrative", ''), created_at=data.get(f"{prefix}created_at", None), last_modified_at=data.get(f"{prefix}last_modified_at", None), )
@property def is_approved(self): return self.status == IndicatorPeriodData.STATUS_APPROVED_CODE
[docs]class ReportingPeriodMixin(ABC): target_value: Optional[str] = None indicator_type: int = QUANTITATIVE indicator_measure: str = '' updates: List[PeriodUpdateData] = field(default_factory=list) contributors: List['ContributorData'] = field(default_factory=list) @property def is_qualitative(self): return self.indicator_type == QUALITATIVE @property def is_quantitative(self): return self.indicator_type != QUALITATIVE @property def is_percentage(self): return self.is_quantitative and self.indicator_measure == PERCENTAGE_MEASURE @cached_property def approved_updates(self): return [u for u in self.updates if u.is_approved] @cached_property def has_updates(self): return len(self.approved_updates) > 0 @cached_property def updates_value(self): if self.is_percentage: return None value = 0 for update in self.approved_updates: value += ensure_decimal(update.value) return value @cached_property def updates_numerator(self): if not self.is_percentage: return None value = 0 for update in self.approved_updates: value += ensure_decimal(update.numerator) return value @cached_property def updates_denominator(self): if not self.is_percentage: return None value = 0 for update in self.approved_updates: value += ensure_decimal(update.denominator) return value @cached_property def aggregated_value(self): if self.is_percentage or self.is_qualitative: return None value = self.updates_value for contributor in self.contributors: value += ensure_decimal(contributor.aggregated_value) return value @cached_property def aggregated_numerator(self): if not self.is_percentage: return None value = self.updates_numerator for contributor in self.contributors: value += ensure_decimal(contributor.aggregated_numerator) return value @cached_property def aggregated_denominator(self): if not self.is_percentage: return None value = self.updates_denominator for contributor in self.contributors: value += ensure_decimal(contributor.aggregated_denominator) return value @cached_property def aggregated_target_value(self): value = ensure_decimal(self.target_value) for contributor in self.contributors: value += ensure_decimal(contributor.aggregated_target_value) return value @cached_property def aggregated_indicator_target_value(self): value = ensure_decimal(getattr(self, 'indicator_target_value', 0)) for contributor in self.contributors: value += ensure_decimal(contributor.indicator_target_value) return value if value else None
[docs]@dataclass(frozen=True) class ContributorData(ReportingPeriodMixin): id: int parent: Optional[int] = None indicator_type: int = QUANTITATIVE indicator_measure: str = '' target_value: Optional[str] = None indicator_target_value: Optional[Decimal] = None project: Optional[ContributorProjectData] = None updates: List[PeriodUpdateData] = field(default_factory=list) contributors: List['ContributorData'] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], parent=data.get(f"{prefix}parent_period", None), indicator_type=data.get(f"{prefix}indicator__type", QUANTITATIVE), indicator_measure=data.get(f"{prefix}indicator__measure", ''), target_value=data.get(f"{prefix}target_value", None), indicator_target_value=data.get(f"{prefix}indicator__target_value", None), project=ContributorProjectData.make(data, 'indicator__result__project__') )
@cached_property def contributing_countries(self): if self.project and not self.project.aggregate_children: return set() local = set([self.project.country_code]) if self.project and self.project.country_code else set() contributors = set() for contrib in self.contributors: if contrib.project and not contrib.project.aggregate_to_parent: continue contributors = contributors | contrib.contributing_countries return local | contributors @cached_property def actual_value(self): if self.is_qualitative: return None if self.is_percentage: return calculate_percentage(self.updates_numerator, self.updates_denominator) return self.updates_value
[docs] def accept(self, visitor): for contributor in self.contributors: contributor.accept(visitor) visitor.visit(self)
[docs]@dataclass(frozen=True) class PeriodData(ReportingPeriodMixin): id: int period_start: Optional[date] = None period_end: Optional[date] = None target_value: Optional[str] = None target_comment: str = '' actual_value: str = '' actual_comment: str = '' narrative: str = '' indicator_type: int = QUANTITATIVE indicator_measure: str = '' updates: List[PeriodUpdateData] = field(default_factory=list) contributors: List[ContributorData] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], period_start=data.get(f"{prefix}period_start", None), period_end=data.get(f"{prefix}period_end", None), target_value=data.get(f"{prefix}target_value", None), target_comment=data.get(f"{prefix}target_comment", ''), actual_value=data.get(f"{prefix}actual_value", ''), actual_comment=data.get(f"{prefix}actual_comment", ''), narrative=data.get(f"{prefix}narrative", ''), indicator_type=data.get(f"{prefix}indicator__type", QUANTITATIVE), indicator_measure=data.get(f"{prefix}indicator__measure", ''), )
@cached_property def contributing_countries(self): codes = set() for contrib in self.contributors: codes = codes | contrib.contributing_countries return codes @cached_property def aggregated_value(self): if self.is_qualitative: return None if self.is_percentage: return calculate_percentage(self.aggregated_numerator, self.aggregated_denominator) value = self.updates_value for contributor in self.contributors: value += ensure_decimal(contributor.aggregated_value) return value
[docs]@dataclass(frozen=True) class IndicatorData(object): id: int title: str = '' type: int = QUANTITATIVE measure: str = '' description: str = '' baseline_year: Optional[int] = None baseline_value: str = '' baseline_comment: str = '' target_value: Optional[Decimal] = None target_comment: str = '' periods: List[PeriodData] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], title=data.get(f"{prefix}title", ''), type=data.get(f"{prefix}type", QUANTITATIVE), measure=data.get(f"{prefix}measure", ''), description=data.get(f"{prefix}description", ''), baseline_year=data.get(f"{prefix}baseline_year", None), baseline_value=data.get(f"{prefix}baseline_value", ''), baseline_comment=data.get(f"{prefix}baseline_comment", ''), target_value=data.get(f"{prefix}target_value", None), target_comment=data.get(f"{prefix}target_comment", None), )
@cached_property def contributing_countries(self): codes = set() for period in self.periods: codes = codes | period.contributing_countries return codes @property def is_quantitative(self): return self.type != QUALITATIVE @property def is_percentage(self): return self.is_quantitative and self.measure == PERCENTAGE_MEASURE
[docs]@dataclass(frozen=True) class ResultData(object): id: int title: str = '' description: str = '' type: str = '' aggregation_status: Optional[bool] = None indicators: List[IndicatorData] = field(default_factory=list)
[docs] @classmethod def make(cls, data, prefix=''): return cls( id=data[f"{prefix}id"], title=data.get(f"{prefix}title", ''), description=data.get(f"{prefix}description", ''), type=data.get(f"{prefix}type", ''), aggregation_status=data.get(f"{prefix}aggregation_status", ''), )
@cached_property def contributing_countries(self): codes = set() for indicator in self.indicators: codes = codes | indicator.contributing_countries return codes @cached_property def iati_type_name(self): return self.get_codelist_name(self.type)
[docs] @staticmethod @lru_cache def get_codelist_name(code, version=settings.IATI_VERSION): try: type = ResultType.objects.get(code=code, version__code=version) return type.name except ResultType.DoesNotExist: return ''
# ------------------------------------------------------------------------ # Fetch Data
[docs]def fetch_periods(project): queryset = IndicatorPeriod.objects\ .select_related('indicator', 'indicator__result', 'indicator__result__project')\ .filter(indicator__result__project=project) return queryset\ .order_by('indicator__result__order', 'indicator__order', '-period_start')\ .values( 'id', 'period_start', 'period_end', 'target_value', 'indicator__id', 'indicator__title', 'indicator__type', 'indicator__measure', 'indicator__target_value', 'indicator__result__id', 'indicator__result__type', 'indicator__result__title', )
[docs]def fetch_contributors(root_period_ids): family = set(root_period_ids) while True: children = IndicatorPeriod.objects.filter(parent_period__in=family).values_list('id', flat=True) if family.union(children) == family: break family = family.union(children) contributor_ids = family - root_period_ids return IndicatorPeriod.objects\ .select_related('indicator__result__project')\ .prefetch_related('data')\ .filter(id__in=contributor_ids)\ .values( 'id', 'parent_period', 'target_value', 'indicator__id', 'indicator__type', 'indicator__measure', 'indicator__target_value', 'indicator__result__project__id', 'indicator__result__project__title', 'indicator__result__project__aggregate_children', 'indicator__result__project__aggregate_to_parent', 'indicator__result__project__primary_location__country__iso_code', 'data__id', 'data__status', 'data__value', 'data__numerator', 'data__denominator', )
# ------------------------------------------------------------------------- # Process data
[docs]def hierarchize_contributors(contributors): tops = [] lookup = {it.id: it for it in contributors} ids = lookup.keys() for contributor in contributors: parent = contributor.parent if not parent or parent not in ids: tops.append(contributor) else: lookup[parent].contributors.append(contributor) return tops
[docs]def get_contributors(root_period_ids): lookup = { 'contributors': {}, 'updates': {}, } raw_contributors = fetch_contributors(root_period_ids) for c in raw_contributors: contributor_id = c['id'] update_id = c['data__id'] if contributor_id not in lookup['contributors']: contributor = ContributorData.make(c) lookup['contributors'][contributor_id] = contributor else: contributor = lookup['contributors'][contributor_id] if update_id is None: continue if update_id not in lookup['updates']: update = PeriodUpdateData.make(c, 'data__') contributor.updates.append(update) lookup['updates'][update_id] = update else: update = lookup['updates'][update_id] return hierarchize_contributors(lookup['contributors'].values())
[docs]def get_results_framework(project): raw_periods = fetch_periods(project) lookup = { 'results': {}, 'indicators': {}, 'periods': {}, } for r in raw_periods: result_id = r['indicator__result__id'] indicator_id = r['indicator__id'] period_id = r['id'] if result_id not in lookup['results']: lookup['results'][result_id] = ResultData.make(r, 'indicator__result__') result = lookup['results'][result_id] if indicator_id not in lookup['indicators']: indicator = IndicatorData.make(r, 'indicator__') result.indicators.append(indicator) lookup['indicators'][indicator_id] = indicator else: indicator = lookup['indicators'][indicator_id] if period_id not in lookup['periods']: period = PeriodData.make(r) indicator.periods.append(period) lookup['periods'][period_id] = period period_ids = {it['id'] for it in raw_periods} contributors = get_contributors(period_ids) for contributor in contributors: period_id = contributor.parent if period_id in lookup['periods']: period = lookup['periods'][period_id] period.contributors.append(contributor) return [r for r in lookup['results'].values()]
[docs]def get_countries_geojson(): filepath = join(dirname(abspath(__file__)), 'countries.geojson') with open(filepath, 'r') as f: return json.load(f)
[docs]@dataclass class ContributionValue(object): target: Optional[Decimal] = None value: Optional[Decimal] = None numerator: Optional[Decimal] = None denominator: Optional[Decimal] = None
[docs] def add_target(self, target): if self.target is None: self.target = Decimal(0) self.target += target
[docs] def add_value(self, value): if self.value is None: self.value = Decimal(0) self.value += value
[docs] def add_fraction(self, numerator, denominator): if self.numerator is None: self.numerator = Decimal(0) if self.denominator is None: self.denominator = Decimal(0) self.numerator += numerator self.denominator += denominator
[docs]@dataclass(frozen=True) class ContributionPerCountryVisitor(object): result: ResultData indicator: IndicatorData period: PeriodData countries: Dict[str, ContributionValue] = field(default_factory=dict)
[docs] @classmethod def collect(cls, result, indicator, period): visitor = cls(result=result, indicator=indicator, period=period) for contributor in period.contributors: contributor.accept(visitor) return visitor
[docs] def visit(self, contributor): if not contributor.has_updates and contributor.target_value is None: return country_code = contributor.project.country_code if country_code is None: return if country_code not in self.countries: self.countries[country_code] = ContributionValue() contribution = self.countries[country_code] if contributor.target_value: contribution.add_target(ensure_decimal(contributor.target_value)) if self.period.is_percentage: contribution.add_fraction(contributor.updates_numerator, contributor.updates_denominator) else: contribution.add_value(contributor.updates_value)
[docs]def get_indicators_by_country(project): results = get_results_framework(project) visitors = [] for result in results: for indicator in result.indicators: for period in indicator.periods: visitors.append(ContributionPerCountryVisitor.collect(result, indicator, period)) by_countries = {} for visitor in visitors: indicator = visitor.indicator period = visitor.period for country_code, contribution in visitor.countries.items(): if country_code not in by_countries: by_countries[country_code] = {} by_country = by_countries[country_code] if indicator.id not in by_country: by_country[indicator.id] = { 'is_percentage': indicator.is_percentage, 'periods': {} } by_indicator = by_country[indicator.id] by_indicator['periods'][period.id] = { 'period_start': period.period_start, 'period_end': period.period_end, 'period_target': ensure_decimal(period.target_value), 'aggregated_period_target': period.aggregated_target_value, 'target': contribution.target, 'value': contribution.value, 'numerator': contribution.numerator, 'denominator': contribution.denominator, } return by_countries
[docs]def get_geo_data(project): by_countries = get_indicators_by_country(project) country_codes = list(by_countries.keys()) countries_geo = get_countries_geojson() features = [] for feature in countries_geo['features']: country_code = feature['properties']['iso_code'] if country_code.lower() not in country_codes: continue properties = {'iso_code': country_code, 'indicators': {}} for indicator_id, indicator_value in by_countries[country_code.lower()].items(): properties['indicators'][indicator_id] = { 'is_percentage': indicator_value['is_percentage'], 'periods': [p for p in indicator_value['periods'].values()] } feature['properties'] = properties features.append(feature) return {'type': 'FeatureCollection', 'features': features}
# ------------------------------------------------------------------------- # Endpoint
[docs]@api_view(['GET']) @authentication_classes([SessionAuthentication, TastyTokenAuthentication]) def get_program_results_geo(request, project_pk): project = get_object_or_404(Project, pk=project_pk) if not request.user.has_perm('rsr.view_project', project): return Response('Request not allowed', status=HTTP_403_FORBIDDEN) data = get_geo_data(project) return Response(data)