本期作者
王翔宇
哔哩哔哩资深开发工程师
2017年加入B站,现服务于基础架构实时团队。先后负责B站日志系统、实时流式传输工作。
魏泽丰
哔哩哔哩高级开发工程师
2021年加入B站,现服务于基础架构实时团队,负责实时流式传输以及Flink CDC相关工作。
高瑞超
哔哩哔哩资深开发工程师
2021年加入B站,现服务于基础架构实时团队,负责实时流式传输以及Flink connector相关工作。
01 背景
Lancer是B站的实时流式传输平台,承载全站服务端、客户端的数据上报/采集、传输、集成工作,秒级延迟,作为数仓入口是B站数据平台的生命线。目前每日峰值 5000w/s rps, 3PB/天, 4K+条流的数据同步能力。
服务如此大的数据规模,对产品的可靠性、可扩展性和可维护性提出了很高的要求。流式传输的实现是一个很有挑战的事情,聚焦快、准、稳的需求, Lancer整体演进经历了大管道模型、BU粒度管道模型、单流单作业模型三个阶段的演进,下面我们娓娓道来。
02 关键词说明
logid:每个业务方上报的数据流以logid进行标识,logid是数据在传输+集成过程中的元信息标识。
数据源:数据进入到lancer的入口,例如:log-agent,bfe-agent,flink cdc
lancer-gateway(数据网关):接收数据上报的网关。
数据缓冲层:也叫做内部kafka,用于解耦数据上报和数据分发。
lancer-collector(数据分发层):也叫做数据同步,可以根据实际场景完成不同端到端的数据同步。
03 技术演进
整个B站流式数据传输架构的演进大致经历了三个阶段。
3.1 架构V1.0-基于flume的
大管道数据传输架构(2019之前)
B站流式传输架构建立之初,数据流量和数据流条数相对较少,因此采用了全站的数据流混合在一个管道中进行处理,基于flume二次定制化的数据传输架构,架构如下:
v1.0架构在使用中暴露出一些的痛点:
1. 数据源端对于数据上报的可控性和容错性较差,例如:
2. 整体架构是一个大管道模型,资源的划分和隔离不明确,整体维护成本高,自身故障隔离性差。
3. 基于flume二次迭代的一些缺陷:
3.2 架构V2.0-BU粒度的
管道化架构(2020-2021)
针对v1.0的缺陷,我们引入了架构v2.0,架构如下:
此架构的关键细节如下:
1. 强化了数据上报源端的边缘可控能力
2. 数据管道以BU为粒度搭建,管道间资源隔离,每个管道包含整套独立的完整数据传输链路,并且数据管道支持基于airflow快速搭建。故障隔离做到BU级别。
3. 数据网关升级到自研lancer-gateway2.0,逻辑精简,支持流控反压,并且适配kafka failover, 基于k8s进行部署。
4. hdfs分发基于flink jar进行实现:支持exactly once语义保证。
V2.0架构相对于v1.0, 重点提升了数据上报边缘的可控力、BU粒度管道间的资源划分和隔离性。但是随着B站流式数据传输规模的快速增加,对数据传输的时效性、成本、质量也提出了越来越高的要求,V2.0也逐渐暴露出了一些缺陷:
1. logid级别隔离性差:
2. 网关是异步发送模型,极端情况下(组件崩溃),存在数据丢失风险。
3. ods层局部热点/故障影响放大
4. hdfs小文件问题放大
针对上述痛点,最直接的解决思路就是整体架构做进一步的隔离,以单logid为维度实现数据传输+分发。面临的挑战主要有以下几个方面:
3.3 架构V3.0-基于Flink SQL的
单流单作业数据集成方案
在V3.0架构中,我们对整体传输链路进行了单作业单数据流隔离改造,并且基于Flink SQL支撑数据分发场景。架构如下:
相比v2.0, 资源池容量管理上依然以BU为粒度,但是每个logid的传输和分发相互独立,互不影响。具体逻辑如下 :
相较于之前的实现,v3.0架构具有以下的优势:
1. 可靠性:
2. 可维护性上:
3. 可扩展性:
04 V3.0架构具体实现
我们重点介绍下,当前V3.0结构各个分层的实现。
4.1 数据上报边缘层
4.1.1 log-agent
基于go自研,插件化架构,部署于物理机,可靠、高效的支持服务端数据上报。
时间架构分为收集、处理、发送三层,具有以下主要特性:
4.1.2 bfe-agent
基于go自研,部署于cdn,用于承载公网数据上报。
边缘cdn节点,cdn服务器上部署nginx和bfe-agent,bfe-agent整体实现架构与log-agent类似,对于web和app端数据上报请求QPS高、突发性强的特点,主要强化了以下能力:
4.2 数据上报网关层
v3.0方案中,数据数据网关的架构如下:
数据网关功能特性如下:
整个数据网关中的实现难点是:单gateway承载多logid处理的过程中如何保证隔离性和公平性,我们参考了Golang 中GMP的机制,整体数据流程如下:
1. 收到的请求,会把请求放到logid对应的请求队列,如果队列满,直接拒绝请求
2. 每个kafka集群,会初始化一个N大小的kafka producer pool,其中每个producer会遍历所有的队列,进行数据的发送。
3. 对于每个logid的请求队列,会从两个维护限制资源的占用,以保证公平性和隔离性
4.3 数据上报分发层
随着flink在实时计算领域的成熟,其高性能、低延迟、exactly once语义保证、批流一体、丰富的数据源支持、活跃的社区等优势,促使我们选择了以flink sql作为数据分发层的解决方案。当前我们主要支持了kafka→hive, kafka→kafka, cdc→kafka->hudi/hive三种场景:
1. kafka→hive
2. kafka→kafka
3. cdc→kafka->hudi/hive
05 Flink connector功能迭代
在Flink SQL数据分发场景的支持中,针对我们遇到的实际需求,对社区原生connector进行了对应的优化,我们针对性的介绍下。
5.1 hive sink connector优化
断流空分区提交
背景:B站离线作业的拉起依赖上游分区提交,HDFS分区提交的判断依赖于作业整体watermark的推进,但是某些logid在断流的情况下,如何进行分区的提交呢
解决办法:
如图所示:当所有的StreamFileWriter连续两次checkpoint内未处理任何数据的情况下,StreamingFileCommiter会判定发生了断流,按照当前时间提交分区。
支持下游增量数据同步
背景:传统方式ods到dwd的数据同步只有当ods层分区ready之后才会开始,时效性较差,如何加速数据的同步?
解决办法:
orc+zstd
背景:相较于行式存储,列式存储在压缩比上有着显著的优势。
解决办法:支持orc+zstd, 经过测试,相较于text+lzo,空间节省在40%以上。
hdfs异步close
背景:snapshot阶段flush数据,close文件经常因为个别文件慢拖累整体吞吐。
解决办法:
小文件合并
背景:rolling on checkpoint的滚动策略,会导致文件数量的膨胀,对namenode产生较大的压力。
解决办法:
5.2 kafka connector优化
支持protobuf format
背景:用户有处理protobuf格式数据的需求
解决办法:
kafka sink支持自定义分流
背景:用户希望在一个sql作业中根据需要,灵活定制将消息发送到指定kafka 集群和topic。
解决办法:
CREATE TABLE sink_test (
broker_name_arg varchar,
topic_name_arg varchar,
message string,
t1 string
) WITH(
'bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 根据broker_name_arg作为udf参数计算brokers
'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 获取brokers Udf
'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 根据broker_name_arg和topic_name_arg作为udf参数计算topic
'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 计算topoc Udf
'udf.cache.min' = '1', // 缓存时间
'exclude.udf.field' = 'false', // udf的相关字段是否输出
'connector' = 'kafka-diversion'
);
5.3 cdc connector优化
sql场景下多库多表场景支持
背景:原生的flink cdc source在单个sql任务中,只能同步相同DDL定义的表,如果需要同步异构DDL,不得不启动多个独立的job进行同步。这样会存在资源的额外开销。
解决办法:
断流场景分区提交支持
背景:由于整个cdc方案存在上游和下游两个独立的job,并且都是基于event time推进watermark做分区的提交,下游watermark的推进受阻可能受到数据正常断流或者上游作业异常两种原因的影响,如果正确判断呢?
解决办法:
CREATE TABLE mysql_binlog (
host_name STRING METADATA FROM 'host_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL,
sequence BIGINT METADATA FROM 'sequence' VIRTUAL, // sequence严格单调递增
heartbeat BOOLEAN METADATA FROM 'heartbeat'VIRTUAL, // 对于心跳信息标识为true
mtime TIMESTAMP(3) METADATA FROM 'mtime'VIRTUAL, // 提取mtime,用于下游推进watermark
id BIGINT NOT NULL,
filed_list BYTES NOT NULL, // 去DDL,在source内部数据全部按照changelog-json格式进行序列化、
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = '3552',
'username' = 'datacenter_cdc',
'password' = 'xxx',
'database-name' = 'xxx',
'debezium.format' = 'bytes',
'table-name' = 'xxx',
'server-time-zone' = 'Asia/Shanghai',
'heartbeat.enable'='true',
'scan.incremental.snapshot.chunk.size' = '80960'
);
06 架构稳定性优化
为了保障流式传输稳定和高效运行,我们在以下几个方面做了一些优化,分别介绍下:
6.1 管道热点优化
作业在正常运行的过程中,经常遇到局部热点问题,例如kafka/hdfs io热点导致局部并行度消费速度下降或者写入受阻、yarn队列机器load不均匀导致作业部分并行度消费能力不如,虽然原因多种多样,但是本质看,这些问题的一个共性就是由于局部热点导致局部数据延迟。针对这个问题,我们分别从局部流量调度和全局流量调度两个维度进行优化。
局部流量调度
局部流量调度的优化思路是在单个producer和task内部,分区之间进行流量的重分配。目前在两个点就行了优化:
全局流量调度
全局流量调度的优化思路是整个传输链路层级之间的流量调配,目前我们将生产者(lancer-gateway)与消费者(flink sql kafka source)进行联动,当消费者出现tp消费lag的情况,通过注册黑名单(lag partition)到zookeeper,上游生产者感知黑名单,停止向高lag partition中继续发送数据。
Flink kafka source中基于flink AggregateFunction机制,kafka source subtask上报lag到job manager,job manager基于全局lag判断注册黑名单到zookeeper
黑名单判断逻辑:当单tp lag > min(全局lag平均值,全局lag中位数)* 倍数 && 单tp lag 大于 lag绝对值, 其中 "单tp lag 大于 lag绝对值" 是为了规避此机制过于敏感,"单tp lag > min(全局lag平均值,全局lag中位数)* 倍数" 用于筛选出头部的lag tp。为了防止黑名单比例过大,黑名单剔除的tp数量上限不得大于全部tp数量的一定比例。
局部流量调度和全局流量调度在管道热点优化效果上存在一定的互补性,但是也各有优势。
6.2 全链路埋点质量监控
数据质量是重要一环,通常数据质量包含完整性、时效性、准确性、一致性、唯一性等方面,对于数据传输场景,当面我们重点关注完整性和时效性两个方面
整体质量方案大致包含监控数据采集和规则配置两个大的方向,整体架构如下:
监控数据采集
我们自研了trace系统:以logid为单位,我们在数据处理流程中的每一层都进行了监控埋点
监控报警规则
我们针对数据流进行了分级,每个等级指定了不同的保障级别(SLA),SLA破线,报警通知oncall同学处理。
延迟归档报警:hdfs分区提交延迟,触发报警。
实时完整性监控:基于trace数据,实时监控端到端的完整性,接收条数/落地条数
离线数据完整性:hdfs分区ready后,触发dqc规则运行,对比接收条数(trace数据)/落地条数(hive查询条数)
传输延迟监控:基于trace数据,计算端到端数据传输延迟的分位数。
DQC阻塞:离线数据完整性异常后,阻塞下游作业的调度。
6.3 kafka同步断流重复优化
相对比2.0方案中flume方案,基于flink sql的kafka到kafka的实现方案明显的一个变化就是作业的重启、故障恢复会导致整体的断流和一定比例的数据重复(从checkpoint恢复),因此如何降低用户对此类问题的感知,至关重要。
首先梳理下可能造成问题出现的原因:1)作业升级重启 2)task manager故障 3)job manager 故障 4)checkpoint连续失败,同时根据flink job整体提交流程,影响作业恢复速度的关键环节是资源的申请。根据上述分析和针对性测试,针对kafka同步场景的断流重复采用了如下优化方式:
经过上述优化,经过测试一个(50core,400GB memory,50 slot)规模的作业,优化效果如下:
6.4 kafka流量动态failover能力
为了保证数据及时上报,Lancer对于数据缓冲层的kafka的发送成功率依赖性很高,经常遇到的case是高峰期或者流量抖动导致的kafka写入瓶颈。参考Netflix Hystrix 熔断原理,我们在网关层实现了一种动态 kafka failover机制:网关可以根据实时的数据发送情况计算熔断率,根据熔断率将流量在normal kafka和failover kafka之间动态调节。
6.5 全链路流控、反压、降级
从端上上报到数据落地的整个流程中,为了保证稳定性和可控性,除了前述手段,我们还引入了整体流控、反压、降级等技术手段,下面综合介绍下。
从后向前,分为几个环节:
1. 数据分发层:
2. 数据网关层:
3. 数据上报层:
6.6 开发阶段质量验证
为了在开发阶段保证整体服务的正确性和稳定性,开发阶段我们设计了一套完整的测试框架。
07 未来展望
作者:王翔宇,魏泽丰,高瑞超
来源:微信公众号:哔哩哔哩技术
出处:https://mp.weixin.qq.com/s/NawxeiP-_DFpyoekRrzlLQ
留言与评论(共有 0 条评论) “” |