本文共 3795 字,大约阅读时间需要 12 分钟。
@【
转载:
请到原文查看。。。目标:从有序的简单事件流中发现一些高阶特征
输入:一个或多个简单事件构成的事件流
处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
输出:满足规则的复杂事件
flinkCEP 提供了 Pattern API,定义用于对输入流数据进行复杂事件的规则,用来提取符合规则的事件结果
处理事件的规则,被叫做“模式(Pattern)”
包含四个步骤
输入事件流的创建
Pattern 的定义
Pattern 应用在事件流上检测
选取结果
// 定义一个 Pattern val pattern = Pattern.begin[event]("start").where(_.getId == 42) .next("middle").subtype(classOf(SubEvent)).where(_.getTemp >= 10.0) .followedBy("end").where(_.getName == "end")// 将创建好的 Pattern 应用到输入事件流上val patternStream = CEP.pattern(inputStream.pattern)// 获取事件序列,得到处理结果val result:DataStream(alert) = patternStream.select(createAlert(_))
Pattern API 模式有几下几种分类
个体模式(Individual Patterns)
模式序列\组合模式(Combining Patterns )
模式组(Groups of patterns)
个体模式就是组成复杂规则的每个单独的模式定义
个体模式可以包括“单利模式 singleton” 和“循环模式 loogping”
单例模式只接收一个事件,而循环模式可以接收多个事件
start.times(3).where(_.behavior.startWith(“fav”))
可以在一个个体模式后追加量词,也就是指定循环次数
// 匹配出现4次start.times(4)// 匹配出现 2-4 次start.times(2,4)// 匹配出现0次或4次start.times(4).optional// 匹配出现 2-4 次,并且尽可能多的重复匹配start.times(2,4).greedy// 匹配出现 1 次或多次start.oneOrMore// 匹配出现 0 次、两次或多次start.timesOrMore(2).optional.greedy
条件(Condition)
.where() .or() .until()
来指定条件简单条件
通过.where()
方法对事件中的字段进行判断筛选,决定是否接受该事件
start.where(event=>event.getName.startWith(“foo”))
组合条件
.or()
方法表示逻辑相连,.where()
的直接组合就是 ANDpattern.where(event=>…).or(event=>…)
终止条件
.until()
作为终止条件,以便清理状态迭代条件(Iterative Condition)
.where((value,ctx)=>{…})
可以调用 ctx.getEventsForPattern(“name”)
val start = Pattern.begin(“start”)
.next()
指定“a next b “
事件序列 【a,c,b,d】
则没有匹配.followBy()
指定“a followBy b “
事件序列 【a,c,b,d】
匹配为{a,b}
.followedByAny()
指定“a followBy b “
事件序列 【a,c,b1,b2】匹配为{a,b1},{a,b2}
除了上述模式之外,还可以定义“不希望出现某种近邻关系”
.notNext
: 不想让某个事件严格紧邻前一个事件发生.notFollowedBy
:不想让某个事件在两个事件之间发生.begin()
开始.notFollowedBy()
结束“not”
类型的模式不能被 optional所修饰next.within(Time.seconds(10))
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
调用CEP.pattern()
,给定输入流和模式,就能得到一个 PatternStream val input:DataStream[Event] = ...val pattern:Pattern[Event,_] = ...val patternStream:PatternStream[Event] = CEP.pattern(input,pattern)
创建 PatternStream 之后,就可以应用 select 或者 flatselect方法,从检测到的事件序列中提取事件了
select()方法需要输入一个 select function作为参数,每个成功匹配的事件序列都会调用它
select() 以一个 Map[String,Iterable[N]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所接收到的事件的 Iterable 类型
def selectFn(pattern:Map[String,Iterable[In]]):OUT= { val startEvent = pattern.get("start").get.next val endEvent = pattern.get("end").get.next OUT(startEvent,endEvent)}
当一个模式通过 within关键字定义了检测窗口时间时,部分时间序列可能因为超过窗口长度而被丢弃,为了能够处理这部分超时的数据,select 和 flatSelect API 调用允许指定超时处理程序
超时处理程序会接收到目前为止模式匹配到的所有事件,由一个 OutputTag 定义接收到的额超时事件序列
val patternStream:PatternStream[Event] = CEP.pattern(input,pattern)val outputTag = OutputTag[String]("side-output")val result = patternStream.select(outputTag){ (pattern:Map[String,Iterable[Event]],timestamp:Long)=>TimeoutEvent()}{ pattern:Map[String,Iterable[Event]] => complexEvent()}val timeoutResult:DataStream= result.getSideoutput(outputTag)