Skip to content

Instantly share code, notes, and snippets.

@SCP002
Last active March 12, 2025 07:48
Show Gist options
  • Save SCP002/6c21348635264a90c0cc7b642c446399 to your computer and use it in GitHub Desktop.
Save SCP002/6c21348635264a90c0cc7b642c446399 to your computer and use it in GitHub Desktop.
Python: Start process. Keep StdOut and StdErr in the original order. Display output real time character by character. Capture combined output.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import dataclasses
import io
import subprocess
import sys
from collections.abc import Callable
from typing import IO
@dataclasses.dataclass()
class ProcessOptions:
command: list[str]
encoding: str = 'utf-8'
cwd: str | None = None
wait: bool = True
shell: bool = False
capture: bool = True
print: bool = True
new_console: bool = False
hide: bool = False
on_char: Callable[[str, subprocess.Popen[str]], None] | None = None
on_line: Callable[[str, subprocess.Popen[str]], None] | None = None
@dataclasses.dataclass()
class ProcessResult:
exit_code: int | None = None
output: str = ''
def start_process(opts: ProcessOptions) -> ProcessResult:
startup_info = subprocess.STARTUPINFO()
creation_flags = 0
std_out: int | IO | None = None
std_err: int | IO | None = None
std_in: int | IO | None = None
if opts.new_console or opts.hide:
if opts.new_console:
creation_flags |= subprocess.CREATE_NEW_CONSOLE
if opts.hide:
startup_info.dwFlags |= subprocess.STARTF_USESHOWWINDOW
else: # Can capture output
std_out = subprocess.PIPE
std_err = subprocess.STDOUT
std_in = sys.stdin
proc = subprocess.Popen(
opts.command,
encoding=opts.encoding,
cwd=opts.cwd,
shell=opts.shell,
bufsize=1,
startupinfo=startup_info,
creationflags=creation_flags,
stdout=std_out,
stderr=std_err,
stdin=std_in,
)
captured_out = io.StringIO()
can_capture_out: bool = not opts.new_console and not opts.hide
try:
if can_capture_out and opts.wait:
def read(file: IO[str] | None) -> None:
if file is None:
return
line = io.StringIO()
while True:
char = file.read(1)
if char == '' and proc.poll() is not None:
break
if opts.on_char:
opts.on_char(char, proc)
if opts.on_line:
if char in ['\n', '\r']:
opts.on_line(line.getvalue(), proc)
line.truncate(0)
else:
line.write(char)
if opts.capture:
captured_out.write(char)
if opts.print:
print(char, flush=True, end='')
read(proc.stdout)
if opts.wait:
proc.wait()
except KeyboardInterrupt:
proc.terminate()
result = ProcessResult()
result.exit_code = proc.poll()
result.output = captured_out.getvalue()
return result
def main() -> None:
def on_char(char: str, proc: subprocess.Popen[str]) -> None:
pass
def on_line(line: str, proc: subprocess.Popen[str]) -> None:
pass
opts = ProcessOptions(
command=['dir', 'C:\\', '|', 'findstr', '/i', 'Program Files'],
encoding='cp866',
wait=True,
shell=True,
capture=True,
print=True,
new_console=False,
hide=False,
on_char=on_char,
on_line=on_line,
)
result = start_process(opts)
print('-' * 50)
print('\033[92mCaptured output will be displayed below:\033[0m')
print(result.output, end='')
print('-' * 50)
print(f'\033[92mExit code: {result.exit_code}\033[0m')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment