Last active
March 6, 2025 03:24
-
-
Save Segmentational/5c1394cd1cdbbc8bd0be07d60615cf6b to your computer and use it in GitHub Desktop.
Parallel Command Executions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"context" | |
_ "embed" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"golang.org/x/sys/execabs" | |
"gopkg.in/yaml.v3" | |
"io" | |
"os" | |
"path/filepath" | |
"runtime" | |
"slices" | |
"strings" | |
"sync" | |
) | |
// cwd: "../.." | |
// shell: bash | |
// workers: 0 | |
// commands: | |
// - echo "hello world" | |
// - echo "hello world" | |
// - echoo "bad" | |
//go:embed commands.yaml | |
var commands []byte | |
type Configuration struct { | |
CWD *string `yaml:"cwd" json:"cwd"` | |
Shell string `yaml:"shell" json:"shell"` | |
Workers *int `yaml:"workers" json:"workers"` | |
Commands []string `yaml:"commands" json:"commands"` | |
} | |
type Command struct { | |
Executable string `json:"executable" yaml:"executable"` | |
Arguments []string `json:"arguments" yaml:"arguments"` | |
} | |
type Failure struct { | |
Command Command `json:"command" yaml:"command"` | |
Status *int `json:"status" yaml:"status"` | |
Exception string `json:"error" yaml:"error"` | |
Output string `json:"output,omitempty" yaml:"output,omitempty"` | |
} | |
var ErrInvalidExecutablePath = fmt.Errorf("invalid executable path") | |
var ErrNilCommand = errors.New("nil command instance") | |
func (c *Command) verify() (e error) { | |
if c == nil { | |
// slog.ErrorContext(ctx, "Invalid Verification on Nil Command Instance") | |
return ErrNilCommand | |
} | |
c.Executable, e = execabs.LookPath(c.Executable) | |
if e != nil { | |
e = fmt.Errorf("%w: %w", ErrInvalidExecutablePath, e) | |
} | |
return | |
} | |
// Pool struct manages command execution in parallel | |
type Pool struct { | |
workers int | |
output io.Writer | |
error io.Writer | |
jobs chan Command | |
wg sync.WaitGroup | |
stdoutstream chan string | |
stderrstream chan string | |
failures []Failure | |
mutex sync.Mutex | |
} | |
// New initializes the worker pool | |
func New(workers int, output, error io.Writer) *Pool { | |
return &Pool{ | |
workers: runtime.NumCPU(), | |
output: output, | |
error: error, | |
jobs: make(chan Command, workers), | |
stdoutstream: make(chan string, 128), // Buffered channel for logging | |
stderrstream: make(chan string, 128), // Buffered channel for logging | |
failures: []Failure{}, | |
} | |
} | |
// worker executes bash commands and logs output | |
func (p *Pool) worker(ctx context.Context, id int, cwd string) { | |
defer p.wg.Done() | |
for cmd := range p.jobs { | |
p.run(ctx, id, cwd, cmd.Executable, cmd.Arguments...) | |
} | |
} | |
// run executes a shell command and logs its output | |
func (p *Pool) run(ctx context.Context, id int, cwd string, executable string, arguments ...string) { | |
literal := fmt.Sprintf("%s %s", executable, strings.Join(arguments, " ")) | |
p.stdoutstream <- fmt.Sprintf("[Worker (%d)] Command: %s", id, literal) | |
cmd := execabs.CommandContext(ctx, executable, arguments...) | |
cmd.Dir = cwd | |
// Buffer to capture logs specific to the command in the event of failure | |
buffer := bytes.NewBuffer(nil) | |
stdout, err := cmd.StdoutPipe() | |
if err != nil { | |
p.stderrstream <- fmt.Sprintf("[Worker (%d)] Error getting stdout: %v", id, err) | |
return | |
} | |
stderr, err := cmd.StderrPipe() | |
if err != nil { | |
p.stderrstream <- fmt.Sprintf("[Worker (%d)] Error getting stderr: %v", id, err) | |
return | |
} | |
if e := cmd.Start(); e != nil { | |
message := fmt.Sprintf("[Worker (%d)] Error starting command: %v", id, e) | |
p.stderrstream <- message | |
p.mutex.Lock() | |
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error()}) | |
p.mutex.Unlock() | |
return | |
} | |
// Capture output from stdout pipe. | |
go func() { | |
scanner := bufio.NewScanner(stdout) | |
for scanner.Scan() { | |
// buffer.Write(scanner.Bytes()) | |
p.stdoutstream <- fmt.Sprintf("[Worker (%d)] %s: %s", id, "Output", scanner.Text()) | |
} | |
}() | |
// Capture output from stderr pipe. | |
go func() { | |
scanner := bufio.NewScanner(stderr) | |
for scanner.Scan() { | |
buffer.Write(scanner.Bytes()) | |
p.stderrstream <- fmt.Sprintf("[Worker (%d)] %s: %s", id, "Error", scanner.Text()) | |
} | |
}() | |
// Wait for command to complete. | |
if e := cmd.Wait(); e != nil { | |
var status *int | |
if state := cmd.ProcessState; state != nil { | |
v := state.ExitCode() | |
status = &v | |
} | |
message := fmt.Sprintf("[Worker (%d)] Command failed: %v", id, e) | |
content, exception := io.ReadAll(buffer) | |
if exception != nil { | |
exception = fmt.Errorf("unable to read content from tee reader: %w", exception) | |
p.stderrstream <- exception.Error() | |
} | |
p.stderrstream <- message | |
p.mutex.Lock() | |
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error(), Output: string(content), Status: status}) | |
p.mutex.Unlock() | |
} | |
} | |
// Start initializes and runs worker goroutines | |
func (p *Pool) Start(ctx context.Context, cwd string) { | |
go func() { | |
for buffer := range p.stdoutstream { | |
fmt.Fprintf(p.output, "%s\n", buffer) | |
} | |
}() | |
go func() { | |
for buffer := range p.stderrstream { | |
fmt.Fprintf(p.error, "%s\n", buffer) | |
} | |
}() | |
for i := 0; i < p.workers; i++ { | |
p.wg.Add(1) | |
go p.worker(ctx, i, cwd) | |
} | |
} | |
// Add adds a command to the job queue. The Pool.Start command must be first executed prior to calling. | |
func (p *Pool) Add(executable string, arguments ...string) { | |
cmd := Command{Executable: executable, Arguments: arguments} | |
if e := cmd.verify(); e != nil { | |
e = fmt.Errorf("unable to add command: %w", e) | |
p.mutex.Lock() | |
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error()}) | |
p.mutex.Unlock() | |
return | |
} | |
p.jobs <- cmd | |
return | |
} | |
// Signal signals workers to stop after finishing current jobs. | |
func (p *Pool) Signal() { | |
defer close(p.jobs) | |
} | |
// Wait waits for all workers to complete. | |
func (p *Pool) Wait() { | |
p.wg.Wait() | |
} | |
// Close closes the logging stream(s). | |
func (p *Pool) Close() { | |
close(p.stdoutstream) | |
close(p.stderrstream) | |
} | |
// Failures returns a channel of failed command(s). | |
// | |
// - Ensure to run Pool.Terminate & Pool.Wait prior to reading from the failures. | |
func (p *Pool) Failures() []Failure { | |
return p.failures | |
} | |
// Main function to run the program | |
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
_ = cancel | |
var configuration Configuration | |
if e := yaml.Unmarshal(commands, &configuration); e != nil { | |
panic(e) | |
} | |
pwd, e := os.Getwd() | |
if e != nil { | |
panic(e) | |
} | |
var cwd string = pwd | |
if configuration.CWD != nil { | |
cwd = *(configuration.CWD) | |
} | |
if !(filepath.IsAbs(cwd)) { | |
v, e := filepath.Abs(cwd) | |
if e != nil { | |
panic(e) | |
} | |
cwd = v | |
} | |
shell := configuration.Shell | |
if shell == "" { | |
shell = os.Getenv("SHELL") | |
} | |
var flag string | |
switch shell { | |
case "sh", "bash", "zsh", "fish": | |
flag = "-c" | |
} | |
workers := runtime.NumCPU() // Number of workers | |
if configuration.Workers != nil || *(configuration.Workers) < 1 { | |
workers = *(configuration.Workers) | |
} | |
wp := New(workers, os.Stdout, os.Stdout) | |
wp.Start(ctx, cwd) | |
// Add commands to the job queue | |
for index := range configuration.Commands { | |
cmd := configuration.Commands[index] | |
// cmd = strings.ReplaceAll(cmd, "\"", "\\\"") | |
// cmd = fmt.Sprintf("\"%s\"", cmd) | |
arguments := []string{flag, cmd} | |
arguments = slices.DeleteFunc(arguments, func(instance string) bool { | |
return instance == "" | |
}) | |
wp.Add(shell, arguments...) | |
} | |
wp.Signal() | |
wp.Wait() | |
wp.Close() | |
for _, failure := range wp.Failures() { | |
fmt.Printf("Failure: %+v\n\n", failure) | |
} | |
{ | |
failures := wp.Failures() | |
content, e := json.MarshalIndent(failures, "", " ") | |
if e != nil { | |
panic(e) | |
} | |
os.WriteFile("errors.json", content, 0777) | |
} | |
//for _, failure := range failures { | |
// fmt.Println("Failure:", failures) | |
//} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment