Latest posts for tag python

A little gitpython recipe to list the paths of all files in a commit:

#!/usr/bin/python3

import git
from pathlib import Path
import sys


def list_paths(root_tree, path=Path(".")):
    for blob in root_tree.blobs:
        yield path / blob.name
    for tree in root_tree.trees:
        yield from list_paths(tree, path / tree.name)


repo = git.Repo(".", search_parent_directories=True)
commit = repo.commit(sys.argv[1])
for path in list_paths(commit.tree):
    print(path)

It can be a good base, for example, for writing a script that, given two git branches, shows which django migrations are in one and not in the other, without doing any git checkout of the code.

One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.

It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.

I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.

This is how to start Tornado on a free random port:

from tornado.options import define, options
import tornado.netutil
import tornado.httpserver

define("web_port", type=int, default=None, help="listening port for web interface")

application = Application(self.db_url)

if options.web_port is None:
    sockets = tornado.netutil.bind_sockets(0, '127.0.0.1')
    self.web_port = sockets[0].getsockname()[:2][1]
    server = tornado.httpserver.HTTPServer(application)
    server.add_sockets(sockets)
else:
    server = tornado.httpserver.HTTPServer(application)
    server.listen(options.web_port)

I am writing a little application server for microservices written as compiled binaries, and I would like to log execution statistics from getrusage(2).

The application server is written using asyncio, and processes are managed using asyncio subprocesses.

Unfortunately, asyncio uses os.waitpid instead of os.wait4 to reap child processes, and to get rusage information one has to delve into the asyncio innards, and provide a custom ChildWatcher implementation. Here's how I did it:

import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os


class ExtendedResults:
    def __init__(self):
        self.rusage = None
        self.returncode = None


class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
    """
    SafeChildWatcher that uses os.wait4 to also get rusage information.
    """
    rusage_results = {}

    @classmethod
    @contextmanager
    def monitor(cls, proc):
        """
        Return an ExtendedResults that gets filled when the process exits
        """
        assert proc.pid > 0
        pid = proc.pid
        extended_results = ExtendedResults()
        cls.rusage_results[pid] = extended_results
        try:
            yield extended_results
        finally:
            cls.rusage_results.pop(pid, None)

    def _do_waitpid(self, expected_pid):
        # The original is in asyncio/unix_events.py; on new python versions, it
        # makes sense to check changes to it and port them here
        assert expected_pid > 0

        try:
            pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255",
                pid)
        else:
            if pid == 0:
                # The child process is still alive.
                return

            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug('process %s exited with returncode %s',
                             expected_pid, returncode)

        extended_results = self.rusage_results.get(pid)
        if extended_results is not None:
            extended_results.rusage = rusage
            extended_results.returncode = returncode

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:  # pragma: no cover
            # May happen if .remove_child_handler() is called
            # after os.waitpid() returns.
            if self._loop.get_debug():
                logger.warning("Child watcher got an unexpected pid: %r",
                               pid, exc_info=True)
        else:
            callback(pid, returncode, *args)

    @classmethod
    def install(cls):
        loop = asyncio.get_event_loop()
        child_watcher = cls()
        child_watcher.attach_loop(loop)
        asyncio.set_child_watcher(child_watcher)

To use it:

from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()

...

    @coroutine
    def run(self, *args, **kw):
        kw["stdin"] = asyncio.subprocess.PIPE
        kw["stdout"] = asyncio.subprocess.PIPE
        kw["stderr"] = asyncio.subprocess.PIPE
        self.started = time.time()

        self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)

        from .hacks import SafeChildWatcherWithRusage
        with SafeChildWatcherWithRusage.monitor(self.proc) as results:
            yield from asyncio.tasks.gather(
                self.write_stdin(self.proc.stdin),
                self.read_stdout(self.proc.stdout),
                self.read_stderr(self.proc.stderr)
            )
        self.returncode = yield from self.proc.wait()
        self.rusage = results.rusage
        self.ended = time.time()

Python mailbox.mbox is not good at opening compressed mailboxes:

