# -*- coding: utf-8 -*-
# Akvo RSR is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.
from abc import ABC
from dataclasses import dataclass, field
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from functools import cached_property, lru_cache
from typing import Optional, List, Set
from akvo.rsr.models import IndicatorPeriodData
from akvo.rsr.models.result.utils import QUANTITATIVE, QUALITATIVE, PERCENTAGE_MEASURE, calculate_percentage
from akvo.utils import ensure_decimal, maybe_decimal
from akvo.codelists.models import ResultType
from django.conf import settings
[docs]@dataclass(frozen=True)
class DisaggregationTargetData(object):
id: Optional[int] = None
category: str = ''
type: str = ''
value: Optional[Decimal] = None
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
category=data.get(f"{prefix}dimension_value__name__name", ''),
type=data.get(f"{prefix}dimension_value__value", ''),
value=data.get(f"{prefix}value", None),
)
[docs]@dataclass(frozen=True)
class DisaggregationData(object):
id: Optional[int] = None
category: str = ''
type: str = ''
value: Optional[Decimal] = None
numerator: Optional[Decimal] = None
denominator: Optional[Decimal] = None
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
category=data.get(f"{prefix}dimension_value__name__name", ''),
type=data.get(f"{prefix}dimension_value__value", ''),
value=data.get(f"{prefix}value", None),
numerator=data.get(f"{prefix}numerator", None),
denominator=data.get(f"{prefix}denominator", None),
)
[docs]@dataclass(frozen=True)
class UserData(object):
email: str = ''
first_name: str = ''
last_name: str = ''
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
email=data.get(f"{prefix}email", ''),
first_name=data.get(f"{prefix}first_name", ''),
last_name=data.get(f"{prefix}last_name", ''),
)
@property
def full_name(self):
fullname = f"{self.first_name} {self.last_name}".strip()
return f"{fullname} ({self.email})" if fullname else self.email
[docs]@dataclass(frozen=True)
class PeriodUpdateData(object):
id: int
user: Optional[UserData] = None
status: str = IndicatorPeriodData.STATUS_DRAFT_CODE
value: Optional[Decimal] = None
numerator: Optional[Decimal] = None
denominator: Optional[Decimal] = None
narrative: str = ''
created_at: Optional[datetime] = None
last_modified_at: Optional[datetime] = None
disaggregations: List[DisaggregationData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
user=UserData.make(data, f"{prefix}user__"),
status=data.get(f"{prefix}status", IndicatorPeriodData.STATUS_DRAFT_CODE),
value=data.get(f"{prefix}value", None),
numerator=data.get(f"{prefix}numerator", None),
denominator=data.get(f"{prefix}denominator", None),
narrative=data.get(f"{prefix}narrative", ''),
created_at=data.get(f"{prefix}created_at", None),
last_modified_at=data.get(f"{prefix}last_modified_at", None),
)
@property
def is_pending(self):
return self.status == IndicatorPeriodData.STATUS_PENDING_CODE
@property
def is_approved(self):
return self.status == IndicatorPeriodData.STATUS_APPROVED_CODE
[docs]@dataclass(frozen=True)
class CountryData(object):
iso_code: str = ''
name: str = ''
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
iso_code=data.get(f"{prefix}iso_code", ''),
name=data.get(f"{prefix}name", ''),
)
def __hash__(self):
return hash((self.iso_code, self.name))
[docs]@dataclass(frozen=True)
class LocationData(object):
latitude: float = 0
longitude: float = 0
country: Optional[CountryData] = None
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
latitude=data.get(f"{prefix}latitude", 0),
longitude=data.get(f"{prefix}longitude", 0),
country=CountryData.make(data, f"{prefix}country__"),
)
def __hash__(self):
return hash((self.latitude, self.longitude, self.country))
[docs]@dataclass(frozen=True)
class ContributorProjectData(object):
id: Optional[int] = None
title: str = ''
subtitle: str = ''
country: Optional[str] = None
location: Optional[LocationData] = None
aggregate_children: bool = True
aggregate_to_parent: bool = True
sectors: Set[str] = field(default_factory=set)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data.get(f"{prefix}id", None),
title=data.get(f"{prefix}title", ''),
subtitle=data.get(f"{prefix}subtitle", ''),
country=data.get(f"{prefix}primary_location__country__name", None),
location=LocationData.make(data, f"{prefix}primary_location__"),
aggregate_children=data.get(f"{prefix}aggregate_children", True),
aggregate_to_parent=data.get(f"{prefix}aggregate_to_parent", True),
)
def __hash__(self):
return hash((
self.id,
self.title,
self.subtitle,
self.country,
self.location,
self.aggregate_children,
self.aggregate_to_parent,
tuple(self.sectors),
))
[docs]class ReportingPeriodMixin(ABC):
period_start: Optional[date] = None
target_value: Optional[Decimal] = None
indicator_type: int = QUANTITATIVE
indicator_measure: str = ''
indicator_cumulative: bool = False
updates: List[PeriodUpdateData] = field(default_factory=list)
disaggregation_targets: List[DisaggregationTargetData] = field(default_factory=list)
contributors: List['ContributorData'] = field(default_factory=list)
period_disaggregations: List[DisaggregationData] = field(default_factory=list)
period_actual_value: Optional[Decimal] = None
@property
def is_qualitative(self):
return self.indicator_type == QUALITATIVE
@property
def is_quantitative(self):
return self.indicator_type != QUALITATIVE
@property
def is_percentage(self):
return self.is_quantitative and self.indicator_measure == PERCENTAGE_MEASURE
@property
def is_cumulative(self):
return self.indicator_cumulative and not self.is_percentage
@property
def is_cumulative_future(self):
return self.is_cumulative and self.period_start and self.period_start > date.today()
@cached_property
def approved_updates(self):
return [u for u in self.updates if u.is_approved]
@cached_property
def updates_value(self):
if self.is_percentage:
return None
value = 0
for update in self.approved_updates:
value += ensure_decimal(update.value)
return value
@cached_property
def updates_numerator(self):
if not self.is_percentage:
return None
value = 0
for update in self.approved_updates:
value += ensure_decimal(update.numerator)
return value
@cached_property
def updates_denominator(self):
if not self.is_percentage:
return None
value = 0
for update in self.approved_updates:
value += ensure_decimal(update.denominator)
return value
@cached_property
def aggregated_value(self):
if self.is_percentage or self.is_qualitative:
return None
value = ensure_decimal(self.updates_value)
for contributor in self.contributors:
value += ensure_decimal(contributor.aggregated_value)
return value
@cached_property
def aggregated_numerator(self):
if not self.is_percentage:
return None
value = ensure_decimal(self.updates_numerator)
for contributor in self.contributors:
value += ensure_decimal(contributor.aggregated_numerator)
return value
@cached_property
def aggregated_denominator(self):
if not self.is_percentage:
return None
value = ensure_decimal(self.updates_denominator)
for contributor in self.contributors:
value += ensure_decimal(contributor.aggregated_denominator)
return value
@cached_property
def aggregated_target_value(self):
value = ensure_decimal(self.target_value)
for contributor in self.contributors:
value += ensure_decimal(contributor.aggregated_target_value)
return value
@cached_property
def aggregated_indicator_target_value(self):
value = ensure_decimal(getattr(self, 'indicator_target_value', 0))
for contributor in self.contributors:
value += ensure_decimal(contributor.indicator_target_value)
return value if value else None
@cached_property
def actual_value(self):
if self.is_qualitative:
return None
if self.is_percentage:
return calculate_percentage(self.aggregated_numerator, self.aggregated_denominator)
# if self.is_cumulative_future:
# return 0
return self.period_actual_value
@cached_property
def disaggregations(self):
return self._get_disaggregations(self.approved_updates)
@cached_property
def aggregated_disaggregations(self) -> List[DisaggregationData]:
items = {}
for d in self.disaggregations:
key = (d.category, d.type)
if key not in items:
items[key] = {'value': None, 'numerator': None, 'denominator': None}
if self.is_percentage:
items[key]['numerator'] = ensure_decimal(items[key]['numerator']) + ensure_decimal(d.numerator)
items[key]['denominator'] = ensure_decimal(items[key]['denominator']) + ensure_decimal(d.numerator)
else:
items[key]['value'] = ensure_decimal(items[key]['value']) + ensure_decimal(d.value)
for contributor in self.contributors:
for d in contributor.aggregated_disaggregations:
key = (d.category, d.type)
if key not in items:
items[key] = {'value': None, 'numerator': None, 'denominator': None}
if self.is_percentage:
items[key]['numerator'] = ensure_decimal(items[key]['numerator']) + ensure_decimal(d.numerator)
items[key]['denominator'] = ensure_decimal(items[key]['denominator']) + ensure_decimal(d.numerator)
else:
items[key]['value'] = ensure_decimal(items[key]['value']) + ensure_decimal(d.value)
return [
DisaggregationData(None, category, type, d['value'], d['numerator'], d['denominator'])
for (category, type), d
in items.items()
]
@cached_property
def aggregated_disaggregation_targets(self) -> List[DisaggregationTargetData]:
items = {}
for d in self.disaggregation_targets:
key = (d.category, d.type)
if key not in items:
items[key] = None
items[key] = ensure_decimal(items[key]) + ensure_decimal(d.value)
for contributor in self.contributors:
for d in contributor.aggregated_disaggregation_targets:
key = (d.category, d.type)
if key not in items:
items[key] = None
items[key] = ensure_decimal(items[key]) + ensure_decimal(d.value)
return [
DisaggregationTargetData(None, category, type, value)
for (category, type), value
in items.items()
]
[docs] def get_disaggregation_target_value(self, category, type):
item = self._select_disaggregation(self.disaggregation_targets, category, type)
return item.value if item else None
[docs] def get_aggregated_disaggregation_target_value(self, category, type):
item = self._select_disaggregation(self.aggregated_disaggregation_targets, category, type)
return item.value if item else None
[docs] def get_disaggregation_value(self, category, type):
if self.is_cumulative_future:
return None
item = self._select_disaggregation(self.period_disaggregations if self.is_cumulative else self.disaggregations, category, type)
if not item:
return None
if self.is_percentage:
return calculate_percentage(item.numerator, item.denominator)
return item.value
[docs] def get_aggregated_disaggregation_value(self, category, type):
if self.is_cumulative_future:
return None
item = self._select_disaggregation(self.period_disaggregations if self.is_cumulative else self.aggregated_disaggregations, category, type)
if not item:
return None
if self.is_percentage:
return calculate_percentage(item.numerator, item.denominator)
return item.value
def _select_disaggregation(self, disaggregations, category, type):
return next((d for d in disaggregations if d.category == category and d.type == type), None)
def _get_disaggregations(self, updates):
items = {}
for u in updates:
for d in u.disaggregations:
key = (d.category, d.type)
if key not in items:
items[key] = {'value': None if self.is_percentage else 0, 'numerator': d.numerator, 'denominator': d.denominator}
if not self.is_percentage:
items[key]['value'] += 0 if d.value is None else d.value
return [DisaggregationData(None, category, type, d['value'], d['numerator'], d['denominator']) for (category, type), d in items.items()]
[docs]@dataclass(frozen=True)
class ContributorData(ReportingPeriodMixin):
id: int
period_start: Optional[date] = None
parent: Optional[int] = None
indicator_type: int = QUANTITATIVE
indicator_measure: str = ''
indicator_cumulative: bool = False
target_value: Optional[Decimal] = None
period_actual_value: Optional[Decimal] = None
period_actual_comment: str = ''
period_narrative: str = ''
indicator_baseline_value: Optional[Decimal] = None
indicator_target_value: Optional[Decimal] = None
project: Optional[ContributorProjectData] = None
updates: List[PeriodUpdateData] = field(default_factory=list)
disaggregation_targets: List[DisaggregationTargetData] = field(default_factory=list)
contributors: List['ContributorData'] = field(default_factory=list)
period_disaggregations: List[DisaggregationData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
period_start=data.get(f"{prefix}period_start", None),
parent=data.get(f"{prefix}parent_period", None),
indicator_type=data.get(f"{prefix}indicator__type", QUANTITATIVE),
indicator_measure=data.get(f"{prefix}indicator__measure", ''),
indicator_cumulative=data.get(f"{prefix}indicator__cumulative", False),
target_value=maybe_decimal(data.get(f"{prefix}target_value", None)),
period_actual_value=ensure_decimal(data.get(f"{prefix}actual_value", None)),
period_actual_comment=data.get(f"{prefix}actual_comment", ''),
period_narrative=data.get(f"{prefix}narrative", ''),
indicator_baseline_value=data.get(f"{prefix}indicator__baseline_value", None),
indicator_target_value=data.get(f"{prefix}indicator__target_value", None),
project=ContributorProjectData.make(data, 'indicator__result__project__')
)
@cached_property
def has_contributions(self):
if len(self.approved_updates) > 0:
return True
for contributor in self.contributors:
if contributor.has_contributions:
return True
return False
@cached_property
def locations(self) -> Set[LocationData]:
result = set()
if self.project and self.project.location:
result.add(self.project.location)
for contributor in self.contributors:
result.update(contributor.locations)
return result
@cached_property
def countries(self) -> Set[CountryData]:
result = set()
for location in self.locations:
if not location.country:
continue
result.add(location.country)
return result
[docs]@dataclass(frozen=True)
class PeriodData(ReportingPeriodMixin):
id: int
period_start: Optional[date] = None
period_end: Optional[date] = None
target_value: Optional[Decimal] = None
target_comment: str = ''
period_actual_value: Optional[Decimal] = None
actual_comment: str = ''
narrative: str = ''
indicator_type: int = QUANTITATIVE
indicator_measure: str = ''
indicator_cumulative: bool = False
updates: List[PeriodUpdateData] = field(default_factory=list)
disaggregation_targets: List[DisaggregationTargetData] = field(default_factory=list)
contributors: List[ContributorData] = field(default_factory=list)
period_disaggregations: List[DisaggregationData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
period_start=data.get(f"{prefix}period_start", None),
period_end=data.get(f"{prefix}period_end", None),
target_value=maybe_decimal(data.get(f"{prefix}target_value", None)),
target_comment=data.get(f"{prefix}target_comment", ''),
period_actual_value=ensure_decimal(data.get(f"{prefix}actual_value", None)),
actual_comment=data.get(f"{prefix}actual_comment", ''),
narrative=data.get(f"{prefix}narrative", ''),
indicator_type=data.get(f"{prefix}indicator__type", QUANTITATIVE),
indicator_measure=data.get(f"{prefix}indicator__measure", ''),
indicator_cumulative=data.get(f"{prefix}indicator__cumulative", False),
)
@cached_property
def aggregated_value(self):
if self.is_qualitative:
return None
if self.is_cumulative_future:
return 0
if self.is_cumulative:
return self.actual_value
if self.is_percentage:
return calculate_percentage(self.aggregated_numerator, self.aggregated_denominator)
value = ensure_decimal(self.updates_value)
for contributor in self.contributors:
value += ensure_decimal(contributor.aggregated_value)
return value
@cached_property
def pending_updates(self):
return [u for u in self.updates if u.is_pending]
@cached_property
def has_pending_updates(self):
return len(self.pending_updates) > 0
@cached_property
def pending_value(self):
if self.is_qualitative:
return None
if self.is_percentage:
return calculate_percentage(self.pending_numerator, self.pending_denominator)
value = 0
for update in self.pending_updates:
value += ensure_decimal(update.value)
return value
@cached_property
def pending_numerator(self):
if not self.is_percentage:
return None
value = 0
for update in self.pending_updates:
value += ensure_decimal(update.numerator)
return value
@cached_property
def pending_denominator(self):
if not self.is_percentage:
return None
value = 0
for update in self.pending_updates:
value += ensure_decimal(update.denominator)
return value
@cached_property
def pending_disaggregations(self):
return self._get_disaggregations(self.pending_updates)
[docs] def get_pending_disaggregation_value(self, category, type):
item = self._select_disaggregation(self.pending_disaggregations, category, type)
return item.value if item else None
@cached_property
def locations(self) -> Set[LocationData]:
result = set()
for contributor in self.contributors:
result.update(contributor.locations)
return result
@cached_property
def countries(self) -> Set[CountryData]:
result = set()
for contributor in self.contributors:
result.update(contributor.countries)
return result
[docs]@dataclass(frozen=True)
class IndicatorData(object):
id: int
title: str = ''
type: int = QUANTITATIVE
measure: str = ''
cumulative: bool = False
ascending: bool = False
description: str = ''
baseline_year: Optional[int] = None
baseline_value: Optional[Decimal] = None
baseline_comment: str = ''
target_value: Optional[Decimal] = None
target_comment: str = ''
periods: List[PeriodData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
title=data.get(f"{prefix}title", ''),
type=data.get(f"{prefix}type", QUANTITATIVE),
measure=data.get(f"{prefix}measure", ''),
cumulative=data.get(f"{prefix}cumulative", False),
ascending=data.get(f"{prefix}ascending", False),
description=data.get(f"{prefix}description", ''),
baseline_year=data.get(f"{prefix}baseline_year", None),
baseline_value=maybe_decimal(data.get(f"{prefix}baseline_value", None)),
baseline_comment=data.get(f"{prefix}baseline_comment", ''),
target_value=data.get(f"{prefix}target_value", None),
target_comment=data.get(f"{prefix}target_comment", None),
)
@cached_property
def has_pending_updates(self):
for period in self.periods:
if period.has_pending_updates:
return True
return False
@cached_property
def aggregated_target_value(self):
return self.periods[0].aggregated_indicator_target_value if len(self.periods) > 0 else None
@property
def is_qualitative(self):
return self.type == QUALITATIVE
@property
def is_quantitative(self):
return self.type != QUALITATIVE
@property
def is_percentage(self):
return self.is_quantitative and self.measure == PERCENTAGE_MEASURE
@property
def is_cumulative(self):
return self.cumulative and not self.is_percentage
[docs]@dataclass(frozen=True)
class ResultData(object):
id: int
title: str = ''
description: str = ''
type: str = ''
aggregation_status: Optional[bool] = None
indicators: List[IndicatorData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
title=data.get(f"{prefix}title", ''),
description=data.get(f"{prefix}description", ''),
type=data.get(f"{prefix}type", ''),
aggregation_status=data.get(f"{prefix}aggregation_status", ''),
)
@cached_property
def has_pending_updates(self):
for indicator in self.indicators:
if indicator.has_pending_updates:
return True
return False
@cached_property
def iati_type_name(self):
return self.get_codelist_name(self.type)
[docs] @staticmethod
@lru_cache
def get_codelist_name(code, version=settings.IATI_VERSION):
try:
type = ResultType.objects.get(code=code, version__code=version)
return type.name
except ResultType.DoesNotExist:
return ''
[docs]@dataclass(frozen=True)
class ProjectData(object):
id: int
title: str
subtitle: str
iati_activity_id: str
date_start_planned: Optional[date]
date_end_planned: Optional[date]
date_start_actual: Optional[date]
date_end_actual: Optional[date]
targets_at: str
recipient_countries: List[str] = field(default_factory=list)
partners: List[str] = field(default_factory=list)
results: List[ResultData] = field(default_factory=list)
[docs] @classmethod
def make(cls, data, prefix=''):
return cls(
id=data[f"{prefix}id"],
title=data.get(f"{prefix}title", ''),
subtitle=data.get(f"{prefix}subtitle", ''),
iati_activity_id=data.get(f"{prefix}iati_activity_id", ''),
date_start_planned=data.get(f"{prefix}date_start_planned", None),
date_end_planned=data.get(f"{prefix}date_end_planned", None),
date_start_actual=data.get(f"{prefix}date_start_actual", None),
date_end_actual=data.get(f"{prefix}date_end_actual", None),
targets_at=data.get(f"{prefix}targets_at", 'period'),
)
@property
def country_codes(self):
return ', '.join({it.lower() for it in self.recipient_countries})
@property
def partner_names(self):
return ', '.join({it for it in self.partners})
[docs]class IndicatorType(Enum):
Quantitative = QUANTITATIVE
Qualitative = QUALITATIVE
[docs]@dataclass(frozen=True)
class ResultWithIndicatorType:
type: IndicatorType
result: ResultData
@cached_property
def indicators(self):
return [it for it in self.result.indicators if self._check_type(it)]
def __getattr__(self, attr):
return getattr(self.result, attr)
def _check_type(self, indicator: IndicatorData):
return indicator.is_qualitative \
if self.type == IndicatorType.Qualitative \
else indicator.is_quantitative
[docs]def filter_results_by_indicator_type(type: IndicatorType, results: List[ResultData]):
wrapped = [ResultWithIndicatorType(type=type, result=it) for it in results]
return [it for it in wrapped if it.indicators]
[docs]def group_results_by_types(results):
types = {}
for result in results:
type = result.iati_type_name
if not type:
continue
types.setdefault(type, []).append(result)
return types
[docs]def has_cumulative_indicator(results):
for result in results:
for indicator in result.indicators:
if indicator.is_cumulative:
return True
return False