Last active
August 29, 2015 14:06
-
-
Save manadart/5e6431122ee54bbabb92 to your computer and use it in GitHub Desktop.
Go Concurrent Slice Blog Post
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Payload type extending a slice of items. | |
type Payload []*itemType | |
// Use a pool of Goroutines to run the input action against all the items, with the input concurrency factor. | |
func (payload Payload) ExecuteAction(action func(*itemType), concFactor int) { | |
itemsChl := make(chan *itemType) | |
wg := new(sync.WaitGroup) | |
wg.Add(concFactor) | |
// Set up the pool with a number of Goroutines equal to our concurrency factor. | |
wrapped := ItemActionWrapper(action) | |
for i := 0; i < concFactor; i++ { | |
go wrapped(itemsChl, wg) | |
} | |
// Send the items. | |
for _, i := range payload { | |
itemsChl <- i | |
} | |
// Close the channel and wait for the workers to finish. | |
close(itemsChl) | |
wg.Wait() | |
} | |
// Executes the wrapped action while the channel is open, then notifies the WaitGroup of completion. | |
func ItemActionWrapper(action func(*itemType)) func(chan *itemType, *sync.WaitGroup) { | |
return func(itemsChl chan *itemType, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for i := range itemsChl { | |
action(i) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func someItemAction(item *itemType) { | |
// Do something. | |
} | |
func anotherItemAction(item *itemType) { | |
// Do another thing. | |
} | |
func main() { | |
var payload Payload | |
// Populate somehow. | |
payload.ExecuteAction(someItemAction, 8) | |
payload.ExecuteAction(anotherItemAction, 16) | |
// Do something else with the payload. | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func worker(itemsChl chan *itemType, wg *sync.WaitGroup) { | |
defer wg.Done() | |
for item := range itemsChl { | |
// Do something with item. | |
} | |
} | |
func main() { | |
sliceOfItems := make(*itemType, 100) | |
// Populate the slice somehow. | |
itemsChl := make(chan *itemType) | |
wg := new(sync.WaitGroup) | |
// Create the pool of 8 workers and set them watching the channel. | |
for i := 0; i < 8; i++ { | |
wg.Add(1) | |
go worker(itemsChl, wg) | |
} | |
// Send all the items down the channel. | |
for _, item := range sliceOfItems { | |
itemsChl <- item | |
} | |
// Close the channel and wait for the workers to finish. | |
close(itemsChl) | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment