Playing with python, terminfo and command output

I am experimenting with showing progress on the terminal for a subcommand that is being run, showing what is happening without scrolling away the output of the main program, and I came out with this little toy. It shows the last X lines of a subcommand output, then gets rid of everything after the command has ended.

Usability-wise, it feels like a tease to me: it looks like I'm being shown all sorts of information then they are taken away from me before I managed to make sense of them. However, I find it cute enough to share:

#!/usr/bin/env python3
#coding: utf-8
# Copyright 2015 Enrico Zini <enrico@enricozini.org>.  Licensed under the terms
# of the GNU General Public License, version 2 or any later version.

import argparse
import fcntl
import select
import curses
import contextlib
import subprocess
import os
import sys
import collections
import shlex
import shutil
import logging

def stream_output(proc):
    """
    Take a subprocess.Popen object and generate its output, line by line,
    annotated with "stdout" or "stderr". At process termination it generates
    one last element: ("result", return_code) with the return code of the
    process.
    """
    fds = [proc.stdout, proc.stderr]
    bufs = [b"", b""]
    types = ["stdout", "stderr"]
    # Set both pipes as non-blocking
    for fd in fds:
        fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
    # Multiplex stdout and stderr with different prefixes
    while len(fds) > 0:
        s = select.select(fds, (), ())
        for fd in s[0]:
            idx = fds.index(fd)
            buf = fd.read()
            if len(buf) == 0:
                fds.pop(idx)
                if len(bufs[idx]) != 0:
                    yield types[idx], bufs.pop(idx)
                types.pop(idx)
            else:
                bufs[idx] += buf
                lines = bufs[idx].split(b"\n")
                bufs[idx] = lines.pop()
                for l in lines:
                    yield types[idx], l
    res = proc.wait()
    yield "result", res

@contextlib.contextmanager
def miniscreen(has_fancyterm, name, maxlines=3, silent=False):
    """
    Show the output of a process scrolling in a portion of the screen.

    has_fancyterm: true if the terminal supports fancy features; if false, just
    write lines to standard output

    name: name of the process being run, to use as a header

    maxlines: maximum height of the miniscreen

    silent: do nothing whatsoever, used to disable this without needing to
            change the code structure

    Usage:
        with miniscreen(True, "my process", 5) as print_line:
            for i in range(10):
                print_line(("stdout", "stderr")[i % 2], "Line #{}".format(i))
    """
    if not silent and has_fancyterm:
        # Discover all the terminal control sequences that we need
        output_normal = str(curses.tigetstr("sgr0"), "ascii")
        output_up = str(curses.tigetstr("cuu1"), "ascii")
        output_clreol = str(curses.tigetstr("el"), "ascii")
        cols, lines = shutil.get_terminal_size()
        output_width = cols

        fg_color = (curses.tigetstr("setaf") or
                    curses.tigetstr("setf") or "")
        sys.stdout.write(str(curses.tparm(fg_color, 6), "ascii"))

        output_lines = collections.deque(maxlen=maxlines)

        def print_lines():
            """
            Print the lines in our buffer, then move back to the beginning
            """
            sys.stdout.write("{} progress:".format(name))
            sys.stdout.write(output_clreol)
            for msg in output_lines:
                sys.stdout.write("\n")
                sys.stdout.write(msg)
                sys.stdout.write(output_clreol)
            sys.stdout.write(output_up * len(output_lines))
            sys.stdout.write("\r")

        try:
            print_lines()

            def _progress_line(type, line):
                """
                Print a new line to the miniscreen
                """
                # Add the new line to our output buffer
                msg = "{} {}".format("." if type == "stdout" else "!", line)
                if len(msg) > output_width - 4:
                    msg = msg[:output_width - 4] + "..."
                output_lines.append(msg)
                # Update the miniscreen
                print_lines()

            yield _progress_line

            # Clear the miniscreen by filling our ring buffer with empty lines
            # then printing them out
            for i in range(maxlines):
                output_lines.append("")
            print_lines()
        finally:
            sys.stdout.write(output_normal)
    elif not silent:
        def _progress_line(type, line):
            print("{}: {}".format(type, line))
        yield _progress_line
    else:
        def _progress_line(type, line):
            pass
        yield _progress_line

def run_command_fancy(name, cmd, env=None, logfd=None, fancy=True, debug=False):
    quoted_cmd = " ".join(shlex.quote(x) for x in cmd)
    log.info("%s running command %s", name, quoted_cmd)
    if logfd: print("runcmd:", quoted_cmd, file=logfd)

    # Run the script itself on an empty environment, so that what was
    # documented is exactly what was run
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)

    with miniscreen(fancy, name, silent=debug) as progress:
        stderr = []
        for type, val in stream_output(proc):
            if type == "stdout":
                val = val.decode("utf-8")
                if logfd: print("stdout:", val, file=logfd)
                log.debug("%s stdout: %s", name, val)
                progress(type, val)
            elif type == "stderr":
                val = val.decode("utf-8")
                if logfd: print("stderr:", val, file=logfd)
                stderr.append(val)
                log.debug("%s stderr: %s", name, val)
                progress(type, val)
            elif type == "result":
                if logfd: print("retval:", val, file=logfd)
                log.debug("%s retval: %d", name, val)
                retval = val

    if retval != 0:
        lastlines = min(len(stderr), 5)
        log.error("%s exited with code %s", name, retval)
        log.error("Last %d lines of standard error:", lastlines)
        for line in stderr[-lastlines:]:
            log.error("%s: %s", name, line)

    return retval


parser = argparse.ArgumentParser(description="run a command showing only a portion of its output")
parser.add_argument("--logfile", action="store", help="specify a file where the full execution log will be written")
parser.add_argument("--debug", action="store_true", help="debugging output on the terminal")
parser.add_argument("--verbose", action="store_true", help="verbose output on the terminal")
parser.add_argument("command", nargs="*", help="command to run")
args = parser.parse_args()

if args.debug:
    loglevel = logging.DEBUG
elif args.verbose:
    loglevel = logging.INFO
else:
    loglevel = logging.WARN
logging.basicConfig(level=loglevel, stream=sys.stderr)
log = logging.getLogger()

fancy = False
if not args.debug and sys.stdout.isatty():
    curses.setupterm()
    if curses.tigetnum("colors") > 0:
        fancy = True

if args.logfile:
    logfd = open("output.log", "wt")
else:
    logfd = None

retval = run_command_fancy("miniscreen example", args.command, logfd=logfd)

sys.exit(retval)