Source code for akvo.rsr.management.commands.django_q_probettp

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Akvo Reporting is covered by the GNU Affero General Public License.
# See more details in the license.txt file located at the root folder of the Akvo RSR module.
# For additional details on the GNU license please see < http://www.gnu.org/licenses/agpl.html >.

"""
Provides a localhost HTTP server to query the local status of the django-q cluster
"""
import logging
import signal
import socket
from datetime import timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer

from django.core.management.base import BaseCommand
from django.utils import timezone
from django_q.models import Success
from django_q.status import Stat

logger = logging.getLogger(__name__)


[docs]class Command(BaseCommand): help = __doc__
[docs] def handle(self, *args, **options): server = HTTPServer(("localhost", 8080), DjangoQRequestHandler) def handle_end(*_): logger.info("Stopping server") server.shutdown() signal.signal(signal.SIGINT, handle_end) signal.signal(signal.SIGTERM, handle_end) logger.info("Starting server...") server.serve_forever()
[docs]class DjangoQRequestHandler(BaseHTTPRequestHandler): """ A handler to be used with HTTPServer to get the status of the local django-q cluster """
[docs] def do_GET(self): """ Handle GET requests to return response with the status code indicating django-q cluster is still running """ if self.is_worker_running(): self.send_response(200) else: self.send_response(500) self.end_headers() self.wfile.write(b'')
[docs] def is_worker_running(self): hostname = socket.gethostname() # Find local cluster local_stat = next(iter(stat for stat in Stat.get_all() if stat.host == hostname), None) if local_stat: logger.info(f"Cluster status: {local_stat.status}") return True # Sometimes Stat.get_all() returns [] even though the cluster is still running. # Since we have tasks that runs every minute, we can use it to make sure. # Check to see if we have any successful result within the last 10 minutes. now = timezone.now() past = now - timedelta(minutes=10) results = Success.objects.filter(started__gte=past) logger.info(f"successful tasks within 10 minutes: {results.count()}") # If any results are returned, then the background worker is running! return results.exists()
[docs] def log_message(self, format: str, *args) -> None: logger.debug(format, *args)