Source code for akvo.rsr.usecases.audit_project_aggregation

import math
import tablib
from dataclasses import dataclass, field
from decimal import Decimal
from typing import List, Optional
from django.conf import settings
from django.db.models import Q
from akvo.rsr.models import Project, IndicatorPeriod
from akvo.rsr.models.result.utils import QUANTITATIVE
from akvo.rsr.usecases.period_update_aggregation import calculate_period_actual_value
from akvo.utils import ensure_decimal, rsr_send_mail

DECIMAL_TOLERANCE = 0.0001


[docs]@dataclass(frozen=True) class FailureItem: period: IndicatorPeriod expected_value: Decimal
[docs]@dataclass class AuditResult: success: int = 0 failures: List[FailureItem] = field(default_factory=list)
[docs] def increment_success(self): self.success += 1
[docs] def add_failure(self, failure: FailureItem): self.failures.append(failure)
@property def failure_count(self): return len(self.failures) @property def total_count(self): return self.success + self.failure_count
[docs]def audit_period_aggregation(period: IndicatorPeriod, result: Optional[AuditResult] = None) -> AuditResult: result = result or AuditResult() value, *_ = calculate_period_actual_value(period) if math.isclose(ensure_decimal(period.actual_value), value, abs_tol=DECIMAL_TOLERANCE): result.increment_success() elif period.indicator.is_cumulative() and not period.approved_updates.exists(): # carried-over value in the cumulative period is not saved until the actual update for that period is approved result.increment_success() else: result.add_failure(FailureItem(period=period, expected_value=value)) for child_period in period.child_periods.all(): audit_period_aggregation(child_period, result) return result
[docs]def audit_project_aggregation(project: Project, send_mail=True): running_quantitative_periods = IndicatorPeriod.objects\ .filter(indicator__result__project=project, indicator__type=QUANTITATIVE)\ .exclude(Q(actual_value__isnull=True) | Q(actual_value__exact='')) result = AuditResult() for period in running_quantitative_periods: audit_period_aggregation(period, result) _process_audit_result(project, result, send_mail) return result
def _process_audit_result(project: Project, result: AuditResult, send_mail): print(f"Audited {result.total_count} periods, {result.failure_count} errors found") if not result.failures: return failure_report = _create_failure_report(result) email_recipients = getattr(settings, 'PROJECT_AGGREGATION_ERROR_RECIPIENTS', []) if email_recipients and send_mail: content = f"The following aggregation problem were detected\n\n{failure_report.export('tsv')}" attachments = [{ 'filename': 'aggregation_errors.tsv', 'content': content, 'mimetype': 'text/tab-separated-values' }] rsr_send_mail( email_recipients, subject='audit_aggregation/subject.txt', message='audit_aggregation/message.txt', msg_context={'project': project, 'result': result}, attachments=attachments ) print(failure_report.export('tsv')) def _create_failure_report(result): dataset = tablib.Dataset() dataset.headers = [ 'Project', 'Result', 'Indicator', 'Period', 'Current value', 'Expected value', ] for item in result.failures: period = item.period indicator = period.indicator result = indicator.result project = result.project dataset.append([ f"{project.title} (ID: {project.id})", result.title, indicator.title, str(period), period.actual_value, str(item.expected_value), ]) return dataset