Latest posts for tag python

This is a common logging pattern in Python, to have loggers related to module names:

import logging

log = logging.getLogger(__name__)


class Bill:
    def load_bill(self, filename: str):
        log.info("%s: loading file", filename)

I often however find myself wanting to have loggers related to something context-dependent, like the kind of file that is being processed. For example, I'd like to log loading of bill loading when done by the expenses module, and not when done by the printing module.

I came up with a little hack that keeps the same API as before, and allows to propagate a context dependent logger to the code called:

# Call this file log.py
from __future__ import annotations
import contextlib
import contextvars
import logging

_log: contextvars.ContextVar[logging.Logger] = contextvars.ContextVar('log', default=logging.getLogger())


@contextlib.contextmanager
def logger(name: str):
    """
    Set a default logger for the duration of this context manager
    """
    old = _log.set(logging.getLogger(name))
    try:
        yield
    finally:
        _log.reset(old)


def debug(*args, **kw):
    _log.get().debug(*args, **kw)


def info(*args, **kw):
    _log.get().info(*args, **kw)


def warning(*args, **kw):
    _log.get().warning(*args, **kw)


def error(*args, **kw):
    _log.get().error(*args, **kw)

And now I can do this:

from . import log

# …
    with log.logger("expenses"):
        bill = load_bill(filename)


# This code did not change!
class Bill:
    def load_bill(self, filename: str):
        log.info("%s: loading file", filename)

Suppose you have a tool that archives images, or scientific data, and it has a test suite. It would be good to collect sample files for the test suite, but they are often so big one can't really bloat the repository with them.

But does the test suite need everything that is in those files? Not necesarily. For example, if one's testing code that reads EXIF metadata, one doesn't care about what is in the image.

That technique works extemely well. I can take GRIB files that are several megabytes in size, zero out their data payload, and get nice 1Kb samples for the test suite.

I've started to collect and organise the little hacks I use for this into a tool I called mktestsample:

$ mktestsample -v samples1/*
2021-11-23 20:16:32 INFO common samples1/cosmo_2d+0.grib: size went from 335168b to 120b
2021-11-23 20:16:32 INFO common samples1/grib2_ifs.arkimet: size went from 4993448b to 39393b
2021-11-23 20:16:32 INFO common samples1/polenta.jpg: size went from 3191475b to 94517b
2021-11-23 20:16:32 INFO common samples1/test-ifs.grib: size went from 1986469b to 4860b

Those are massive savings, but I'm not satisfied about those almost 94Kb of JPEG:

$ ls -la samples1/polenta.jpg
-rw-r--r-- 1 enrico enrico 94517 Nov 23 20:16 samples1/polenta.jpg
$ gzip samples1/polenta.jpg
$ ls -la samples1/polenta.jpg.gz
-rw-r--r-- 1 enrico enrico 745 Nov 23 20:16 samples1/polenta.jpg.gz

I believe I did all I could: completely blank out image data, set quality to zero, maximize subsampling, and tweak quantization to throw everything away.

Still, the result is a 94Kb file that can be gzipped down to 745 bytes. Is there something I'm missing?

I suppose JPEG is better at storing an image than at storing the lack of an image. I cannot really complain :)

I can still commit compressed samples of large images to a git repository, taking very little data indeed. That's really nice!

help2man is quite nice for autogenerating manpages from command line help, making sure that they stay up to date as command line options evolve.

It works quite well, except for commands with subcommands, like Python programs that use argparse's add_subparser.

So, here's a quick hack that calls help2man for each subcommand, and stitches everything together in a simple manpage.

#!/usr/bin/python3

import re
import shutil
import sys
import subprocess
import tempfile

# TODO: move to argparse
command = sys.argv[1]

# Use setup.py to get the program version
res = subprocess.run([sys.executable, "setup.py", "--version"], stdout=subprocess.PIPE, text=True, check=True)
version = res.stdout.strip()

# Call the main commandline help to get a list of subcommands
res = subprocess.run([sys.executable, command, "--help"], stdout=subprocess.PIPE, text=True, check=True)
subcommands = re.sub(r'^.+\{(.+)\}.+$', r'\1', res.stdout, flags=re.DOTALL).split(',')

# Generate a help2man --include file with an extra section for each subcommand
with tempfile.NamedTemporaryFile("wt") as tf:
    print("[>DESCRIPTION]", file=tf)

    for subcommand in subcommands:
        res = subprocess.run(
                ["help2man", f"--name={command}", "--section=1",
                 "--no-info", "--version-string=dummy", f"./{command} {subcommand}"],
                stdout=subprocess.PIPE, text=True, check=True)
        subcommand_doc = re.sub(r'^.+.SH DESCRIPTION', '', res.stdout, flags=re.DOTALL)
        print(".SH ", subcommand.upper(), " SUBCOMMAND", file=tf)
        tf.write(subcommand_doc)

    with open(f"{command}.1.in", "rt") as fd:
        shutil.copyfileobj(fd, tf)

    tf.flush()

    # Call help2man on the main command line help, with the extra include file
    # we just generated
    subprocess.run(
            ["help2man", f"--include={tf.name}", f"--name={command}",
             "--section=1", "--no-info", f"--version-string={version}",
             "--output=arkimaps.1", "./arkimaps"],
            check=True)

I had to package a nontrivial Python codebase, and I needed to put dependencies in setup.py.

I could do git grep -h import | sort -u, then review the output by hand, but I lacked the motivation for it. Much better to take a stab at solving the general problem

The result is at https://github.com/spanezz/python-devel-tools.

One fun part is scanning a directory tree, using ast to find import statements scattered around the code:

class Scanner:
    def __init__(self):
        self.names: Set[str] = set()

    def scan_dir(self, root: str):
        for dirpath, dirnames, filenames, dir_fd in os.fwalk(root):
            for fn in filenames:
                if fn.endswith(".py"):
                    with dirfd_open(fn, dir_fd=dir_fd) as fd:
                        self.scan_file(fd, os.path.join(dirpath, fn))
                st = os.stat(fn, dir_fd=dir_fd)
                if st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH):
                    with dirfd_open(fn, dir_fd=dir_fd) as fd:
                        try:
                            lead = fd.readline()
                        except UnicodeDecodeError:
                            continue
                        if re_python_shebang.match(lead):
                            fd.seek(0)
                            self.scan_file(fd, os.path.join(dirpath, fn))

    def scan_file(self, fd: TextIO, pathname: str):
        log.info("Reading file %s", pathname)
        try:
            tree = ast.parse(fd.read(), pathname)
        except SyntaxError as e:
            log.warning("%s: file cannot be parsed", pathname, exc_info=e)
            return

        self.scan_tree(tree)

    def scan_tree(self, tree: ast.AST):
        for stm in tree.body:
            if isinstance(stm, ast.Import):
                for alias in stm.names:
                    if not isinstance(alias.name, str):
                        print("NAME", repr(alias.name), stm)
                    self.names.add(alias.name)
            elif isinstance(stm, ast.ImportFrom):
                if stm.module is not None:
                    self.names.add(stm.module)
            elif hasattr(stm, "body"):
                self.scan_tree(stm)

Another fun part is grouping the imported module names by where in sys.path they have been found:

    scanner = Scanner()
    scanner.scan_dir(args.dir)

    sys.path.append(args.dir)
    by_sys_path: Dict[str, List[str]] = collections.defaultdict(list)
    for name in sorted(scanner.names):
        spec = importlib.util.find_spec(name)
        if spec is None or spec.origin is None:
            by_sys_path[""].append(name)
        else:
            for sp in sys.path:
                if spec.origin.startswith(sp):
                    by_sys_path[sp].append(name)
                    break
            else:
                by_sys_path[spec.origin].append(name)

    for sys_path, names in sorted(by_sys_path.items()):
        print(f"{sys_path or 'unidentified'}:")
        for name in names:
            print(f"  {name}")

An example. It's kind of nice how it can at least tell apart stdlib modules so one doesn't need to read through those:

$ ./scan-imports …/himblick
unidentified:
  changemonitor
  chroot
  cmdline
  mediadir
  player
  server
  settings
  static
  syncer
  utils
…/himblick:
  himblib.cmdline
  himblib.host_setup
  himblib.player
  himblib.sd
/usr/lib/python3.9:
  __future__
  argparse
  asyncio
  collections
  configparser
  contextlib
  datetime
  io
  json
  logging
  mimetypes
  os
  pathlib
  re
  secrets
  shlex
  shutil
  signal
  subprocess
  tempfile
  textwrap
  typing
/usr/lib/python3/dist-packages:
  asyncssh
  parted
  progressbar
  pyinotify
  setuptools
  tornado
  tornado.escape
  tornado.httpserver
  tornado.ioloop
  tornado.netutil
  tornado.web
  tornado.websocket
  yaml
built-in:
  sys
  time

Maybe such a tool already exists and works much better than this? From a quick search I didn't find it, and it was fun to (re)invent it.

Updates:

Jakub Wilk pointed out to an old python-modules script that finds Debian dependencies.

The AST scanning code should be refactored to use ast.NodeVisitor.

Here's a little toy program that displays a message like a split-flap display:

#!/usr/bin/python3

import sys
import time

def display(line: str):
    cur = '0' * len(line)
    while True:
        print(cur, end="\r")
        if cur == line:
            break
        time.sleep(0.09)
        cur = "".join(chr(min(ord(c) + 1, ord(oc))) for c, oc in zip(cur, line))
    print()

message = " ".join(sys.argv[1:])
display(message.upper())

This only works if the script's stdout is unbuffered. Pipe the output through cat, and you get a long wait, and then the final string, without the animation.

What is happening is that since the output is not going to a terminal, optimizations kick in that buffer the output and send it in bigger chunks, to make processing bulk I/O more efficient.

I haven't found a good introductory explanation of buffering in Python's documentation. The details seem to be scattered in the io module documentation and they mostly assume that one is already familiar with concepts like unbuffered, line-buffered or block-buffered. The libc documentation has a good quick introduction that one can read to get up to speed.

Controlling buffering in Python

In Python, one can force a buffer flush with the flush() method of the output file descriptor, like sys.stdout.flush(), to make sure pending buffered output gets sent.

Python's print() function also supports flush=True as an optional argument:

    print(cur, end="\r", flush=True)

If one wants to change the default buffering for a file descriptor, since Python 3.7 there's a convenient reconfigure() method, which can reconfigure line buffering only:

sys.stdout.reconfigure(line_buffering=True)

Otherwise, the technique is to reassign sys.stdout to something that has the behaviour one wants (code from this StackOverflow thread):

import io
# Python 3, open as binary, then wrap in a TextIOWrapper with write-through.
sys.stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)

If one needs all this to implement a progressbar, one should make sure to have a look at the progressbar module first.

I'm too lazy to manually check code blocks in autogenerated sphinx documentation to see if they are valid and reasonably up to date. Doing it automatically feels much more interesting to me: here's how I did it.

(continue reading)

Python comes with a built-in way of collecting profile information, documented at https://docs.python.org/3/library/profile.html.

In a nutshell, it boils down to:

python3 -m cProfile -o profile.out command.py arguments…

This post is an attempt to document what's in profile.out, since the python documentation does not cover it. I have tried to figure this out by looking at the sources of the pstats module, the sources of gprof2dot, and trying to make sense of the decoded structure, and navigate it.

(continue reading)

A little gitpython recipe to list the paths of all files in a commit:

#!/usr/bin/python3

import git
from pathlib import Path
import sys


def list_paths(root_tree, path=Path(".")):
    for blob in root_tree.blobs:
        yield path / blob.name
    for tree in root_tree.trees:
        yield from list_paths(tree, path / tree.name)


repo = git.Repo(".", search_parent_directories=True)
commit = repo.commit(sys.argv[1])
for path in list_paths(commit.tree):
    print(path)

It can be a good base, for example, for writing a script that, given two git branches, shows which django migrations are in one and not in the other, without doing any git checkout of the code.

One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.

It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.

I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.

This is how to start Tornado on a free random port:

from tornado.options import define, options
import tornado.netutil
import tornado.httpserver

define("web_port", type=int, default=None, help="listening port for web interface")

application = Application(self.db_url)

if options.web_port is None:
    sockets = tornado.netutil.bind_sockets(0, '127.0.0.1')
    self.web_port = sockets[0].getsockname()[:2][1]
    server = tornado.httpserver.HTTPServer(application)
    server.add_sockets(sockets)
else:
    server = tornado.httpserver.HTTPServer(application)
    server.listen(options.web_port)