>>> import mailbox
>>> print(len(mailbox.mbox("/tmp/test.mbox")))
9
>>> print(len(mailbox.mbox("/tmp/test.mbox.gz")))
0
>>> print(len(mailbox.mbox("/tmp/test1.mbox.xz")))
0

For a prototype rewrite of the MIA team's Echelon (the engine behind mia-query), I needed to scan compressed mailboxes, and I had to work around this limitation.

Here is the alternative mailbox.mbox implementation:

import lzma
import gzip
import bz2
import mailbox


class StreamMbox(mailbox.mbox):
    """
    mailbox.mbox does not support opening a stream, which is sad.

    This is a subclass that works around it
    """
    def __init__(self, fd: BinaryIO, factory=None, create: bool = True):
        # Do not call parent __init__, just redo everything here to be able to
        # open a stream. This will need to be re-reviewed for every new version
        # of python's stdlib.

        # Mailbox constructor
        self._path = None
        self._factory = factory

        # _singlefileMailbox constructor
        self._file = fd
        self._toc = None
        self._next_key = 0
        self._pending = False       # No changes require rewriting the file.
        self._pending_sync = False  # No need to sync the file
        self._locked = False
        self._file_length = None    # Used to record mailbox size

        # mbox constructor
        self._message_factory = mailbox.mboxMessage

    def flush(self):
        raise NotImplementedError("StreamMbox is a readonly class")


class UsageExample:
    DECOMPRESS = {
        ".xz": lzma.open,
        ".gz": gzip.open,
        ".bz2": bz2.open,
    }

    @classmethod
    def scan(cls, path: Path) -> Generator[ScannedEmail, None, None]:
        decompress = cls.DECOMPRESS.get(path.suffix)
        if decompress is None:
            with open(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)
        else:
            with decompress(path.as_posix(), "rb") as fd:
                yield from cls.scan_fd(path, fd)

    @classmethod
    def scan_fd(cls, path: Path, fd: BinaryIO) -> Generator[ScannedEmail, None, None]:
        mbox = StreamMbox(fd)
        for msg in mbox:
            ...

This code:

#!/usr/bin/python3

class Test:
    def __init__(self, items=[]):
        self.items = items

    def add(self, item):
        self.items.append(item)


a = Test()
a.add("foo")
b = Test()
b.add("bar")
print(repr(a.items))
print(repr(b.items))

"obviously" prints:

['foo', 'bar']
['foo', 'bar']

Because the default value of the items argument is a mutable list constructed just once when the code is compiled when the function definition is evaluated, and then reused.

So, in Python, mutable items in default arguments are a good way to get more fun time with debugging.

I am exposing some library functions using a TurboGears2 controller (see Building a web-based API with Turbogears2). It turns out that some functions return a dict, some a list, some a string, and TurboGears 2 only allows JSON serialisation for dicts.

A simple work-around for this is to wrap the function result into a dict, something like this:

@expose("json")
@validate(validator_dispatcher, error_handler=api_validation_error)
def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
    # Call API
    res = self.engine.list_colours(filter, productID, maxResults)

    # Return result
    return dict(r=res)

It would be nice, however, to have an @webapi() decorator that automatically wraps the function result with the dict:

def webapi(func):
    def dict_wrap(*args, **kw):
        return dict(r=func(*args, **kw))
    return dict_wrap

# ...in the controller...

    @expose("json")
    @validate(validator_dispatcher, error_handler=api_validation_error)
    @webapi
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

This works, as long as @webapi appears last in the list of decorators. This is because if it appears last it will be the first to wrap the function, and so it will not interfere with the tg.decorators machinery.

Would it be possible to create a decorator that can be put anywhere among the decorator list? Yes, it is possible but tricky, and it gives me the feeling that it may break in any future version of TurboGears:

class webapi(object):
    def __call__(self, func):
        def dict_wrap(*args, **kw):
            return dict(r=func(*args, **kw))
        # Migrate the decoration attribute to our new function
        if hasattr(func, 'decoration'):
            dict_wrap.decoration = func.decoration
            dict_wrap.decoration.controller = dict_wrap
            delattr(func, 'decoration')
        return dict_wrap

