Developer Journal: 22 April 2020

Posted on Wed 22 April 2020 in dev-journal

Creating a new command to save screenshots in my dev journal folder

This meant adding the convert -trim command that I discovered recently into the screenshot command for i3. In addition to that a new command was created that would save the screenshots in a different location.

# .config/i3/config
bindsym Print exec scrot -e 'mv $f ~/screenshots/ 2>/dev/null'
bindsym --release Shift+Print exec scrot -s '%Y-%m-%d-%H%M%S_$wx$h.png' -e 'convert $f -trim +repage $f && mv $f ~/screenshots/'
bindsym --release $mod+Print exec scrot -s '%Y-%m-%d-%H%M%S_$wx$h.png' -e 'convert $f -trim +repage $f && mv $f ~/screenshots/dev-journal/'

Using gevent with RQ

I was working on a prototype to increase the concurrency in which I could deploy jobs. Feeling that I was reinventing the wheel I started looking into other options. I could switch to celery which has the option to specify gevent or greenlet as a pool option... I took another look through the RQ repo and found issue 45 and issue 303 which talked about creating a custom worker class to handle gevent. Thanks to this gist I was able to get going in the right direction. In the end I had a script that looked like this:

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from gevent import monkey, get_hub
from gevent.hub import LoopExit
monkey.patch_all() # noqa

import signal
import gevent
import gevent.pool
from rq import Worker
try:  # for rq >= 0.5.0
    from rq.job import JobStatus
except ImportError:  # for rq <= 0.4.6
    from rq.job import Status as JobStatus
from rq.timeouts import BaseDeathPenalty, JobTimeoutException
from rq.worker import StopRequested, green, blue, WorkerStatus
from rq.exceptions import DequeueTimeout
from rq.logutils import setup_loghandlers
from rq.version import VERSION

DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'


class GeventDeathPenalty(BaseDeathPenalty):
    def setup_death_penalty(self):
        exception = JobTimeoutException(
            'Gevent Job exceeded maximum timeout value (%d seconds).' %
            self._timeout)
        self.gevent_timeout = gevent.Timeout(self._timeout, exception)
        self.gevent_timeout.start()

    def cancel_death_penalty(self):
        self.gevent_timeout.cancel()


class GeventWorker(Worker):
    death_penalty_class = GeventDeathPenalty
    DEFAULT_POOL_SIZE = 20

    def __init__(self, *args, **kwargs):
        pool_size = self.DEFAULT_POOL_SIZE
        if 'pool_size' in kwargs:
            pool_size = kwargs.pop('pool_size')
        self.gevent_pool = gevent.pool.Pool(pool_size)
        self.gevent_worker = None
        super(GeventWorker, self).__init__(*args, **kwargs)

    def register_birth(self):
        super(GeventWorker, self).register_birth()
        self.connection.hset(self.key, 'pool_size', self.gevent_pool.size)

    def heartbeat(self, timeout=0, pipeline=None):
        connection = pipeline if pipeline is not None else self.connection
        super(GeventWorker, self).heartbeat(timeout)
        connection.hset(self.key, 'curr_pool_len', len(self.gevent_pool))

    def _install_signal_handlers(self):
        def request_force_stop():
            self.log.warning('Cold shut down.')
            self.gevent_pool.kill()
            raise SystemExit()

        def request_stop():
            if not self._stop_requested:
                gevent.hub.signal(signal.SIGINT, request_force_stop)
                gevent.hub.signal(signal.SIGTERM, request_force_stop)

                self.log.warning('Warm shut down requested.')
                self.log.warning('Stopping after all greenlets are finished. '
                                 'Press Ctrl+C again for a cold shutdown.')

                self._stop_requested = True
                self.gevent_pool.join()
                if self.gevent_worker is not None:
                    self.gevent_worker.kill(StopRequested)

        gevent.hub.signal(signal.SIGINT, request_stop)
        gevent.hub.signal(signal.SIGTERM, request_stop)

    def set_current_job_id(self, job_id, pipeline=None):
        pass

    def _work(self, burst=False, logging_level="INFO",
              date_format=DEFAULT_LOGGING_DATE_FORMAT,
              log_format=DEFAULT_LOGGING_FORMAT,
              max_jobs=None, with_scheduler=False):
        """Starts the work loop.

        Pops and performs all jobs on the current list of queues.  When all
        queues are empty, block and wait for new jobs to arrive on any of the
        queues, unless `burst` mode is enabled.

        The return value indicates whether any jobs were processed.
        """
        setup_loghandlers(logging_level)
        self._install_signal_handlers()

        self.did_perform_work = False
        self.register_birth()
        self.log.info("RQ GEVENT worker (Greenlet pool size={0}) {1!r} started, version {2}".
                      format(self.gevent_pool.size, self.key, VERSION))
        self.set_state(WorkerStatus.STARTED)

        try:
            while True:
                try:
                    self.check_for_suspension(burst)

                    if self.should_run_maintenance_tasks:
                        self.clean_registries()

                    if self._stop_requested:
                        self.log.info('Stopping on request.')
                        break

                    timeout = None if burst else max(1, self.default_worker_ttl - 60)

                    result = self.dequeue_job_and_maintain_ttl(timeout)
                    if result is None and burst:
                        self.log.info("RQ worker {0!r} done, quitting".format(self.key))

                        try:
                            # Make sure dependented jobs are enqueued.
                            get_hub().switch()
                        except LoopExit:
                            pass
                        result = self.dequeue_job_and_maintain_ttl(timeout)

                    if result is None:
                        break
                except StopRequested:
                    break

                job, queue = result
                self.execute_job(job, queue)

        finally:
            if not self.is_horse:
                self.register_death()
        return self.did_perform_work

    def work(self, burst=False, logging_level="INFO",
             date_format=DEFAULT_LOGGING_DATE_FORMAT,
             log_format=DEFAULT_LOGGING_FORMAT,
             max_jobs=None, with_scheduler=False):
        """
        Spawning a greenlet to be able to kill it when it's blocked dequeueing job
        :param burst: if it's burst worker don't need to spawn a greenlet
        """
        # If the is a burst worker it's not needed to spawn greenlet
        if burst:
            return self._work(burst, logging_level)

        self.gevent_worker = gevent.spawn(self._work, burst)
        self.gevent_worker.join()
        return self.gevent_worker.value

    def execute_job(self, job, queue):
        def job_done(child):
            self.did_perform_work = True
            self.heartbeat()
            if job.get_status() == JobStatus.FINISHED:
                queue.enqueue_dependents(job)

        child_greenlet = self.gevent_pool.spawn(self.perform_job, job, queue)
        child_greenlet.link(job_done)

    def dequeue_job_and_maintain_ttl(self, timeout):
        if self._stop_requested:
            raise StopRequested()

        result = None
        while True:
            if self._stop_requested:
                raise StopRequested()

            self.heartbeat()

            if self.gevent_pool.full():
                self.set_state(WorkerStatus.BUSY)
                self.log.warning(
                    "RQ GEVENT worker greenlet pool empty current size %s",
                    self.gevent_pool.size)

            while self.gevent_pool.full():
                gevent.sleep(0.1)
                if self._stop_requested:
                    raise StopRequested()

            try:
                result = self.queue_class.dequeue_any(
                    self.queues, timeout, connection=self.connection
                )
                self.set_state(WorkerStatus.IDLE)
                if result is not None:
                    job, queue = result
                    self.log.info('%s: %s (%s)' % (green(queue.name),
                                  blue(job.description), job.id))
                break
            except DequeueTimeout:
                pass

        self.heartbeat()
        return result


if __name__ == "__main__":
    import sys
    from rq.cli import worker as rq_main

    if '-w' in sys.argv or '--worker-class' in sys.argv:
        print("You cannot specify worker class when using this script,"
              "use the official rqworker instead")
        sys.exit(1)

    sys.argv.extend(['-w', 'gevent_rqworker.GeventWorker'])
    rq_main()

Now this allows me to run python gevent_rqworker.py high default to start two different worker queues using gevent with a pool of 20 greenlets