Latest posts for tag python

A little gitpython recipe to list the paths of all files in a commit:


import git
from pathlib import Path
import sys

def list_paths(root_tree, path=Path(".")):
    for blob in root_tree.blobs:
        yield path /
    for tree in root_tree.trees:
        yield from list_paths(tree, path /

repo = git.Repo(".", search_parent_directories=True)
commit = repo.commit(sys.argv[1])
for path in list_paths(commit.tree):

It can be a good base, for example, for writing a script that, given two git branches, shows which django migrations are in one and not in the other, without doing any git checkout of the code.

One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.

It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.

I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.

This is how to start Tornado on a free random port:

from tornado.options import define, options
import tornado.netutil
import tornado.httpserver

define("web_port", type=int, default=None, help="listening port for web interface")

application = Application(self.db_url)

if options.web_port is None:
    sockets = tornado.netutil.bind_sockets(0, '')
    self.web_port = sockets[0].getsockname()[:2][1]
    server = tornado.httpserver.HTTPServer(application)
    server = tornado.httpserver.HTTPServer(application)

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os

class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None

class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    rusage_results = {}

    def monitor(cls, proc):
        Return an ExtendedResults that gets filled when the process exits
        assert > 0
        pid =
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
            yield extended_results
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
                "Unknown child process pid %d, will report returncode 255",
            if pid == 0:
                # The child process is still alive.

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
            callback(pid, returncode, *args)

    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()

To use it:

from .hacks import SafeChildWatcherWithRusage


    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()

Python mailbox.mbox is not good at opening compressed mailboxes:

>>> import mailbox
>>> print(len(mailbox.mbox("/tmp/test.mbox")))
>>> print(len(mailbox.mbox("/tmp/test.mbox.gz")))
>>> print(len(mailbox.mbox("/tmp/test1.mbox.xz")))

For a prototype rewrite of the MIA team's Echelon (the engine behind mia-query), I needed to scan compressed mailboxes, and I had to work around this limitation.

Here is the alternative mailbox.mbox implementation:

import lzma
import gzip
import bz2
import mailbox

class StreamMbox(mailbox.mbox):
    mailbox.mbox does not support opening a stream, which is sad.

    This is a subclass that works around it
    def __init__(self, fd: BinaryIO, factory=None, create: bool = True):
        # Do not call parent __init__, just redo everything here to be able to
        # open a stream. This will need to be re-reviewed for every new version
        # of python's stdlib.

        # Mailbox constructor
        self._path = None
        self._factory = factory

        # _singlefileMailbox constructor
        self._file = fd
        self._toc = None
        self._next_key = 0
        self._pending = False       # No changes require rewriting the file.
        self._pending_sync = False  # No need to sync the file
        self._locked = False
        self._file_length = None    # Used to record mailbox size

        # mbox constructor
        self._message_factory = mailbox.mboxMessage

    def flush(self):
        raise NotImplementedError("StreamMbox is a readonly class")

class UsageExample:

    def scan(cls, path: Path) -> Generator[ScannedEmail, None, None]:
        decompress = cls.DECOMPRESS.get(path.suffix)
        if decompress is None:
            with open(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)
            with decompress(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)

    def scan_fd(cls, path: Path, fd: BinaryIO) -> Generator[ScannedEmail, None, None]:
        mbox = StreamMbox(fd)
        for msg in mbox:

This code:


class Test:
    def __init__(self, items=[]):
        self.items = items

    def add(self, item):

a = Test()
b = Test()

"obviously" prints:

['foo', 'bar']
['foo', 'bar']

Because the default value of the items argument is a mutable list constructed just once when the code is compiled when the function definition is evaluated, and then reused.

So, in Python, mutable items in default arguments are a good way to get more fun time with debugging.

I am exposing some library functions using a TurboGears2 controller (see Building a web-based API with Turbogears2). It turns out that some functions return a dict, some a list, some a string, and TurboGears 2 only allows JSON serialisation for dicts.

A simple work-around for this is to wrap the function result into a dict, something like this:

@validate(validator_dispatcher, error_handler=api_validation_error)
def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
    # Call API
    res = self.engine.list_colours(filter, productID, maxResults)

    # Return result
    return dict(r=res)

It would be nice, however, to have an @webapi() decorator that automatically wraps the function result with the dict:

def webapi(func):
    def dict_wrap(*args, **kw):
        return dict(r=func(*args, **kw))
    return dict_wrap

# the controller...

    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

This works, as long as @webapi appears last in the list of decorators. This is because if it appears last it will be the first to wrap the function, and so it will not interfere with the tg.decorators machinery.

Would it be possible to create a decorator that can be put anywhere among the decorator list? Yes, it is possible but tricky, and it gives me the feeling that it may break in any future version of TurboGears:

class webapi(object):
    def __call__(self, func):
        def dict_wrap(*args, **kw):
            return dict(r=func(*args, **kw))
        # Migrate the decoration attribute to our new function
        if hasattr(func, 'decoration'):
            dict_wrap.decoration = func.decoration
            dict_wrap.decoration.controller = dict_wrap
            delattr(func, 'decoration')
        return dict_wrap

# the controller...

    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

As a convenience, TurboGears 2 offers, in the decorators module, a way to build decorator "hooks":

class before_validate(_hook_decorator):
    '''A list of callables to be run before validation is performed'''
    hook_name = 'before_validate'

class before_call(_hook_decorator):
    '''A list of callables to be run before the controller method is called'''
    hook_name = 'before_call'

class before_render(_hook_decorator):
    '''A list of callables to be run before the template is rendered'''
    hook_name = 'before_render'

class after_render(_hook_decorator):
    '''A list of callables to be run after the template is rendered.

    Will be run before it is returned returned up the WSGI stack'''

    hook_name = 'after_render'

The way these are invoked can be found in the _perform_call function in tg/

To show an example use of those hooks, let's add a some polygen wisdom to every data structure we return:

class wisdom(decorators.before_render):
    def __init__(self, grammar):
        super(wisdom, self).__init__(self.add_wisdom)
        self.grammar = grammar
    def add_wisdom(self, remainder, params, output):
        from subprocess import Popen, PIPE
        output["wisdom"] = Popen(["polyrun", self.grammar], stdout=PIPE).communicate()[0]

# the controller...

    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

These hooks cannot however be used for what I need, that is, to wrap the result inside a dict. The reason is because they are called in this way:

                'before_render', remainder, params, output)