# ...in the controller...

    @expose("json")
    @validate(validator_dispatcher, error_handler=api_validation_error)
    @webapi
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

As a convenience, TurboGears 2 offers, in the decorators module, a way to build decorator "hooks":

class before_validate(_hook_decorator):
    '''A list of callables to be run before validation is performed'''
    hook_name = 'before_validate'

class before_call(_hook_decorator):
    '''A list of callables to be run before the controller method is called'''
    hook_name = 'before_call'

class before_render(_hook_decorator):
    '''A list of callables to be run before the template is rendered'''
    hook_name = 'before_render'

class after_render(_hook_decorator):
    '''A list of callables to be run after the template is rendered.

    Will be run before it is returned returned up the WSGI stack'''

    hook_name = 'after_render'

The way these are invoked can be found in the _perform_call function in tg/controllers.py.

To show an example use of those hooks, let's add a some polygen wisdom to every data structure we return:

class wisdom(decorators.before_render):
    def __init__(self, grammar):
        super(wisdom, self).__init__(self.add_wisdom)
        self.grammar = grammar
    def add_wisdom(self, remainder, params, output):
        from subprocess import Popen, PIPE
        output["wisdom"] = Popen(["polyrun", self.grammar], stdout=PIPE).communicate()[0]

# ...in the controller...

    @wisdom("genius")
    @expose("json")
    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

These hooks cannot however be used for what I need, that is, to wrap the result inside a dict. The reason is because they are called in this way:

        controller.decoration.run_hooks(
                'before_render', remainder, params, output)

and not in this way:

        output = controller.decoration.run_hooks(
                'before_render', remainder, params, output)

So it is possible to modify the output (if it is a mutable structure) but not to exchange it with something else.

Can we do even better? Sure we can. We can assimilate @expose and @validate inside @webapi to avoid repeating those same many decorator lines over and over again:

class webapi(object):
    def __init__(self, error_handler = None):
        self.error_handler = error_handler

    def __call__(self, func):
        def dict_wrap(*args, **kw):
            return dict(r=func(*args, **kw))
        res = expose("json")(dict_wrap)
        res = validate(validator_dispatcher, error_handler=self.error_handler)(res)
        return res

# ...in the controller...

    @expose("json")
    def api_validation_error(self, **kw):
        pylons.response.status = "400 Error"
        return dict(e="validation error on input fields", form_errors=pylons.c.form_errors)

    @webapi(error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

This got rid of @expose and @validate, and provides almost all the default values that I need. Unfortunately I could not find out how to access api_validation_error from the decorator so that I can pass it to the validator, therefore I remain with the inconvenience of having to explicitly pass it every time.

I am using TurboGears2 to export a python API over the web. Every API method is wrapper by a controller method that validates the parameters and returns the results encoded in JSON.

The basic idea is this:

@expose("json")
def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
    # Call API
    res = self.engine.list_colours(filter, productID, maxResults)

    # Return result
    return res

To validate the parameters we can use forms, it's their job after all:

class ListColoursForm(TableForm):
    fields = [
            # One field per parameter
            twf.TextField("filter", help_text="Please enter the string to use as a filter"),
            twf.TextField("productID", help_text="Please enter the product ID"),
            twf.TextField("maxResults", validator=twfv.Int(min=0), default=200, size=5, help_text="Please enter the maximum number of results"),
    ]
list_colours_form=ListColoursForm()

#...

    @expose("json")
    @validate(list_colours_form, error_handler=list_colours_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Parameter validation is done by the form

        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

All straightforward so far. However, this means that we need two exposed methods for every API call: one for the API call and one error handler. For every API call, we have to type the name several times, which is error prone and risks to get things mixed up.

We can however have a single error handler for all methonds:

def get_method():
    '''
    The method name is the first url component after the controller name that
    does not start with 'test'
    '''
    found_controller = False
    for name in pylons.c.url.split("/"):
        if not found_controller and name == "controllername":
            found_controller = True
            continue
        if name.startswith("test"):
            continue
        if found_controller:
            return name
    return None

class ValidatorDispatcher:
    '''
    Validate using the right form according to the value of the "method" field
    '''
    def validate(self, args, state):
        method = args.get("method", None)
    # Extract the method from the URL if it is missing
        if method is None:
            method = get_method()
            args["method"] = method
        return forms[method].validate(args, state)

validator_dispatcher = ValidatorDispatcher()

This validator will try to find the method name, either as a form field or by parsing the URL. It will then use the method name to find the form to use for validation, and pass control to the validate method of that form.

We then need to add an extra "method" field to our forms, and arrange the forms inside a dictionary:

class ListColoursForm(TableForm):
    fields = [
            # One hidden field to have a place for the method name
            twf.HiddenField("method")
            # One field per parameter
            twf.TextField("filter", help_text="Please enter the string to use as a filter"),
    #...

forms["list_colours"] = ListColoursForm()

And now our methods become much nicer to write:

    @expose("json")
    def api_validation_error(self, **kw):
        pylons.response.status = "400 Error"
        return dict(form_errors=pylons.c.form_errors)

    @expose("json")
    @validate(validator_dispatcher, error_handler=api_validation_error)
    def list_colours(self, filter=None, productID=None, maxResults=100, **kw):
        # Parameter validation is done by the form

        # Call API
        res = self.engine.list_colours(filter, productID, maxResults)

        # Return result
        return res

api_validation_error is interesting: it returns a proper HTTP error status, and a JSON body with the details of the error, taken straight from the form validators. It took me a while to find out that the form errors are in pylons.c.form_errors (and for reference, the form values are in pylons.c.form_values). pylons.response is a WebOb Response that we can play with.

So now our client side is able to call the API methods, and get a proper error if it calls them wrong.

But now that we have the forms ready, it doesn't take much to display them in web pages as well:

def _describe(self, method):
    "Return a dict describing an API method"
    ldesc = getattr(self.engine, method).__doc__.strip()
    sdesc = ldesc.split("\n")[0]
    return dict(name=method, sdesc = sdesc, ldesc = ldesc)

@expose("myappserver.templates.myappapi")
def index(self):
    '''
    Show an index of exported API methods
    '''
    methods = dict()
    for m in forms.keys():
        methods[m] = self._describe(m)
    return dict(methods=methods)

@expose('myappserver.templates.testform')
def testform(self, method, **kw):
    '''
    Show a form with the parameters of an API method
    '''
    kw["method"] = method
    return dict(method=method, action="/myapp/test/"+method, value=kw, info=self._describe(method), form=forms[method])

@expose(content_type="text/plain")
@validate(validator_dispatcher, error_handler=testform)
def test(self, method, **kw):
    '''
    Run an API method and show its prettyprinted result
    '''
    res = getattr(self, str(method))(**kw)
    return pprint.pformat(res)

In a few lines, we have all we need: an index of the API methods (including their documentation taken from the docstrings!), and for each method a form to invoke it and a page to see the results.

Make the forms children of AjaxForm, and you can even see the results together with the form.

It is possible to create process pipelines using subprocess.Popen, by just using stdout=subprocess.PIPE and stdin=otherproc.stdout.

Almost.

In a pipeline created in this way, the stdout of all processes except the last is opened twice: once in the script that has run the subprocess and another time in the standard input of the next process in the pipeline.

This is a problem because if a process closes its stdin, the previous process in the pipeline does not get SIGPIPE when trying to write to its stdout, because that pipe is still open on the caller process. If this happens, a wait on that process will hang forever: the child process waits for the parent to read its stdout, the parent process waits for the child process to exit.

The trick is to close the stdout of each process in the pipeline except the last just after creating them:

#!/usr/bin/python
# coding=utf-8

import subprocess

def pipe(*args):
    '''
    Takes as parameters several dicts, each with the same
    parameters passed to popen.

    Runs the various processes in a pipeline, connecting
    the stdout of every process except the last with the
    stdin of the next process.
    '''
    if len(args) < 2:
        raise ValueError, "pipe needs at least 2 processes"
    # Set stdout=PIPE in every subprocess except the last
    for i in args[:-1]:
        i["stdout"] = subprocess.PIPE

    # Runs all subprocesses connecting stdins and stdouts to create the
    # pipeline. Closes stdouts to avoid deadlocks.
    popens = [subprocess.Popen(**args[0])]
    for i in range(1,len(args)):
        args[i]["stdin"] = popens[i-1].stdout
        popens.append(subprocess.Popen(**args[i]))
        popens[i-1].stdout.close()

    # Returns the array of subprocesses just created
    return popens

At this point, it's nice to write a function that waits for the whole pipeline to terminate and returns an array of result codes:

def pipe_wait(popens):
    '''
    Given an array of Popen objects returned by the
    pipe method, wait for all processes to terminate
    and return the array with their return values.
    '''
    results = [0] * len(popens)
    while popens:
        last = popens.pop(-1)
        results[len(popens)] = last.wait()
    return results

And, look and behold, we can now easily run a pipeline and get the return codes of every single process in it:

process1 = dict(args='sleep 1; grep line2 testfile', shell=True)
process2 = dict(args='awk \'{print $3}\'', shell=True)
process3 = dict(args='true', shell=True)
popens = pipe(process1, process2, process3)
result = pipe_wait(popens)
print result

Update: Colin Watson suggests an improvement to compensate for Python's nonstandard SIGPIPE handling.

Colin Watson has a similar library for C.

Suppose you have a User that can be a member of a Company. In SQLObject you model it somehow like this:

    class Company(SQLObject):
        name = UnicodeCol(length=16, alternateID=True, alternateMethodName="by_name")
        display_name = UnicodeCol(length=255)

    class User(InheritableSQLObject):
        company = ForeignKey("Company", notNull=False, cascade='null')

Then you want to make a form that allows to choose what is the company of a user:

def companies():
    return [ [ -1, 'None' ] ] + [ [c.id, c.display_name] for c in Company.select() ]

class NewUserFields(WidgetsList):
    """Fields for editing general settings"""
    user_name = TextField(label="User name")
    companyID = SingleSelectField(label="Company", options=companies)

Ok. Now you want to run tests:

  1. nosetests imports the controller to see if there's any initialisation code.
  2. The NewUserFields class is created.
  3. The SingleSelectField is created.
  4. The SingleSelectField constructor tries to guess the validator and peeks at the first option.
  5. This calls companies.
  6. companies accesses the database.
  7. The testing database has not yet been created because nosetests imported the module before giving the test code a chance to setup the test database.
  8. Bang.

The solution is to add an explicit validator to disable this guessing code that is a source of so many troubles:

class NewUserFields(WidgetsList):
    """Fields for editing general settings"""
    user_name = TextField(label="User name")
    companyID = SingleSelectField(label="Company", options=companies, validator=v.Int(not_empty=True))

In case your RemoteForm misteriously behaves like a normal HTTP form, refreshing the page on submit, and the only hint that there's something wrong is this bit in the Iceweasel's error console:

Errore: uncaught exception: [Exception... "Component returned failure
code: 0x80070057 (NS_ERROR_ILLEGAL_VALUE) [nsIXMLHttpRequest.open]"
nsresult: "0x80070057 (NS_ERROR_ILLEGAL_VALUE)"  location: "JS frame ::
javascript: eval(__firebugTemp__); :: anonymous :: line 1"  data: no]

the problem can just be a missing action= attribute to the form.

I found out after:

  1. reading the TurboGears remoteform wiki: "For some reason, the RemoteForm is acting like a regular html form, serving up a new page instead of performing the replacements we're looking for. I'll update this page as soon as I figure out why this is happening."

  2. finding this page on Google and meditating for a while while staring at it. I don't speak German, but often enough I manage to solve problems after meditating over Google results in all sorts of languages unknown or unreadable to me. I will call this practice Webomancy.