构造动态的计算流程

论坛 期权论坛 期权     
大可不加冰   2019-6-8 04:29   487   0
上周临时被人拉壮丁,火线派了个任务,在UCloud TIC大会上简单介绍了一下USQL和StepFlow两个产品。USQL是一个优秀的数据湖产品,使得用户可以用简单的SQL语句直接在对象存储服务中的海量数据上进行查询,无需建表、ETL/ELT、设计数据仓库等等。而StepFlow则是受AWS StepFunctions的启发,设计的一个用来编排服务工作流的产品。
其实我对这两款产品的了解程度也仅限于产品团队提供给我的公开材料的水平,但是我发现,很多朋友对构造一个动态的计算过程感到迷茫,如果是我自己要设计一个很简单的工作流,读取用户定制的调用逻辑顺序,动态执行,我应该怎么做?我感觉这是一个有趣的问题,可以给有兴趣的朋友提供一个思路。
[h1]如何编排服务[/h1]当下微服务架构简直是当红炸子鸡,原先在一个单体应用中的各个模块被拆分成了各自独立的服务,但是要完成一项业务操作仍然需要模块间的协同,我们需要一个扮演原先单体应用的服务来把这些独立的服务以某种正确的结构粘在一起。这种结构实际上就是我们最为熟悉的那些——顺序执行、if、while、for、switch case、try catch等等,当然也可以有一些函数式的功能,map、flatMap、filter、reduce等等,只不过中间执行的每一条语句都可能是工作流引擎提供的内置方法,或者是一个对外部服务的调用。
一个工作流程可以看成是由语句(Statement)组成的,而语句可以是由其他语句以及表达式(Expression)组成的。
[h1]表达式[/h1]表达式可以有很多种
  • 各种类型的字面值表达式,比如true, false, 100, 100.0, "Hello World"
  • 二元操作表达式,比如1+1,true && false
  • 一元表达式,比如i++,!true
  • 取变量表达式,比如person.name(person代表一个名为person的对象变量)
  • 函数调用表达式,比如time.Now()
等等等等,不一而足。表达式之间也可以是组合关系,比如这个为了举例而完全没有任何实际意义的例子:person.IsValid && !(time.IsAfterNoon() || person.Grade > 10)


[h1]语句[/h1]语句就是构成流程的主体,一个工作流可以看作是一个语句的序列。一条语句可以是执行一个特定任务,比如调用一个服务,也可以是由一组语句构成的。

解释执行这个AST主要依赖Visitor模式,比如一个简单的Visit方法:
  1. func Visit(visitor Visitor, stmt Node, ctx Context) {
  2.     switch stmt.(type) {
  3.     case InvokeSvcStmt:
  4.         {
  5.             stmt.(InvokeSvcStmt).Evaluate(visitor, ctx)
  6.         }
  7.     case IfElseStmt:
  8.         {
  9.             stmt.(IfElseStmt).Evaluate(visitor, ctx)
  10.         }
  11.     case BoolExpr:
  12.         {
  13.             visitor.Push(stmt.(BoolExpr).Evaluate(visitor, ctx))
  14.         }
  15.     case BinaryExpr:
  16.         {
  17.             visitor.Push(stmt.(BinaryExpr).Evaluate(visitor, ctx))
  18.         }
  19.     case VarExpr:
  20.         {
  21.             visitor.Push(stmt.(VarExpr).Evaluate(visitor, ctx))
  22.         }
  23.     }
  24. }
复制代码
在这里Visitor就相当于一个线程栈,记录了调用顺序和栈上数据。
对于语句,调用对应的Evaluate方法;对于表达式,则进行求值后压栈,这样后续的计算过程中可以通过推栈读取之前操作的结果。
我们将工作流看成是一组语句的序列,那么执行一个工作流其实就是执行这个语句序列(在这里偷了个懒,直接用WorkFlow类型实现了Context接口):
  1. func (flow WorkFlow) VisitWorkFlow(visitor Visitor) {
  2.     for _, stmt := range flow.Stmts {
  3.         Visit(visitor, stmt, flow)
  4.     }
  5. }
复制代码
if else的statement,它的结构就是:
  1. type IfElseStmt struct {
  2.     Cond Expr
  3.     Body []Stmt
  4.     Else []Stmt
  5. }
复制代码
它由三部分组成,代表条件的Cond,代表为true时执行的Body,以及代表false时的Else。执行IfElse语句的逻辑如下:
  1. func (ifStmt IfElseStmt) Evaluate(visitor Visitor, ctx Context) {
  2.     cond := EvaluateExpr(visitor, ifStmt.Cond, ctx).(bool)
  3.     stmts := ifStmt.Body
  4.     if !cond {
  5.         stmts = ifStmt.Else
  6.     }
  7.     for _, s := range stmts {
  8.         Visit(visitor, s, ctx)
  9.     }
  10. }
  11. func EvaluateExpr(visitor Visitor, e Expr, ctx Context) interface{} {
  12.     Visit(visitor, e, ctx)
  13.     return visitor.Pop()
  14. }
复制代码
对于访问变量的表达式,就是去Context读取记录:
  1. func (stmt VarExpr) Evaluate(visitor Visitor, ctx Context) interface{} {
  2.     return ctx.GetVar(stmt.Name)
  3. }
复制代码
你会发现,我们只需要把构造工作流所需要的积木块一点点构建出来,然后通过Visit方法将其正确地串接起来,我们可以实现任意复杂的计算逻辑。所谓的方法调用也好,Binary操作也好,都只是针对程序栈的压栈或推栈操作。
如果这个工作流跑到一半,进程意外宕机了怎么办?有没有办法使得计算流程能够从距离宕机最近时刻的位置继续执行下去?这也不难做到。我们借鉴程序计数器(Program Counter)的概念,为工作流和语句块引入PC的概念:
  1. type ProgramCounter struct {
  2.     count int
  3. }
  4. func (pc ProgramCounter) IncreasePC() {
  5.     pc.count++
  6. }
  7. func (pc ProgramCounter) PC() int {
  8.     return pc.count
  9. }
  10. type WorkFlow struct {
  11.     ProgramCounter
  12.     Stmts []Stmt
  13.     vars map[string]interface{}
  14. }
复制代码
如果工作流执行到一半宕机了,如何才能恢复?
  1. func (wf WorkFlow) VisitWorkFlow(visitor Visitor) {
  2.     for i := wf.PC(); i < len(wf.Stmts); i ++ {
  3.         Visit(visitor, wf.Stmts[i], wf)
  4.         wf.IncreasePC()
  5.         PersistWorkFlow(wf)
  6.     }
  7. }
复制代码
我们改造了一下执行工作流的流程,读取工作流当前的PC,从PC处开始执行语句,跳过已经执行过的,每执行完一个语句就递增一下PC,然后持久化状态,这样即使宕机后,其他机器从持久化存储中恢复了工作流的状态,也会从最新状态处开始继续执行。
对于每一个语句内部的执行状态也可以用类似的方法来记录:
  1. type IfElseStmt struct {
  2.     ProgramCounter
  3.     Cond     *bool
  4.     CondExpr Expr
  5.     Body     []Stmt
  6.     Else     []Stmt
  7. }
  8. func (ifStmt IfElseStmt) Evaluate(visitor Visitor, ctx Context) {
  9.     pc := ifStmt.PC()
  10.     if ifStmt.Cond == nil {
  11.         cond := EvaluateExpr(visitor, ifStmt.CondExpr, ctx).(bool)
  12.         ifStmt.Cond = &cond
  13.         PersistStmt(ifStmt)
  14.     }
  15.     stmts := ifStmt.Body
  16.     if !*ifStmt.Cond {
  17.         stmts = ifStmt.Else
  18.     }
  19.     for i := pc; i < len(stmts); i++ {
  20.         Visit(visitor, stmts[i], ctx)
  21.         ifStmt.IncreasePC()
  22.         PersistStmt(ifStmt)
  23.     }
  24. }
复制代码
对if else的执行也可以做这样的改造,把if条件的判定结果立即持久化,这样恢复后不会重复调用判定,以防止得到和宕机时不一样的判定结果,然后就是执行对应分支的语句。
这种方法,可以保证对语句的At Least Once的执行,但有可能会执行多次,所以操作要设计成幂等的。
到目前为止,我们可以动态执行用于定义的流程,并且配合持久化存储后可以在意外宕机后从最接近宕机时的状态开始继续执行,但目前的做法,次完整的工作流执行被限制在一个进程内完成,无法做到集群分布式执行语句,这带来一些限制,比如说想要定义一个定时执行逻辑,在一个死循环中设置一个长达1周的sleep,实现每周自动执行一次的逻辑,这种需求会使得我们的goroutine要sleep一周,几乎不太现实。如果读者们有兴趣的话,下次可以讲讲,如何用actor模型来解决这个问题。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:
帖子:
精华:
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP