Last active
April 12, 2017 22:06
-
-
Save rohanthewiz/73f68f6af94f88843ac2bc43317d2448 to your computer and use it in GitHub Desktop.
A Pattern for concurrency using Wait Groups and Buffered Channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A Concurrency Pattern using Wait Groups and Buffered Channels | |
package main | |
import ( | |
"time" | |
"fmt" | |
"sync" | |
) | |
// Synopsis | |
// We create a buffered channel that will receive the basic unit of processing. Here we use a slice (array in other langs) of strings | |
// An unbuffered "done" channel is used to signal when poller processing complete | |
var process_strings_channel chan []string | |
const processChannelSize = 2000 | |
var generate_strings_wait_group = new(sync.WaitGroup) | |
func main() { | |
process_strings_channel = make(chan []string, processChannelSize) | |
process_strings_done := make(chan bool) | |
// Start the listener to the process_strings_channel | |
go pollForStrings(process_strings_channel, process_strings_done) | |
// Send some slices | |
for i := 0; i < 5; i++ { | |
generate_strings_wait_group.Add(1) // basically increment the number of outstanding generate_string goroutines | |
go generateStrings(i) | |
} | |
// Wait for generateStrings goroutines to finish (number of outstanding goroutines is 0) | |
generate_strings_wait_group.Wait() | |
// Close the channel so nothing else can be added and the consumer (poller) knows the end of the line | |
close(process_strings_channel) | |
// Wait for the consumer to send back on the unbuffered 'done' channel (process_strings_done) when it has completed *all* processing | |
<- process_strings_done // wait for final stage to complete | |
} | |
func generateStrings(gr_num int) { | |
gr := fmt.Sprintf("GR:%d", gr_num) | |
arr_str := []string{ | |
gr, "cat", "dog", "mouse", | |
} | |
// Send to the channel | |
// This will block if the channel is full i.e. at processChannelSize capacity | |
fmt.Println(gr, "sending...") | |
process_strings_channel <- arr_str // send to the channel. | |
generate_strings_wait_group.Done() // decrement the number of outstanding generate_string goroutines | |
} | |
// Poll the process_strings_channel for incoming messages of []string | |
// We pass in input and signal channels so this function could live in a separate package | |
func pollForStrings(process_strings_channel <-chan []string, done chan <- bool) { | |
defer func() { | |
done <- true // signal caller when we are done | |
}() | |
for { | |
select { // Select can multiplex cases reading from multiple channels | |
case attrs, ok := <- process_strings_channel: // we will block till there is a message on the channel | |
if !ok { // the channel is closed *and* empty | |
// wrap up | |
return // the defer will send `true` to the done channel | |
} else { // do some work on our attributes | |
fmt.Println("Poller received new array of strings") | |
for _, attr := range attrs { | |
fmt.Println(attr) | |
} | |
} | |
case <- time.After(10 * time.Second): // After 10 seconds we will receive from the channel returned by time.After | |
fmt.Println("Timeout waiting for messages") | |
return | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment