Uber 的全球数据仓库团队使用统一的、 PB 级、集中建模的数据湖使所有 Uber 的数据民主化。数据湖由使用维度数据建模技术[1]开发的基础事实、维度和聚合表组成,工程师和数据科学家可以自助方式访问这些表,为 Uber 的数据工程、数据科学、机器学习和报告提供支持。因此,计算这些表的 ETL(提取、转换、加载)管道对 Uber 的应用程序和服务至关重要,为乘客安全、ETA 预测、欺诈检测等核心平台功能提供支持。在Uber数据新鲜度是一项关键业务需求。Uber在工程方面投入了大量资金以尽快处理数据,使其与现实世界中发生的事情保持同步。
为了在我们的 ETL 管道中实现这种数据新鲜度,一个关键挑战是增量更新这些建模表,而不是在每次新的 ETL 运行时重新计算所有数据。这对于在 Uber 的巨大规模下经济高效地运营这些管道也是必要的。事实上早在2016年,Uber就通过Apache Hudi[2]项目引入了具有强大增量数据处理能力[3]的全新“交易数据湖[4]”范式来应对这些挑战。我们后来将该项目捐赠给了 Apache 软件基金会。Apache Hudi 现在是一个顶级的 Apache 项目,在 lakehouse 新兴技术类别中被广泛使用。在此期间我们很高兴地看到该行业已在很大程度上从批量数据摄取转向 Apache Hudi 在 Uber 引入的更加增量的摄取模型。在此博客中,我们分享了过去一年左右的工作,将这种增量数据处理模型扩展到我们复杂的 ETL 管道,以解锁真正的端到端增量数据处理。
背景
在我们深入研究之前,让我们花时间回顾一下增量处理的一些背景知识,以及它与传统批处理和较新的流处理的关系。使用流处理,可以尽可能快地实时处理数据流,并产生更多的数据流供下游处理。流处理系统内置支持处理迟到/事后[5]数据,使用事件时间或处理时间语义处理数据。尽管理想情况下事件时间和处理时间应该相同,但由于系统重试、数据丢失或损坏、网络延迟、业务逻辑等原因,漂移非常常见。虽然批处理数据处理可以处理大量数据,但它们处理的是迟到的数据 当数据到达或与延迟数据协调时,效果不佳且无法有效处理数据。为了克服这个问题,批处理管道通常在数据稳定几个小时后触发,或者通过每天多次有效地重新计算整个表或时间窗口来触发。这意味着迟到/事后数据将需要重新计算整个分区,即使实际更改的数据与整个分区相比是最小的。
增量处理背后的想法[6]非常简单。增量处理通过每次运行仅处理新数据然后增量更新新结果,将处理流数据的语义扩展到批处理管道。由于更短的批处理管道以及由于能够更频繁地运行它们而导致的数据新鲜度加速,这可以节省大量成本。Apache Hudi 的设计初衷是为了在湖上提供增量数据处理。流系统使用某种版本化状态存储来处理延迟到达的数据,该存储具有点查找功能以读取和修改数据。同样,Apache Hudi 支持时间点读取、强大的索引功能、优化的 Merge-On-Read 存储格式和索引元数据,以处理对表的快速、频繁的突变。在传统的海量数据湖中,重新计算以处理延迟数据意味着触发所有受影响分区的重新计算(在分区表的情况下)并将此过程级联到所有下游表。Apache Hudi 支持强大的变更数据捕获能力,以实现从摄取数据到计算建模表乃至上述下游数据应用程序的增量链式数据处理。
长期以来,传统数据湖中的数据被认为是不可变的。尽管 Lakehouse 架构通过添加类似于数据仓库的事务/更新/删除来挑战这一点,但我们认为需要类似于流处理状态存储的类似数据库的功能才能充分实现这种增量数据处理模型的优势。
用例
现在让我们检查 Uber 的两个示例场景,其中增量数据处理可以产生重大影响。
司机和快递员收入
让我们举一个非常重要的例子,增量读写可以带来比传统批处理数据处理显着的性能提升。假设数据集包含每个司机每天的司机收入。有时乘客可以选择在行程结束数小时后给司机小费,这将是对包含基本收入信息的初始记录的延迟更新。下面的图 2 详细说明了特定司机是如何发生这种情况的,他在周一的行程中赚了 10 美元,并在周二早上的周一行程中获得了 1 美元的小费。
在典型的批处理 ETL 世界中,我们不知道输入数据是如何变化的。我们将假设“N”天回顾(基于操作启发式)并重新处理所有分区,在必要时将司机收入更新到目标表。事实证明,这是一个非常耗费时间和资源的过程,因为在几个月前的分区中可能只有非常少量的记录需要更新。在下图中可以看到在一次运行中触及的所有日期分区,以及需要为该特定日期更新的事件数。
正如从上图中看到的那样,甚至有少量关于司机收入的更新可以追溯到几个月前,当我们假设一个 N 天回溯窗口时,这些更新将被遗漏。在增量 ETL 方法中,所有这些更新都可以在增量管道的每次执行期间使用,并以记录级别反映在目标 Apache Hudi 表中。通过这个过程,我们能够在建模数据集中实现 100% 的数据准确性和完整性,而无需定期重新处理数月的分区。
Uber Eats 优食商家经常更新菜单
增量读取和更新插入在性能和成本上都胜过批处理模型的另一个用例是,当我们必须满足频繁更新并使用短 SLA 获取建模表中的数据时。假设有多个表包含有关以各种粒度更新的餐厅菜单的信息。例如一个表可能包含菜单项信息,而另一个表可能存储菜单级别信息。每个商家可以在一天内根据需要多次更改此信息,在我们的建模表中,我们希望尽快显示这些更改的最新状态。
迎合此用例的批处理方法是一次提取一整天的更改,然后将其与每个现有记录的最新状态合并,以获取所有实体的最新图片,然后使用此信息填充所有其他表 . 如上图(图 4)所示,每天进入的实体更新百分比占实体总数的很大一部分(4.08 亿增量变化与 110 亿实体总数相比)。批处理方法的问题是计算是在数据被认为是完整的之后完成的,并增加了所有下游表的 SLA。由于涉及大量数据,管道需要大约 14 小时才能完成,因此一次故障将再次使 SLA 增加许多小时。在增量 ETL 方法中,由于能够运行更频繁、更短的管道运行,故障对数据新鲜度 SLA 的影响大大降低。
增量 ETL 模型
我们的目标是使用 Apache Hudi 的增量处理原语查询来改善建模数据集的延迟和质量,然后在不重写整个分区的情况下更新记录。这也减少了很多操作开销,以扩展非常大的批处理管道。通过这样做我们能够使用此架构为所有派生数据集实现 100% 的数据完整性。下图详细说明了Uber如何从原始数据到它们下游的派生数据集启用增量处理。在本节中我们提供了一个模型,用于根据 ETL 逻辑的性质选择如何组合这些增量数据管道。
读取和Join策略
在我们的 ETL 管道中,需要处理可以使用 Apache Hudi 执行的各种类型的读取和Join。这涉及对单个源的增量读取,以及对多个原始数据、派生和查找表进行Join的增量读取。此外我们还需要处理单个或多个表上回填的快照读取。
下表总结了我们如何处理这些情况。
| 场景 | 如何处理 |
| 增量读取单个源 | 使用 Apache Hudi 的增量读取器并更新到目标表 |
| 增量读取+多原始数据表连接 | 使用Apache Hudi主表增量读取,其他原始数据表左外连接T-24小时增量拉取数据 |
| 增量读取 + 连接多个派生表和查找表 | 在主表上使用 Apache Hudi 的增量读取,并在其他派生表上执行左外连接,仅获取受影响的分区 |
| 回填用例 | 在 etl_start_date 和 etl_end_date 内的单个或多个表上使用快照读取 |
写策略
我们现在将介绍在 Apache Hudi 中对分区表和非分区表应用增量更新的各种方法,包括插入和插入覆盖。此外,我们将讨论针对分区表中非增量列的目标合并和更新语句的使用。我们还探讨了避免非分区表中非增量列的数据质量问题的策略。
| 表类型 | 如何处理 |
| 分区表 | 1. 使用 upsert 仅应用增量更新;2. 在执行回填操作时使用插入覆盖更新所有受影响的分区;3. 使用 Apache Spark SQL 对非增量列使用目标合并/更新语句 |
| 非分区表 | 1. 使用 upsert 仅处理增量更新;2. 在目标表上使用完全外部连接连接增量行时使用插入覆盖来更新增量列和非增量列(以避免非增量列上的 DQ 问题) |
回填策略
与流处理管道一样,增量数据管道也需要一种在业务逻辑发生变化时回填表的方法。由于 Apache Hudi 还支持批量写入操作,例如 insert_overwrite,我们通过在源表上读取快照,然后在同一表/分区上并发写入来无缝处理此类回填场景。在这种情况下,Apache Hudi 中的一些关键设计选择和功能值得强调。Apache Hudi 支持记录键和预组合键,通过允许增量写入器和回填过程在没有意外副作用(如最新写入被回填过程覆盖)的情况下运行,从而使回填过程变得轻松。Apache Hudi 还提供了运行表服务的能力,这些服务可以同时优化和管理表,而不会阻塞增量/回填写入,从而帮助我们为表实现更低的 SLA。Apache Hudi 还使我们能够确保回填过程不会更新或影响增量写入的检查点。
实现
在本节中,我们将探讨使用 Apache Hudi、Apache Spark 和 Uber 的工作流管理系统 Piper[7] 构建和管理增量 ETL 管道所需的基本构建块(尽管它也应该可以在 Apache Airflow 等系统上运行)。在Uber我们构建了一个 Apache Spark ETL 框架,能够大规模管理和操作 ETL 管道,这是通过 Piper 进行调度的。该框架建立在 Apache Hudi 的增量数据处理工具“DeltaStreamer[8]”之上,该工具最初由Uber贡献,现在许多其他组织出于类似目的使用。在Uber我们现在使用该工具从我们遗留的 Hive ETL 框架中迁移出来。我们新的 Apache Spark ETL 框架让用户可以通过简单的步骤编写和管理 ETL 管道。用户可以自定义作业运行的频率和作业消耗的资源,以获得最佳的表新鲜度。以下是定义每个管道的最低限度要求的用户输入。
表定义
一个 DDL 定义文件,其中包含预期数据集的架构信息,并将表的格式声明为 Apache Hudi 格式。可以在下图中看到示例 DDL 文件:
用于定义 DeltaStreamer 配置的 YAML
该文件将包含 Apache Spark DeltaStreamer 应用程序所期望的配置列表。在下图中可以看到 DeltaStreamer 作业中使用的示例 YAML 配置文件。
让我们回顾一些重要的配置:
•
hoodie.datasource.recordkey.field这是目标表中的主键。对主键进行去重,如果有重复的记录,则根据hoodie.datasource.write.precombine.field标识的列的最大值将其缩减为一条记录。即使对于仅追加表,这也非常有用。•
hoodie.datasource.write.operationUpsert 指示记录级更新应该使用 SQL 转换生成的有效负载在目标表上执行。
基于 SQL 的转换
我们提供了一个包含 SQL 转换的文件,其中包含 DeltaStreamer 将使用 Apache Spark SQL 执行的业务逻辑。然后,最终有效负载将用于对目标表执行记录级更新。下图是 SQL 转换示例。
请注意 DeltaStreamer 还允许从 Kafka 源而不是 Apache Hudi 表进行增量读取。
代码转换
对于更高级的用户,除了 SQL 文件之外,还可以选择提供自定义的基于 Scala/Java Apache Spark RDD 的转换器,该转换器将在运行时由 DeltaStreamer 执行。为此只需实施位于 Apache Hudi Utilities Bundle 中的转换器接口。下图中可以看到一个简单的自定义转换类,它转换维度驱动表中的增量数据并连接维度城市表:
同样用户也可以提供多个自定义转换,这些转换将被链接起来并按顺序执行。
成果
通过在 CoreGDW、Rides、Driver、UberEats、Finance 和 Earnings 等 Uber 组织中推出这个新的增量框架,我们能够实现以下好处。
性能和成本节约
从下表中可以看到通过将我们的批处理 ETL 管道转换为使用 Apache Hudi 的 DeltaStreamer 增量读取和更新插入,我们观察到的巨大性能提升。通过这种方法我们能够将管道运行时间减少 50%,并将 SLA 减少 60%。
| 管道 | vcore_seconds | memory_seconds | 成本 | 运行时间(分钟) |
| 维度驱动表的批量ETL | 3,129,130 | 23,815,200 | $11.39 | 220 |
| 度驱动表的增量 ETL | 1,280,928 | 6,427,500 | $2.44 | 39 |
| 差别 | 1,848,202 | 17,387,700 | $8.95 | 181 |
| %改进 | 59.06% | 73.01% | 78.57% | 82.27% |
| 司机状态事实表的批量 ETL | 2,162,362 | 5,658,785 | $3.30 | 94 |
| 司机状态事实表的增量 ETL | 1,640,438 | 3,862,490 | $2.45 | 48 |
| 差别 | 521,924 | 1,796 | $0.85 | 46 |
| %改进 | 24.13% | 31.74% | 25.75% | 48.93% |
跨双活数据中心的数据强一致性
Uber 拥有跨多个数据中心的双活架构。在不同数据中心的表之间实现 100% 强数据一致性对于 Uber 的规模来说至关重要,可以运行工作负载而不必担心数据不一致
通过迁移到 Apache Hudi(与 Hive 中的普通Parquet表相比),我们能够跨多个数据中心的数据湖构建高度一致的复制。为此我们在主数据中心计算一次表后复制了一个表,然后使用使用 Apache Hudi 元数据的复制器服务仅移动增量更改的文件。
提高数据质量
Apache Hudi 支持写入-审计-发布 (WAP) 模式,我们可以在发布数据之前执行预加载数据质量检查——使用这种模式,我们可以防止不良数据进入生产数据集。Apache Hudi 提供预提交验证器[9],因此在配置后可以在数据发布之前对数据运行多个基于 SQL 的质量检查。
可观测性改进
Apache Hudi 的 DeltaStreamer 有多个关键指标[10],这些指标将提供有关 ETL 执行中发生的事情的详细见解,例如正在进行的提交数量、完成的提交、插入/更新/删除的总记录等。下面的 Grafana 仪表板显示了几个示例 在 DeltaStreamer 中捕获的指标。这些指标在设置监控和警报系统以立即了解 ETL 管道何时落后于其上游源时非常有用。
下图显示了正在进行的提交与已处理的提交,这将告诉我们上游和下游管道之间的滞后,以及增量 ETL 管道处理的源表中生成的每个提交之间的滞后。当增量 ETL 出现故障时,将向我们的值班人员发送警报。
结论
在 Apache Hudi 和增量 ETL 的帮助下,我们可以增量读取更新,只对增量更改运行计算逻辑,然后将记录更新插入到我们的 Apache Hudi 表中。增量数据处理模型为数据工程社区带来了许多优势,包括大量资源节省、较低的数据新鲜度和最佳数据完整性,同时允许下游工作负载效仿。事实上这是我们能够降低系统成本,同时享受性能提升的难得机会之一。
虽然毫无疑问增量处理应该是数据Lakehouse的实际模型,但要解锁它还有很多工作要做。例如在我们的实现中,我们将自己限制在单流、多表连接上,同时在选择要用作增量扫描或完整快照扫描的表时,也依赖于对业务领域的某种程度的理解。我们期待与 Apache Hudi 和开源社区合作,充分利用我们框架中现有的 SQL 功能,并在 Apache Spark 和 Flink 等引擎上实现通用增量 SQL。
推荐阅读
引用链接
[1] 维度数据建模技术: [https://medium.com/walmartglobaltech/implementation-of-scd-2-slowly-changing-dimension-with-apache-hudi-465e0eb94a5](https://medium.com/walmartglobaltech/implementation-of-scd-2-slowly-changing-dimension-with-apache-hudi-465e0eb94a5)[2] Apache Hudi: [https://hudi.apache.org/](https://hudi.apache.org/)[3] 增量数据处理能力: [https://www.uber.com/blog/hoodie/](https://www.uber.com/blog/hoodie/)[4] 交易数据湖: [https://www.uber.com/blog/apache-hudi-graduation/](https://www.uber.com/blog/apache-hudi-graduation/)[5] 迟到/事后: [https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/)[6] 想法: [https://www.oreilly.com/content/ubers-case-for-incremental-processing-on-hadoop/](https://www.oreilly.com/content/ubers-case-for-incremental-processing-on-hadoop/)[7] Piper: [https://www.uber.com/blog/managing-data-workflows-at-scale/](https://www.uber.com/blog/managing-data-workflows-at-scale/)[8] DeltaStreamer: [https://hudi.apache.org/docs/hoodie_deltastreamer](https://hudi.apache.org/docs/hoodie_deltastreamer)[9] 预提交验证器: [https://hudi.apache.org/docs/precommit_validator](https://hudi.apache.org/docs/precommit_validator)[10] 关键指标: [https://hudi.apache.org/docs/metrics](https://hudi.apache.org/docs/metrics)