Skip to content

Instantly share code, notes, and snippets.

@Segmentational
Last active March 6, 2025 03:24
Show Gist options
  • Save Segmentational/5c1394cd1cdbbc8bd0be07d60615cf6b to your computer and use it in GitHub Desktop.
Save Segmentational/5c1394cd1cdbbc8bd0be07d60615cf6b to your computer and use it in GitHub Desktop.
Parallel Command Executions
package main
import (
"bufio"
"bytes"
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"golang.org/x/sys/execabs"
"gopkg.in/yaml.v3"
"io"
"os"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
)
// cwd: "../.."
// shell: bash
// workers: 0
// commands:
// - echo "hello world"
// - echo "hello world"
// - echoo "bad"
//go:embed commands.yaml
var commands []byte
type Configuration struct {
CWD *string `yaml:"cwd" json:"cwd"`
Shell string `yaml:"shell" json:"shell"`
Workers *int `yaml:"workers" json:"workers"`
Commands []string `yaml:"commands" json:"commands"`
}
type Command struct {
Executable string `json:"executable" yaml:"executable"`
Arguments []string `json:"arguments" yaml:"arguments"`
}
type Failure struct {
Command Command `json:"command" yaml:"command"`
Status *int `json:"status" yaml:"status"`
Exception string `json:"error" yaml:"error"`
Output string `json:"output,omitempty" yaml:"output,omitempty"`
}
var ErrInvalidExecutablePath = fmt.Errorf("invalid executable path")
var ErrNilCommand = errors.New("nil command instance")
func (c *Command) verify() (e error) {
if c == nil {
// slog.ErrorContext(ctx, "Invalid Verification on Nil Command Instance")
return ErrNilCommand
}
c.Executable, e = execabs.LookPath(c.Executable)
if e != nil {
e = fmt.Errorf("%w: %w", ErrInvalidExecutablePath, e)
}
return
}
// Pool struct manages command execution in parallel
type Pool struct {
workers int
output io.Writer
error io.Writer
jobs chan Command
wg sync.WaitGroup
stdoutstream chan string
stderrstream chan string
failures []Failure
mutex sync.Mutex
}
// New initializes the worker pool
func New(workers int, output, error io.Writer) *Pool {
return &Pool{
workers: runtime.NumCPU(),
output: output,
error: error,
jobs: make(chan Command, workers),
stdoutstream: make(chan string, 128), // Buffered channel for logging
stderrstream: make(chan string, 128), // Buffered channel for logging
failures: []Failure{},
}
}
// worker executes bash commands and logs output
func (p *Pool) worker(ctx context.Context, id int, cwd string) {
defer p.wg.Done()
for cmd := range p.jobs {
p.run(ctx, id, cwd, cmd.Executable, cmd.Arguments...)
}
}
// run executes a shell command and logs its output
func (p *Pool) run(ctx context.Context, id int, cwd string, executable string, arguments ...string) {
literal := fmt.Sprintf("%s %s", executable, strings.Join(arguments, " "))
p.stdoutstream <- fmt.Sprintf("[Worker (%d)] Command: %s", id, literal)
cmd := execabs.CommandContext(ctx, executable, arguments...)
cmd.Dir = cwd
// Buffer to capture logs specific to the command in the event of failure
buffer := bytes.NewBuffer(nil)
stdout, err := cmd.StdoutPipe()
if err != nil {
p.stderrstream <- fmt.Sprintf("[Worker (%d)] Error getting stdout: %v", id, err)
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
p.stderrstream <- fmt.Sprintf("[Worker (%d)] Error getting stderr: %v", id, err)
return
}
if e := cmd.Start(); e != nil {
message := fmt.Sprintf("[Worker (%d)] Error starting command: %v", id, e)
p.stderrstream <- message
p.mutex.Lock()
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error()})
p.mutex.Unlock()
return
}
// Capture output from stdout pipe.
go func() {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
// buffer.Write(scanner.Bytes())
p.stdoutstream <- fmt.Sprintf("[Worker (%d)] %s: %s", id, "Output", scanner.Text())
}
}()
// Capture output from stderr pipe.
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
buffer.Write(scanner.Bytes())
p.stderrstream <- fmt.Sprintf("[Worker (%d)] %s: %s", id, "Error", scanner.Text())
}
}()
// Wait for command to complete.
if e := cmd.Wait(); e != nil {
var status *int
if state := cmd.ProcessState; state != nil {
v := state.ExitCode()
status = &v
}
message := fmt.Sprintf("[Worker (%d)] Command failed: %v", id, e)
content, exception := io.ReadAll(buffer)
if exception != nil {
exception = fmt.Errorf("unable to read content from tee reader: %w", exception)
p.stderrstream <- exception.Error()
}
p.stderrstream <- message
p.mutex.Lock()
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error(), Output: string(content), Status: status})
p.mutex.Unlock()
}
}
// Start initializes and runs worker goroutines
func (p *Pool) Start(ctx context.Context, cwd string) {
go func() {
for buffer := range p.stdoutstream {
fmt.Fprintf(p.output, "%s\n", buffer)
}
}()
go func() {
for buffer := range p.stderrstream {
fmt.Fprintf(p.error, "%s\n", buffer)
}
}()
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(ctx, i, cwd)
}
}
// Add adds a command to the job queue. The Pool.Start command must be first executed prior to calling.
func (p *Pool) Add(executable string, arguments ...string) {
cmd := Command{Executable: executable, Arguments: arguments}
if e := cmd.verify(); e != nil {
e = fmt.Errorf("unable to add command: %w", e)
p.mutex.Lock()
p.failures = append(p.failures, Failure{Command: Command{Executable: executable, Arguments: arguments}, Exception: e.Error()})
p.mutex.Unlock()
return
}
p.jobs <- cmd
return
}
// Signal signals workers to stop after finishing current jobs.
func (p *Pool) Signal() {
defer close(p.jobs)
}
// Wait waits for all workers to complete.
func (p *Pool) Wait() {
p.wg.Wait()
}
// Close closes the logging stream(s).
func (p *Pool) Close() {
close(p.stdoutstream)
close(p.stderrstream)
}
// Failures returns a channel of failed command(s).
//
// - Ensure to run Pool.Terminate & Pool.Wait prior to reading from the failures.
func (p *Pool) Failures() []Failure {
return p.failures
}
// Main function to run the program
func main() {
ctx, cancel := context.WithCancel(context.Background())
_ = cancel
var configuration Configuration
if e := yaml.Unmarshal(commands, &configuration); e != nil {
panic(e)
}
pwd, e := os.Getwd()
if e != nil {
panic(e)
}
var cwd string = pwd
if configuration.CWD != nil {
cwd = *(configuration.CWD)
}
if !(filepath.IsAbs(cwd)) {
v, e := filepath.Abs(cwd)
if e != nil {
panic(e)
}
cwd = v
}
shell := configuration.Shell
if shell == "" {
shell = os.Getenv("SHELL")
}
var flag string
switch shell {
case "sh", "bash", "zsh", "fish":
flag = "-c"
}
workers := runtime.NumCPU() // Number of workers
if configuration.Workers != nil || *(configuration.Workers) < 1 {
workers = *(configuration.Workers)
}
wp := New(workers, os.Stdout, os.Stdout)
wp.Start(ctx, cwd)
// Add commands to the job queue
for index := range configuration.Commands {
cmd := configuration.Commands[index]
// cmd = strings.ReplaceAll(cmd, "\"", "\\\"")
// cmd = fmt.Sprintf("\"%s\"", cmd)
arguments := []string{flag, cmd}
arguments = slices.DeleteFunc(arguments, func(instance string) bool {
return instance == ""
})
wp.Add(shell, arguments...)
}
wp.Signal()
wp.Wait()
wp.Close()
for _, failure := range wp.Failures() {
fmt.Printf("Failure: %+v\n\n", failure)
}
{
failures := wp.Failures()
content, e := json.MarshalIndent(failures, "", " ")
if e != nil {
panic(e)
}
os.WriteFile("errors.json", content, 0777)
}
//for _, failure := range failures {
// fmt.Println("Failure:", failures)
//}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment