Flink番外篇~流与表的相互转换

流处理可以产生很多的商业价值。许多组织都意识到了实时管理大量数据、快速响应以及为客户提供大规模实时服务的好处,而业务逻辑明确的流应用程序则可以提供更多的竞争优势。

Flink DataStream抽象是一个功能强大的API,该API允许您定义基本的、复杂的流式管道。

此外,它还提供了像 Async IO、ProcessFunctions这样的底层操作。但对大部分用户来说,这些深层次的底层API是用不到的,他们真正需要的是能解决80%用例且只需少数代码就可搞定的API。

为了向更多用户提供强大的流处理功能,Apache Flink社区开发了一套抽象更简单,语法更简洁的API,以便用户可以专注于他们的业务逻辑(而非高级流概念)。

与其他API(例如:用来处理流中复杂事件的CEP)一起,Flink还提供了一个可统一批流处理的关系API:Table & SQL API(通常称为Table API)。

最近,来自Alibaba,Huawei,data Artisans公司的贡献者们决定进一步开发Table API。在过去的一年中,Table API已被完全重写。从Flink 1.1开始,其核心就改为了Apache Calcite,后者能解析处理SQL、优化所有关系查询。今天,Table API可以在批处理和流环境中使用统一语义来处理各种用例。

本文总结了Flink Table API的当前状态,并展示了Apache Flink最近添加的一些功能。这里介绍的功能包括统一访问批、流数据,数据转换,以及窗口算子。

下面的段落不仅提供了Table API的一般概述,还可说明将来关系API的潜力。

由于Table API是基于Flink核心API来构建的,因此DataStreams/DataSets和Table可以相互转换(转换过程不会产生太多开销)。

我们还将展示:如何从不同的源创建表,并指定可以在本地或分布式环境中执行的程序。

本文,虽然我们用的是Scala版本的Table API,但对于Java来说也说一样的(具有等价功能的SQL API)。

Data Transformation and ETL

数据处理管道的常见任务是从一个或多个系统导入数据,执行一些转换,然后再将转换后的数据导出到另一个系统。

Table API可以帮助管理这些重复任务。 对于读取数据,API提供了一组即用型TableSource,例如CsvTableSource和KafkaTableSource。如果这些即用型TableSource不能满足您的要求,您可以实现自定义TableSource,这些TableSource可以隐藏流概念中那些陌生的配置细节(例如水印生成)。

假设我们有一个存储了客户信息的CSV文件,其值由“|”字符分隔,内容包含客户标识符,名称,上次更新的时间戳,以及以逗号分隔的偏好键值对:

42|Bob Smith|2016-07-23 16:10:11|color=12,length=200,size=200

下面的示例展示了在将文件数据转换为常规DataStream程序前,如何利用Flink来读取CSV文件并执行某些数据清理。

// 设置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 配置table source
val customerSource = CsvTableSource.builder()
.path("/path/to/customer_data.csv")
.ignoreFirstLine()
.fieldDelimiter("|")
.field("id", Types.LONG)
.field("name", Types.STRING)
.field("last_update", Types.TIMESTAMP)
.field("prefs", Types.STRING)
.build()
// 命名table source
tEnv.registerTableSource("customers", customerSource)
// 定义table程序
val table = tEnv
.scan("customers")
.filter('name.isNotNull && 'last_update > "2016-01-01 00:00:00".toTimestamp)
.select('id, 'name.lowerCase(), 'prefs)
// 转换为data stream
val ds = table.toDataStream[Row]
ds.print()
env.execute()

Table API附带了大量的内置函数,通过这些函数,我们可以轻松地使用语言集成查询语法(LINQ)来定义业务逻辑。

在上面的例子中,我们先过滤了无效用户名,然后再查找最近更新了用户偏好的记录。同时,为了对数据进行规范化,我们还将姓名转换为了小写。

出于调试,这里我们只将table转换为了DataStream ,并将结果打印到了标准输出中。

CsvTableSource既支持批处理环境,又支持流处理环境。如果程序员想在批处理程序中执行上述程序,那么只需要将执行环境替换为ExecutionEnvironment,并将DataStream修改为DataSet即可。 Table API 程序本身是不需要改动的。

在上面的示例中,我们将table程序转换为了Row对象的数据流。但,这并意味着只能用row数据类型。Table API支持底层API中的所有类型,例如: Java/Scala中的Tuples, Case Classes, POJOs,或能通过Kryo序列化的泛型类型。

假设我们不想要通用的row数据类型,而是想返回一个带有下面格式的POJO类型:

class Customer {
var id: Int = _
var name: String = _
var update: Long = _
var prefs: java.util.Properties = _
}

要实现上述需求, 我们可在table程序中将CSV文件转换成Customer对象。Flink会自动为我们创建对象和映射字段。

val ds = tEnv
.scan("customers")
.select('id, 'name, 'last_update as 'update, parseProperties('prefs) as 'prefs)
.toDataStream[Customer]

或许你已经注意到了,在上面的例子中,我们使用了一个函数来解析偏好字段。尽管Flink Table API 附带了大量的内置函数,但通常还得为特定业务实现自定义的标量函数(scalar functions)。

在上面的例子中,我们使用了一个自定义函数parseProperties。下面的代码片段就是该函数的实现。

object parseProperties extends ScalarFunction {
def eval(str: String): Properties = {
val props = new Properties()
str
.split(",")
.map(\_.split("="))
.foreach(split => props.setProperty(split(0), split(1)))
props
}
}

标量函数(Scalar functions)可用来执行反序列化,抽取,或转换等操作。通过覆盖open()方法,我们甚至可以访问运行时信息(例如:分布缓存文件或指标)。注意:open()方法只会在运行时的任务生命周期中调用一次。

静态和流式数据的统一窗口

另一种常见任务(特别是在处理连续数据时)是将流分割成有限大小的窗口来进行计算。

当前,Table API支持三种不同类型的窗口:滑动窗口(sliding windows),翻滚窗口(tumbling windows),以及会话窗口(session windows)。有关这三种窗口类型的定义,可参考Flink文档。

这三种窗口都可基于事件时间或处理时间来处理事件。会话窗口可按时间间隔定义,滑动窗口和翻滚窗口可按时间间隔或行数来定义。

假设上面示例中的数据是客户更新其偏好时产生的更新事件流,并假设TableSource中的所有事件都已正确地分配了时间戳和水印。

下面我们将再次使用LINQ风格来定义窗口。下面的示例会算出1天内更新的偏好次数。

table
.window(Tumble over 1.day on 'rowtime as 'w)
.groupBy('id, 'w)
.select('id, 'w.start as 'from, 'w.end as 'to, 'prefs.count as 'updates)

通过on()参数,我们可以指定窗口是否需按事件时间来处理。Table API假设在使用事件时间时已经正确地分配了时间戳和水印。时间戳小于最后接收水印的元素会被删除。

由于时间戳的提取和水印的生成依赖于数据源,因此通常要由TableSource或上游DataStream来负责分配这些属性。

下面的代码展示了如何定义其它类型的窗口:

// 使用处理时间
table.window(Tumble over 100.rows as 'manyRowWindow)
// 使用事件时间
table.window(Session withGap 15.minutes on 'rowtime as 'sessionWindow)
table.window(Slide over 1.day every 1.hour on 'rowtime as 'dailyWindow)

由于批处理只是流处理的一种特例(批处理有明确的起点和终点), 因此这些窗口在批处理环境中也同样适用。由于我们指定了一个名为“rowtime”的列,因此可在不修改table程序的情况下直接基于DataSet来运行上述代码。这一点对于精确计算相当有用,因为它可以处理那些严重无序到达的延迟事件。

目前,Table API仅支持DataStream API中的“分组窗口(group windows)”。 其他窗口,如SQL的OVER条件窗口将在Flink 1.3中实现。

为了展示API的强大功能,下面的代码片断会展示一个高级示例:它会在一小时的滑动窗口上衰减移动平均值,并每秒返回一次聚合结果。在这个table程序中,最新订单的权重比老订单更重。这个例子是从Apache Calcite中借来的,它展示了未来Flink版本中Table API和SQL的可能性。

table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

用户自定义Table函数

用户自定义表函数是Flink 1.2中加入的新功能。 该功能对于那些包含非原子值的表列(在处理之前,需将这些非原子值提取和映射为单独字段)非常有用。表函数可接受任意数目的标量值作为输入,并可返回任意数目的行作为输出(不仅仅输出单个值),这一点类似于DataStream/DataSet API中的flatMap函数。之后,还可使用左外连接或交叉连接将表函数的输出与表中的原始行连接。

还是使用前面提到的客户表,假设现在我们想要生成一个包含颜色和大小偏好的表作为单独的列。

那么表程序看起来会像下面这样:

// 创建一个表函数实例
val extractPrefs = new PropertiesExtractor()
// 派生行,并将它们与原始行进行连接
table
.join(extractPrefs('prefs) as ('color, 'size))
.select('id, 'username, 'color, 'size)

PropertiesExtractor是一个可提取颜色和大小的用户自定义表函数。我们只对包含这两个偏好的用户感兴趣,因此如果字符串值中没有这两个属性,那么不会发出任何内容。由于表程序使用的是交叉连接,因此它会过滤掉连接右侧没有结果的客户。

class PropertiesExtractor extends TableFunction[Row] {
def eval(prefs: String): Unit = {
// 将string拆分成(key, value)对
val pairs = prefs
.split(",")
.map { kv =>
val split = kv.split("=")
(split(0), split(1))
}
val color = pairs.find(\_.\_1 == "color").map(\_.\_2)
val size = pairs.find(\_.\_1 == "size").map(\_.\_2)
//如果指定了颜色和大小,则发出一行数据
(color, size) match {
case (Some(c), Some(s)) => collect(Row.of(c, s))
case _ => // skip
}
}
override def getResultType = new RowTypeInfo(Types.STRING, Types.STRING)
}

结论

当下,人们对易于访问和使用的流更感兴趣。 虽然Flink Table API还在不断发展,但我们相信很快您就能使用纯关系API来实现批流传输,甚至将现有的Flink作业转换为表程序。

Table API已是一个非常有用的工具,因此您可在DataSet/DataStream抽象和Table抽象之间来回切换来应对限制和缺少功能。

支持Apache Hive UDF,外部目录,更多TableSource,更多窗口,以及更多算子等功能将使Table API成为更有用的工具。 尤其是,即将推出的动态表格,它表明即使在2017年,新的关系API也打开了许多可能性大门。

原文:https://flink.apache.org/news/2017/03/29/table-sql-api-update.html

发表评论
留言与评论(共有 0 条评论)
   
验证码:

相关文章

推荐文章

'); })();