workflow引擎(支持pipeline)

引擎代码:

import (
	"errors"
	"fmt"
	"github.com/go-redis/redis"
	"sync"
)

//分布式workflow
type DFlow struct {
	RC      *redis.ClusterClient
	LockKey string
	Func    map[string]Func
	Depend  map[string][]string
	Force   bool
}

//workflow引擎
type flowCore struct {
	funcs map[string]*flowStruct
}

type Func func(interface{}) (interface{}, error)

type flowStruct struct {
	Deps  []string
	Ctr   int
	Fn    Func
	C     chan error
	Res   interface{}
	force bool
	once  sync.Once
}

//workflow节点已执行
func (fs *flowStruct) done(e error) {
	for i := 0; i < fs.Ctr; i++ {
		fs.C <- e
	}
}

//关闭workflow节点channel
func (fs *flowStruct) close() {
	fs.once.Do(func() {
		close(fs.C)
	})
}

//初始化channel
func (fs *flowStruct) init() {
	fs.C = make(chan error)
}

//创建workflow
func create() *flowCore {
	return &flowCore{
		funcs: make(map[string]*flowStruct),
	}
}

//增加workflow节点
func (flw *flowCore) add(name string, d []string, fn Func, fc bool) *flowCore {
	flw.funcs[name] = &flowStruct{
		Deps:  d,
		Fn:    fn,
		Ctr:   1,
		force: fc,
	}
	return flw
}

//workflow检查并启动
func (flw *flowCore) start() map[string]error {
	for name, fn := range flw.funcs {
		for _, dep := range fn.Deps {
			// prevent self depends
			if dep == name {
				return map[string]error{name: errors.New(name + " not depends of it self")}
			}
			// prevent no existing dependencies
			if _, exists := flw.funcs[dep]; exists == false {
				return map[string]error{dep: errors.New(dep + " not exists")}
			}
			flw.funcs[dep].Ctr++
		}
	}
	return flw.do()
}

//执行workflow节点
func (flw *flowCore) do() map[string]error {
	result := map[string]error{}
	for name, f := range flw.funcs {
		f.init()
		go func(name string, fs *flowStruct) {
			do := true
			defer func() {
				if r := recover(); r != nil {
					fmt.Println(r)
				}
			}()
			if len(fs.Deps) > 0 {
				for _, dep := range fs.Deps {
					err, ok := <-flw.funcs[dep].C
					if !fs.force && (err != nil || !ok) {
						do = false
					}
				}
			}
			if do {
				//匹配pipeline条件
				if len(fs.Deps) == 1 {
					fs.Res, result[name] = fs.Fn(flw.funcs[fs.Deps[0]].Res)
				} else {
					fs.Res, result[name] = fs.Fn(nil)
				}
				fs.done(result[name])
			} else {
				for _, fn := range flw.funcs {
					fn.close()
				}
			}
		}(name, f)
	}
	return result
}

//运行workflow
func (df *DFlow) Run() map[string]error {
	lock := SyncMutex{LockKey: df.LockKey, LockTime: 15, Rc: df.RC}
	//加锁
	if lock.Lock() {
		defer func() {
			// 释放锁
			lock.UnLock()
		}()
		fl := create()
		for k, v := range df.Depend {
			fl.add(k, v, df.Func[k], df.Force)
		}
		return fl.start()
	}
	return nil
}

运行示例:

var (
	RC, _ = RedisConnect()
)

type test struct {
}

func (t *test) a(interface{}) (interface{}, error) {
	fmt.Println("a")
        fmt.Println("==========")
	return "a ok", nil
}
func (t *test) b(i interface{}) (interface{}, error) {
	fmt.Println(i)
	fmt.Println("b")
        fmt.Println("==========")
	return "b ok", nil
}
func (t *test) c(i interface{}) (interface{}, error) {
	fmt.Println(i)
	fmt.Println("c")
        fmt.Println("==========")
	return nil, errors.New("c error")
}
func (t *test) d(i interface{}) (interface{}, error) {
	fmt.Println(i)
	fmt.Println("d")
        fmt.Println("==========")
	return "d ok", nil
}
func init() {
	t := test{}
	Func := map[string]common.Func{"a": t.a, "b": t.b, "c": t.c, "d": t.d}
	Depend := map[string][]string{"a": {}, "b": {"a"}, "c": {"b"}, "d": {"c"}}
	df := common.DFlow{RC: RC, LockKey: "workflow_test", Func: Func, Depend: Depend}
	result := df.Run()
	fmt.Println(result)
}

执行结果:

a //执行
==========
a ok //a执行输出
b //执行
==========
b ok //b执行输出
c //执行错误,流水线中断
==========
map[a: b: c:c error]
//流水线节点执行结果
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章