Latest posts for tag asyncio

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os


class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None


class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    """
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    """
    rusage_results = {}

    @classmethod
    @contextmanager
    def monitor(cls, proc):
        """
        Return an ExtendedResults that gets filled when the process exits
        """
        assert proc.pid > 0
        pid = proc.pid
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
        try:
            yield extended_results
        finally:
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/unix_events.py; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

        try:
            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255",
                pid)
        else:
            if pid == 0:
                # The child process is still alive.
                return

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
        else:
            callback(pid, returncode, *args)

    @classmethod
    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()
        child_watcher.attach_loop(loop)
        asyncio.set_child_watcher(child_watcher)

To use it:

from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()

...

    @coroutine
    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
                self.write_stdin(self.proc.stdin),
                self.read_stdout(self.proc.stdout),
                self.read_stderr(self.proc.stderr)
            )
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()