# -*- coding: utf-8 -*-
"""Akvo RSR is covered by the GNU Affero General Public License.
See more details in the license.txt file located at the root folder of the
Akvo RSR module. For additional details on the GNU license please
see < http://www.gnu.org/licenses/agpl.html >.
"""
import copy
from collections import OrderedDict
from datetime import date
from functools import cached_property
from django.conf import settings
from akvo.rsr.models import IndicatorPeriod, IndicatorPeriodData
from akvo.rsr.models.result.utils import QUALITATIVE, PERCENTAGE_MEASURE, calculate_percentage
from akvo.utils import ensure_decimal, ObjectReaderProxy
from enum import Enum
[docs]def is_aggregating_targets(project):
return project.id in settings.AGGREGATE_TARGETS
[docs]def merge_unique(l1, l2):
out = list(l1)
for i in l2:
if i not in out:
out.append(i)
return out
[docs]def get_periods_with_contributors(root_periods, aggregate_targets=False):
periods = get_periods_hierarchy_flatlist(root_periods)
periods_tree = make_object_tree_from_flatlist(periods, 'parent_period')
project = periods.first().indicator.result.project
disaggregations = get_disaggregations(project)
return [PeriodProxy(n['item'], n['children'], aggregate_targets, disaggregations) for n in periods_tree]
[docs]def get_disaggregations(project):
disaggregations = OrderedDict()
for n in project.dimension_names.all():
disaggregations[n.name] = OrderedDict()
for v in n.dimension_values.all():
disaggregations[n.name][v.value] = None
return disaggregations
[docs]def get_periods_hierarchy_flatlist(root_periods):
family = {period.id for period in root_periods}
while True:
children = set(
IndicatorPeriod.objects.filter(
parent_period__in=family
).values_list(
'id', flat=True
))
if family.union(children) == family:
break
family = family.union(children)
return IndicatorPeriod.objects.select_related(
'indicator__result__project',
'indicator__result__project__primary_location__country',
'parent_period',
'label',
).prefetch_related(
'data',
'data__user',
'data__approved_by',
'data__comments',
'data__comments__user',
'data__disaggregations',
'data__disaggregations__dimension_value',
'data__disaggregations__dimension_value__name',
'disaggregation_targets',
'disaggregation_targets__dimension_value',
'disaggregation_targets__dimension_value__name'
).filter(pk__in=family)
[docs]def make_object_tree_from_flatlist(flatlist, parent_attr):
tree = []
lookup = {}
ids = [o.id for o in flatlist]
for obj in flatlist:
item_id = obj.id
if item_id not in lookup:
lookup[item_id] = {'children': []}
lookup[item_id]['item'] = obj
node = lookup[item_id]
parent_obj = getattr(obj, parent_attr)
parent_id = parent_obj.id if parent_obj else None
if not parent_id or parent_id not in ids:
tree.append(node)
else:
if parent_id not in lookup:
lookup[parent_id] = {'children': []}
lookup[parent_id]['children'].append(node)
return tree
[docs]class IndicatorType(Enum):
UNIT = 1
PERCENTAGE = 2
NARRATIVE = 3
[docs] @classmethod
def get_type(cls, indicator):
if indicator.type == QUALITATIVE:
return cls.NARRATIVE
if indicator.measure == PERCENTAGE_MEASURE:
return cls.PERCENTAGE
return cls.UNIT
[docs]class PeriodProxy(ObjectReaderProxy):
def __init__(self, period, children=[], aggregate_targets=False, project_disaggregations=None):
super().__init__(period)
self.type = IndicatorType.get_type(period.indicator)
self.aggregate_targets = aggregate_targets
self._project_disaggregations = project_disaggregations
self._children = children
self._project = None
self._updates = None
self._actual_comment = None
self._actual_numerator = None
self._actual_denominator = None
self._target_value = None
self._contributors = None
self._countries = None
self._locations = None
self._disaggregation_targets = None
self._disaggregation_contributions = None
self._disaggregation_contributions_view = None
self._indicator_target_value = None
self._use_indicator_target = None
@property
def project(self):
if self._project is None:
self._project = self._real.indicator.result.project
return self._project
@property
def updates(self):
if self._updates is None:
self._updates = UpdateCollection(self._real, self.type)
return self._updates
@property
def contributors(self):
if self._contributors is None:
children = self._children if self.project.aggregate_children else []
self._contributors = ContributorCollection(children, self.type, self._project_disaggregations)
return self._contributors
@property
def target_value(self):
if self._target_value is None:
if self.type == IndicatorType.NARRATIVE:
self._target_value = self._real.target_value
elif self.aggregate_targets and self.type != IndicatorType.PERCENTAGE:
self._target_value = _aggregate_period_targets(self._real, self._children)
else:
self._target_value = ensure_decimal(self._real.target_value)
return self._target_value
@property
def use_indicator_target(self):
if self._use_indicator_target is None:
program = self.project.get_program()
targets_at = program.targets_at if program else self.targets_at
self._use_indicator_target = True if targets_at == 'indicator' else False
return self._use_indicator_target
@property
def indicator_target_value(self):
if self._indicator_target_value is None:
if not self.use_indicator_target:
self._indicator_target_value = 0
elif self.type == IndicatorType.NARRATIVE:
narrative = self.indicator.target_value
self._indicator_target_value = narrative if narrative else ''
elif self.use_indicator_target and self.type != IndicatorType.PERCENTAGE:
self._indicator_target_value = _aggregate_indicator_targets(self._real, self._children)
else:
self._indicator_target_value = ensure_decimal(self.indicator.target_value)
return self._indicator_target_value
@cached_property
def is_cumulative(self):
return self.indicator.is_cumulative()
@property
def actual_comment(self):
if self._actual_comment is None:
self._actual_comment = self._real.actual_comment.split(' | ') \
if self._real.actual_comment \
else False
return self._actual_comment or None
@cached_property
def actual_value(self):
if self.type == IndicatorType.PERCENTAGE:
return calculate_percentage(self.actual_numerator, self._actual_denominator)
if self.is_cumulative and self.period_start and self.period_start > date.today():
return 0
return self._real.actual_value
@property
def actual_numerator(self):
if self._actual_numerator is None and self.type == IndicatorType.PERCENTAGE:
self._actual_numerator = self.updates.total_numerator + self.contributors.total_numerator
return self._actual_numerator
@property
def actual_denominator(self):
if self._actual_denominator is None and self.type == IndicatorType.PERCENTAGE:
self._actual_denominator = self.updates.total_denominator + self.contributors.total_denominator
return self._actual_denominator
@property
def countries(self):
if self._countries is None:
self._countries = self.contributors.countries
return self._countries
@property
def locations(self):
if self._locations is None:
location = self.project.primary_location
self._locations = merge_unique(self.contributors.locations, [location])
return self._locations
@property
def disaggregation_contributions(self):
if self._disaggregation_contributions is None:
self._disaggregation_contributions = self.contributors.disaggregations
return self._disaggregation_contributions
@property
def disaggregation_targets(self):
if self._disaggregation_targets is None:
disaggregations = [
DisaggregationTarget(t)
for t in self._real.disaggregation_targets.all()
]
self._disaggregation_targets = {(d.category, d.type): d for d in disaggregations}
return self._disaggregation_targets
[docs] def get_disaggregation_target_of(self, category, type):
key = (category, type)
if key not in self.disaggregation_targets:
return None
return ensure_decimal(self.disaggregation_targets[key].value)
@property
def disaggregation_contributions_view(self):
if self._disaggregation_contributions_view is None:
self._disaggregation_contributions_view = copy.deepcopy(self._project_disaggregations)
for d in self.disaggregation_contributions.values():
category = d['category']
type = d['type']
value = d['value']
numerator = d['numerator']
denominator = d['denominator']
if category not in self._disaggregation_contributions_view or type not in self._disaggregation_contributions_view[category]:
continue
self._disaggregation_contributions_view[category][type] = {
'value': value,
'numerator': numerator,
'denominator': denominator,
}
return self._disaggregation_contributions_view
[docs] def get_disaggregation_contribution_of(self, category, type):
key = (category, type)
if key not in self.disaggregation_contributions:
return None
return self.disaggregation_contributions[key]['value']
[docs]class ContributorCollection(object):
def __init__(self, nodes, type=IndicatorType.UNIT, project_disaggregations=None):
self.nodes = nodes
self.type = type
self._project_disaggregations = project_disaggregations
self._contributors = None
self._total_value = None
self._total_numerator = None
self._total_denominator = None
self._countries = None
self._locations = None
self._disaggregations = None
@property
def total_value(self):
self._build()
return self._total_value
@property
def total_numerator(self):
self._build()
return self._total_numerator
@property
def total_denominator(self):
self._build()
return self._total_denominator
@property
def countries(self):
self._build()
return self._countries
@property
def locations(self):
self._build()
return self._locations
@property
def disaggregations(self):
self._build()
return self._disaggregations
def __iter__(self):
self._build()
return iter(self._contributors)
def __len__(self):
self._build()
return len(self._contributors)
def _build(self):
if self._contributors is not None:
return
self._contributors = []
self._countries = []
self._locations = []
self._disaggregations = {}
if self.type == IndicatorType.PERCENTAGE:
self._total_numerator = 0
self._total_denominator = 0
else:
self._total_value = 0
for node in self.nodes:
contributor = Contributor(node['item'], node['children'], self.type, self._project_disaggregations)
if not contributor.project.aggregate_to_parent or (
ensure_decimal(contributor.actual_value) < 1 and len(contributor.updates) < 1
):
continue
self._contributors.append(contributor)
self._countries = merge_unique(self._countries, contributor.contributing_countries)
self._locations = merge_unique(self._locations, contributor.contributing_locations)
# calculate values
if self.type == IndicatorType.PERCENTAGE:
self._total_numerator += contributor.actual_numerator
self._total_denominator += contributor.actual_denominator
else:
self._total_value += ensure_decimal(contributor.actual_value)
# calculate disaggregations
for key in contributor.contributors.disaggregations:
if key not in self._disaggregations:
self._disaggregations[key] = contributor.contributors.disaggregations[key].copy()
else:
self._disaggregations[key]['value'] += contributor.contributors.disaggregations[key]['value']
for key in contributor.updates.disaggregations:
if key not in self._disaggregations:
self._disaggregations[key] = contributor.updates.disaggregations[key].copy()
else:
self._disaggregations[key]['value'] += contributor.updates.disaggregations[key]['value']
[docs]class Contributor(object):
def __init__(self, period, children=[], type=IndicatorType.UNIT, project_disaggregations=None):
self.period = period
self.children = children
self.type = type
self._project_disaggregations = project_disaggregations
self._project = None
self._country = None
self._actual_numerator = None
self._actual_denominator = None
self._location = None
self._updates = None
self._contributors = None
self._contributing_countries = None
self._contributing_locations = None
self._actual_comment = None
self._target_value = None
self._disaggregation_targets = None
self._disaggregations_view = None
@property
def project(self):
if self._project is None:
self._project = self.period.indicator.result.project
return self._project
@property
def contributors(self):
if self._contributors is None:
children = self.children if self.project.aggregate_children else []
self._contributors = ContributorCollection(children, self.type, self._project_disaggregations)
return self._contributors
@property
def updates(self):
if self._updates is None:
self._updates = UpdateCollection(self.period, self.type)
return self._updates
@cached_property
def is_cumulative(self):
return self.period.indicator.is_cumulative()
@cached_property
def actual_value(self):
if self.type == IndicatorType.PERCENTAGE:
return calculate_percentage(self.actual_numerator, self._actual_denominator)
if self.is_cumulative and self.period.period_start and self.period.period_start > date.today():
return 0
return ensure_decimal(self.period.actual_value)
@property
def actual_numerator(self):
if self._actual_numerator is None and self.type == IndicatorType.PERCENTAGE:
self._actual_numerator = self.updates.total_numerator + self.contributors.total_numerator
return self._actual_numerator
@property
def actual_denominator(self):
if self._actual_denominator is None and self.type == IndicatorType.PERCENTAGE:
self._actual_denominator = self.updates.total_denominator + self.contributors.total_denominator
return self._actual_denominator
@property
def actual_comment(self):
if self._actual_comment is None:
self._actual_comment = self.period.actual_comment.split(' | ') \
if self.period.actual_comment \
else False
return self._actual_comment or None
@property
def target_value(self):
if self._target_value is None:
self._target_value = ensure_decimal(self.period.target_value) \
if self.type != IndicatorType.NARRATIVE \
else self.period.target_value
return self._target_value
@property
def disaggregation_targets(self):
if self._disaggregation_targets is None:
disaggregations = [
DisaggregationTarget(t)
for t in self.period.disaggregation_targets.all()
]
self._disaggregation_targets = {(d.category, d.type): d for d in disaggregations}
return self._disaggregation_targets
[docs] def get_disaggregation_target_of(self, category, type):
key = (category, type)
if key not in self.disaggregation_targets:
return None
return ensure_decimal(self.disaggregation_targets[key].value)
@property
def country(self):
if self._country is None:
self._country = self.location.country if self.location else False
return self._country or None
@property
def contributing_countries(self):
if self._contributing_countries is None:
self._contributing_countries = merge_unique(self.contributors.countries, [self.country])
return self._contributing_countries
@property
def location(self):
if self._location is None:
self._location = self.project.primary_location or False
return self._location or None
@property
def contributing_locations(self):
if self._contributing_locations is None:
self._contributing_locations = merge_unique(self.contributors.locations, [self.location])
return self._contributing_locations
@property
def disaggregations_view(self):
if self._disaggregations_view is None:
self._disaggregations_view = copy.deepcopy(self._project_disaggregations)
for d in self.updates.disaggregations.values():
category = d['category']
type = d['type']
value = d['value']
numerator = d['numerator']
denominator = d['denominator']
if category not in self._disaggregations_view or type not in self._disaggregations_view[category]:
continue
self._disaggregations_view[category][type] = {
'value': value,
'numerator': numerator,
'denominator': denominator,
}
return self._disaggregations_view
[docs] def get_disaggregation_of(self, category, type):
key = (category, type)
if key not in self.updates.disaggregations:
return None
return self.updates.disaggregations[key]['value']
[docs]class UpdateCollection(object):
def __init__(self, period, type):
self.period = period
self.type = type
self._updates = None
self._total_value = None
self._total_numerator = None
self._total_denominator = None
self._disaggregations = None
@property
def total_value(self):
self._build()
return self._total_value
@property
def total_numerator(self):
self._build()
return self._total_numerator
@property
def total_denominator(self):
self._build()
return self._total_denominator
@property
def disaggregations(self):
self._build()
return self._disaggregations
def __iter__(self):
self._build()
return iter(self._updates)
def __len__(self):
self._build()
return len(self._updates)
def _build(self):
if self._updates is not None:
return
self._updates = []
self._total_value = 0
if self.type == IndicatorType.PERCENTAGE:
self._total_numerator = 0
self._total_denominator = 0
self._disaggregations = {}
for update in self.period.data.all():
self._updates.append(UpdateProxy(update))
if update.status != IndicatorPeriodData.STATUS_APPROVED_CODE:
continue
if self.type == IndicatorType.PERCENTAGE:
if update.numerator is not None and update.denominator is not None:
self._total_numerator += update.numerator
self._total_denominator += update.denominator
elif update.value:
self._total_value += update.value
for d in update.disaggregations.all():
key = (d.dimension_value.name.name, d.dimension_value.value)
if key not in self._disaggregations:
self._disaggregations[key] = {
'category': d.dimension_value.name.name,
'type': d.dimension_value.value,
'value': 0,
'numerator': d.numerator,
'denominator': d.denominator,
}
self._disaggregations[key]['value'] += ensure_decimal(d.value)
if self.type == IndicatorType.PERCENTAGE and self._total_denominator > 0:
self._total_value = calculate_percentage(self._total_numerator, self._total_denominator)
[docs]class UpdateProxy(ObjectReaderProxy):
pass
[docs]class DisaggregationTarget(ObjectReaderProxy):
def __init__(self, target):
super().__init__(target)
self._category = None
self._type = None
@property
def category(self):
if self._category is None:
self._category = self.dimension_value.name.name
return self._category
@property
def type(self):
if self._type is None:
self._type = self.dimension_value.value
return self._type
def _aggregate_period_targets(period, children):
aggregate = ensure_decimal(period.target_value)
for node in children:
aggregate += _aggregate_period_targets(node['item'], node.get('children', []))
return aggregate
def _aggregate_indicator_targets(period, children):
aggregate = ensure_decimal(period.indicator.target_value)
for node in children:
aggregate += _aggregate_indicator_targets(node['item'], node.get('children', []))
return aggregate