-
-
Save cljoly/1de3b8c4cf92691346b906a144bdf20d to your computer and use it in GitHub Desktop.
Fan-Out with Context in Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"math/rand" | |
"os" | |
"os/exec" | |
"os/signal" | |
"runtime" | |
"strings" | |
"sync" | |
"syscall" | |
) | |
type ErrProcessing struct { | |
event string | |
inner error | |
} | |
func (e *ErrProcessing) Error() string { | |
return fmt.Sprintf("error processing %s. extra context: %s", e.event, e.inner.Error()) } | |
func main() { | |
numOfWorkers := runtime.NumCPU() | |
fmt.Fprintln(os.Stdout, "Running on", numOfWorkers, "goroutines.") | |
var ( | |
ctx, cancel = context.WithCancel(context.Background()) // Context | |
inChan = make(chan string) // Input values | |
outChanValues = make(chan string, 10) // Output values | |
outChanErrors = make(chan error, 10) // Output errors | |
succeeded = []string{} // Collected Output values | |
failed = []string{} // Collected Input values that failed | |
) | |
defer cancel() | |
go func() { | |
sigterm := make(chan os.Signal, 1) | |
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) | |
<-sigterm | |
cancel() | |
}() | |
// Create consumers | |
wg := &sync.WaitGroup{} // Waitgroup for workers | |
wg.Add(numOfWorkers) | |
for s := 0; s < numOfWorkers; s++ { | |
go fanoutWorker(ctx, wg, inChan, s, outChanValues, outChanErrors) | |
} | |
// Input data | |
go inputData(inChan) | |
go func() { | |
// Once input data is treated and all workers have returned closed the output channel | |
wg.Wait() | |
close(outChanValues) | |
close(outChanErrors) | |
}() | |
for { | |
select { | |
case value, ok := <-outChanValues: | |
if ok { | |
fmt.Fprintf(os.Stdout, "Success: %s\n", value) | |
succeeded = append(succeeded, value) | |
} else { | |
outChanValues = nil | |
} | |
case err, ok := <-outChanErrors: | |
if ok { | |
var errP *ErrProcessing | |
if errors.As(err, &errP) { | |
failed = append(failed, errP.event) | |
} else { | |
fmt.Fprintln(os.Stderr, "unhandled error:", err) | |
} | |
} else { | |
outChanErrors = nil | |
} | |
} | |
if outChanValues == nil && outChanErrors == nil { | |
break | |
} | |
} | |
fmt.Fprintf(os.Stdout, "Successful (%d): %s\nFailed (%d): %s", | |
len(succeeded), strings.Join(succeeded, ", "), len(failed), strings.Join(failed, ", ")) | |
} | |
// Insert data into the input channel and signal it's done | |
func inputData(inChan chan<- string) { | |
for _, v := range strings.Fields(Names) { | |
inChan <- v | |
} | |
close(inChan) | |
} | |
func fanoutWorker(ctx context.Context, wg *sync.WaitGroup, inChan <-chan string, | |
routineName int, valOut chan<- string, errOut chan<- error) { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case name, ok := <-inChan: | |
if !ok { | |
return | |
} | |
// Random sleep between 1-5s handled as syscalls. | |
cmd := exec.CommandContext(ctx, "sleep", fmt.Sprint(rand.Intn(5-1)+1)) | |
if err := cmd.Run(); err == nil { | |
valOut <- name | |
} else { | |
errOut <- &ErrProcessing{event: name, inner: err} | |
} | |
} | |
} | |
} | |
const Names = "dynasty regret appalling creative accessories forlornness bazooka pattern first glow crackdown daughter addictive goon beautiful grave amusement pitch peepshow accountable bloat cyanide fork fight axiom biggest enjoy disfigurement teen foreign company lavender owl hooligan blabbermouth blockade frying melody empire apocalyptic hooves terror believable vibrator sentinel famous convulsion flirtation system heavyhearted" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment