Creating pipelines with subprocess

It is possible to create process pipelines using subprocess.Popen, by just using stdout=subprocess.PIPE and stdin=otherproc.stdout.


In a pipeline created in this way, the stdout of all processes except the last is opened twice: once in the script that has run the subprocess and another time in the standard input of the next process in the pipeline.

This is a problem because if a process closes its stdin, the previous process in the pipeline does not get SIGPIPE when trying to write to its stdout, because that pipe is still open on the caller process. If this happens, a wait on that process will hang forever: the child process waits for the parent to read its stdout, the parent process waits for the child process to exit.

The trick is to close the stdout of each process in the pipeline except the last just after creating them:

# coding=utf-8

import subprocess

def pipe(*args):
    Takes as parameters several dicts, each with the same
    parameters passed to popen.

    Runs the various processes in a pipeline, connecting
    the stdout of every process except the last with the
    stdin of the next process.
    if len(args) < 2:
        raise ValueError, "pipe needs at least 2 processes"
    # Set stdout=PIPE in every subprocess except the last
    for i in args[:-1]:
        i["stdout"] = subprocess.PIPE

    # Runs all subprocesses connecting stdins and stdouts to create the
    # pipeline. Closes stdouts to avoid deadlocks.
    popens = [subprocess.Popen(**args[0])]
    for i in range(1,len(args)):
        args[i]["stdin"] = popens[i-1].stdout

    # Returns the array of subprocesses just created
    return popens

At this point, it's nice to write a function that waits for the whole pipeline to terminate and returns an array of result codes:

def pipe_wait(popens):
    Given an array of Popen objects returned by the
    pipe method, wait for all processes to terminate
    and return the array with their return values.
    results = [0] * len(popens)
    while popens:
        last = popens.pop(-1)
        results[len(popens)] = last.wait()
    return results

And, look and behold, we can now easily run a pipeline and get the return codes of every single process in it:

process1 = dict(args='sleep 1; grep line2 testfile', shell=True)
process2 = dict(args='awk \'{print $3}\'', shell=True)
process3 = dict(args='true', shell=True)
popens = pipe(process1, process2, process3)
result = pipe_wait(popens)
print result

Update: Colin Watson suggests an improvement to compensate for Python's nonstandard SIGPIPE handling.

Colin Watson has a similar library for C.