Created
May 7, 2021 05:55
-
-
Save askrabal/79f511e8f247cc50bf8f17eed0124e6a to your computer and use it in GitHub Desktop.
Example of re-printing a line per subprocess in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import random | |
from ctypes import c_char | |
from multiprocessing import Process, Array | |
from sys import argv, stdout | |
from time import sleep | |
arg_end = 10 | |
arg_rest = 1.0 | |
if len(argv) > 1: | |
arg_end = int(argv[1]) | |
if len(argv) > 2: | |
arg_rest = float(argv[2]) | |
def rprint(raw: str, file_name=stdout): | |
file_name.write(raw) | |
def reprint_lines(num_lines: int): | |
for i in range(num_lines): | |
rprint("\033[1A") # go up 1 line | |
rprint("\033[2K") # clear line | |
rprint("\033[G") # move to begin of line | |
def random_counter(arr: Array): | |
try: | |
for ii in range(arg_end): | |
nn = random.randint(0, 99999) | |
val = f"Random num: {nn:5}" | |
arr.value = val.encode() | |
sleep(arg_rest) | |
except KeyboardInterrupt: | |
return 0 | |
def serial_even_counter(arr: Array): | |
try: | |
for ii in range(0, arg_end, 2): | |
val = f"Counting by 2s: {ii + 2:5}" | |
arr.value = val.encode() | |
sleep(arg_rest) | |
except KeyboardInterrupt: | |
return 0 | |
def serial_counter(arr: Array): | |
try: | |
for ii in range(arg_end): | |
val = f"Counting to {arg_end}: {ii + 1:5}" | |
arr.value = val.encode() | |
sleep(arg_rest) | |
except KeyboardInterrupt: | |
return 0 | |
def main(): | |
procs = [] | |
lines = [] | |
funcs = (serial_even_counter, random_counter, serial_counter) | |
for val in funcs: | |
arr = Array(c_char, 1024) | |
lines.append(arr) | |
proc = Process(target=val, args=(arr,)) | |
procs.append(proc) | |
proc.start() | |
while procs: | |
for line in lines: | |
print(line.value.decode()) | |
sleep(arg_rest / 10) | |
reprint_lines(len(lines)) | |
for pp in procs: | |
if not pp.is_alive(): | |
pp.join() | |
procs.remove(pp) | |
for line in lines: | |
print(line.value.decode()) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment