引擎代码:
import (
"errors"
"fmt"
"github.com/go-redis/redis"
"sync"
)
//分布式workflow
type DFlow struct {
RC *redis.ClusterClient
LockKey string
Func map[string]Func
Depend map[string][]string
Force bool
}
//workflow引擎
type flowCore struct {
funcs map[string]*flowStruct
}
type Func func(interface{}) (interface{}, error)
type flowStruct struct {
Deps []string
Ctr int
Fn Func
C chan error
Res interface{}
force bool
once sync.Once
}
//workflow节点已执行
func (fs *flowStruct) done(e error) {
for i := 0; i < fs.Ctr; i++ {
fs.C <- e
}
}
//关闭workflow节点channel
func (fs *flowStruct) close() {
fs.once.Do(func() {
close(fs.C)
})
}
//初始化channel
func (fs *flowStruct) init() {
fs.C = make(chan error)
}
//创建workflow
func create() *flowCore {
return &flowCore{
funcs: make(map[string]*flowStruct),
}
}
//增加workflow节点
func (flw *flowCore) add(name string, d []string, fn Func, fc bool) *flowCore {
flw.funcs[name] = &flowStruct{
Deps: d,
Fn: fn,
Ctr: 1,
force: fc,
}
return flw
}
//workflow检查并启动
func (flw *flowCore) start() map[string]error {
for name, fn := range flw.funcs {
for _, dep := range fn.Deps {
// prevent self depends
if dep == name {
return map[string]error{name: errors.New(name + " not depends of it self")}
}
// prevent no existing dependencies
if _, exists := flw.funcs[dep]; exists == false {
return map[string]error{dep: errors.New(dep + " not exists")}
}
flw.funcs[dep].Ctr++
}
}
return flw.do()
}
//执行workflow节点
func (flw *flowCore) do() map[string]error {
result := map[string]error{}
for name, f := range flw.funcs {
f.init()
go func(name string, fs *flowStruct) {
do := true
defer func() {
if r := recover(); r != nil {
fmt.Println(r)
}
}()
if len(fs.Deps) > 0 {
for _, dep := range fs.Deps {
err, ok := <-flw.funcs[dep].C
if !fs.force && (err != nil || !ok) {
do = false
}
}
}
if do {
//匹配pipeline条件
if len(fs.Deps) == 1 {
fs.Res, result[name] = fs.Fn(flw.funcs[fs.Deps[0]].Res)
} else {
fs.Res, result[name] = fs.Fn(nil)
}
fs.done(result[name])
} else {
for _, fn := range flw.funcs {
fn.close()
}
}
}(name, f)
}
return result
}
//运行workflow
func (df *DFlow) Run() map[string]error {
lock := SyncMutex{LockKey: df.LockKey, LockTime: 15, Rc: df.RC}
//加锁
if lock.Lock() {
defer func() {
// 释放锁
lock.UnLock()
}()
fl := create()
for k, v := range df.Depend {
fl.add(k, v, df.Func[k], df.Force)
}
return fl.start()
}
return nil
}运行示例:
var (
RC, _ = RedisConnect()
)
type test struct {
}
func (t *test) a(interface{}) (interface{}, error) {
fmt.Println("a")
fmt.Println("==========")
return "a ok", nil
}
func (t *test) b(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("b")
fmt.Println("==========")
return "b ok", nil
}
func (t *test) c(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("c")
fmt.Println("==========")
return nil, errors.New("c error")
}
func (t *test) d(i interface{}) (interface{}, error) {
fmt.Println(i)
fmt.Println("d")
fmt.Println("==========")
return "d ok", nil
}
func init() {
t := test{}
Func := map[string]common.Func{"a": t.a, "b": t.b, "c": t.c, "d": t.d}
Depend := map[string][]string{"a": {}, "b": {"a"}, "c": {"b"}, "d": {"c"}}
df := common.DFlow{RC: RC, LockKey: "workflow_test", Func: Func, Depend: Depend}
result := df.Run()
fmt.Println(result)
}执行结果:
a //执行
==========
a ok //a执行输出
b //执行
==========
b ok //b执行输出
c //执行错误,流水线中断
==========
map[a: b: c:c error]
//流水线节点执行结果 | 留言与评论(共有 0 条评论) “” |