案例来源于 [美]威廉·肯尼迪(William Kennedy)布赖恩·克特森(Brian Ketelsen)埃里克·圣马丁(Erik St. Martin). Go语言实战 (Kindle Location 3997). 人民邮电出版社. Kindle Edition.一书第7章 并发模式 之 7.1 runner。
设计
runner包用于展示如何使用通道来监视程序的执行时间,如果程序运行时间太长,也可以用runner包来终止程序。当开发需要调度后台处理任务的程序的时候,这种模式会很有用。这也是目前我们使用跑数平台抓取数据和执行临时脚本的关键所在,虽然实现各不相同,但大致思路都是相同的。即是添加命令,记录输出,控制命令执行完毕或中断……
教材中给出的案例相对比较简单,传入参数随意,只传入一个下标ID值。但是go中参数传递并不容易,更啰嗦的办法或者是总结业务中需要用到的形式来提炼出更抽象的方式,这样会显得相对简单,但维护会增加好几倍的精力。但是所有的抽象都统一到一个函数中,只会让参数更抽象,使用go语言的接口interface{}类型,因为该类型可以接收函数传入任意类型的参数,但是在函数中,参数的类型依然是interface类型,需要特别注意,必须进行处理后方可使用。
package util
import (
"errors"
"os"
"os/signal"
"time"
)
// 注册任务名称和参数
type Task struct {
Name func(...interface{})
Numbers []int
Strings []string
}
// 任务管理控制
type runner struct {
// signal包实现了对输入信号的访问
// 该通道用于报告从操作系统发送的所有信号
interrupt chan os.Signal
// 报告处理任务完成结果
complete chan error
// 报告处理任务已经超时
timeout <-chan time.Time
// 需要处理的注册任务
tasks []Task
}
// 任务执行超时错误
var ErrTimeout = errors.New("received timeout")
// 接收到中断信号
var ErrInterrupt = errors.New("received interrupt")
// 生成一个任务控制实例
func New(d time.Duration) *runner {
return &runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// 注册任务到任务列表中
// 可以连续注册多个任务
func (r *runner) Add(task ...Task) {
r.tasks = append(r.tasks, task...)
}
// 管理注册任务
// 监听任务执行完毕和超时的信号
func (r *runner) Start() error {
// 原型: func Notify(c chan<- os.Signal, sig ...os.Signal)
// Notify函数让signal包将输入信号转发到c
// 如果没有列出要传递的信号, 会将所有输入信号传递到c;
// 否则只传递列出的输入信号
//
// signal包不会为了向c发送信息而阻塞(就是说如果发送时c阻塞了, signal包会直接放弃)
// 调用者应该保证c有足够的缓存空间可以跟上期望的信号频率
// 对使用单一信号用于通知的通道, 缓存为1就足够了
//
// 可以使用同一通道多次调用Notify
// 每一次都会扩展该通道接收的信号集
// 唯一从信号集去除信号的方法是调用Stop
// 可以使用同一信号和不同通道多次调用Notify; 每一个通道都会独立接收到该信号的一个拷贝
signal.Notify(r.interrupt, os.Interrupt)
// 异步执行每一个注册的任务
go func() {
r.complete <- r.run()
}()
// 接收任务完成和超时信号
select {
case err := <-r.complete:
return err
case <-r.timeout:
return ErrTimeout
}
}
// 执行注册任务
func (r *runner) run() error {
for _, task := range r.tasks {
if r.goInterrupt() {
return ErrInterrupt
}
task.Name(task.Numbers, task.Strings)
}
return nil
}
// 接收中断信号
// 一般来说, select 语句在没有任何要接收的数据时会阻塞
// 默认的 default 分支将其转化为非阻塞模式
func (r *runner) goInterrupt() bool {
select {
// 接收到中断信号后, 停止继续接收信号
case <-r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
测试
测试也给的比较简单,测试整数切片和字符串切片,可以正常使用,超时也可以实现,但是中断事件还有一些问题,留待下次研究。go语言的信号机制还没怎么去了解,接下来会整理一篇关于信号的基础知识。
package main
import (
"fmt"
"notes/go/util"
"os"
"time"
)
func main() {
r := util.New(10 * time.Second)
r.Add(
util.Task{testTask, []int{1, 2, 3}, []string{"2017-09-01", "2017-09-10"}},
util.Task{testTask, []int{1, 2, 3}, []string{}},
util.Task{testTask, []int{}, []string{"2017-09-01", "2017-09-10"}},
)
if err := r.Start(); err != nil {
switch err {
case util.ErrTimeout:
fmt.Println(util.ErrTimeout)
os.Exit(1)
case util.ErrInterrupt:
fmt.Println(util.ErrInterrupt)
os.Exit(2)
}
}
}
// 测试传入整数
// interface{}可用于向函数传递任意类型的变量, 但对于函数内部, 该变量仍然为interface{}类型
// 接口类型向普通类型的转换称为类型断言(运行期确定)
func testTask(n ...interface{}) {
// 故意制造超时事件
//time.Sleep(5 * time.Second)
// 写一个死循环, 手动kill中断运行
i := 0
for {
i += 1
}
// 处理正常事件
for _, res := range n {
switch r := res.(type) {
case nil:
fmt.Println("empty value")
case []int:
for _, num := range r {
fmt.Println(num)
}
case []string:
for _, s := range r {
fmt.Println(s)
}
default:
fmt.Println("unknown type error")
}
}
}
超时时的输出:
zhgxun-pro:go zhgxun$ go run tasktest.go
received timeout
exit status 1
zhgxun-pro:go zhgxun$
正常执行完任务时的输出:
zhgxun-pro:go zhgxun$ go run tasktest.go
1
2
3
2017-09-01
2017-09-10
1
2
3
2017-09-01
2017-09-10
zhgxun-pro:go zhgxun$