Latest posts for tag asyncio

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os

class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None

class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    rusage_results = {}

    def monitor(cls, proc):
        Return an ExtendedResults that gets filled when the process exits
        assert > 0
        pid =
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
            yield extended_results
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
                "Unknown child process pid %d, will report returncode 255",
            if pid == 0:
                # The child process is still alive.

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
            callback(pid, returncode, *args)

    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()

To use it:

from .hacks import SafeChildWatcherWithRusage


    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()