# -*- coding: utf-8 -*-
# Akvo RSR is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.
import re
from xml.sax.saxutils import XMLGenerator, escape as __escape
from django.contrib.syndication.views import FeedDoesNotExist, Feed
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.utils.feedgenerator import Rss201rev2Feed
from django.utils.translation import gettext_lazy as _
from akvo.rsr.models import Project, ProjectUpdate, Organisation
[docs]def escape(data, entities={}):
"""Modification to xml.sax.saxutils.escape to that detects CDATA blocks that are not escaped
Escape &, <, and > in a string of data.
You can escape other strings of data by passing a dictionary as
the optional entities parameter. The keys and values must all be
strings; each key will be replaced with its corresponding value.
"""
# find character data, re.DOTALL includes linefeed in .
pattern = re.compile(r'<!\[CDATA\[.*\]\]>', re.DOTALL)
iterator = pattern.finditer(data)
start = 0
bits = []
for match in iterator:
# grab chunk before first match
bit = data[start:match.span()[0]]
bit = __escape(bit, entities)
bits.append(bit)
# grab match
bit = data[match.span()[0]:match.span()[1]]
bits.extend(bit)
start = match.span()[1]
# escape tail bit after last match
bit = data[start:]
bit = __escape(bit, entities)
bits.extend(bit)
data = ''.join(bits)
return data
[docs]class RSRSimplerXMLGenerator(XMLGenerator):
"""subclassed to be able to call custom escape() function, see above
"""
[docs] def characters(self, content):
if not isinstance(content, str):
content = str(content, self._encoding)
self._write(escape(content))
[docs] def addQuickElement(self, name, contents=None, attrs=None):
"Convenience method for adding an element with no children"
if attrs is None:
attrs = {}
self.startElement(name, attrs)
if contents is not None:
self.characters(contents)
self.endElement(name)
[docs]class UpdateFeed(Feed):
"""base class generating Update feeds
"""
feed_type = RSRMediaRssFeed
[docs] def link(self, obj):
if not obj:
raise FeedDoesNotExist
return obj.get_absolute_url()
[docs] def item_link(self, item):
return item.get_absolute_url()
[docs] def item_title(self, item):
return item.title
[docs] def item_description(self, item):
try:
item.photo.size
return '<![CDATA[<p><a href="%s"><img src="%s" alt="" /></a></p><p>%s</p>]]>' % (
item.get_absolute_url(),
item.photo.thumbnail.absolute_url,
item.text,
)
except Exception:
return item.text
[docs] def item_pubdate(self, item):
return item.created_at
[docs] def item_author_name(self, item):
return item.user.get_full_name()
[docs] def item_credit(self, item):
return item.photo_credit
[docs]class ProjectUpdates(UpdateFeed):
"""RSS feed for last 25 RSR updates of a project."""
[docs] def get_object(self, request, project_id):
return Project.objects.get(pk__exact=project_id)
[docs] def title(self, obj):
return _('Akvo RSR project %(id)d: %(project_title)s') % {
'id': obj.id,
'project_title': obj.title
}
[docs] def description(self, obj):
return _('Project updates for project %(project_title)s') % {
'project_title': obj.title
}
[docs] def items(self, obj):
# Limited to 25 items to prevent gateway timeouts.
return ProjectUpdate.objects.filter(project__id=obj.id)[:25]
[docs]class OrganisationUpdates(UpdateFeed):
"""RSS feed for last 25 RSR updates of an organisation."""
feed_type = RSRMediaRssFeed
[docs] def get_object(self, request, org_id):
return get_object_or_404(Organisation, id=int(org_id))
[docs] def title(self, obj):
return _('Projects of %(org_name)s') % {'org_name': obj.name, }
[docs] def description(self, obj):
if obj.name == obj.long_name:
return _("Project updates for projects partnered by %(org_name)s") % {
'org_name': obj.name
}
else:
return _(
"Project updates for projects partnered by %(org_name)s - %(long_name)s"
) % {'org_name': obj.name, 'long_name': obj.long_name}
[docs] def items(self, obj):
# Limited to 25 items to prevent gateway timeouts.
return obj.all_updates()[:25]
[docs] def item_title(self, item):
return _(
'Project %(project_id)d - %(project_title)s: %(update_title)s'
) % {
'project_id': item.project.id,
'project_title': item.project.title,
'update_title': item.title
}
[docs]class AllProjectUpdates(UpdateFeed):
"""RSS feed for last 25 RSR updates."""
title = _('Last 25 RSR project updates')
[docs] def link(self):
return reverse('update-directory')
description = _('Project updates for all Akvo RSR projects')
[docs] def items(self):
# Limited to 25 items to prevent gateway timeouts.
return ProjectUpdate.objects.all()[:25]
[docs] def item_title(self, item):
return _(
'Project %(project_id)d - %(project_title)s: %(update_title)s'
) % {
'project_id': item.project.id,
'project_title': item.project.title,
'update_title': item.title
}