Beyond simple command execution, subprocess enables Unix-style piping between processes.

Basic Pipe

import subprocess
 
# Capture output
result = subprocess.run(
    ['ls', '-la'],
    capture_output=True,
    text=True
)
print(result.stdout)

Piping Between Commands

import subprocess
 
# ls | grep ".py"
ls = subprocess.Popen(
    ['ls', '-la'],
    stdout=subprocess.PIPE
)
 
grep = subprocess.Popen(
    ['grep', '.py'],
    stdin=ls.stdout,
    stdout=subprocess.PIPE,
    text=True
)
 
ls.stdout.close()  # Allow ls to receive SIGPIPE
output, _ = grep.communicate()
print(output)

Chain Multiple Commands

import subprocess
 
def pipe(*commands):
    """Chain commands with pipes."""
    procs = []
    
    for i, cmd in enumerate(commands):
        stdin = procs[-1].stdout if procs else None
        proc = subprocess.Popen(
            cmd,
            stdin=stdin,
            stdout=subprocess.PIPE,
            text=True
        )
        if procs:
            procs[-1].stdout.close()
        procs.append(proc)
    
    output, _ = procs[-1].communicate()
    
    # Wait for all processes
    for proc in procs[:-1]:
        proc.wait()
    
    return output
 
# cat file | grep pattern | sort | uniq
result = pipe(
    ['cat', 'data.txt'],
    ['grep', 'error'],
    ['sort'],
    ['uniq', '-c']
)

Streaming Output

import subprocess
 
# Stream line by line
proc = subprocess.Popen(
    ['tail', '-f', '/var/log/syslog'],
    stdout=subprocess.PIPE,
    text=True
)
 
for line in proc.stdout:
    print(f"LOG: {line.strip()}")
    if 'error' in line.lower():
        break
 
proc.terminate()

Feed Input to Process

import subprocess
 
# Echo input through process
proc = subprocess.Popen(
    ['sort'],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    text=True
)
 
output, _ = proc.communicate(input='banana\napple\ncherry\n')
print(output)  # apple\nbanana\ncherry\n

Bidirectional Communication

import subprocess
 
proc = subprocess.Popen(
    ['python3', '-c', 'while True: print(input().upper())'],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    text=True,
    bufsize=1  # Line buffered
)
 
proc.stdin.write('hello\n')
proc.stdin.flush()
print(proc.stdout.readline())  # HELLO
 
proc.stdin.write('world\n')
proc.stdin.flush()
print(proc.stdout.readline())  # WORLD
 
proc.terminate()

Timeout with Pipes

import subprocess
 
proc = subprocess.Popen(
    ['long_running_command'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)
 
try:
    stdout, stderr = proc.communicate(timeout=30)
except subprocess.TimeoutExpired:
    proc.kill()
    stdout, stderr = proc.communicate()
    print("Process timed out")

stderr Handling

import subprocess
 
# Capture separately
result = subprocess.run(
    ['ls', '/nonexistent'],
    capture_output=True,
    text=True
)
print(f"stdout: {result.stdout}")
print(f"stderr: {result.stderr}")
 
# Merge stderr into stdout
proc = subprocess.Popen(
    ['ls', '/nonexistent'],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True
)
output, _ = proc.communicate()
 
# Discard stderr
proc = subprocess.Popen(
    ['command'],
    stdout=subprocess.PIPE,
    stderr=subprocess.DEVNULL
)

Shell Pipes (Use Carefully)

import subprocess
 
# Using shell=True for complex pipes
result = subprocess.run(
    'cat data.txt | grep error | wc -l',
    shell=True,
    capture_output=True,
    text=True
)
print(result.stdout)
 
# DANGER: Never use shell=True with user input
# user_input = "; rm -rf /"  # Malicious!
# subprocess.run(f'echo {user_input}', shell=True)  # BAD!

Process Substitution

import subprocess
import tempfile
 
def process_substitution(cmd):
    """Simulate bash's <(command)."""
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE
    )
    return proc.stdout
 
# diff <(cmd1) <(cmd2)
with process_substitution(['sort', 'file1.txt']) as f1, \
     process_substitution(['sort', 'file2.txt']) as f2:
    diff = subprocess.run(
        ['diff', '/dev/fd/3', '/dev/fd/4'],
        stdin=subprocess.DEVNULL,
        pass_fds=(f1.fileno(), f2.fileno()),
        capture_output=True
    )

Async Subprocess

import asyncio
 
async def run_command(cmd):
    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()
    return stdout.decode()
 
async def main():
    results = await asyncio.gather(
        run_command(['ls', '-la']),
        run_command(['df', '-h']),
        run_command(['uptime'])
    )
    for r in results:
        print(r[:100])
 
asyncio.run(main())

Context Manager Pattern

import subprocess
from contextlib import contextmanager
 
@contextmanager
def piped_process(cmd, **kwargs):
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        **kwargs
    )
    try:
        yield proc
    finally:
        proc.terminate()
        proc.wait()
 
with piped_process(['tail', '-f', 'log.txt'], text=True) as proc:
    for line in proc.stdout:
        if 'done' in line:
            break
        print(line.strip())

Real-Time Output

import subprocess
import sys
 
# Stream output as it happens
proc = subprocess.Popen(
    ['ping', '-c', '5', 'google.com'],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True,
    bufsize=1
)
 
for line in iter(proc.stdout.readline, ''):
    sys.stdout.write(line)
    sys.stdout.flush()
 
proc.wait()

Practical: Log Processor

import subprocess
from collections import Counter
 
def count_log_levels(log_file):
    """Count ERROR, WARN, INFO in log file."""
    grep = subprocess.Popen(
        ['grep', '-oE', '(ERROR|WARN|INFO)', log_file],
        stdout=subprocess.PIPE,
        text=True
    )
    
    counts = Counter()
    for line in grep.stdout:
        counts[line.strip()] += 1
    
    grep.wait()
    return dict(counts)
 
# Or pure Python pipe
def pipe_count(log_file):
    cat = subprocess.Popen(['cat', log_file], stdout=subprocess.PIPE)
    grep = subprocess.Popen(
        ['grep', '-oE', 'ERROR|WARN|INFO'],
        stdin=cat.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    sort = subprocess.Popen(
        ['sort'],
        stdin=grep.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    uniq = subprocess.Popen(
        ['uniq', '-c'],
        stdin=sort.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    
    cat.stdout.close()
    grep.stdout.close()
    sort.stdout.close()
    
    output, _ = uniq.communicate()
    return output

Summary

subprocess pipe patterns:

  • Basic pipe: stdout=subprocess.PIPE
  • Chain commands: Connect stdout → stdin
  • Stream output: Iterate proc.stdout
  • Bidirectional: Use bufsize=1 for line buffering
  • Error handling: stderr=PIPE, STDOUT, or DEVNULL
  • Async: asyncio.create_subprocess_exec()

Avoid shell=True with untrusted input. Use explicit pipes for safety.

React to this post: