Advertisement

Go 实现任务队列

阅读量:

问题提出

当某个对象的接口仅能在单线程(协程)调用时, 所有针对该对象的接口调用都应该处于同一个调度队列中。比如串口的读或者写, 如果多线程读(写)串口,难免出现数据完整性被破坏的问题。为了避免被破坏,就需要在调用接口时加锁保护。更简单一种方法就是将所有对该对象的接口调用都放在同一个调度队列中,这样每一次调用都会按先后顺序执行,不会有任何抢占资源的问题。

实现原理

在传统的编程语言中, 一般的思路是:

  • 一个存放任务的队列,一个用于任务队列的资源锁, 一个单独的内部线程
  • 内部线程不断的取出队列中的新任务执行
  • 设定一个线程退出标志,当标志被设置的时候,内部线程自动结束

Go中的实现原理

  • 带资源锁的任务队列可以使用 chan func()来实现
  • 线程可以用goroutine来代替
  • 退出标志可以是一个简单的 chan bool

队列的数据结构

复制代码
    type DispatchQueue struct {
    	work    chan func() // 任务队列
    	quit    chan bool // 退出标志
    	Working bool  // 运行状态
    }

队列实例化

复制代码
    func NewQueue() *DispatchQueue {
    	q := &DispatchQueue{work: make(chan func()), quit: make(chan bool), Working:true}
    	go func() {
    		var job func()
    		for {
    			select {
    			case job = <- q.work:
    			case <-q.quit:
    				q.Working = false
    				return
    
    			}
    			job()
    		}
    	}()
    	return q
    }

说明 :

其中的 go func() {...}调用就是使用goroutine开启单独的任务执行协程。
里面是一个无限循环, 循环中使用select语句来获取队列中的任务来执行或者获取到退出信号终止循环从而结束该协程

队列调度任务

  • work channel发送一个任务
复制代码
    func (q *DispatchQueue) invoke(work func())  {
    	q.work <- work
    }

结束任务队列

  • quit channel发送信号
复制代码
    func (q *DispatchQueue) stop()  {
    	q.quit <- true
    }

重启任务队列的工作协程

复制代码
    func (q *DispatchQueue) start()  {
    	if q.Working {
    		return
    	}
    
    	go func() {
    		var job func()
    
    		q.Working = true
    
    		for {
    			select {
    			case job = <- q.work:
    			case <-q.quit:
    				q.Working = false
    				return
    
    			}
    			job()
    		}
    	}()
    }

使用示例

复制代码
    func main() {
    	queue := NewQueue()
    
    	go func() {
    		for i := range make([]bool, 10) {
    			queue.invoke(func() {
    				println(i)
    			})
    			<- time.After(time.Second)
    		}
    	}()
    
    	<- time.After(3 * time.Second)
    	queue.stop()
    	println("stopped")
    	println("will restart after 1s")
    	<- time.After(time.Second)
    	queue.start()
    	<- time.After(10 * time.Second)
    }

输出:

复制代码
    0
    1
    2
    3
    stopped
    will restart after 1s
    4
    5
    6
    7
    8
    9

输出说明:

在新协程中,每间隔1s发送一次任务。虽然一共有10次往任务队列发送任务,但是在主协程(main)中仅仅等待了3s就将任务队列的工作协程结束了,因此在新协程里面3s后发送的任务都不会得到执行,除非重新调用 queue.start()来重新启动工作协程才会继续输出

全部评论 (0)

还没有任何评论哟~