Created
January 28, 2015 11:36
-
-
Save huandu/66837eba3ee039505d5b to your computer and use it in GitHub Desktop.
一个简单的并发可控、任务可随意拼接的任务队列实现
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 一个简单的并发可控、任务可随意拼接的任务队列实现。 | |
// 仅作概念演示用,细节不要纠结。 | |
// | |
// 基本结构: | |
// Context:所有任务共享的上下文,任务通过上下文交换数据 | |
// Dispatcher:任务队列管理器,负责创建 Context 并把它放入合适的工作队列 | |
// Worker:任务的抽象接口 | |
// XXXWorker:各个具体的任务处理逻辑 | |
package main | |
import ( | |
"fmt" | |
"time" | |
) | |
type JobId string | |
type JobData string | |
type WorkerFactory func() Worker | |
type WorkerConfig struct { | |
Name JobId | |
Factory WorkerFactory | |
Count int // 需要启动的 worker 数量 | |
} | |
// 所有的任务都会读取 Context 的内容,所以这个结构会很大。 | |
// 当它变得过于复杂的时候需要重构,不过这就不是现在讨论的问题了。 | |
type Context struct { | |
Jobs []JobId | |
// 各种可能被用到的字段 | |
Data JobData | |
Foo string | |
Bar string | |
Player string | |
} | |
// 任务调度器 | |
type Dispatcher struct { | |
done chan bool | |
jobChannels map[JobId]*JobChannels | |
} | |
type JobChannels struct { | |
input chan *Context | |
output chan *Context | |
} | |
// Worker 的接口 | |
type Worker interface { | |
Work(input <-chan *Context, output chan<- *Context) | |
} | |
// 各种 worker | |
type FooWorker struct{} | |
type BarWorker struct{} | |
type PlayerWorker struct{} | |
func main() { | |
fmt.Println("starting...") | |
dispatcher := NewDispatcher() | |
// 这里用来演示通过网络异步收到 job 的情况 | |
go func() { | |
job1 := []JobId{"foo", "bar", "player"} | |
job2 := []JobId{"foo", "player"} // 跳过 bar | |
job3 := []JobId{"bar", "foo"} // 逆序 | |
// 执行任务,每个任务可以带一个自定义数据,现在先简单用 string,未来应该根据设计 | |
dispatcher.Dispatch(job1, "job1") | |
dispatcher.Dispatch(job2, "job2") | |
dispatcher.Dispatch(job3, "job3") | |
time.Sleep(time.Second) | |
dispatcher.Stop() | |
}() | |
dispatcher.Start() | |
} | |
func NewDispatcher() *Dispatcher { | |
return &Dispatcher{ | |
done: make(chan bool), | |
} | |
} | |
var workerConfig = []*WorkerConfig{ | |
&WorkerConfig{"foo", NewFooWorker, 1}, | |
&WorkerConfig{"bar", NewBarWorker, 2}, | |
&WorkerConfig{"player", NewPlayerWorker, 3}, | |
} | |
func (d *Dispatcher) Start() { | |
d.jobChannels = make(map[JobId]*JobChannels) | |
// 启动足够数量的 worker | |
for _, config := range workerConfig { | |
channels := &JobChannels{ | |
input: make(chan *Context), | |
output: make(chan *Context), | |
} | |
d.jobChannels[config.Name] = channels | |
for i := 0; i < config.Count; i++ { | |
worker := config.Factory() | |
go worker.Work(channels.input, channels.output) | |
} | |
} | |
// 做输入输出的调度工作 | |
for _, channels := range d.jobChannels { | |
go d.monitor(channels.output) | |
} | |
<-d.done | |
} | |
func (d *Dispatcher) monitor(output <-chan *Context) { | |
for ctx := range output { | |
go d.dispatch(ctx) | |
} | |
} | |
func (d *Dispatcher) dispatch(ctx *Context) { | |
// 所有任务都完成了 | |
if len(ctx.Jobs) == 0 { | |
fmt.Println("job is done! Name:", ctx.Data, "Data:", *ctx) | |
return | |
} | |
// 把 ctx 放入合适的任务队列,开始执行任务 | |
job := ctx.Jobs[0] | |
ctx.Jobs = ctx.Jobs[1:] | |
channels := d.jobChannels[job] | |
channels.input <- ctx | |
} | |
func (d *Dispatcher) Stop() { | |
d.done <- true | |
} | |
func (d *Dispatcher) Dispatch(jobs []JobId, data JobData) { | |
// 首先初始化一个上下文 | |
ctx := &Context{ | |
Jobs: jobs, | |
Data: data, | |
} | |
// 开始派发任务 | |
d.dispatch(ctx) | |
} | |
func NewFooWorker() Worker { | |
return &FooWorker{} | |
} | |
func NewBarWorker() Worker { | |
return &BarWorker{} | |
} | |
func NewPlayerWorker() Worker { | |
return &PlayerWorker{} | |
} | |
func (foo *FooWorker) Work(input <-chan *Context, output chan<- *Context) { | |
for ctx := range input { | |
fmt.Println("Worker foo: current job name is", ctx.Data) | |
ctx.Foo = "foo-done" | |
output <- ctx | |
} | |
} | |
func (bar *BarWorker) Work(input <-chan *Context, output chan<- *Context) { | |
for ctx := range input { | |
fmt.Println("Worker bar: current job name is", ctx.Data) | |
ctx.Bar = "bar-done" | |
output <- ctx | |
} | |
} | |
func (player *PlayerWorker) Work(input <-chan *Context, output chan<- *Context) { | |
for ctx := range input { | |
fmt.Println("Worker player: current job name is", ctx.Data) | |
ctx.Player = "player-done" | |
output <- ctx | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment