LF Edgee Kuiper是Golang实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper的主要目标是在边缘端提供一个流媒体软件架构(类似于Apache Flink)。eKuiper的规则引擎允许用户提供基于SQL或基于图形(类似于Node-RED)的规则,在几分钟内创建物联网边缘分析应用。
近日,eKuiper发布了1.8.0版本。该版本的主要亮点有:
零编码AI推理:通过通用AI函数,用户无需编码即可针对流式数据或视频流实现实时AI算法推理。该函数可以推理任意的Tensor Flow Lite模型。用户模型训练完成后下发模型即可使用,十分灵活快捷。
可视化规则创建:管理控制台中集成了可视化规则编辑器Flow Editor。用户使用免费的eKuiper manager管理控制台时,可通过可视化拖拽UI进行规则的新建和编辑。
更灵活的数据传输配置:重构了外部连接source/sink的格式和序列化实现,解耦了格式和传输协议,并支持更多的格式如csv和自定义格式。
同时,产品团队也重构了文档结构,更新了安装和应用场景文档,方便用户快速找到有用的文档信息。
之前的版本中,eKuiper支持通过扩展的方式,在插件中调用AI/ML模型进行流式数据算法推理。这种方法方便用户进行算法的预处理和后处理,但有较高的使用门槛,运维更新也比较复杂。
新版本提供了Tensor Flow Lite函数插件,用于在流式计算和视频流中进行实时AI推理。这个函数为通用的AI函数,可用于处理大部分已预训练好的Tensor Flow Lite模型。使用中,用户只需上传或提前部署好需要使用到的模型,无需额外编码即可在规则中使用这些模型。
TfLite函数接收两个参数,其中第一个参数为模型(扩展名须为.tflite)的名称,第二个参数为模型的输入。假设用户预先训练好了文本分类模型text_model和智能回复模型smart_reply_model,需要对实时流入eKuiper的数据应用这两个模型分析。使用时仅需要两个步骤:
1.下发模型到eKuiper部署的边缘端,可通过eKuiper的upload API或者其他应用管理。
2.配置规则,使用tfLite函数,指定模型名称即可使用。
函数会在eKuiper层面针对输入数据格式进行验证。用户可以通过更多的SQL语句对模型的输入和输出做预处理或者后处理。
图像/视频流推理
配合新版本提供的视频流源(详情见下文),eKuiper提供了视频接入并定时获取图像帧的能力。图像帧可在规则中,使用tfLite函数进行AI推理。Tensor Flow模型通常是针对特定的图像大小进行训练的,对图像进行推理时,经常需要进行变更大小等预处理。eKuiper也提供了resize、thumnail等预处理方法。函数会返回output tensor的数组表示供后续规则或应用处理。
在以下的规则ruleTf中,我们调用了label.tflite模型,对传入的图像先进行预处理,大小调整为224*224。
使用通用AI函数,用户可以快速部署、验证和更新AI模型,加快应用的迭代更新。
eKuiper从1.6.0版本开始提供适合面向可视化界面的图规则API,相比于SQL更适合于构建UI界面。在1.8.0版本中,我们正式在免费的eKuiper manager管理控制台中提供了Flow Editor可视化编辑器。用户在创建和编辑规则时,可选择使用原有的SQL规则编辑器或使用试用版本的Flow Editor。
Flow Editor的界面如下图所示。它的使用遵循主流可视化工作量编辑器的风格和使用逻辑。左侧是可用节点,用户自定义插件和函数也会出现在列表中。中间是画布,用户可拖拽节点并连线;右侧是属性配置视图,点击节点后可在此配置。
除了集成原有功能到Flow Editor中,新版本中还添加了两种节点:
Switch node:该节点允许消息被路由到不同的流程分支,类似于编程语言中的switch语句。
Script node:该节点允许针对传递的信息运行JavaScript代码。
有了这两种节点,Flow Editor可以创建传统多分支工作流并且更加容易进行节点的扩展,实现脚本编写。
eKuiper通过source/sink与外部系统进行连接、读入或写出数据。以source为例,每种类型的source读取数据时都需要经过连接(connect)和序列化(serialization)两个步骤。例如,MQTT source,连接意味着遵循MQTT协议连接broker,而序列化则是将读取到的数据payload解析成eKuiper内部的map格式。
连接和序列化
此前,连接和序列化通常在source内部实现,因此当用户需要解析自定义格式时,即使连接协议是MQTT等已支持协议,仍然需要编写完整的source插件。新的版本中,格式和source类型进一步分离,用户可以自定义格式,而各种格式可以与不同的连接类型结合使用。
例如,创建MQTT类型的数据流时可定义各种不同的payload格式。
Schema
此前eKuiper支持在Create Stream的时候指定数据结构类型等。但该方式存在一些不足:
额外性能消耗。当前的Schema没有与数据原本的格式Schema关联,因此在数据解码之后,需要再额外进行一次验证/转换;而且该过程基于反射动态完成,性能较差。例如,使用Protobuf等强Schema时,经Protobuf解码之后的数据应当已经符合格式,不应再进行转换。
Schema定义繁琐。同样无法利用数据本身格式的Schema,而是需要额外配置。
新的版本中,Stream定义时支持逻辑Schema和格式中的物理Schema定义。SQL解析时,会自动合并物理Schema和逻辑Schema,用于指导SQL的验证和优化。同时,我们也提供了API,用于外部系统获取数据流的实际推断Schema。
格式列表
新版本中,支持的格式扩展到如下几种。部分格式包含内置的序列化;部分格式(如Protobuf)既可以使用内置的动态序列化方式也可以由用户提供静态序列化插件以获得更好的性能。在Schema支持方面,部分格式带有Schema,而自定义格式也可以提供Schema实现。
新的版本继续加强了有状态分析函数的能力,同时提供了统计函数,提升了产品原生的分析能力。
有条件分析函数
分析函数添加了WHEN条件判断子句,根据是否满足条件来确定当前事件是否为有效事件。当为有效事件时,根据分析函数语意计算结果并更新状态。当为无效事件时,忽略事件值,复用保存的状态值。
增加了WHEN子句之后,分析函数可以实现更加复杂的有状态分析。lag(StartTime) OVER (WHENhad_changed(true,StatusCode))
统计函数
新的版本中,我们提供了多个聚合统计函数,例如标准差、方差和百分位的计算。
eKuiper可以处理二进制图像数据,但是此前的测试中,图像都是经由MQTT、HTTP等偏向文本数据传输的协议来发送。新版本提供了视频流源,增加了一种新的二进制数据源。另外,我们大幅增强了文件source的能力,支持更多文件类型并支持流式消费文件内容。
文件源
之前版本的文件源主要用于创建Table,对流式处理的支持不够完善。新的版本中,文件源也支持作为用作流,此时通常需要设置interval参数以定时拉取更新。同时增加了文件夹的支持,多种文件格式的支持和更多的配置项。
新版本中支持的文件类型有:
json:标准的JSON数组格式文件。如果文件格式是行分隔的JSON字符串,需要用lines格式定义。
csv:支持逗号分隔的csv文件,以及自定义分隔符。
lines:以行分隔的文件。每行的解码方法可以通过流定义中的格式参数来定义。例如,对于一个行分开的JSON字符串,文件类型应设置为lines,格式应设置为JSON。
视频流源
视频源用于接入视频流,例如来自摄像头的视频或者直播视频流。视频流源定期采集视频流中的帧,作为二进制流接入eKuiper中进行处理。
通过视频源接入的数据,可以使用已有的SQL功能,例如AI推理函数功能等,转换成数据进行计算或输出为新的二进制图像等。
部署在边缘端的规则运维相对困难。而边缘端的部署数量通常较大,手工重启规则或重启eKuiper也会成为较为繁琐的工作。新的版本中,我们增强了规则的自治和自适应能力。
规则自动重启策略
规则因各种原因出现异常时可能会停止运行,其中有些错误是可恢复的。eKuiper 1.8.0提供了可配置的规则自动重启功能,使得规则失败后可以自动重试从而从可恢复的错误中恢复运行。
用户可配置全局的规则重启策略,也可以针对每个规则配置单独的重启策略。规则重启配置的选项包括:
重试次数
重试间隔
重试间隔系数,即重试失败后重试时间增加的倍数
最大重试间隔
随机重试延迟,防止多个规则总是在同一个时间点重试,造成拥塞
通过配置重试,可以在出现偶发错误时自动恢复,减少人工运维的需要。
数据导入导出
新版本中提供了RESTAPI和CLI接口,用于导入导出当前eKuiper实例中的所有配置(流、表、规则、插件、源配置、动作配置、模式)。这样可以快速地备份配置或者移植配置到新的eKuiper实例中。导入导出的规则集为文本的JSON格式,可读性较强,也可以手工编辑。
导出配置的rest接口为GET/data/export,通过此API可导出当前节点的所有配置
导出配置的rest接口为POST/data/import,通过此API可导入已有配置至目标eKuiper实例中
如果导入的配置中包含插件(native)、静态模式(static schema)的更新,则需要调用接口POST/data/import?stop=1
导入配置的状态统计可用GET/data/import/status接口查看
Portable插件热更新
相比原生插件,Portable插件更加容易打包和部署,因此也有更多的更新需求。之前的版本中,Portable插件更新后无法立即生效,需要手动重启使用插件的规则或者重启eKuiper。eKuiper 1.8.0中,插件更新后,使用插件的规则可无缝切换到新的插件实现中,减少运维工作。
2023-04-07 EMQ 映云科技发布了 《EMQ&阿里云Lindorm联合方案:解决物联网关键业务场景数据处理难题》的文章
2023-03-24 EMQ 映云科技发布了 《来2023全球边缘计算大会与EMQ探讨云边协同落地实践》的文章
2023-03-20 EMQ 映云科技发布了 《EMQ&南洋万邦|深度激活工业数据潜力,加速绿色制造数智化转型升级》的文章
2023-02-24 EMQ 映云科技发布了 《EMQX在Kubernetes中如何进行优雅升级》的文章
2023-02-24 EMQ 映云科技发布了 《EMQX Cloud Serverless正式上线:实现三秒部署的MQTT Serverless云服务》的文章