and not in this way:

        output = controller.decoration.run_hooks(
                'before_render', remainder, params, output)

So it is possible to modify the output (if it is a mutable structure) but not to exchange it with something else.

Can we do even better? Sure we can. We can assimilate @expose and @validate inside @webapi to avoid repeating those same many decorator lines over and over again:

class webapi(object):
    def __init__(self, error_handler = None):
        self.error_handler = error_handler

    def __call__(self, func):
        def dict_wrap(*args, **kw):
            return dict(r=func(*args, **kw))
        res = expose("json")(dict_wrap)
        res = validate(validator_dispatcher, error_handler=self.error_handler)(res)
        return res

# the controller...

    def api_validation_error(self, **kw):
        pylons.response.status = "400 Error"
        return dict(e="validation error on input fields", form_errors=pylons.c.form_errors)

    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

This got rid of @expose and @validate, and provides almost all the default values that I need. Unfortunately I could not find out how to access api_validation_error from the decorator so that I can pass it to the validator, therefore I remain with the inconvenience of having to explicitly pass it every time.

I am using TurboGears2 to export a python API over the web. Every API method is wrapper by a controller method that validates the parameters and returns the results encoded in JSON.

The basic idea is this:

def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
    # Call API
    res = self.engine.list_colours(filter, productID, maxResults)

    # Return result
    return res

To validate the parameters we can use forms, it's their job after all:

class ListColoursForm(TableForm):
    fields = [
            # One field per parameter
            twf.TextField("filter", help_text="Please enter the string to use as a filter"),
            twf.TextField("productID", help_text="Please enter the product ID"),
            twf.TextField("maxResults", validator=twfv.Int(min=0), default=200, size=5, help_text="Please enter the maximum number of results"),


    @validate(list_colours_form, error_handler=list_colours_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Parameter validation is done by the form

        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

All straightforward so far. However, this means that we need two exposed methods for every API call: one for the API call and one error handler. For every API call, we have to type the name several times, which is error prone and risks to get things mixed up.

We can however have a single error handler for all methonds:

def get_method():
    The method name is the first url component after the controller name that
    does not start with 'test'
    found_controller = False
    for name in pylons.c.url.split("/"):
        if not found_controller and name == "controllername":
            found_controller = True
        if name.startswith("test"):
        if found_controller:
            return name
    return None

class ValidatorDispatcher:
    Validate using the right form according to the value of the "method" field
    def validate(self, args, state):
        method = args.get("method", None)
    # Extract the method from the URL if it is missing
        if method is None:
            method = get_method()
            args["method"] = method
        return forms[method].validate(args, state)

validator_dispatcher = ValidatorDispatcher()

This validator will try to find the method name, either as a form field or by parsing the URL. It will then use the method name to find the form to use for validation, and pass control to the validate method of that form.

We then need to add an extra "method" field to our forms, and arrange the forms inside a dictionary:

class ListColoursForm(TableForm):
    fields = [
            # One hidden field to have a place for the method name
            # One field per parameter
            twf.TextField("filter", help_text="Please enter the string to use as a filter"),

forms["list_colours"] = ListColoursForm()

And now our methods become much nicer to write:

    def api_validation_error(self, **kw):
        pylons.response.status = "400 Error"
        return dict(form_errors=pylons.c.form_errors)

    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Parameter validation is done by the form

        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

api_validation_error is interesting: it returns a proper HTTP error status, and a JSON body with the details of the error, taken straight from the form validators. It took me a while to find out that the form errors are in pylons.c.form_errors (and for reference, the form values are in pylons.c.form_values). pylons.response is a WebOb Response that we can play with.

So now our client side is able to call the API methods, and get a proper error if it calls them wrong.

But now that we have the forms ready, it doesn't take much to display them in web pages as well:

def _describe(self, method):
    "Return a dict describing an API method"
    ldesc = getattr(self.engine, method).__doc__.strip()
    sdesc = ldesc.split("\n")[0]
    return dict(name=method, sdesc = sdesc, ldesc = ldesc)

def index(self):
    Show an index of exported API methods
    methods = dict()
    for m in forms.keys():
        methods[m] = self._describe(m)
    return dict(methods=methods)

def testform(self, method, **kw):
    Show a form with the parameters of an API method
    kw["method"] = method
    return dict(method=method, action="/myapp/test/"+method, value=kw, info=self._describe(method), form=forms[method])

@validate(validator_dispatcher, error_handler=testform)
def test(self, method, **kw):
    Run an API method and show its prettyprinted result
    res = getattr(self, str(method))(**kw)
    return pprint.pformat(res)

In a few lines, we have all we need: an index of the API methods (including their documentation taken from the docstrings!), and for each method a form to invoke it and a page to see the results.

Make the forms children of AjaxForm, and you can even see the results together with the form.

It is possible to create process pipelines using subprocess.Popen, by just using stdout=subprocess.PIPE and stdin=otherproc.stdout.


In a pipeline created in this way, the stdout of all processes except the last is opened twice: once in the script that has run the subprocess and another time in the standard input of the next process in the pipeline.

This is a problem because if a process closes its stdin, the previous process in the pipeline does not get SIGPIPE when trying to write to its stdout, because that pipe is still open on the caller process. If this happens, a wait on that process will hang forever: the child process waits for the parent to read its stdout, the parent process waits for the child process to exit.

The trick is to close the stdout of each process in the pipeline except the last just after creating them:

# coding=utf-8

import subprocess

def pipe(*args):
    Takes as parameters several dicts, each with the same
    parameters passed to popen.

    Runs the various processes in a pipeline, connecting
    the stdout of every process except the last with the
    stdin of the next process.
    if len(args) < 2:
        raise ValueError, "pipe needs at least 2 processes"
    # Set stdout=PIPE in every subprocess except the last
    for i in args[:-1]:
        i["stdout"] = subprocess.PIPE

    # Runs all subprocesses connecting stdins and stdouts to create the
    # pipeline. Closes stdouts to avoid deadlocks.
    popens = [subprocess.Popen(**args[0])]
    for i in range(1,len(args)):
        args[i]["stdin"] = popens[i-1].stdout

    # Returns the array of subprocesses just created
    return popens

At this point, it's nice to write a function that waits for the whole pipeline to terminate and returns an array of result codes:

def pipe_wait(popens):
    Given an array of Popen objects returned by the
    pipe method, wait for all processes to terminate
    and return the array with their return values.
    results = [0] * len(popens)
    while popens:
        last = popens.pop(-1)
        results[len(popens)] = last.wait()
    return results

And, look and behold, we can now easily run a pipeline and get the return codes of every single process in it:

process1 = dict(args='sleep 1; grep line2 testfile', shell=True)
process2 = dict(args='awk \'{print $3}\'', shell=True)
process3 = dict(args='true', shell=True)
popens = pipe(process1, process2, process3)
result = pipe_wait(popens)
print result

Update: Colin Watson suggests an improvement to compensate for Python's nonstandard SIGPIPE handling.

Colin Watson has a similar library for C.

How do you create a list of similar functions in Python?

As a simple example, let's say we want to create an array of 10 elements like this:

a[0] = lambda x: x
a[1] = lambda x: x+1
a[2] = lambda x: x+2
a[9] = lambda x: x+9


>>> a = []
>>> for i in range(0,10): a.append(lambda x: x+i)

...but wrong:

>>> a[0](1)

What happened here? In Python, that lambda x: x+i uses the value that i will have when the function is invoked.

This is the trick to get it right:

>>> a = []
>>> for i in range(0,10): a.append(lambda x, i=i: x + i)
>>> a[0](1)

What happens here is explained in the section "A Jedi Mind Trick" of the Instant Python article: i=i assigns as the default value of the parameter i the current value of i.

Strangely enough the same article has "A Note About Python 2.1 and Nested Scopes" which seems to imply that from Python 2.2 the scoping has changed to "work as it should". I don't understand: the examples above are run on Python 2.4.4.

Googling for keywords related to python closure scoping only yields various sorts of complicated PEPs and an even uglier list trick:

a lot of people might not know about the trick of using a list to box variables within a closure.

Now I know about the trick, but I wish I didn't need to know :-(

Collecting strings from .kid files

tg-admin i18n collect won't collect strings from your .kid files: you need the toolbox web interface for that.

Indentation problems in .kid files

The toolbox web interface chokes on intentation errors on your .kid files.

To see the name of the .kid file that causes the error, look at the tg-admin toolbox output in the terminal for lines like Working on app/Foo/templates/bar.kid.

What happens is that the .kid files are converted to python files, and if there are indentation glitches they end up in the python files, and python will complain.

Once you see from the tg-admin toolbox standard error what is the .kid file with the problem, edit it and try to make sure that all closing tags are at the exact indentation level as their coresponding opening tags. Even a single space would matter.

Bad i18n bug in TurboKid versions earlier than 1.0.1

faide on #turbogears also says:

It is of outmost importance that you use TurboKid 1.0.1 because it is the first version that corrects a BIG bug regarding i18n filters ...

The version below had a bug where the filters kept being added at each page load in such a way that after a few hundreds of pages you could have page loading times as long as 5 minutes!

If one has a previous version of TurboKid, one (and only one) of these is needed:

So, in short, all i18n users should upgrade to TurboGears or patch TurboKid using