Source code for akvo.utils

# -*- coding: utf-8 -*-

# Akvo RSR is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.

# utility functions for RSR
from collections import namedtuple
import hashlib
import inspect
import json
import logging
from os.path import splitext
import zipfile

from decimal import Decimal, InvalidOperation
from django.conf import settings
from django.contrib.admin.models import LogEntry, ADDITION, CHANGE, DELETION
from django.contrib.auth.models import Group
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.mail import EmailMultiAlternatives, get_connection
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from django.core.signing import TimestampSigner
from django.apps import apps
from django.http import HttpResponse
from django.template import loader
from django.utils import timezone
from django.utils.text import slugify
import pytz
from sorl.thumbnail import get_thumbnail as get_sorl_thumbnail
from sorl.thumbnail.parsers import parse_geometry
from storages.backends.gcloud import GoogleCloudStorage

from akvo.rsr.iso3166 import COUNTRY_CONTINENTS, CONTINENTS, ISO_3166_COUNTRIES


logger = logging.getLogger('akvo.rsr')


[docs]class HttpResponseNoContent(HttpResponse): status_code = 204
# tuple holding a Django table full name, app name and model name. # Used in the project_editor DRF code DjangoModel = namedtuple('DjangoModel', 'table_name, app, model_name')
[docs]def rsr_image_path(instance, file_name, path_template='db/project/%s/%s'): """ Use to set ImageField upload_to attribute. Create path for image storing. When a new object instance is created we save in MEDIA_ROOT/db/project/temp/img_name.ext first and then immediately call save on the ImageFieldFile when the object instance has been saved to the db, so the path changes to MEDIA_ROOT/db/project/org.pk/img_name.ext. Modify path by supplying a path_tempate string """ if instance.pk: instance_pk = str(instance.pk) else: # for new objects that have no id yet instance_pk = 'temp' return path_template % locals()
[docs]def send_mail_with_attachments(subject, message, from_email, recipient_list, fail_silently=False, auth_user=None, auth_password=None, connection=None, html_message=None, attachments=None): """ Extension of django.core.main.send_mail to allow the inclusion of attachments Easy wrapper for sending a single message to a recipient list. All members of the recipient list will see the other recipients in the 'To' field. If auth_user is None, the EMAIL_HOST_USER setting is used. If auth_password is None, the EMAIL_HOST_PASSWORD setting is used. Note: The API for this method is frozen. New code wanting to extend the functionality should use the EmailMessage class directly. attachments must be a list of dicts of the form {'filename': <file name>, 'content': <attachment data>, 'mimetype': mime type} """ connection = connection or get_connection(username=auth_user, password=auth_password, fail_silently=fail_silently) mail = EmailMultiAlternatives(subject, message, from_email, recipient_list, connection=connection) if html_message: mail.attach_alternative(html_message, 'text/html') if attachments: for attachment in attachments: mail.attach(**attachment) logger.info('Sending email to "%s" with the subject "%s"', recipient_list, subject) return mail.send()
[docs]def rsr_send_mail(to_list, subject='templates/email/test_subject.txt', message='templates/email/test_message.txt', subject_context=None, msg_context=None, html_message=None, attachments=None): """ Send template driven email. to_list is a list of email addresses subject and message are templates for use as email subject and message body subject_context and msg_context are dicts used when renedering the respective templates html_message is the HTML template for use as message body settings.RSR_DOMAIN is added to both contexts as current_site, defaulting to 'akvo.org' if undefined """ subject_context = subject_context or {} msg_context = msg_context or {} current_site = getattr(settings, 'RSR_DOMAIN', 'rsr.akvo.org') subject_context.update({'site': current_site}) subject = loader.render_to_string(subject, subject_context) # Email subject *must not* contain newlines subject = ''.join(subject.splitlines()) msg_context.update({'site': current_site}) message = loader.render_to_string(message, msg_context) if html_message: html_message = loader.render_to_string(html_message, msg_context) send_mail_with_attachments( subject, message, settings.DEFAULT_FROM_EMAIL, to_list, html_message=html_message, attachments=attachments )
[docs]def rsr_send_mail_to_users(users, subject='test/test_subject.txt', message='test/test_message.txt', subject_context=None, msg_context=None): """ Send mail to many users supplied through a queryset """ if not subject_context: subject_context = {} if not msg_context: msg_context = {} to_list = [user.email for user in users if user.email] rsr_send_mail(to_list, subject, message, subject_context, msg_context)
[docs]def model_and_instance_based_filename(object_name, pk, field_name, img_name): """ Create a file name for an image based on the model name, the current object's pk, the field name of the model and the current date and time""" return "%s_%s_%s_%s%s" % ( object_name, pk or '', field_name, timezone.now().strftime("%Y-%m-%d_%H.%M.%S"), splitext(img_name)[1], )
[docs]def who_am_i(): "introspecting function returning the name of the function where who_am_i is called" return inspect.stack()[1][3]
[docs]def who_is_parent(): """ introspecting function returning the name of the caller of the function where who_is_parent is called """ return inspect.stack()[2][3]
# convert naive datetime to GMT format
[docs]def to_gmt(dt): gmt = pytz.timezone('GMT') return dt.replace(tzinfo=gmt).astimezone(gmt)
[docs]def custom_get_or_create_country(iso_code, country=None): """ add the missing fields to a skeleton country object from the admin or create a new one with the given iso_code if it doesn't already exist """ # Importing Country at the module level doesn't work, because of circular imports from akvo.rsr.models import Country iso_code = iso_code.lower() if not country: try: country = Country.objects.get(iso_code=iso_code) return country except Country.DoesNotExist: country = Country() country.iso_code = iso_code continent_code = COUNTRY_CONTINENTS[iso_code] country.name = dict(ISO_3166_COUNTRIES)[iso_code] country.continent = dict(CONTINENTS)[continent_code] country.continent_code = continent_code country.save() return country
[docs]def right_now_in_akvo(): """ Calculate the numbers used in the "Right now in Akvo" box on the home page. """ projects = apps.get_model('rsr', 'Project').objects.public().published() organisations = apps.get_model('rsr', 'Organisation').objects.all() updates = apps.get_model('rsr', 'ProjectUpdate').objects.all() people_served = projects.get_largest_value_sum( getattr(settings, 'AFFECTED_BENCHMARKNAME', 'people affected') ) return { 'number_of_organisations': organisations.count(), 'number_of_projects': projects.count(), 'people_served': int(people_served / 1000) * 1000, 'projects_budget_millions': round(projects.budget_sum() / 100000) / 10.0, 'number_of_project_updates': updates.count(), }
[docs]def rsr_show_keywords(instance): if len(instance.keywords.all()) > 0: keyword_str = '<ul>' for key in instance.keywords.all(): keyword_str += '<li>%s</li>' % key.label keyword_str += '</ul>' return keyword_str else: return 'None'
[docs]def filter_query_string(qs): """ Takes a QueryDict and returns a string that can be prepended to paginated links. Since pagination is handled outside of this function we pop the page item. """ q = dict(qs.lists()) # to Python dict q.pop('page', None) if not bool(q): return '' return '&{}'.format( '&'.join(['{}={}'.format(k, ''.join(v)) for (k, v) in q.items()]))
[docs]def codelist_choices(codelist, show_code=True): """ Based on a model from the codelists app, returns a list of tuples with the available choices. :param codelist: Codelist from codelists store :param show_code: Show the code (e.g. '1 - ..') in front of the name, True by default :return: List of tuples with available choices, tuples in the form of (code, name) """ fields = codelist[0] try: name_index = fields.index('name') except Exception: name_index = None # the code field has to exist or we're in trouble code_index = fields.index('code') list_items = codelist[1:] if name_index is not None and show_code: return [ (item[code_index], '{} - {}'.format(item[code_index], item[name_index])) for item in list_items ] else: return [ (item[code_index], item[name_index] if name_index is not None else item[code_index]) for item in list_items ]
[docs]def codelist_value(model, instance, field, version=settings.IATI_VERSION): """ Looks up the value of a codelist :param model: Model from codelists app :param instance: Instance from model :param field: String of the lookup field (e.g. 'type') :param version: String of version (optional) :return: String of the codelist instance """ value = getattr(instance, field, None) if not value: return '' key = '{}-{}-{}'.format(version, model.__name__, value,) # Memcached keys can't have whitespace and has a max length of 250 # https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L41 key = slugify(key).encode('utf-8')[:250] result = cache.get(key) if result is not None: return result try: objects = getattr(model, 'objects') result = objects.get(code=value, version__code=version) except model.DoesNotExist: result = value else: # Update the cache only if the required data is in the DB! cache.set(key, result) finally: return result
[docs]def codelist_has_value(model, value, version=settings.IATI_VERSION): """ Check if value exists """ objects = getattr(model, 'objects') return objects.filter(code=value, version__code=version).exists()
[docs]def codelist_name(model, instance, field, version=settings.IATI_VERSION): """ Looks up the name of a codelist, returns the field value if the lookup fails :param model: Model from codelists app :param instance: Instance from model :param field: String of the lookup field (e.g. 'type') :param version: String of version (optional) :return: String of the codelist instance """ value = codelist_value(model, instance, field, version) return value.name if hasattr(value, 'name') else value
[docs]def check_auth_groups(group_names): for group_name in group_names: Group.objects.get_or_create(name=group_name)
[docs]def file_from_zip_archive(zip, file_name): # pragma: no cover """ Return a file from a zip archive :param zip: zip file or file name :param file_name: name of the file to retrieve from the archive :return: the file or None """ zip = zipfile.ZipFile(zip, 'r') # TODO: in test try: return zip.open(file_name) except KeyError: return None
[docs]def get_sha1_hash(s): """ return the sha1 hash of the string you call with""" hash = hashlib.sha1() hash.update(s) return hash.hexdigest()
[docs]def single_period_dates(name): try: config = settings.SINGLE_PERIOD_INDICATORS[name] return config['needs_reporting_timeout_days'], config['period_start'], config['period_end'] except KeyError: return None, None, None
[docs]def get_placeholder_thumbnail(file_, geometry_string, **options): """Return a place holder url for the given geometry string""" x, y = parse_geometry(geometry_string, ratio=1) url = '//placehold.it/{}x{}'.format(x, y) Url = namedtuple('Url', ('url',)) return Url(url=url)
local_dev = settings.RSR_DOMAIN == 'rsr.localdev.akvo.org' get_thumbnail = get_placeholder_thumbnail if local_dev else get_sorl_thumbnail
[docs]def get_report_thumbnail(file_): """ Helper function to guarantee the same settings for existing and new thumbs of project update photos """ return get_thumbnail(file_, settings.RS_THUMB_GEOMETRY, quality=settings.RS_THUMB_QUALITY)
[docs]def log_project_changes(user, project, related_obj, data, action): """Logs all changes to Django's LogEntry model.""" Project = apps.get_model('rsr', 'Project') if action == 'changed': action_flag = CHANGE change = {action: {'fields': list(data.keys())}, "source": "API"} elif action == 'added': action_flag = ADDITION change = {action: {}, "source": "API"} elif action == 'deleted': action_flag = DELETION change = {action: '', "source": "API"} LogEntry.objects.log_action( user_id=user.pk, content_type_id=ContentType.objects.get_for_model(related_obj).pk, object_id=related_obj.pk, object_repr=str(related_obj), action_flag=action_flag, change_message=json.dumps([change]) ) if not isinstance(related_obj, Project): obj_name = related_obj._meta.model_name if action_flag in {ADDITION, DELETION}: project_fields = {'name': obj_name, 'object': related_obj.pk} else: project_fields = { 'fields': ['{}__{}'.format(obj_name, key) for key in data], 'object': related_obj.pk, } project_change = dict(change) project_change[action] = project_fields LogEntry.objects.log_action( user_id=user.pk, content_type_id=ContentType.objects.get_for_model(project).pk, object_id=project.pk, object_repr=str(project), action_flag=CHANGE, change_message=json.dumps([project_change]) ) return
[docs]def get_country(*args, **kwargs): """Stub function since one of the migrations imports this function"""
[docs]def get_project_for_object(Project, obj): """Return the Project to which an object is associated.""" obj_model = obj._meta.model if obj is not None else None model_project_relation = getattr(obj_model, 'project_relation', None) if model_project_relation: query = {model_project_relation: [obj.id]} project = Project.objects.get(**query) elif obj_model == Project: project = obj elif hasattr(obj, 'project'): project = obj.project else: logger.info('%s does not define a relation to a project', obj_model) project = None return project
[docs]def send_user_invitation(email, user, invited_user, employment=None, project=None): _, token_date, token = TimestampSigner().sign(email).split(':') subject = 'registration/invited_user_subject.txt' msg_context = { 'user': user, 'invited_user': invited_user, 'token': token, 'token_date': token_date, } if employment is not None: msg_context['employment'] = employment message = 'registration/invited_user_message.txt' html_message = 'registration/invited_user_message.html' else: is_a4a = project.reporting_org and project.reporting_org.id == settings.A4A_ORG_ID msg_context['project'] = project message = 'registration/a4a_project_invited_user_message.txt' if is_a4a else 'registration/project_invited_user_message.txt' html_message = 'registration/a4a_project_invited_user_message.html' if is_a4a else 'registration/project_invited_user_message.html' params = dict( subject=subject, message=message, html_message=html_message, msg_context=msg_context) rsr_send_mail([email], **params)
[docs]def ensure_decimal(value): try: return Decimal(value) except (InvalidOperation, TypeError): return Decimal(0)
[docs]def maybe_decimal(value): try: return Decimal(value) except (InvalidOperation, TypeError): return None
BOOL_STRINGS = ( "1", "true", "yes", )
[docs]def to_bool(obj) -> bool: """Converts a given object to a boolean""" return str(obj).lower() in BOOL_STRINGS
[docs]def save_image(img, name, field_name): if isinstance(img.storage, GoogleCloudStorage): new_name = img.field.generate_filename(img.instance, name) blob = img.storage.bucket.blob(img.name) img.storage.bucket.rename_blob(blob, new_name) img.name = new_name img.instance.save(update_fields=[field_name]) else: img.save(name, img)
[docs]class ObjectReaderProxy(object): """ Proxy object to wrap other object for read only purpose. Main use case is for wrapping models to be used in the presentation layer. Additional property/method can be added to encapsulate presentation logic in sub-classes. """ def __init__(self, real): self._real = real def __getattr__(self, attr): return getattr(self._real, attr)
[docs]def make_safe_timezone_aware_date(d): ''' If datetime is naive and is an invalid datetime of current timezone, then assume datetime is in UTC and convert it to a datetime of current timezone. ''' if not timezone.is_naive(d): return d try: return timezone.make_aware(d, timezone.get_current_timezone()) except pytz.InvalidTimeError: date_utc = timezone.make_aware(d, timezone.utc) return date_utc.astimezone(timezone.get_current_timezone())