Results aggregation
In a project hierarchies, it’s possible to see and sum up the data from the child projects. This is an operation that bubbles upwards and is called an aggregation.
The target node of an aggregation is a program.
Process
Once a result is submitted, an aggregation job is queued and executed by the worker. The progress of the job can be tracked, there’s an automatic retry mechanism, and when the max number of retries has been reached, specific users can be notified by email and it’s possible to trigger a retry from the frontend.
See also
Jobs are part of the Task Management.
The main model is
akvo.rsr.models.aggregation_job.IndicatorPeriodAggregationJob
and the execution can be found in
akvo-rsr.doc.akvo.rsr.usecases.jobs.aggregation
.
The general process is as follows
from akvo.rsr.models.result.indicator_period import IndicatorPeriod
from akvo.rsr.usecases.jobs.aggregation import (
schedule_aggregation_jobs,
execute_aggregation_jobs,
)
period = IndicatorPeriod.objects.get(id=some_id)
schedule_aggregation_jobs(period)
# every interval (currently a minute)
execute_aggregation_jobs()
Scheduling aggregation jobs
The process of schedule is simply creating an IndicatorPeriodAggregationJob
.
The conditions are in the function, but they can be sums up to: only schedule if this job hasn’t been schedule yet.
Executing aggregation jobs
execute_aggregation_jobs
takes care of executing all aggregation jobs.
This is a remnant from when the task was executed in a cronjob.
Each aggregation job targets a period and runs all the way until the root of the tree. The algorithm is as follows
def aggregate_period(period):
# Sum values of children
if has_child(period):
child_values = period.children.values()
total = sum(child_values)
period.actual_value = total
period.save()
if has_parent(period):
aggregate_period(period.parent_period)
It is a monolithic, multi-step operation wrapped in a database transaction and there is much room for optimisation e.g
periods could make use of
AkvoTreeModel
for traversing the hierarchyeach level could be done with an independent job using a django-q Chain
it might be possible to improve the model and make this unnecessary by allowing calculation at request time (Sum aggregation)
Handling failures
Should a job fail for whatever reason, the number of failures is updated and the job is marked as failed.
A check is then done to make sure the
akvo/rsr/usecases/jobs/aggregation.MAX_ATTEMPTS
has been reached.
Should that be the case, the job is marked as “maxxed” and an email is sent to subscribers.
Subscribers are configured by the admin at request of the subscriber by setting
akvo.rsr.models.employment.Employment.receives_indicator_aggregation_emails